File: N:\mfix\model\des\sendrecvnode_mod.f

1     !----------------------------------------------------------------------!
2     !  Module: MPI_PACK_DES                                                !
3     !  Author: Pradeep Gopalakrishnan, J.Musser                            !
4     !                                                                      !
5     !  Purpose: Contains routines for passing data interpolted to ghost    !
6     !     cells to the owner cell via summation.                           !
7     !----------------------------------------------------------------------!
8           module sendrecvnode
9     
10           PRIVATE
11     
12           PUBLIC :: INIT_DES_COLLECT_gDATA, DES_COLLECT_gDATA
13           PUBLIC :: DES_SETNODEINDICES, DES_EXCHANGENODE
14     
15           interface DES_COLLECT_gDATA
16             module procedure DES_COLLECT_gDATA_db1
17             module procedure DES_COLLECT_gDATA_db2
18           end interface DES_COLLECT_gDATA
19     
20           integer :: itotalneigh, itotalindx
21     
22           integer, allocatable :: itoproc(:)
23           integer, allocatable :: istartsend(:)
24           integer, allocatable :: istartrecv(:)
25     
26     ! Following variables are used to exchange grid index values when
27     ! des_interp_on is true
28           integer, allocatable :: isendnodes(:)
29           integer, allocatable :: irecvnodes(:)
30     
31           double precision, allocatable :: dsendnodebuf(:)
32           double precision, allocatable :: drecvnodebuf(:)
33     
34           integer, allocatable :: irecvreqnode(:)
35           integer, allocatable :: isendreqnode(:)
36     
37           contains
38     
39     !----------------------------------------------------------------------!
40     !  Subroutine: INIT_DES_COLLECT_gDATA                                  !
41     !  Author: J.Musser                                                    !
42     !                                                                      !
43     ! Purpose: Setup the send/recv schedules for summing ghost cell data   !
44     !    back into the owner cell for DES interpolation.                   !
45     !----------------------------------------------------------------------!
46           subroutine INIT_DES_COLLECT_gDATA
47     
48           use compar, only: istart1, istart2, iend1, iend2
49           use compar, only: jstart1, jstart2, jend1, jend2
50           use compar, only: kstart1, kstart2, kend1, kend2
51           use compar, only: mype, nodesi, nodesj, nodesk, dead_cell_at
52           use desgrid, only: IofPROC, JofPROC, KofPROC
53           use desgrid, only: procIJK
54           use discretelement, only: des_periodic_walls_x, des_periodic_walls_y, des_periodic_walls_z
55           use functions, only: funijk, wall_at
56     
57           implicit none
58     
59     ! Local variables
60     !-----------------------------------------------
61           integer :: lijkproc,liproc,ljproc,lkproc
62           integer :: li,lj,lk
63           integer :: li2,lj2,lk2
64     
65           integer :: liproc_start, liproc_end
66           integer :: ljproc_start, ljproc_end
67           integer :: lkproc_start, lkproc_end
68     
69           integer :: lci,lcj,lck,lproc,lcount
70           integer :: linode_start,linode_end, linode
71           integer :: ljnode_start,ljnode_end, ljnode
72           integer :: lknode_start,lknode_end, lknode
73           logical :: lpresent
74     
75           integer, allocatable :: iprocsumindx(:)
76     
77     !-----------------------------------------------
78     
79     ! set flags for interprocessor boundaries
80           liproc = iofproc(mype)
81           ljproc = jofproc(mype)
82           lkproc = kofproc(mype)
83     
84     ! if not periodic then limit the processor
85           if(des_periodic_walls_x .and. nodesi > 1) then
86              liproc_start=liproc-1
87              liproc_end=liproc+1
88           else
89              liproc_start =max(liproc-1,0)
90              liproc_end=min(liproc+1,nodesi-1)
91           end if
92     
93           if(des_periodic_walls_y .and. nodesj > 1) then
94              ljproc_start=ljproc-1
95              ljproc_end=ljproc+1
96           else
97              ljproc_start =max(ljproc-1,0)
98              ljproc_end=min(ljproc+1,nodesj-1)
99           end if
100     
101           if(des_periodic_walls_z .and. nodesk > 1) then
102              lkproc_start=lkproc-1
103              lkproc_end=lkproc+1
104           else
105              lkproc_start =max(lkproc-1,0)
106              lkproc_end=min(lkproc+1,nodesk-1)
107           end if
108     
109           itotalneigh = (liproc_end-liproc_start+1)*&
110              (ljproc_end-ljproc_start+1)*(lkproc_end-lkproc_start+1)-1
111     
112     ! allocate the variables
113           allocate (itoproc(itotalneigh))
114           allocate (iprocsumindx(itotalneigh))
115           allocate (istartsend(itotalneigh+1))
116           allocate (istartrecv(itotalneigh+1))
117           allocate (isendreqnode(itotalneigh))
118           allocate (irecvreqnode(itotalneigh))
119     
120     ! First loop to count the total index for each processor and count the
121     ! neighbour processor
122           itotalneigh = 0
123           itoproc(:)=-1
124           iprocsumindx(:) =0
125     
126           do lk = lkproc_start, lkproc_end
127           do lj = ljproc_start, ljproc_end
128           do li = liproc_start, liproc_end
129     
130              li2 = mod(li,nodesi); if(li2 < 0) li2 = nodesi-1
131              lj2 = mod(lj,nodesj); if(lj2 < 0) lj2 = nodesj-1
132              lk2 = mod(lk,nodesk); if(lk2 < 0) lk2 = nodesk-1
133     
134              lijkproc = procijk(li2,lj2,lk2)
135     
136              if (lijkproc.eq.mype) cycle
137     
138     ! check if the processor exits in the previous list
139              lpresent = .false.
140              do lproc = 1,itotalneigh
141                 if (lijkproc .eq.itoproc(lproc)) then
142                    lpresent = .true.
143                    exit
144                 end if
145              end do
146              if(.not.lpresent) then
147                 itotalneigh = itotalneigh + 1
148                 lproc = itotalneigh
149              end if
150     
151              itoproc(lproc) = lijkproc
152     
153              lci=(liproc-li)
154              if(lci == 1) then
155                 linode_start = iStart2
156                 linode_end = iStart2
157              elseif(lci == -1) then
158                 linode_start = iEnd2
159                 linode_end = iEnd2
160              else
161                 linode_start = iStart1
162                 linode_end = iEnd1
163              endif
164     
165              lcj=(ljproc-lj)
166              if(lcj == 1) then
167                 ljnode_start = jStart2
168                 ljnode_end = jStart2
169              elseif(lcj == -1) then
170                 ljnode_start = jEnd2
171                 ljnode_end = jEnd2
172              else
173                 ljnode_start = jStart1
174                 ljnode_end=jEnd1
175              endif
176     
177              lck=(lkproc-lk)
178              if(lck == 1) then
179                 lknode_start = kStart2
180                 lknode_end = kStart2
181              elseif(lck == -1) then
182                 lknode_start = kEnd2
183                 lknode_end = kEnd2
184              else
185                 lknode_start = kStart1
186                 lknode_end=kEnd1
187              endif
188     
189              do lknode = lknode_start,lknode_end
190              do linode = linode_start,linode_end
191              do ljnode = ljnode_start,ljnode_end
192                 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
193                 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
194                 iprocsumindx(lproc) = iprocsumindx(lproc) + 1
195              end do
196              end do
197              end do
198           end do
199           end do
200           end do
201     
202     
203     !assign the start index
204           do lproc =1,itotalneigh+1
205              istartsend(lproc)=sum(iprocsumindx(1:lproc-1))+1
206           end do
207           itotalindx=istartsend(itotalneigh+1)-1
208     
209     ! allocate the variables
210           allocate(isendnodes(itotalindx))
211           allocate(dsendnodebuf(itotalindx))
212     
213     ! second loop to assign actual index for send map
214           iprocsumindx(:)=0
215           do lk = lkproc_start,lkproc_end
216           do lj = ljproc_start,ljproc_end
217           do li = liproc_start,liproc_end
218              li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
219              lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
220              lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
221              lijkproc = procijk(li2,lj2,lk2)
222              if (lijkproc.eq.mype) cycle
223     ! find the index of the neighbour
224              do lproc =1,itotalneigh
225                 if(lijkproc.eq.itoproc(lproc)) then
226                    exit
227                 end if
228              end do
229     
230              lci=(liproc-li)
231              if(lci == 1) then
232                 linode_start = iStart2
233                 linode_end = iStart2
234              elseif(lci == -1) then
235                 linode_start = iEnd2
236                 linode_end = iEnd2
237              else
238                 linode_start = iStart1
239                 linode_end = iEnd1
240              endif
241     
242              lcj=(ljproc-lj)
243              if(lcj == 1) then
244                 ljnode_start = jStart2
245                 ljnode_end = jStart2
246              elseif(lcj == -1) then
247                 ljnode_start = jEnd2
248                 ljnode_end = jEnd2
249              else
250                 ljnode_start = jStart1
251                 ljnode_end=jEnd1
252              endif
253     
254              lck=(lkproc-lk)
255              if(lck == 1) then
256                 lknode_start = kStart2
257                 lknode_end = kStart2
258              elseif(lck == -1) then
259                 lknode_start = kEnd2
260                 lknode_end = kEnd2
261              else
262                 lknode_start = kStart1
263                 lknode_end=kEnd1
264              endif
265     
266              lcount = istartsend(lproc)+iprocsumindx(lproc)
267              do lknode = lknode_start,lknode_end
268              do linode = linode_start,linode_end
269              do ljnode = ljnode_start,ljnode_end
270                 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
271                 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
272                 isendnodes(lcount)=funijk(linode,ljnode,lknode)
273                 iprocsumindx(lproc)=iprocsumindx(lproc)+1
274                 lcount = lcount+1
275              end do
276              end do
277              end do
278     
279           end do
280           end do
281           end do
282     
283     ! Build the recv schedule
284     
285           iprocsumindx(:) =0
286           do lk = lkproc_start, lkproc_end
287           do lj = ljproc_start, ljproc_end
288           do li = liproc_start, liproc_end
289     
290              li2 = mod(li,nodesi); if(li2 < 0) li2 = nodesi-1
291              lj2 = mod(lj,nodesj); if(lj2 < 0) lj2 = nodesj-1
292              lk2 = mod(lk,nodesk); if(lk2 < 0) lk2 = nodesk-1
293     
294              lijkproc = procijk(li2,lj2,lk2)
295     
296              if (lijkproc.eq.mype) cycle
297     
298     ! check if the processor exits in the previous list
299              do lproc = 1,itotalneigh
300                 if(lijkproc .eq. itoproc(lproc)) exit
301              end do
302     
303              lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
304     
305              linode_start = istart1; linode_end=iend1
306              ljnode_start = jstart1; ljnode_end=jend1
307              lknode_start = kstart1; lknode_end=kend1
308              if(lci.eq. 1) linode_end = iStart1
309              if(lci.eq.-1) linode_start = iEnd1
310              if(lcj.eq. 1) ljnode_end = jStart1
311              if(lcj.eq.-1) ljnode_start = jEnd1
312              if(lck.eq. 1) lknode_end = kStart1
313              if(lck.eq.-1) lknode_start = kEnd1
314     
315              do lknode = lknode_start,lknode_end
316              do linode = linode_start,linode_end
317              do ljnode = ljnode_start,ljnode_end
318                 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
319                 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
320                 iprocsumindx(lproc) = iprocsumindx(lproc) + 1
321              end do
322              end do
323              end do
324           end do
325           end do
326           end do
327     
328     !assign the start index
329           do lproc =1,itotalneigh+1
330              istartrecv(lproc)=sum(iprocsumindx(1:lproc-1))+1
331           end do
332           itotalindx=istartrecv(itotalneigh+1)-1
333     
334           allocate(irecvnodes(itotalindx))
335           allocate(drecvnodebuf(itotalindx))
336     
337     ! second loop to assign actual index
338           iprocsumindx(:)=0
339           do lk = lkproc_start,lkproc_end
340           do lj = ljproc_start,ljproc_end
341           do li = liproc_start,liproc_end
342     
343              li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
344              lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
345              lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
346              lijkproc = procijk(li2,lj2,lk2)
347     
348              if (lijkproc.eq.mype) cycle
349     
350     ! find the index of the neighbour
351              do lproc =1,itotalneigh
352                 if(lijkproc.eq.itoproc(lproc)) then
353                    exit
354                 end if
355              end do
356     
357              lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
358     
359     ! Set up the receive map
360              linode_start = istart1; linode_end=iend1
361              ljnode_start = jstart1; ljnode_end=jend1
362              lknode_start = kstart1; lknode_end=kend1
363     
364              if(lci.eq. 1) linode_end = iStart1
365              if(lci.eq.-1) linode_start = iEnd1
366              if(lcj.eq. 1) ljnode_end = jStart1
367              if(lcj.eq.-1) ljnode_start = jEnd1
368              if(lck.eq. 1) lknode_end = kStart1
369              if(lck.eq.-1) lknode_start = kEnd1
370     
371              lcount = istartrecv(lproc)+iprocsumindx(lproc)
372              do lknode = lknode_start,lknode_end
373              do linode = linode_start,linode_end
374              do ljnode = ljnode_start,ljnode_end
375                 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
376                 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
377                 irecvnodes(lcount)=funijk(linode,ljnode,lknode)
378                 iprocsumindx(lproc)=iprocsumindx(lproc)+1
379                 lcount = lcount+1
380              end do
381              end do
382              end do
383     
384           end do
385           end do
386           end do
387     
388     !      call des_dbgnodesr()
389     
390           RETURN
391           end subroutine INIT_DES_COLLECT_gDATA
392     
393     
394     !----------------------------------------------------------------------!
395     !  Subroutine: DES_COLLECT_gDATA_db1                                   !
396     !  Author: J.Musser                                                    !
397     !                                                                      !
398     ! Purpose: Conduct a reverse halo type exchange were data that was     !
399     !    stored in ghost cells is summed into the 'real' cell. This is     !
400     !    needed for DES interpolation where data is interpolted into ghost !
401     !    cells.                                                            !
402     !----------------------------------------------------------------------!
403           subroutine DES_COLLECT_gDATA_db1(pvar)
404     
405           use desmpi_wrapper, only: DES_MPI_WAIT
406           use desmpi_wrapper, only: DES_MPI_iSEND
407           use desmpi_wrapper, only: DES_MPI_iRECV
408           use parallel_mpi, only: MPI_Check
409           use compar, only: myPE, numPEs
410     !-----------------------------------------------
411           implicit none
412     !-----------------------------------------------
413     ! dummy variables
414     !-----------------------------------------------
415           double precision, intent(inout) :: pvar(:)
416     !-----------------------------------------------
417     ! local variables
418     !-----------------------------------------------
419           character(len=80), parameter :: name = 'des_exchangenode'
420           integer :: lindx,lcount,lcount2,lneigh,ltag,lerr
421           integer :: lstart,lend,ltotal
422     !-----------------------------------------------
423     
424     ! steps pack the buffer call isend and irecv
425           do lcount = 1,itotalneigh
426              lneigh = itoproc(lcount)
427              lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
428              do lcount2 = lstart,lend
429                 dsendnodebuf(lcount2) = pvar(isendnodes(lcount2))
430              end do
431     
432              ltag = message_tag(lneigh,mype)
433              lstart = istartrecv(lcount);lend=istartrecv(lcount+1)-1
434              ltotal = lend-lstart+1
435              call des_mpi_irecv(drecvnodebuf(lstart:lend),ltotal, &
436                                 lneigh,ltag,irecvreqnode(lcount),lerr)
437              call mpi_check( name //':mpi_irecv ', lerr )
438     
439              ltag = message_tag(mype,lneigh)
440              lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
441              ltotal = lend-lstart+1
442              call des_mpi_isend(dsendnodebuf(lstart:lend),ltotal, &
443                                 lneigh,ltag,isendreqnode(lcount),lerr)
444              call mpi_check( name //':mpi_irecv ', lerr )
445           end do
446     ! call mpi wait to complete the exchange
447           do lcount = 1,itotalneigh
448              call des_mpi_wait(isendreqnode(lcount),lerr)
449              call mpi_check( name //':mpi_wait-send', lerr )
450              call des_mpi_wait(irecvreqnode(lcount),lerr)
451              call mpi_check( name //':mpi_wait-recv', lerr )
452           end do
453     
454     ! after receiving the buffer the values are either added or
455     ! replaced based on the flag
456           do lcount = 1,itotalindx
457              lindx = irecvnodes(lcount)
458              pvar(lindx) = pvar(lindx) + drecvnodebuf(lcount)
459           end do
460     
461           return
462     
463           contains
464     
465           integer function message_tag(lsource,ldest)
466              implicit none
467              integer, intent(in) :: lsource,ldest
468              message_tag = lsource+numpes*ldest+200
469           end function message_tag
470     
471           end subroutine DES_COLLECT_gDATA_db1
472     
473     
474     !----------------------------------------------------------------------!
475     !  Subroutine: DES_COLLECT_gDATA_db2                                   !
476     !  Author: J.Musser                                                    !
477     !                                                                      !
478     ! Purpose: Wrapper for 2D arrays. See DES_COLLECT_gDATA_db1.           !
479     !----------------------------------------------------------------------!
480           subroutine DES_COLLECT_gDATA_db2(pvar)
481     
482           implicit none
483     
484     ! dummy arguments 
485     !-----------------------------------------------
486           double precision, intent(inout) :: pvar(:,:)
487           integer :: lc
488     
489           do lc=lbound(pVAR,2), ubound(pVAR,2)
490              call des_collect_gDATA_db1(pVAR(:,lc))
491           enddo
492           return
493           end subroutine des_collect_gdata_db2
494     
495     
496     
497     !!############################################################################################
498     !!############################################################################################
499     !!############################################################################################
500     !!############################################################################################
501     
502     !------------------------------------------------------------------------
503     ! Subroutine       : des_setnodesindices
504     ! Purpose          : allocates and initializes the variables related
505     !                    to send and recv for grid node values
506     !------------------------------------------------------------------------
507           subroutine des_setnodeindices
508           use compar, only: mype, nodesi, nodesj, nodesk, dead_cell_at
509           use desgrid, only: IofPROC, JofPROC, KofPROC
510           use desgrid, only: procIJK
511           use discretelement, only: des_periodic_walls_x, des_periodic_walls_y, des_periodic_walls_z
512           use compar, only: istart2, iend1
513           use compar, only: jstart2, jend1
514           use compar, only: kstart2, kend1
515           use functions, only: funijk
516     !-----------------------------------------------
517           implicit none
518     !-----------------------------------------------
519     ! Local variables
520     !-----------------------------------------------
521           integer :: lijkproc,liproc,ljproc,lkproc
522           integer :: li,lj,lk
523           integer :: li2,lj2,lk2
524           integer :: liproc_start,liproc_end
525           integer :: ljproc_start,ljproc_end
526           integer :: lkproc_start,lkproc_end
527           integer :: lci,lcj,lck,lproc,lcount
528           integer :: linode_start,linode_end, linode
529           integer :: ljnode_start,ljnode_end, ljnode
530           integer :: lknode_start,lknode_end, lknode
531           logical :: lpresent
532           integer, allocatable :: iprocsumindx(:)
533     !-----------------------------------------------
534     
535     ! set flags for interprocessor boundaries and set the corresponding to proc
536           liproc = iofproc(mype)
537           ljproc = jofproc(mype)
538           lkproc = kofproc(mype)
539     
540     ! if not periodic then limit the processor
541           if(des_periodic_walls_x.and.nodesi.gt.1) then
542              liproc_start=liproc-1
543              liproc_end=liproc+1
544           else
545              liproc_start =max(liproc-1,0)
546              liproc_end=min(liproc+1,nodesi-1)
547           end if
548           if(des_periodic_walls_y.and.nodesj.gt.1) then
549              ljproc_start=ljproc-1
550              ljproc_end=ljproc+1
551           else
552              ljproc_start =max(ljproc-1,0)
553              ljproc_end=min(ljproc+1,nodesj-1)
554           end if
555           if(des_periodic_walls_z.and.nodesk.gt.1) then
556              lkproc_start=lkproc-1
557              lkproc_end=lkproc+1
558           else
559              lkproc_start =max(lkproc-1,0)
560              lkproc_end=min(lkproc+1,nodesk-1)
561           end if
562           itotalneigh = (liproc_end-liproc_start+1)*&
563              (ljproc_end-ljproc_start+1)*(lkproc_end-lkproc_start+1)-1
564     
565     ! allocate the variables
566           allocate (itoproc(itotalneigh))
567           allocate (iprocsumindx(itotalneigh))
568           allocate (istartsend(itotalneigh+1))
569           allocate (irecvreqnode(itotalneigh))
570           allocate (isendreqnode(itotalneigh))
571     
572     ! First loop to count the total index for each processor and count the
573     ! neighbour processor
574           itotalneigh = 0
575           itoproc(:)=-1
576           iprocsumindx(:) =0
577           do lk = lkproc_start,lkproc_end
578           do lj = ljproc_start,ljproc_end
579           do li = liproc_start,liproc_end
580              li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
581              lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
582              lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
583              lijkproc = procijk(li2,lj2,lk2)
584              if (lijkproc.eq.mype) cycle
585     ! check if the processor exits in the previous list
586              lpresent = .false.
587              do lproc = 1,itotalneigh
588                 if (lijkproc .eq.itoproc(lproc)) then
589                    lpresent = .true.
590                    exit
591                 end if
592              end do
593              if(.not.lpresent) then
594                 itotalneigh = itotalneigh + 1
595                 lproc = itotalneigh
596              end if
597              itoproc(lproc) = lijkproc
598              lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
599              linode_start = istart2; linode_end=iend1
600              ljnode_start = jstart2; ljnode_end=jend1
601              lknode_start = kstart2; lknode_end=kend1
602              if(lci.eq.1) linode_end = istart2
603              if(lci.eq.-1)  linode_start = iend1
604              if(lcj.eq.1) ljnode_end = jstart2
605              if(lcj.eq.-1)  ljnode_start = jend1
606              if(lck.eq.1) lknode_end = kstart2
607              if(lck.eq.-1)  lknode_start = kend1
608              do lknode = lknode_start,lknode_end
609              do linode = linode_start,linode_end
610              do ljnode = ljnode_start,ljnode_end
611                 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
612                 iprocsumindx(lproc) = iprocsumindx(lproc) + 1
613              end do
614              end do
615              end do
616           end do
617           end do
618           end do
619     !assign the start index
620           do lproc =1,itotalneigh+1
621              istartsend(lproc)=sum(iprocsumindx(1:lproc-1))+1
622           end do
623           itotalindx=istartsend(itotalneigh+1)-1
624     
625     ! allocate the variables
626           allocate (isendnodes(itotalindx))
627           allocate (dsendnodebuf(itotalindx))
628           allocate (drecvnodebuf(itotalindx))
629     
630     ! second loop to assign actual index
631           iprocsumindx(:)=0
632           do lk = lkproc_start,lkproc_end
633           do lj = ljproc_start,ljproc_end
634           do li = liproc_start,liproc_end
635              li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
636              lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
637              lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
638              lijkproc = procijk(li2,lj2,lk2)
639              if (lijkproc.eq.mype) cycle
640     ! find the index of the neighbour
641              do lproc =1,itotalneigh
642                 if(lijkproc.eq.itoproc(lproc)) then
643                    exit
644                 end if
645              end do
646              lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
647              linode_start = istart2; linode_end=iend1
648              ljnode_start = jstart2; ljnode_end=jend1
649              lknode_start = kstart2; lknode_end=kend1
650              if(lci.eq.1) linode_end = istart2
651              if(lci.eq.-1)  linode_start = iend1
652              if(lcj.eq.1) ljnode_end = jstart2
653              if(lcj.eq.-1)  ljnode_start = jend1
654              if(lck.eq.1) lknode_end = kstart2
655              if(lck.eq.-1)  lknode_start = kend1
656              lcount = istartsend(lproc)+iprocsumindx(lproc)
657              do lknode = lknode_start,lknode_end
658              do linode = linode_start,linode_end
659              do ljnode = ljnode_start,ljnode_end
660                 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
661                 isendnodes(lcount)=funijk(linode,ljnode,lknode)
662                 iprocsumindx(lproc)=iprocsumindx(lproc)+1
663                 lcount = lcount+1
664              end do
665              end do
666              end do
667           end do
668           end do
669           end do
670     
671     !     call  des_dbgnodesr()
672           end subroutine des_setnodeindices
673     
674     !------------------------------------------------------------------------
675     ! Subroutine       : des_exchangenode
676     ! Purpose          : calls send and recv to exchange the node values and
677     !                    adds based on the flag
678     !                    to send and recv for grid node values
679     ! Parameters       : pvar - variable that has to be exchanged
680     !                    padd - if true node values will be added
681     !                           else node values will be replaced
682     !------------------------------------------------------------------------
683           subroutine des_exchangenode(pvar,padd)
684     
685           use desmpi_wrapper, only: DES_MPI_WAIT
686           use desmpi_wrapper, only: DES_MPI_iSEND
687           use desmpi_wrapper, only: DES_MPI_iRECV
688           use parallel_mpi, only: MPI_Check
689           use compar, only: myPE, numPEs
690     
691     !-----------------------------------------------
692           implicit none
693     !-----------------------------------------------
694     ! dummy variables
695     !-----------------------------------------------
696           double precision,dimension(:),intent(inout) ::pvar
697           logical:: padd
698     !-----------------------------------------------
699     ! local variables
700     !-----------------------------------------------
701           character(len=80), parameter :: name = 'des_exchangenode'
702           integer :: lindx,lcount,lcount2,lneigh,ltag,lerr
703           integer :: lstart,lend,ltotal
704     !-----------------------------------------------
705     
706     ! steps pack the buffer call isend and irecv
707           do lcount = 1,itotalneigh
708              lneigh = itoproc(lcount)
709              lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
710              do lcount2 = lstart,lend
711                 dsendnodebuf(lcount2) = pvar(isendnodes(lcount2))
712              end do
713              ltag = message_tag(lneigh,mype)
714              ltotal = lend-lstart+1
715              call des_mpi_irecv(drecvnodebuf(lstart:lend),ltotal, &
716                                 lneigh,ltag,irecvreqnode(lcount),lerr)
717              call mpi_check( name //':mpi_irecv ', lerr )
718              ltag = message_tag(mype,lneigh)
719              call des_mpi_isend(dsendnodebuf(lstart:lend),ltotal, &
720                                 lneigh,ltag,isendreqnode(lcount),lerr)
721              call mpi_check( name //':mpi_irecv ', lerr )
722           end do
723     ! call mpi wait to complete the exchange
724           do lcount = 1,itotalneigh
725              call des_mpi_wait(isendreqnode(lcount),lerr)
726              call mpi_check( name //':mpi_wait-send', lerr )
727              call des_mpi_wait(irecvreqnode(lcount),lerr)
728              call mpi_check( name //':mpi_wait-recv', lerr )
729           end do
730     ! after receiving the buffer the values are either added or
731     ! replaced based on the flag
732           if (padd) then
733              do lcount = 1,itotalindx
734                 lindx = isendnodes(lcount)
735                 pvar(lindx) = pvar(lindx) + drecvnodebuf(lcount)
736              end do
737           else
738              do lcount = 1,itotalindx
739                 lindx = isendnodes(lcount)
740                 pvar(lindx) = drecvnodebuf(lcount)
741              end do
742           end if
743           return
744     
745           contains
746     
747             integer function message_tag(lsource,ldest)
748               implicit none
749               integer, intent(in) :: lsource,ldest
750               message_tag = lsource+numpes*ldest+200
751             end function message_tag
752     
753           end subroutine des_exchangenode
754     
755     !------------------------------------------------------------------------
756     ! Subroutine       : des_dbgnodesr
757     ! Purpose          : For debugging prints the indices
758     !------------------------------------------------------------------------
759           subroutine des_dbgnodesr()
760     !-----------------------------------------------
761     
762           use indices, only: i_of, j_of, k_of
763           use compar, only: mype
764     
765           implicit none
766     !-----------------------------------------------
767     ! local variables
768     !-----------------------------------------------
769           character (255) filename
770           integer ijk
771           integer lcount,lcount2,lstart,lend
772     !-----------------------------------------------
773     
774     ! pradeep remove print the flags
775           write(filename,'("dbg_nodesr",I4.4,".dat")') mype
776           open(44,file=filename,convert='big_endian')
777           do lcount = 1,itotalneigh
778              lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
779              write(44,"(2/,72('*'))")
780              write(44,1100) myPE, itoproc(lcount)
781              write(44,"(/2x,'Start:',I6)") lstart
782              write(44,"( 2x,'End:  ',I6)") lend
783              write(44,"(72('-'))")
784              do lcount2 = lstart,lend
785                 ijk = isendnodes(lcount2)
786                 write(44,1000)'SEND', i_of(ijk),j_of(ijk),k_of(ijk),ijk
787              end do
788              write(44,"(72('-'))")
789           end do
790     
791           if(allocated(irecvnodes)) then
792              do lcount = 1,itotalneigh
793                 lstart = istartrecv(lcount);lend=istartrecv(lcount+1)-1
794                 write(44,"(2/,72('*'))")
795                 write(44,1100) itoproc(lcount), myPE
796                 write(44,"(/2x,'Start:',I6)") lstart
797                 write(44,"( 2x,'End:  ',I6)") lend
798                 write(44,"(72('-'))")
799                 do lcount2 = lstart,lend
800                    ijk = irecvnodes(lcount2)
801                    write(44,1000) 'RECV', i_of(ijk),j_of(ijk),k_of(ijk),ijk
802                 end do
803                 write(44,"(72('-'))")
804              end do
805           end if
806     
807           close (44)
808     
809      1100 FORMAT(2x,'Send Proc ',I2,'   -->   Recv Proc' I2)
810      1000 FORMAT(3x,A,': (',I3,',',I3,',',I3,') :: ',I7)
811     
812           end subroutine des_dbgnodesr
813     
814           end module
815     
816     
817