File: N:\mfix\model\des\sendrecvnode_mod.f
1
2
3
4
5
6
7
8 module sendrecvnode
9
10 PRIVATE
11
12 PUBLIC :: INIT_DES_COLLECT_gDATA, DES_COLLECT_gDATA
13 PUBLIC :: DES_SETNODEINDICES, DES_EXCHANGENODE
14
15 interface DES_COLLECT_gDATA
16 module procedure DES_COLLECT_gDATA_db1
17 module procedure DES_COLLECT_gDATA_db2
18 end interface DES_COLLECT_gDATA
19
20 integer :: itotalneigh, itotalindx
21
22 integer, allocatable :: itoproc(:)
23 integer, allocatable :: istartsend(:)
24 integer, allocatable :: istartrecv(:)
25
26
27
28 integer, allocatable :: isendnodes(:)
29 integer, allocatable :: irecvnodes(:)
30
31 double precision, allocatable :: dsendnodebuf(:)
32 double precision, allocatable :: drecvnodebuf(:)
33
34 integer, allocatable :: irecvreqnode(:)
35 integer, allocatable :: isendreqnode(:)
36
37 contains
38
39
40
41
42
43
44
45
46 subroutine INIT_DES_COLLECT_gDATA
47
48 use compar, only: istart1, istart2, iend1, iend2
49 use compar, only: jstart1, jstart2, jend1, jend2
50 use compar, only: kstart1, kstart2, kend1, kend2
51 use compar, only: mype, nodesi, nodesj, nodesk, dead_cell_at
52 use desgrid, only: IofPROC, JofPROC, KofPROC
53 use desgrid, only: procIJK
54 use discretelement, only: des_periodic_walls_x, des_periodic_walls_y, des_periodic_walls_z
55 use functions, only: funijk, wall_at
56
57 implicit none
58
59
60
61 integer :: lijkproc,liproc,ljproc,lkproc
62 integer :: li,lj,lk
63 integer :: li2,lj2,lk2
64
65 integer :: liproc_start, liproc_end
66 integer :: ljproc_start, ljproc_end
67 integer :: lkproc_start, lkproc_end
68
69 integer :: lci,lcj,lck,lproc,lcount
70 integer :: linode_start,linode_end, linode
71 integer :: ljnode_start,ljnode_end, ljnode
72 integer :: lknode_start,lknode_end, lknode
73 logical :: lpresent
74
75 integer, allocatable :: iprocsumindx(:)
76
77
78
79
80 = iofproc(mype)
81 ljproc = jofproc(mype)
82 lkproc = kofproc(mype)
83
84
85 if(des_periodic_walls_x .and. nodesi > 1) then
86 liproc_start=liproc-1
87 liproc_end=liproc+1
88 else
89 liproc_start =max(liproc-1,0)
90 liproc_end=min(liproc+1,nodesi-1)
91 end if
92
93 if(des_periodic_walls_y .and. nodesj > 1) then
94 ljproc_start=ljproc-1
95 ljproc_end=ljproc+1
96 else
97 ljproc_start =max(ljproc-1,0)
98 ljproc_end=min(ljproc+1,nodesj-1)
99 end if
100
101 if(des_periodic_walls_z .and. nodesk > 1) then
102 lkproc_start=lkproc-1
103 lkproc_end=lkproc+1
104 else
105 lkproc_start =max(lkproc-1,0)
106 lkproc_end=min(lkproc+1,nodesk-1)
107 end if
108
109 itotalneigh = (liproc_end-liproc_start+1)*&
110 (ljproc_end-ljproc_start+1)*(lkproc_end-lkproc_start+1)-1
111
112
113 allocate (itoproc(itotalneigh))
114 allocate (iprocsumindx(itotalneigh))
115 allocate (istartsend(itotalneigh+1))
116 allocate (istartrecv(itotalneigh+1))
117 allocate (isendreqnode(itotalneigh))
118 allocate (irecvreqnode(itotalneigh))
119
120
121
122 = 0
123 itoproc(:)=-1
124 iprocsumindx(:) =0
125
126 do lk = lkproc_start, lkproc_end
127 do lj = ljproc_start, ljproc_end
128 do li = liproc_start, liproc_end
129
130 li2 = mod(li,nodesi); if(li2 < 0) li2 = nodesi-1
131 lj2 = mod(lj,nodesj); if(lj2 < 0) lj2 = nodesj-1
132 lk2 = mod(lk,nodesk); if(lk2 < 0) lk2 = nodesk-1
133
134 lijkproc = procijk(li2,lj2,lk2)
135
136 if (lijkproc.eq.mype) cycle
137
138
139 = .false.
140 do lproc = 1,itotalneigh
141 if (lijkproc .eq.itoproc(lproc)) then
142 lpresent = .true.
143 exit
144 end if
145 end do
146 if(.not.lpresent) then
147 itotalneigh = itotalneigh + 1
148 lproc = itotalneigh
149 end if
150
151 itoproc(lproc) = lijkproc
152
153 lci=(liproc-li)
154 if(lci == 1) then
155 linode_start = iStart2
156 linode_end = iStart2
157 elseif(lci == -1) then
158 linode_start = iEnd2
159 linode_end = iEnd2
160 else
161 linode_start = iStart1
162 linode_end = iEnd1
163 endif
164
165 lcj=(ljproc-lj)
166 if(lcj == 1) then
167 ljnode_start = jStart2
168 ljnode_end = jStart2
169 elseif(lcj == -1) then
170 ljnode_start = jEnd2
171 ljnode_end = jEnd2
172 else
173 ljnode_start = jStart1
174 ljnode_end=jEnd1
175 endif
176
177 lck=(lkproc-lk)
178 if(lck == 1) then
179 lknode_start = kStart2
180 lknode_end = kStart2
181 elseif(lck == -1) then
182 lknode_start = kEnd2
183 lknode_end = kEnd2
184 else
185 lknode_start = kStart1
186 lknode_end=kEnd1
187 endif
188
189 do lknode = lknode_start,lknode_end
190 do linode = linode_start,linode_end
191 do ljnode = ljnode_start,ljnode_end
192 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
193 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
194 iprocsumindx(lproc) = iprocsumindx(lproc) + 1
195 end do
196 end do
197 end do
198 end do
199 end do
200 end do
201
202
203
204 do lproc =1,itotalneigh+1
205 istartsend(lproc)=sum(iprocsumindx(1:lproc-1))+1
206 end do
207 itotalindx=istartsend(itotalneigh+1)-1
208
209
210 allocate(isendnodes(itotalindx))
211 allocate(dsendnodebuf(itotalindx))
212
213
214 (:)=0
215 do lk = lkproc_start,lkproc_end
216 do lj = ljproc_start,ljproc_end
217 do li = liproc_start,liproc_end
218 li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
219 lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
220 lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
221 lijkproc = procijk(li2,lj2,lk2)
222 if (lijkproc.eq.mype) cycle
223
224 do lproc =1,itotalneigh
225 if(lijkproc.eq.itoproc(lproc)) then
226 exit
227 end if
228 end do
229
230 lci=(liproc-li)
231 if(lci == 1) then
232 linode_start = iStart2
233 linode_end = iStart2
234 elseif(lci == -1) then
235 linode_start = iEnd2
236 linode_end = iEnd2
237 else
238 linode_start = iStart1
239 linode_end = iEnd1
240 endif
241
242 lcj=(ljproc-lj)
243 if(lcj == 1) then
244 ljnode_start = jStart2
245 ljnode_end = jStart2
246 elseif(lcj == -1) then
247 ljnode_start = jEnd2
248 ljnode_end = jEnd2
249 else
250 ljnode_start = jStart1
251 ljnode_end=jEnd1
252 endif
253
254 lck=(lkproc-lk)
255 if(lck == 1) then
256 lknode_start = kStart2
257 lknode_end = kStart2
258 elseif(lck == -1) then
259 lknode_start = kEnd2
260 lknode_end = kEnd2
261 else
262 lknode_start = kStart1
263 lknode_end=kEnd1
264 endif
265
266 lcount = istartsend(lproc)+iprocsumindx(lproc)
267 do lknode = lknode_start,lknode_end
268 do linode = linode_start,linode_end
269 do ljnode = ljnode_start,ljnode_end
270 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
271 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
272 isendnodes(lcount)=funijk(linode,ljnode,lknode)
273 iprocsumindx(lproc)=iprocsumindx(lproc)+1
274 lcount = lcount+1
275 end do
276 end do
277 end do
278
279 end do
280 end do
281 end do
282
283
284
285 (:) =0
286 do lk = lkproc_start, lkproc_end
287 do lj = ljproc_start, ljproc_end
288 do li = liproc_start, liproc_end
289
290 li2 = mod(li,nodesi); if(li2 < 0) li2 = nodesi-1
291 lj2 = mod(lj,nodesj); if(lj2 < 0) lj2 = nodesj-1
292 lk2 = mod(lk,nodesk); if(lk2 < 0) lk2 = nodesk-1
293
294 lijkproc = procijk(li2,lj2,lk2)
295
296 if (lijkproc.eq.mype) cycle
297
298
299 do lproc = 1,itotalneigh
300 if(lijkproc .eq. itoproc(lproc)) exit
301 end do
302
303 lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
304
305 linode_start = istart1; linode_end=iend1
306 ljnode_start = jstart1; ljnode_end=jend1
307 lknode_start = kstart1; lknode_end=kend1
308 if(lci.eq. 1) linode_end = iStart1
309 if(lci.eq.-1) linode_start = iEnd1
310 if(lcj.eq. 1) ljnode_end = jStart1
311 if(lcj.eq.-1) ljnode_start = jEnd1
312 if(lck.eq. 1) lknode_end = kStart1
313 if(lck.eq.-1) lknode_start = kEnd1
314
315 do lknode = lknode_start,lknode_end
316 do linode = linode_start,linode_end
317 do ljnode = ljnode_start,ljnode_end
318 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
319 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
320 iprocsumindx(lproc) = iprocsumindx(lproc) + 1
321 end do
322 end do
323 end do
324 end do
325 end do
326 end do
327
328
329 do lproc =1,itotalneigh+1
330 istartrecv(lproc)=sum(iprocsumindx(1:lproc-1))+1
331 end do
332 itotalindx=istartrecv(itotalneigh+1)-1
333
334 allocate(irecvnodes(itotalindx))
335 allocate(drecvnodebuf(itotalindx))
336
337
338 (:)=0
339 do lk = lkproc_start,lkproc_end
340 do lj = ljproc_start,ljproc_end
341 do li = liproc_start,liproc_end
342
343 li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
344 lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
345 lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
346 lijkproc = procijk(li2,lj2,lk2)
347
348 if (lijkproc.eq.mype) cycle
349
350
351 do lproc =1,itotalneigh
352 if(lijkproc.eq.itoproc(lproc)) then
353 exit
354 end if
355 end do
356
357 lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
358
359
360 = istart1; linode_end=iend1
361 ljnode_start = jstart1; ljnode_end=jend1
362 lknode_start = kstart1; lknode_end=kend1
363
364 if(lci.eq. 1) linode_end = iStart1
365 if(lci.eq.-1) linode_start = iEnd1
366 if(lcj.eq. 1) ljnode_end = jStart1
367 if(lcj.eq.-1) ljnode_start = jEnd1
368 if(lck.eq. 1) lknode_end = kStart1
369 if(lck.eq.-1) lknode_start = kEnd1
370
371 lcount = istartrecv(lproc)+iprocsumindx(lproc)
372 do lknode = lknode_start,lknode_end
373 do linode = linode_start,linode_end
374 do ljnode = ljnode_start,ljnode_end
375 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
376 IF(WALL_AT(FUNIJK(linode,ljnode,lknode))) CYCLE
377 irecvnodes(lcount)=funijk(linode,ljnode,lknode)
378 iprocsumindx(lproc)=iprocsumindx(lproc)+1
379 lcount = lcount+1
380 end do
381 end do
382 end do
383
384 end do
385 end do
386 end do
387
388
389
390 RETURN
391 end subroutine INIT_DES_COLLECT_gDATA
392
393
394
395
396
397
398
399
400
401
402
403 subroutine DES_COLLECT_gDATA_db1(pvar)
404
405 use desmpi_wrapper, only: DES_MPI_WAIT
406 use desmpi_wrapper, only: DES_MPI_iSEND
407 use desmpi_wrapper, only: DES_MPI_iRECV
408 use parallel_mpi, only: MPI_Check
409 use compar, only: myPE, numPEs
410
411 implicit none
412
413
414
415 double precision, intent(inout) :: pvar(:)
416
417
418
419 character(len=80), parameter :: name = 'des_exchangenode'
420 integer :: lindx,lcount,lcount2,lneigh,ltag,lerr
421 integer :: lstart,lend,ltotal
422
423
424
425 do lcount = 1,itotalneigh
426 lneigh = itoproc(lcount)
427 lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
428 do lcount2 = lstart,lend
429 dsendnodebuf(lcount2) = pvar(isendnodes(lcount2))
430 end do
431
432 ltag = message_tag(lneigh,mype)
433 lstart = istartrecv(lcount);lend=istartrecv(lcount+1)-1
434 ltotal = lend-lstart+1
435 call des_mpi_irecv(drecvnodebuf(lstart:lend),ltotal, &
436 lneigh,ltag,irecvreqnode(lcount),lerr)
437 call mpi_check( name //':mpi_irecv ', lerr )
438
439 ltag = message_tag(mype,lneigh)
440 lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
441 ltotal = lend-lstart+1
442 call des_mpi_isend(dsendnodebuf(lstart:lend),ltotal, &
443 lneigh,ltag,isendreqnode(lcount),lerr)
444 call mpi_check( name //':mpi_irecv ', lerr )
445 end do
446
447 do lcount = 1,itotalneigh
448 call des_mpi_wait(isendreqnode(lcount),lerr)
449 call mpi_check( name //':mpi_wait-send', lerr )
450 call des_mpi_wait(irecvreqnode(lcount),lerr)
451 call mpi_check( name //':mpi_wait-recv', lerr )
452 end do
453
454
455
456 do lcount = 1,itotalindx
457 lindx = irecvnodes(lcount)
458 pvar(lindx) = pvar(lindx) + drecvnodebuf(lcount)
459 end do
460
461 return
462
463 contains
464
465 integer function message_tag(lsource,ldest)
466 implicit none
467 integer, intent(in) :: lsource,ldest
468 message_tag = lsource+numpes*ldest+200
469 end function message_tag
470
471 end subroutine DES_COLLECT_gDATA_db1
472
473
474
475
476
477
478
479
480 subroutine DES_COLLECT_gDATA_db2(pvar)
481
482 implicit none
483
484
485
486 double precision, intent(inout) :: pvar(:,:)
487 integer :: lc
488
489 do lc=lbound(pVAR,2), ubound(pVAR,2)
490 call des_collect_gDATA_db1(pVAR(:,lc))
491 enddo
492 return
493 end subroutine des_collect_gdata_db2
494
495
496
497
498
499
500
501
502
503
504
505
506
507 subroutine des_setnodeindices
508 use compar, only: mype, nodesi, nodesj, nodesk, dead_cell_at
509 use desgrid, only: IofPROC, JofPROC, KofPROC
510 use desgrid, only: procIJK
511 use discretelement, only: des_periodic_walls_x, des_periodic_walls_y, des_periodic_walls_z
512 use compar, only: istart2, iend1
513 use compar, only: jstart2, jend1
514 use compar, only: kstart2, kend1
515 use functions, only: funijk
516
517 implicit none
518
519
520
521 integer :: lijkproc,liproc,ljproc,lkproc
522 integer :: li,lj,lk
523 integer :: li2,lj2,lk2
524 integer :: liproc_start,liproc_end
525 integer :: ljproc_start,ljproc_end
526 integer :: lkproc_start,lkproc_end
527 integer :: lci,lcj,lck,lproc,lcount
528 integer :: linode_start,linode_end, linode
529 integer :: ljnode_start,ljnode_end, ljnode
530 integer :: lknode_start,lknode_end, lknode
531 logical :: lpresent
532 integer, allocatable :: iprocsumindx(:)
533
534
535
536 = iofproc(mype)
537 ljproc = jofproc(mype)
538 lkproc = kofproc(mype)
539
540
541 if(des_periodic_walls_x.and.nodesi.gt.1) then
542 liproc_start=liproc-1
543 liproc_end=liproc+1
544 else
545 liproc_start =max(liproc-1,0)
546 liproc_end=min(liproc+1,nodesi-1)
547 end if
548 if(des_periodic_walls_y.and.nodesj.gt.1) then
549 ljproc_start=ljproc-1
550 ljproc_end=ljproc+1
551 else
552 ljproc_start =max(ljproc-1,0)
553 ljproc_end=min(ljproc+1,nodesj-1)
554 end if
555 if(des_periodic_walls_z.and.nodesk.gt.1) then
556 lkproc_start=lkproc-1
557 lkproc_end=lkproc+1
558 else
559 lkproc_start =max(lkproc-1,0)
560 lkproc_end=min(lkproc+1,nodesk-1)
561 end if
562 itotalneigh = (liproc_end-liproc_start+1)*&
563 (ljproc_end-ljproc_start+1)*(lkproc_end-lkproc_start+1)-1
564
565
566 allocate (itoproc(itotalneigh))
567 allocate (iprocsumindx(itotalneigh))
568 allocate (istartsend(itotalneigh+1))
569 allocate (irecvreqnode(itotalneigh))
570 allocate (isendreqnode(itotalneigh))
571
572
573
574 = 0
575 itoproc(:)=-1
576 iprocsumindx(:) =0
577 do lk = lkproc_start,lkproc_end
578 do lj = ljproc_start,ljproc_end
579 do li = liproc_start,liproc_end
580 li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
581 lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
582 lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
583 lijkproc = procijk(li2,lj2,lk2)
584 if (lijkproc.eq.mype) cycle
585
586 = .false.
587 do lproc = 1,itotalneigh
588 if (lijkproc .eq.itoproc(lproc)) then
589 lpresent = .true.
590 exit
591 end if
592 end do
593 if(.not.lpresent) then
594 itotalneigh = itotalneigh + 1
595 lproc = itotalneigh
596 end if
597 itoproc(lproc) = lijkproc
598 lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
599 linode_start = istart2; linode_end=iend1
600 ljnode_start = jstart2; ljnode_end=jend1
601 lknode_start = kstart2; lknode_end=kend1
602 if(lci.eq.1) linode_end = istart2
603 if(lci.eq.-1) linode_start = iend1
604 if(lcj.eq.1) ljnode_end = jstart2
605 if(lcj.eq.-1) ljnode_start = jend1
606 if(lck.eq.1) lknode_end = kstart2
607 if(lck.eq.-1) lknode_start = kend1
608 do lknode = lknode_start,lknode_end
609 do linode = linode_start,linode_end
610 do ljnode = ljnode_start,ljnode_end
611 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
612 iprocsumindx(lproc) = iprocsumindx(lproc) + 1
613 end do
614 end do
615 end do
616 end do
617 end do
618 end do
619
620 do lproc =1,itotalneigh+1
621 istartsend(lproc)=sum(iprocsumindx(1:lproc-1))+1
622 end do
623 itotalindx=istartsend(itotalneigh+1)-1
624
625
626 allocate (isendnodes(itotalindx))
627 allocate (dsendnodebuf(itotalindx))
628 allocate (drecvnodebuf(itotalindx))
629
630
631 (:)=0
632 do lk = lkproc_start,lkproc_end
633 do lj = ljproc_start,ljproc_end
634 do li = liproc_start,liproc_end
635 li2 = mod(li,nodesi);if(li2.lt.0)li2=nodesi-1
636 lj2 = mod(lj,nodesj);if(lj2.lt.0)lj2=nodesj-1
637 lk2 = mod(lk,nodesk);if(lk2.lt.0)lk2=nodesk-1
638 lijkproc = procijk(li2,lj2,lk2)
639 if (lijkproc.eq.mype) cycle
640
641 do lproc =1,itotalneigh
642 if(lijkproc.eq.itoproc(lproc)) then
643 exit
644 end if
645 end do
646 lci=(liproc-li);lcj=(ljproc-lj);lck=(lkproc-lk)
647 linode_start = istart2; linode_end=iend1
648 ljnode_start = jstart2; ljnode_end=jend1
649 lknode_start = kstart2; lknode_end=kend1
650 if(lci.eq.1) linode_end = istart2
651 if(lci.eq.-1) linode_start = iend1
652 if(lcj.eq.1) ljnode_end = jstart2
653 if(lcj.eq.-1) ljnode_start = jend1
654 if(lck.eq.1) lknode_end = kstart2
655 if(lck.eq.-1) lknode_start = kend1
656 lcount = istartsend(lproc)+iprocsumindx(lproc)
657 do lknode = lknode_start,lknode_end
658 do linode = linode_start,linode_end
659 do ljnode = ljnode_start,ljnode_end
660 IF(DEAD_CELL_AT(linode,ljnode,lknode)) CYCLE
661 isendnodes(lcount)=funijk(linode,ljnode,lknode)
662 iprocsumindx(lproc)=iprocsumindx(lproc)+1
663 lcount = lcount+1
664 end do
665 end do
666 end do
667 end do
668 end do
669 end do
670
671
672 end subroutine des_setnodeindices
673
674
675
676
677
678
679
680
681
682
683 subroutine des_exchangenode(pvar,padd)
684
685 use desmpi_wrapper, only: DES_MPI_WAIT
686 use desmpi_wrapper, only: DES_MPI_iSEND
687 use desmpi_wrapper, only: DES_MPI_iRECV
688 use parallel_mpi, only: MPI_Check
689 use compar, only: myPE, numPEs
690
691
692 implicit none
693
694
695
696 double precision,dimension(:),intent(inout) ::pvar
697 logical:: padd
698
699
700
701 character(len=80), parameter :: name = 'des_exchangenode'
702 integer :: lindx,lcount,lcount2,lneigh,ltag,lerr
703 integer :: lstart,lend,ltotal
704
705
706
707 do lcount = 1,itotalneigh
708 lneigh = itoproc(lcount)
709 lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
710 do lcount2 = lstart,lend
711 dsendnodebuf(lcount2) = pvar(isendnodes(lcount2))
712 end do
713 ltag = message_tag(lneigh,mype)
714 ltotal = lend-lstart+1
715 call des_mpi_irecv(drecvnodebuf(lstart:lend),ltotal, &
716 lneigh,ltag,irecvreqnode(lcount),lerr)
717 call mpi_check( name //':mpi_irecv ', lerr )
718 ltag = message_tag(mype,lneigh)
719 call des_mpi_isend(dsendnodebuf(lstart:lend),ltotal, &
720 lneigh,ltag,isendreqnode(lcount),lerr)
721 call mpi_check( name //':mpi_irecv ', lerr )
722 end do
723
724 do lcount = 1,itotalneigh
725 call des_mpi_wait(isendreqnode(lcount),lerr)
726 call mpi_check( name //':mpi_wait-send', lerr )
727 call des_mpi_wait(irecvreqnode(lcount),lerr)
728 call mpi_check( name //':mpi_wait-recv', lerr )
729 end do
730
731
732 if (padd) then
733 do lcount = 1,itotalindx
734 lindx = isendnodes(lcount)
735 pvar(lindx) = pvar(lindx) + drecvnodebuf(lcount)
736 end do
737 else
738 do lcount = 1,itotalindx
739 lindx = isendnodes(lcount)
740 pvar(lindx) = drecvnodebuf(lcount)
741 end do
742 end if
743 return
744
745 contains
746
747 integer function message_tag(lsource,ldest)
748 implicit none
749 integer, intent(in) :: lsource,ldest
750 message_tag = lsource+numpes*ldest+200
751 end function message_tag
752
753 end subroutine des_exchangenode
754
755
756
757
758
759 subroutine des_dbgnodesr()
760
761
762 use indices, only: i_of, j_of, k_of
763 use compar, only: mype
764
765 implicit none
766
767
768
769 character (255) filename
770 integer ijk
771 integer lcount,lcount2,lstart,lend
772
773
774
775 write(filename,'("dbg_nodesr",I4.4,".dat")') mype
776 open(44,file=filename,convert='big_endian')
777 do lcount = 1,itotalneigh
778 lstart = istartsend(lcount);lend=istartsend(lcount+1)-1
779 write(44,"(2/,72('*'))")
780 write(44,1100) myPE, itoproc(lcount)
781 write(44,"(/2x,'Start:',I6)") lstart
782 write(44,"( 2x,'End: ',I6)") lend
783 write(44,"(72('-'))")
784 do lcount2 = lstart,lend
785 ijk = isendnodes(lcount2)
786 write(44,1000)'SEND', i_of(ijk),j_of(ijk),k_of(ijk),ijk
787 end do
788 write(44,"(72('-'))")
789 end do
790
791 if(allocated(irecvnodes)) then
792 do lcount = 1,itotalneigh
793 lstart = istartrecv(lcount);lend=istartrecv(lcount+1)-1
794 write(44,"(2/,72('*'))")
795 write(44,1100) itoproc(lcount), myPE
796 write(44,"(/2x,'Start:',I6)") lstart
797 write(44,"( 2x,'End: ',I6)") lend
798 write(44,"(72('-'))")
799 do lcount2 = lstart,lend
800 ijk = irecvnodes(lcount2)
801 write(44,1000) 'RECV', i_of(ijk),j_of(ijk),k_of(ijk),ijk
802 end do
803 write(44,"(72('-'))")
804 end do
805 end if
806
807 close (44)
808
809 1100 FORMAT(2x,'Send Proc ',I2,' --> Recv Proc' I2)
810 1000 FORMAT(3x,A,': (',I3,',',I3,',',I3,') :: ',I7)
811
812 end subroutine des_dbgnodesr
813
814 end module
815
816
817