MFIX  2016-1
sendrecvnode_mod.f
Go to the documentation of this file.
1 !----------------------------------------------------------------------!
2 ! Module: MPI_PACK_DES !
3 ! Author: Pradeep Gopalakrishnan, J.Musser !
4 ! !
5 ! Purpose: Contains routines for passing data interpolted to ghost !
6 ! cells to the owner cell via summation. !
7 !----------------------------------------------------------------------!
8  module sendrecvnode
9 
10  PRIVATE
11 
12  PUBLIC :: init_des_collect_gdata, des_collect_gdata
14 
15  interface des_collect_gdata
16  module procedure des_collect_gdata_db1
17  module procedure des_collect_gdata_db2
18  end interface des_collect_gdata
19 
20  integer :: itotalneigh, itotalindx
21 
22  integer, allocatable :: itoproc(:)
23  integer, allocatable :: istartsend(:)
24  integer, allocatable :: istartrecv(:)
25 
26 ! Following variables are used to exchange grid index values when
27 ! des_interp_on is true
28  integer, allocatable :: isendnodes(:)
29  integer, allocatable :: irecvnodes(:)
30 
31  double precision, allocatable :: dsendnodebuf(:)
32  double precision, allocatable :: drecvnodebuf(:)
33 
34  integer, allocatable :: irecvreqnode(:)
35  integer, allocatable :: isendreqnode(:)
36 
37  contains
38 
39 !----------------------------------------------------------------------!
40 ! Subroutine: INIT_DES_COLLECT_gDATA !
41 ! Author: J.Musser !
42 ! !
43 ! Purpose: Setup the send/recv schedules for summing ghost cell data !
44 ! back into the owner cell for DES interpolation. !
45 !----------------------------------------------------------------------!
46  subroutine init_des_collect_gdata
47 
48  use compar, only: istart1, istart2, iend1, iend2
49  use compar, only: jstart1, jstart2, jend1, jend2
50  use compar, only: kstart1, kstart2, kend1, kend2
52  use desgrid, only: iofproc, jofproc, kofproc
53  use desgrid, only: procijk
54  use discretelement, only: des_periodic_walls_x, des_periodic_walls_y, des_periodic_walls_z
55  use functions, only: funijk, wall_at
56 
57  implicit none
58 
59 ! Local variables
60 !-----------------------------------------------
61  integer :: lijkproc,liproc,ljproc,lkproc
62  integer :: li,lj,lk
63  integer :: li2,lj2,lk2
64 
65  integer :: liproc_start, liproc_end
66  integer :: ljproc_start, ljproc_end
67  integer :: lkproc_start, lkproc_end
68 
69  integer :: lci,lcj,lck,lproc,lcount
70  integer :: linode_start,linode_end, linode
71  integer :: ljnode_start,ljnode_end, ljnode
72  integer :: lknode_start,lknode_end, lknode
73  logical :: lpresent
74 
75  integer, allocatable :: iprocsumindx(:)
76 
77 !-----------------------------------------------
78 
79 ! set flags for interprocessor boundaries
80  liproc = iofproc(mype)
81  ljproc = jofproc(mype)
82  lkproc = kofproc(mype)
83 
84 ! if not periodic then limit the processor
85  if(des_periodic_walls_x .and. nodesi > 1) then
86  liproc_start=liproc-1
87  liproc_end=liproc+1
88  else
89  liproc_start =max(liproc-1,0)
90  liproc_end=min(liproc+1,nodesi-1)
91  end if
92 
93  if(des_periodic_walls_y .and. nodesj > 1) then
94  ljproc_start=ljproc-1
95  ljproc_end=ljproc+1
96  else
97  ljproc_start =max(ljproc-1,0)
98  ljproc_end=min(ljproc+1,nodesj-1)
99  end if
100 
101  if(des_periodic_walls_z .and. nodesk > 1) then
102  lkproc_start=lkproc-1
103  lkproc_end=lkproc+1
104  else
105  lkproc_start =max(lkproc-1,0)
106  lkproc_end=min(lkproc+1,nodesk-1)
107  end if
108 
109  itotalneigh = (liproc_end-liproc_start+1)*&
110  (ljproc_end-ljproc_start+1)*(lkproc_end-lkproc_start+1)-1
111 
112 ! allocate the variables
113  allocate (itoproc(itotalneigh))
114  allocate (iprocsumindx(itotalneigh))
115  allocate (istartsend(itotalneigh+1))
116  allocate (istartrecv(itotalneigh+1))
117  allocate (isendreqnode(itotalneigh))
118  allocate (irecvreqnode(itotalneigh))
119 
120 ! First loop to count the total index for each processor and count the
121 ! neighbour processor
122  itotalneigh = 0
123  itoproc(:)=-1
124  iprocsumindx(:) =0
125 
126  do lk = lkproc_start, lkproc_end
127  do lj = ljproc_start, ljproc_end
128  do li = liproc_start, liproc_end
129 
130  li2 = mod(li,nodesi); if(li2 < 0) li2 = nodesi-1
131  lj2 = mod(lj,nodesj); if(lj2 < 0) lj2 = nodesj-1
132  lk2 = mod(lk,nodesk); if(lk2 < 0) lk2 = nodesk-1
133 
134  lijkproc = procijk(li2,lj2,lk2)
135 
136  if (lijkproc.eq.mype) cycle
137 
138 ! check if the processor exits in the previous list
139  lpresent = .false.
140  do lproc = 1,itotalneigh
141  if (lijkproc .eq.itoproc(lproc)) then
142  lpresent = .true.
143  exit
144  end if
145  end do
146  if(.not.lpresent) then
147  itotalneigh = itotalneigh + 1
148  lproc = itotalneigh
149  end if
150 
151  itoproc(lproc) = lijkproc
152 
153  lci=(liproc-li)
154  if(lci == 1) then
155  linode_start = istart2
156  linode_end = istart2
157  elseif(lci == -1) then
158  linode_start = iend2
159  linode_end = iend2
160  else
161  linode_start = istart1
162  linode_end = iend1
163  endif
164 
165  lcj=(ljproc-lj)
166  if(lcj == 1) then
167  ljnode_start = jstart2
168  ljnode_end = jstart2
169  elseif(lcj == -1) then
170  ljnode_start = jend2
171  ljnode_end = jend2
172  else
173  ljnode_start = jstart1
174  ljnode_end=jend1
175  endif
176 
177  lck=(lkproc-lk)
178  if(lck == 1) then
179  lknode_start = kstart2
180  lknode_end = kstart2
181  elseif(lck == -1) then
182  lknode_start = kend2
183  lknode_end = kend2
184  else
185  lknode_start = kstart1
186  lknode_end=kend1
187  endif
188 
189  do lknode = lknode_start,lknode_end
190  do linode = linode_start,linode_end
191  do ljnode = ljnode_start,ljnode_end
192  IF(dead_cell_at(linode,ljnode,lknode)) cycle
193  IF(wall_at(funijk(linode,ljnode,lknode))) cycle
194  iprocsumindx(lproc) = iprocsumindx(lproc) + 1
195  end do
196  end do
197  end do
198  end do
199  end do
200  end do
201 
202 
203 !assign the start index
204  do lproc =1,itotalneigh+1
205  istartsend(lproc)=sum(iprocsumindx(1:lproc-1))+1
206  end do
207  itotalindx=istartsend(itotalneigh+1)-1
208 
209 ! allocate the variables
210  allocate(isendnodes(itotalindx))
211  allocate(dsendnodebuf(itotalindx))
212 
213 ! second loop to assign actual index for send map
214  iprocsumindx(:)=0
215  do lk = lkproc_start,lkproc_end
216  do lj = ljproc_start,ljproc_end
217  do li = liproc_start,liproc_end
218  li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
219  lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
220  lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
221  lijkproc = procijk(li2,lj2,lk2)
222  if (lijkproc.eq.mype) cycle
223 ! find the index of the neighbour
224  do lproc =1,itotalneigh
225  if(lijkproc.eq.itoproc(lproc)) then
226  exit
227  end if
228  end do
229 
230  lci=(liproc-li)
231  if(lci == 1) then
232  linode_start = istart2
233  linode_end = istart2
234  elseif(lci == -1) then
235  linode_start = iend2
236  linode_end = iend2
237  else
238  linode_start = istart1
239  linode_end = iend1
240  endif
241 
242  lcj=(ljproc-lj)
243  if(lcj == 1) then
244  ljnode_start = jstart2
245  ljnode_end = jstart2
246  elseif(lcj == -1) then
247  ljnode_start = jend2
248  ljnode_end = jend2
249  else
250  ljnode_start = jstart1
251  ljnode_end=jend1
252  endif
253 
254  lck=(lkproc-lk)
255  if(lck == 1) then
256  lknode_start = kstart2
257  lknode_end = kstart2
258  elseif(lck == -1) then
259  lknode_start = kend2
260  lknode_end = kend2
261  else
262  lknode_start = kstart1
263  lknode_end=kend1
264  endif
265 
266  lcount = istartsend(lproc)+iprocsumindx(lproc)
267  do lknode = lknode_start,lknode_end
268  do linode = linode_start,linode_end
269  do ljnode = ljnode_start,ljnode_end
270  IF(dead_cell_at(linode,ljnode,lknode)) cycle
271  IF(wall_at(funijk(linode,ljnode,lknode))) cycle
272  isendnodes(lcount)=funijk(linode,ljnode,lknode)
273  iprocsumindx(lproc)=iprocsumindx(lproc)+1
274  lcount = lcount+1
275  end do
276  end do
277  end do
278 
279  end do
280  end do
281  end do
282 
283 ! Build the recv schedule
284 
285  iprocsumindx(:) =0
286  do lk = lkproc_start, lkproc_end
287  do lj = ljproc_start, ljproc_end
288  do li = liproc_start, liproc_end
289 
290  li2 = mod(li,nodesi); if(li2 < 0) li2 = nodesi-1
291  lj2 = mod(lj,nodesj); if(lj2 < 0) lj2 = nodesj-1
292  lk2 = mod(lk,nodesk); if(lk2 < 0) lk2 = nodesk-1
293 
294  lijkproc = procijk(li2,lj2,lk2)
295 
296  if (lijkproc.eq.mype) cycle
297 
298 ! check if the processor exits in the previous list
299  do lproc = 1,itotalneigh
300  if(lijkproc .eq. itoproc(lproc)) exit
301  end do
302 
303  lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
304 
305  linode_start = istart1; linode_end=iend1
306  ljnode_start = jstart1; ljnode_end=jend1
307  lknode_start = kstart1; lknode_end=kend1
308  if(lci.eq. 1) linode_end = istart1
309  if(lci.eq.-1) linode_start = iend1
310  if(lcj.eq. 1) ljnode_end = jstart1
311  if(lcj.eq.-1) ljnode_start = jend1
312  if(lck.eq. 1) lknode_end = kstart1
313  if(lck.eq.-1) lknode_start = kend1
314 
315  do lknode = lknode_start,lknode_end
316  do linode = linode_start,linode_end
317  do ljnode = ljnode_start,ljnode_end
318  IF(dead_cell_at(linode,ljnode,lknode)) cycle
319  IF(wall_at(funijk(linode,ljnode,lknode))) cycle
320  iprocsumindx(lproc) = iprocsumindx(lproc) + 1
321  end do
322  end do
323  end do
324  end do
325  end do
326  end do
327 
328 !assign the start index
329  do lproc =1,itotalneigh+1
330  istartrecv(lproc)=sum(iprocsumindx(1:lproc-1))+1
331  end do
332  itotalindx=istartrecv(itotalneigh+1)-1
333 
334  allocate(irecvnodes(itotalindx))
335  allocate(drecvnodebuf(itotalindx))
336 
337 ! second loop to assign actual index
338  iprocsumindx(:)=0
339  do lk = lkproc_start,lkproc_end
340  do lj = ljproc_start,ljproc_end
341  do li = liproc_start,liproc_end
342 
343  li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
344  lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
345  lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
346  lijkproc = procijk(li2,lj2,lk2)
347 
348  if (lijkproc.eq.mype) cycle
349 
350 ! find the index of the neighbour
351  do lproc =1,itotalneigh
352  if(lijkproc.eq.itoproc(lproc)) then
353  exit
354  end if
355  end do
356 
357  lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
358 
359 ! Set up the receive map
360  linode_start = istart1; linode_end=iend1
361  ljnode_start = jstart1; ljnode_end=jend1
362  lknode_start = kstart1; lknode_end=kend1
363 
364  if(lci.eq. 1) linode_end = istart1
365  if(lci.eq.-1) linode_start = iend1
366  if(lcj.eq. 1) ljnode_end = jstart1
367  if(lcj.eq.-1) ljnode_start = jend1
368  if(lck.eq. 1) lknode_end = kstart1
369  if(lck.eq.-1) lknode_start = kend1
370 
371  lcount = istartrecv(lproc)+iprocsumindx(lproc)
372  do lknode = lknode_start,lknode_end
373  do linode = linode_start,linode_end
374  do ljnode = ljnode_start,ljnode_end
375  IF(dead_cell_at(linode,ljnode,lknode)) cycle
376  IF(wall_at(funijk(linode,ljnode,lknode))) cycle
377  irecvnodes(lcount)=funijk(linode,ljnode,lknode)
378  iprocsumindx(lproc)=iprocsumindx(lproc)+1
379  lcount = lcount+1
380  end do
381  end do
382  end do
383 
384  end do
385  end do
386  end do
387 
388 ! call des_dbgnodesr()
389 
390  RETURN
391  end subroutine init_des_collect_gdata
392 
393 
394 !----------------------------------------------------------------------!
395 ! Subroutine: DES_COLLECT_gDATA_db1 !
396 ! Author: J.Musser !
397 ! !
398 ! Purpose: Conduct a reverse halo type exchange were data that was !
399 ! stored in ghost cells is summed into the 'real' cell. This is !
400 ! needed for DES interpolation where data is interpolted into ghost !
401 ! cells. !
402 !----------------------------------------------------------------------!
403  subroutine des_collect_gdata_db1(pvar)
405  use desmpi_wrapper, only: des_mpi_wait
406  use desmpi_wrapper, only: des_mpi_isend
407  use desmpi_wrapper, only: des_mpi_irecv
408  use parallel_mpi, only: mpi_check
409  use compar, only: mype, numpes
410 !-----------------------------------------------
411  implicit none
412 !-----------------------------------------------
413 ! dummy variables
414 !-----------------------------------------------
415  double precision, intent(inout) :: pvar(:)
416 !-----------------------------------------------
417 ! local variables
418 !-----------------------------------------------
419  character(len=80), parameter :: name = 'des_exchangenode'
420  integer :: lindx,lcount,lcount2,lneigh,ltag,lerr
421  integer :: lstart,lend,ltotal
422 !-----------------------------------------------
423 
424 ! steps pack the buffer call isend and irecv
425  do lcount = 1,itotalneigh
426  lneigh = itoproc(lcount)
427  lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
428  do lcount2 = lstart,lend
429  dsendnodebuf(lcount2) = pvar(isendnodes(lcount2))
430  end do
431 
432  ltag = message_tag(lneigh,mype)
433  lstart = istartrecv(lcount);lend=istartrecv(lcount+1)-1
434  ltotal = lend-lstart+1
435  call des_mpi_irecv(drecvnodebuf(lstart:lend),ltotal, &
436  lneigh,ltag,irecvreqnode(lcount),lerr)
437  call mpi_check( name //':mpi_irecv ', lerr )
438 
439  ltag = message_tag(mype,lneigh)
440  lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
441  ltotal = lend-lstart+1
442  call des_mpi_isend(dsendnodebuf(lstart:lend),ltotal, &
443  lneigh,ltag,isendreqnode(lcount),lerr)
444  call mpi_check( name //':mpi_irecv ', lerr )
445  end do
446 ! call mpi wait to complete the exchange
447  do lcount = 1,itotalneigh
448  call des_mpi_wait(isendreqnode(lcount),lerr)
449  call mpi_check( name //':mpi_wait-send', lerr )
450  call des_mpi_wait(irecvreqnode(lcount),lerr)
451  call mpi_check( name //':mpi_wait-recv', lerr )
452  end do
453 
454 ! after receiving the buffer the values are either added or
455 ! replaced based on the flag
456  do lcount = 1,itotalindx
457  lindx = irecvnodes(lcount)
458  pvar(lindx) = pvar(lindx) + drecvnodebuf(lcount)
459  end do
460 
461  return
462 
463  contains
464 
465  integer function message_tag(lsource,ldest)
466  implicit none
467  integer, intent(in) :: lsource,ldest
468  message_tag = lsource+numpes*ldest+200
469  end function message_tag
470 
471  end subroutine des_collect_gdata_db1
472 
473 
474 !----------------------------------------------------------------------!
475 ! Subroutine: DES_COLLECT_gDATA_db2 !
476 ! Author: J.Musser !
477 ! !
478 ! Purpose: Wrapper for 2D arrays. See DES_COLLECT_gDATA_db1. !
479 !----------------------------------------------------------------------!
480  subroutine des_collect_gdata_db2(pvar)
481 
482  implicit none
483 
484 ! dummy arguments
485 !-----------------------------------------------
486  double precision, intent(inout) :: pvar(:,:)
487  integer :: lc
488 
489  do lc=lbound(pvar,2), ubound(pvar,2)
490  call des_collect_gdata_db1(pvar(:,lc))
491  enddo
492  return
493  end subroutine des_collect_gdata_db2
494 
495 
496 
497 !!############################################################################################
498 !!############################################################################################
499 !!############################################################################################
500 !!############################################################################################
501 
502 !------------------------------------------------------------------------
503 ! Subroutine : des_setnodesindices
504 ! Purpose : allocates and initializes the variables related
505 ! to send and recv for grid node values
506 !------------------------------------------------------------------------
507  subroutine des_setnodeindices
509  use desgrid, only: iofproc, jofproc, kofproc
510  use desgrid, only: procijk
511  use discretelement, only: des_periodic_walls_x, des_periodic_walls_y, des_periodic_walls_z
512  use compar, only: istart2, iend1
513  use compar, only: jstart2, jend1
514  use compar, only: kstart2, kend1
515  use functions, only: funijk
516 !-----------------------------------------------
517  implicit none
518 !-----------------------------------------------
519 ! Local variables
520 !-----------------------------------------------
521  integer :: lijkproc,liproc,ljproc,lkproc
522  integer :: li,lj,lk
523  integer :: li2,lj2,lk2
524  integer :: liproc_start,liproc_end
525  integer :: ljproc_start,ljproc_end
526  integer :: lkproc_start,lkproc_end
527  integer :: lci,lcj,lck,lproc,lcount
528  integer :: linode_start,linode_end, linode
529  integer :: ljnode_start,ljnode_end, ljnode
530  integer :: lknode_start,lknode_end, lknode
531  logical :: lpresent
532  integer, allocatable :: iprocsumindx(:)
533 !-----------------------------------------------
534 
535 ! set flags for interprocessor boundaries and set the corresponding to proc
536  liproc = iofproc(mype)
537  ljproc = jofproc(mype)
538  lkproc = kofproc(mype)
539 
540 ! if not periodic then limit the processor
541  if(des_periodic_walls_x.and.nodesi.gt.1) then
542  liproc_start=liproc-1
543  liproc_end=liproc+1
544  else
545  liproc_start =max(liproc-1,0)
546  liproc_end=min(liproc+1,nodesi-1)
547  end if
548  if(des_periodic_walls_y.and.nodesj.gt.1) then
549  ljproc_start=ljproc-1
550  ljproc_end=ljproc+1
551  else
552  ljproc_start =max(ljproc-1,0)
553  ljproc_end=min(ljproc+1,nodesj-1)
554  end if
555  if(des_periodic_walls_z.and.nodesk.gt.1) then
556  lkproc_start=lkproc-1
557  lkproc_end=lkproc+1
558  else
559  lkproc_start =max(lkproc-1,0)
560  lkproc_end=min(lkproc+1,nodesk-1)
561  end if
562  itotalneigh = (liproc_end-liproc_start+1)*&
563  (ljproc_end-ljproc_start+1)*(lkproc_end-lkproc_start+1)-1
564 
565 ! allocate the variables
566  allocate (itoproc(itotalneigh))
567  allocate (iprocsumindx(itotalneigh))
568  allocate (istartsend(itotalneigh+1))
569  allocate (irecvreqnode(itotalneigh))
570  allocate (isendreqnode(itotalneigh))
571 
572 ! First loop to count the total index for each processor and count the
573 ! neighbour processor
574  itotalneigh = 0
575  itoproc(:)=-1
576  iprocsumindx(:) =0
577  do lk = lkproc_start,lkproc_end
578  do lj = ljproc_start,ljproc_end
579  do li = liproc_start,liproc_end
580  li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
581  lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
582  lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
583  lijkproc = procijk(li2,lj2,lk2)
584  if (lijkproc.eq.mype) cycle
585 ! check if the processor exits in the previous list
586  lpresent = .false.
587  do lproc = 1,itotalneigh
588  if (lijkproc .eq.itoproc(lproc)) then
589  lpresent = .true.
590  exit
591  end if
592  end do
593  if(.not.lpresent) then
594  itotalneigh = itotalneigh + 1
595  lproc = itotalneigh
596  end if
597  itoproc(lproc) = lijkproc
598  lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
599  linode_start = istart2; linode_end=iend1
600  ljnode_start = jstart2; ljnode_end=jend1
601  lknode_start = kstart2; lknode_end=kend1
602  if(lci.eq.1) linode_end = istart2
603  if(lci.eq.-1) linode_start = iend1
604  if(lcj.eq.1) ljnode_end = jstart2
605  if(lcj.eq.-1) ljnode_start = jend1
606  if(lck.eq.1) lknode_end = kstart2
607  if(lck.eq.-1) lknode_start = kend1
608  do lknode = lknode_start,lknode_end
609  do linode = linode_start,linode_end
610  do ljnode = ljnode_start,ljnode_end
611  IF(dead_cell_at(linode,ljnode,lknode)) cycle
612  iprocsumindx(lproc) = iprocsumindx(lproc) + 1
613  end do
614  end do
615  end do
616  end do
617  end do
618  end do
619 !assign the start index
620  do lproc =1,itotalneigh+1
621  istartsend(lproc)=sum(iprocsumindx(1:lproc-1))+1
622  end do
623  itotalindx=istartsend(itotalneigh+1)-1
624 
625 ! allocate the variables
626  allocate (isendnodes(itotalindx))
627  allocate (dsendnodebuf(itotalindx))
628  allocate (drecvnodebuf(itotalindx))
629 
630 ! second loop to assign actual index
631  iprocsumindx(:)=0
632  do lk = lkproc_start,lkproc_end
633  do lj = ljproc_start,ljproc_end
634  do li = liproc_start,liproc_end
635  li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
636  lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
637  lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
638  lijkproc = procijk(li2,lj2,lk2)
639  if (lijkproc.eq.mype) cycle
640 ! find the index of the neighbour
641  do lproc =1,itotalneigh
642  if(lijkproc.eq.itoproc(lproc)) then
643  exit
644  end if
645  end do
646  lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
647  linode_start = istart2; linode_end=iend1
648  ljnode_start = jstart2; ljnode_end=jend1
649  lknode_start = kstart2; lknode_end=kend1
650  if(lci.eq.1) linode_end = istart2
651  if(lci.eq.-1) linode_start = iend1
652  if(lcj.eq.1) ljnode_end = jstart2
653  if(lcj.eq.-1) ljnode_start = jend1
654  if(lck.eq.1) lknode_end = kstart2
655  if(lck.eq.-1) lknode_start = kend1
656  lcount = istartsend(lproc)+iprocsumindx(lproc)
657  do lknode = lknode_start,lknode_end
658  do linode = linode_start,linode_end
659  do ljnode = ljnode_start,ljnode_end
660  IF(dead_cell_at(linode,ljnode,lknode)) cycle
661  isendnodes(lcount)=funijk(linode,ljnode,lknode)
662  iprocsumindx(lproc)=iprocsumindx(lproc)+1
663  lcount = lcount+1
664  end do
665  end do
666  end do
667  end do
668  end do
669  end do
670 
671 ! call des_dbgnodesr()
672  end subroutine des_setnodeindices
673 
674 !------------------------------------------------------------------------
675 ! Subroutine : des_exchangenode
676 ! Purpose : calls send and recv to exchange the node values and
677 ! adds based on the flag
678 ! to send and recv for grid node values
679 ! Parameters : pvar - variable that has to be exchanged
680 ! padd - if true node values will be added
681 ! else node values will be replaced
682 !------------------------------------------------------------------------
683  subroutine des_exchangenode(pvar,padd)
685  use desmpi_wrapper, only: des_mpi_wait
686  use desmpi_wrapper, only: des_mpi_isend
687  use desmpi_wrapper, only: des_mpi_irecv
688  use parallel_mpi, only: mpi_check
689  use compar, only: mype, numpes
690 
691 !-----------------------------------------------
692  implicit none
693 !-----------------------------------------------
694 ! dummy variables
695 !-----------------------------------------------
696  double precision,dimension(:),intent(inout) ::pvar
697  logical:: padd
698 !-----------------------------------------------
699 ! local variables
700 !-----------------------------------------------
701  character(len=80), parameter :: name = 'des_exchangenode'
702  integer :: lindx,lcount,lcount2,lneigh,ltag,lerr
703  integer :: lstart,lend,ltotal
704 !-----------------------------------------------
705 
706 ! steps pack the buffer call isend and irecv
707  do lcount = 1,itotalneigh
708  lneigh = itoproc(lcount)
709  lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
710  do lcount2 = lstart,lend
711  dsendnodebuf(lcount2) = pvar(isendnodes(lcount2))
712  end do
713  ltag = message_tag(lneigh,mype)
714  ltotal = lend-lstart+1
715  call des_mpi_irecv(drecvnodebuf(lstart:lend),ltotal, &
716  lneigh,ltag,irecvreqnode(lcount),lerr)
717  call mpi_check( name //':mpi_irecv ', lerr )
718  ltag = message_tag(mype,lneigh)
719  call des_mpi_isend(dsendnodebuf(lstart:lend),ltotal, &
720  lneigh,ltag,isendreqnode(lcount),lerr)
721  call mpi_check( name //':mpi_irecv ', lerr )
722  end do
723 ! call mpi wait to complete the exchange
724  do lcount = 1,itotalneigh
725  call des_mpi_wait(isendreqnode(lcount),lerr)
726  call mpi_check( name //':mpi_wait-send', lerr )
727  call des_mpi_wait(irecvreqnode(lcount),lerr)
728  call mpi_check( name //':mpi_wait-recv', lerr )
729  end do
730 ! after receiving the buffer the values are either added or
731 ! replaced based on the flag
732  if (padd) then
733  do lcount = 1,itotalindx
734  lindx = isendnodes(lcount)
735  pvar(lindx) = pvar(lindx) + drecvnodebuf(lcount)
736  end do
737  else
738  do lcount = 1,itotalindx
739  lindx = isendnodes(lcount)
740  pvar(lindx) = drecvnodebuf(lcount)
741  end do
742  end if
743  return
744 
745  contains
746 
747  integer function message_tag(lsource,ldest)
748  implicit none
749  integer, intent(in) :: lsource,ldest
750  message_tag = lsource+numpes*ldest+200
751  end function message_tag
752 
753  end subroutine des_exchangenode
754 
755 !------------------------------------------------------------------------
756 ! Subroutine : des_dbgnodesr
757 ! Purpose : For debugging prints the indices
758 !------------------------------------------------------------------------
759  subroutine des_dbgnodesr()
760 !-----------------------------------------------
761 
762  use indices, only: i_of, j_of, k_of
763  use compar, only: mype
764 
765  implicit none
766 !-----------------------------------------------
767 ! local variables
768 !-----------------------------------------------
769  character (255) filename
770  integer ijk
771  integer lcount,lcount2,lstart,lend
772 !-----------------------------------------------
773 
774 ! pradeep remove print the flags
775  write(filename,'("dbg_nodesr",I4.4,".dat")') mype
776  open(44,file=filename,convert='big_endian')
777  do lcount = 1,itotalneigh
778  lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
779  write(44,"(2/,72('*'))")
780  write(44,1100) mype, itoproc(lcount)
781  write(44,"(/2x,'Start:',I6)") lstart
782  write(44,"( 2x,'End: ',I6)") lend
783  write(44,"(72('-'))")
784  do lcount2 = lstart,lend
785  ijk = isendnodes(lcount2)
786  write(44,1000)'SEND', i_of(ijk),j_of(ijk),k_of(ijk),ijk
787  end do
788  write(44,"(72('-'))")
789  end do
790 
791  if(allocated(irecvnodes)) then
792  do lcount = 1,itotalneigh
793  lstart = istartrecv(lcount);lend=istartrecv(lcount+1)-1
794  write(44,"(2/,72('*'))")
795  write(44,1100) itoproc(lcount), mype
796  write(44,"(/2x,'Start:',I6)") lstart
797  write(44,"( 2x,'End: ',I6)") lend
798  write(44,"(72('-'))")
799  do lcount2 = lstart,lend
800  ijk = irecvnodes(lcount2)
801  write(44,1000) 'RECV', i_of(ijk),j_of(ijk),k_of(ijk),ijk
802  end do
803  write(44,"(72('-'))")
804  end do
805  end if
806 
807  close (44)
808 
809  1100 FORMAT(2x,'Send Proc ',i2,' --> Recv Proc' i2)
810  1000 FORMAT(3x,a,': (',i3,',',i3,',',i3,') :: ',i7)
811 
812  end subroutine des_dbgnodesr
813 
814  end module
815 
816 
integer jend2
Definition: compar_mod.f:80
integer function procijk(fi, fj, fk)
Definition: desgrid_mod.f:88
integer, dimension(:), allocatable i_of
Definition: indices_mod.f:45
integer kend1
Definition: compar_mod.f:80
integer istart1
Definition: compar_mod.f:80
integer iend1
Definition: compar_mod.f:80
integer istart2
Definition: compar_mod.f:80
integer iend2
Definition: compar_mod.f:80
subroutine des_mpi_wait(preq, perr)
integer function iofproc(fijk)
Definition: desgrid_mod.f:101
subroutine, public des_exchangenode(pvar, padd)
integer kend2
Definition: compar_mod.f:80
subroutine des_collect_gdata_db1(pvar)
integer kstart2
Definition: compar_mod.f:80
integer kstart1
Definition: compar_mod.f:80
integer numpes
Definition: compar_mod.f:24
subroutine, public des_setnodeindices
integer, dimension(:), allocatable k_of
Definition: indices_mod.f:47
logical, dimension(:,:,:), allocatable dead_cell_at
Definition: compar_mod.f:127
integer function kofproc(fijk)
Definition: desgrid_mod.f:127
integer, dimension(:), allocatable j_of
Definition: indices_mod.f:46
integer jstart2
Definition: compar_mod.f:80
integer function message_tag(lsource, ldest, lrecvface)
integer function jofproc(fijk)
Definition: desgrid_mod.f:114
subroutine, public init_des_collect_gdata
subroutine mpi_check(msg, ierr)
integer mype
Definition: compar_mod.f:24
integer nodesj
Definition: compar_mod.f:37
integer nodesk
Definition: compar_mod.f:37
integer nodesi
Definition: compar_mod.f:37
integer jend1
Definition: compar_mod.f:80
integer jstart1
Definition: compar_mod.f:80