CoreNEURON
multisend.cpp
Go to the documentation of this file.
1 /*
2 # =============================================================================
3 # Copyright (c) 2016 - 2021 Blue Brain Project/EPFL
4 #
5 # See top-level LICENSE file for details.
6 # =============================================================================
7 */
8 
14 
15 /*
16 Overall exchange strategy
17 
18 When a cell spikes, it immediately does a multisend of
19 (int gid, double spiketime) to all the target machines that have
20 cells that need to receive this spike by spiketime + delay.
21 The MPI implementation does not block due to use of MPI_Isend.
22 
23 In order to minimize the number of nrnmpi_multisend_conserve tests
24 (and potentially abandon them altogether if I can ever guarantee
25 that exchange time is less than half the computation time), I divide the
26 minimum delay integration intervals into two equal subintervals.
27 So if a spike is generated in an even subinterval, I do not have
28 to include it in the conservation check until the end of the next even
29 subinterval.
30 
31 When a spike is received (generally MPI_Iprobe, MPI_Recv) it is placed in
32 even or odd buffers (depending on whether the coded gid is positive or negative)
33 
34 At the end of a computation subinterval the even or odd buffer spikes
35 are enqueued in the priority queue after checking that the number
36 of spikes sent is equal to the number of spikes sent.
37 */
38 
39 // The initial idea behind the optional phase2 is to avoid the large overhead of
40 // initiating a send of the up to 10k list of target hosts when a cell fires.
41 // I.e. when there are a small number of cells on a processor, this causes
42 // load balance problems.
43 // Load balance should be better if the send is distributed to a much smaller
44 // set of targets, which, when they receive the spike, pass it on to a neighbor
45 // set. A non-exclusive alternative to this is the use of RECORD_REPLAY
46 // which give a very fast initiation but we have not been able to get that
47 // to complete in the sense of all the targets receiving their spikes before
48 // the conservation step.
49 // We expect that phase2 will work best in combination with ENQUEUE=2
50 // which has the greatest amount of overlap between computation
51 // and communication.
52 namespace coreneuron {
56 
57 #if NRN_MULTISEND
58 
59 static int n_xtra_cons_check_;
60 #define MAXNCONS 10
61 #if MAXNCONS
62 static int xtra_cons_hist_[MAXNCONS + 1];
63 #endif
64 
65 // ENQUEUE 0 means to Multisend_ReceiveBuffer buffer -> InputPreSyn.send
66 // ENQUEUE 1 means to Multisend_ReceiveBuffer buffer -> psbuf -> InputPreSyn.send
67 // ENQUEUE 2 means to Multisend_ReceiveBuffer.incoming -> InputPrySyn.send
68 // Note that ENQUEUE 2 give more overlap between computation and exchange
69 // since the enqueuing takes place during computation except for those
70 // remaining during conservation.
71 #define ENQUEUE 2
72 
73 #if ENQUEUE == 2
74 static unsigned long enq2_find_time_;
75 static unsigned long enq2_enqueue_time_; // includes enq_find_time_
76 #endif
77 
78 #define PHASE2BUFFER_SIZE 2048 // power of 2
79 #define PHASE2BUFFER_MASK (PHASE2BUFFER_SIZE - 1)
80 struct Phase2Buffer {
81  InputPreSyn* ps;
82  double spiketime;
83  int gid;
84 };
85 
86 #define MULTISEND_RECEIVEBUFFER_SIZE 10000
87 class Multisend_ReceiveBuffer {
88  public:
89  Multisend_ReceiveBuffer();
90  virtual ~Multisend_ReceiveBuffer();
91  void init(int index);
92  void incoming(int gid, double spiketime);
93  void enqueue();
94  int index_{};
95  int size_{MULTISEND_RECEIVEBUFFER_SIZE};
96  int count_{};
97  int maxcount_{};
98  bool busy_{};
99  int nsend_{}, nrecv_{}; // for checking conservation
100  int nsend_cell_{}; // cells that spiked this interval.
101  NRNMPI_Spike** buffer_{};
102 
103  void enqueue1();
104  void enqueue2();
105  InputPreSyn** psbuf_{};
106 
107  void phase2send();
108  int phase2_head_{};
109  int phase2_tail_{};
110  int phase2_nsend_cell_{}, phase2_nsend_{};
111  Phase2Buffer* phase2_buffer_{};
112 };
113 
114 #define MULTISEND_INTERVAL 2
115 static Multisend_ReceiveBuffer* multisend_receive_buffer[MULTISEND_INTERVAL];
116 static int current_rbuf, next_rbuf;
117 #if MULTISEND_INTERVAL == 2
118 // note that if a spike is supposed to be received by multisend_receive_buffer[1]
119 // then during transmission its gid is complemented.
120 #endif
121 
122 static int* targets_phase1_;
123 static int* targets_phase2_;
124 
125 void nrn_multisend_send(PreSyn* ps, double t, NrnThread* nt) {
126  int i = ps->multisend_index_;
127  if (i >= 0) {
128  // format is cnt, cnt_phase1, array of target ranks.
129  // Valid for one or two phase.
130  int* ranks = targets_phase1_ + i;
131  int cnt = ranks[0];
132  int cnt_phase1 = ranks[1];
133  ranks += 2;
134  NRNMPI_Spike spk;
135  spk.gid = ps->output_index_;
136  spk.spiketime = t;
137  if (next_rbuf == 1) {
138  spk.gid = ~spk.gid;
139  }
140  if (nt == nrn_threads) {
141  multisend_receive_buffer[next_rbuf]->nsend_ += cnt;
142  multisend_receive_buffer[next_rbuf]->nsend_cell_ += 1;
143  nrnmpi_multisend(&spk, cnt_phase1, ranks);
144  } else {
145  assert(0);
146  }
147  }
148 }
149 
150 static void multisend_send_phase2(InputPreSyn* ps, int gid, double t) {
151  int i = ps->multisend_phase2_index_;
152  assert(i >= 0);
153  // format is cnt_phase2, array of target ranks
154  int* ranks = targets_phase2_ + i;
155  int cnt_phase2 = ranks[0];
156  ranks += 1;
157  NRNMPI_Spike spk;
158  spk.gid = gid;
159  spk.spiketime = t;
160  nrnmpi_multisend(&spk, cnt_phase2, ranks);
161 }
162 
163 Multisend_ReceiveBuffer::Multisend_ReceiveBuffer()
164  : buffer_ {
165  new NRNMPI_Spike*[size_]
166 }
167 #if ENQUEUE == 1
168 , psbuf_ {
169  new InputPreSyn*[size_]
170 }
171 #endif
172 , phase2_buffer_{new Phase2Buffer[PHASE2BUFFER_SIZE]} {}
173 
174 Multisend_ReceiveBuffer::~Multisend_ReceiveBuffer() {
175  nrn_assert(!busy_);
176  for (int i = 0; i < count_; ++i) {
177  delete buffer_[i];
178  }
179  delete[] buffer_;
180  if (psbuf_)
181  delete[] psbuf_;
182  delete[] phase2_buffer_;
183 }
184 void Multisend_ReceiveBuffer::init(int index) {
185  index_ = index;
186  nsend_cell_ = nsend_ = nrecv_ = maxcount_ = 0;
187  busy_ = false;
188  for (int i = 0; i < count_; ++i) {
189  delete buffer_[i];
190  }
191  count_ = 0;
192 
193  phase2_head_ = phase2_tail_ = 0;
194  phase2_nsend_cell_ = phase2_nsend_ = 0;
195 }
196 void Multisend_ReceiveBuffer::incoming(int gid, double spiketime) {
197  // printf("%d %p.incoming %g %g %d\n", nrnmpi_myid, this, t, spk->spiketime, spk->gid);
198  nrn_assert(!busy_);
199  busy_ = true;
200 
201  if (count_ >= size_) {
202  size_ *= 2;
203  NRNMPI_Spike** newbuf = new NRNMPI_Spike*[size_];
204  for (int i = 0; i < count_; ++i) {
205  newbuf[i] = buffer_[i];
206  }
207  delete[] buffer_;
208  buffer_ = newbuf;
209  if (psbuf_) {
210  delete[] psbuf_;
211  psbuf_ = new InputPreSyn*[size_];
212  }
213  }
214  NRNMPI_Spike* spk = new NRNMPI_Spike();
215  spk->gid = gid;
216  spk->spiketime = spiketime;
217  buffer_[count_++] = spk;
218  if (maxcount_ < count_) {
219  maxcount_ = count_;
220  }
221 
222  ++nrecv_;
223  busy_ = false;
224 }
225 void Multisend_ReceiveBuffer::enqueue() {
226  // printf("%d %p.enqueue count=%d t=%g nrecv=%d nsend=%d\n", nrnmpi_myid, this, t, count_,
227  // nrecv_, nsend_);
228  nrn_assert(!busy_);
229  busy_ = true;
230 
231  for (int i = 0; i < count_; ++i) {
232  NRNMPI_Spike* spk = buffer_[i];
233 
234  auto gid2in_it = gid2in.find(spk->gid);
235  assert(gid2in_it != gid2in.end());
236  InputPreSyn* ps = gid2in_it->second;
237 
238  if (use_phase2_ && ps->multisend_phase2_index_ >= 0) {
239  Phase2Buffer& pb = phase2_buffer_[phase2_head_++];
240  phase2_head_ &= PHASE2BUFFER_MASK;
241  assert(phase2_head_ != phase2_tail_);
242  pb.ps = ps;
243  pb.spiketime = spk->spiketime;
244  pb.gid = spk->gid;
245  }
246 
247  ps->send(spk->spiketime, net_cvode_instance, nrn_threads);
248  delete spk;
249  }
250 
251  count_ = 0;
252 #if ENQUEUE != 2
253  nrecv_ = 0;
254  nsend_ = 0;
255  nsend_cell_ = 0;
256 #endif
257  busy_ = false;
258  phase2send();
259 }
260 
261 void Multisend_ReceiveBuffer::enqueue1() {
262  // printf("%d %lx.enqueue count=%d t=%g nrecv=%d nsend=%d\n", nrnmpi_myid, (long)this, t,
263  // count_, nrecv_, nsend_);
264  nrn_assert(!busy_);
265  busy_ = true;
266  for (int i = 0; i < count_; ++i) {
267  NRNMPI_Spike* spk = buffer_[i];
268 
269  auto gid2in_it = gid2in.find(spk->gid);
270  assert(gid2in_it != gid2in.end());
271  InputPreSyn* ps = gid2in_it->second;
272  psbuf_[i] = ps;
273  if (use_phase2_ && ps->multisend_phase2_index_ >= 0) {
274  Phase2Buffer& pb = phase2_buffer_[phase2_head_++];
275  phase2_head_ &= PHASE2BUFFER_MASK;
276  assert(phase2_head_ != phase2_tail_);
277  pb.ps = ps;
278  pb.spiketime = spk->spiketime;
279  pb.gid = spk->gid;
280  }
281  }
282  busy_ = false;
283  phase2send();
284 }
285 
286 void Multisend_ReceiveBuffer::enqueue2() {
287  // printf("%d %lx.enqueue count=%d t=%g nrecv=%d nsend=%d\n", nrnmpi_myid, (long)this, t,
288  // count_, nrecv_, nsend_);
289  nrn_assert(!busy_);
290  busy_ = false;
291  for (int i = 0; i < count_; ++i) {
292  NRNMPI_Spike* spk = buffer_[i];
293  InputPreSyn* ps = psbuf_[i];
294  ps->send(spk->spiketime, net_cvode_instance, nrn_threads);
295  delete spk;
296  }
297  count_ = 0;
298  nrecv_ = 0;
299  nsend_ = 0;
300  nsend_cell_ = 0;
301  busy_ = false;
302 }
303 
304 void Multisend_ReceiveBuffer::phase2send() {
305  while (phase2_head_ != phase2_tail_) {
306  Phase2Buffer& pb = phase2_buffer_[phase2_tail_++];
307  phase2_tail_ &= PHASE2BUFFER_MASK;
308  int gid = pb.gid;
309  if (index_) {
310  gid = ~gid;
311  }
312  multisend_send_phase2(pb.ps, gid, pb.spiketime);
313  }
314 }
315 
316 static int max_ntarget_host;
317 // For one phase sending, max_multisend_targets is max_ntarget_host.
318 // For two phase sending, it is the maximum of all the
319 // ntarget_hosts_phase1 and ntarget_hosts_phase2.
320 static int max_multisend_targets;
321 
322 void nrn_multisend_init() {
323  for (int i = 0; i < n_multisend_interval; ++i) {
324  multisend_receive_buffer[i]->init(i);
325  }
326  current_rbuf = 0;
327  next_rbuf = n_multisend_interval - 1;
328 #if ENQUEUE == 2
329  enq2_find_time_ = enq2_enqueue_time_ = 0;
330 #endif
331  n_xtra_cons_check_ = 0;
332 #if MAXNCONS
333  for (int i = 0; i <= MAXNCONS; ++i) {
334  xtra_cons_hist_[i] = 0;
335  }
336 #endif // MAXNCONS
337 }
338 
339 static int multisend_advance() {
340  NRNMPI_Spike spk;
341  int i = 0;
342  while (nrnmpi_multisend_single_advance(&spk)) {
343  i += 1;
344  int j = 0;
345 #if MULTISEND_INTERVAL == 2
346  if (spk.gid < 0) {
347  spk.gid = ~spk.gid;
348  j = 1;
349  }
350 #endif
351  multisend_receive_buffer[j]->incoming(spk.gid, spk.spiketime);
352  }
353  return i;
354 }
355 
356 #if NRN_MULTISEND
357 void nrn_multisend_advance() {
358  if (use_multisend_) {
359  multisend_advance();
360 #if ENQUEUE == 2
361  multisend_receive_buffer[current_rbuf]->enqueue();
362 #endif
363  }
364 }
365 #endif
366 
367 void nrn_multisend_receive(NrnThread* nt) {
368  // nrn_spike_exchange();
369  assert(nt == nrn_threads);
370  // double w1, w2;
371  int ncons = 0;
372  int& s = multisend_receive_buffer[current_rbuf]->nsend_;
373  int& r = multisend_receive_buffer[current_rbuf]->nrecv_;
374 // w1 = nrn_wtime();
375 #if NRN_MULTISEND & 1
376  if (use_multisend_) {
378  nrnmpi_barrier();
380  // with two phase we expect conservation to hold and ncons should
381  // be 0.
382  while (nrnmpi_multisend_conserve(s, r) != 0) {
384  ++ncons;
385  }
386  }
387 #endif
388  // w1 = nrn_wtime() - w1;
389  // w2 = nrn_wtime();
390 
391 #if ENQUEUE == 0
392  multisend_receive_buffer[current_rbuf]->enqueue();
393 #endif
394 #if ENQUEUE == 1
395  multisend_receive_buffer[current_rbuf]->enqueue1();
396  multisend_receive_buffer[current_rbuf]->enqueue2();
397 #endif
398 #if ENQUEUE == 2
399  multisend_receive_buffer[current_rbuf]->enqueue();
400  s = r = multisend_receive_buffer[current_rbuf]->nsend_cell_ = 0;
401 
402  multisend_receive_buffer[current_rbuf]->phase2_nsend_cell_ = 0;
403  multisend_receive_buffer[current_rbuf]->phase2_nsend_ = 0;
404 
405  enq2_find_time_ = 0;
406  enq2_enqueue_time_ = 0;
407 #endif // ENQUEUE == 2
408 // wt1_ = nrn_wtime() - w2;
409 // wt_ = w1;
410 #if MULTISEND_INTERVAL == 2
411  // printf("%d reverse buffers %g\n", nrnmpi_myid, t);
412  if (n_multisend_interval == 2) {
413  current_rbuf = next_rbuf;
414  next_rbuf = ((next_rbuf + 1) & 1);
415  }
416 #endif
417 }
418 
419 void nrn_multisend_cleanup() {
420  if (targets_phase1_) {
421  delete[] targets_phase1_;
422  targets_phase1_ = nullptr;
423  }
424 
425  if (targets_phase2_) {
426  delete[] targets_phase2_;
427  targets_phase2_ = nullptr;
428  }
429 
430  // cleanup MultisendReceiveBuffer here as well
431 }
432 
433 void nrn_multisend_setup() {
435  if (!use_multisend_) {
436  return;
437  }
438  nrnmpi_multisend_comm();
439  // if (nrnmpi_myid == 0) printf("multisend_setup()\n");
440  // although we only care about the set of hosts that gid2out_
441  // sends spikes to (source centric). We do not want to send
442  // the entire list of gid2in (which may be 10000 times larger
443  // than gid2out) from every machine to every machine.
444  // so we accomplish the task in two phases the first of which
445  // involves allgather with a total receive buffer size of number
446  // of cells (even that is too large and we will split it up
447  // into chunks). And the second, an
448  // allreduce with receive buffer size of number of hosts.
449  max_ntarget_host = 0;
450  max_multisend_targets = 0;
451 
452  // completely new algorithm does one and two phase.
453  nrn_multisend_setup_targets(use_phase2_, targets_phase1_, targets_phase2_);
454 
455  if (!multisend_receive_buffer[0]) {
456  multisend_receive_buffer[0] = new Multisend_ReceiveBuffer();
457  }
458 #if MULTISEND_INTERVAL == 2
459  if (n_multisend_interval == 2 && !multisend_receive_buffer[1]) {
460  multisend_receive_buffer[1] = new Multisend_ReceiveBuffer();
461  }
462 #endif
463 }
464 #endif // NRN_MULTISEND
465 } // namespace coreneuron
coreneuron::nrn_multisend_send
void nrn_multisend_send(PreSyn *, double t, NrnThread *)
multisend.hpp
coreneuron::nrnmpi_barrier
mpi_function< cnrn_make_integral_constant_t(nrnmpi_barrier_impl)> nrnmpi_barrier
Definition: nrnmpidec.cpp:42
netcvode.hpp
coreneuron::use_multisend_
bool use_multisend_
Definition: multisend.cpp:53
coreneuron
THIS FILE IS AUTO GENERATED DONT MODIFY IT.
Definition: corenrn_parameters.cpp:12
coreneuron::t
double t
Definition: register_mech.cpp:22
coreneuron::i
int i
Definition: cellorder.cpp:485
coreneuron::nrn_multisend_receive
void nrn_multisend_receive(NrnThread *)
nrniv_decl.h
i
#define i
Definition: md1redef.h:19
coreneuron::nrn_multisend_cleanup
void nrn_multisend_cleanup()
coreneuron::nrn_multisend_setup_targets
void nrn_multisend_setup_targets(bool use_phase2, int *&targets_phase1, int *&targets_phase2)
coreneuron::n_multisend_interval
int n_multisend_interval
Definition: multisend.cpp:55
coreneuron::nrn_multisend_init
void nrn_multisend_init()
coreneuron::gid2in
std::map< int, InputPreSyn * > gid2in
Definition: nrn_setup.cpp:158
cnt
#define cnt
Definition: tqueue.hpp:44
netcon.hpp
coreneuron::nrn_threads
NrnThread * nrn_threads
Definition: multicore.cpp:56
coreneuron::net_cvode_instance
NetCvode * net_cvode_instance
Definition: netcvode.cpp:35
coreneuron::nrn_multisend_advance
void nrn_multisend_advance()
multicore.hpp
coreneuron::InputPreSyn
Definition: netcon.hpp:132
coreneuron::nrn_multisend_setup
void nrn_multisend_setup()
nrn_assert
#define nrn_assert(x)
assert()-like macro, independent of NDEBUG status
Definition: nrn_assert.h:33
coreneuron::use_phase2_
bool use_phase2_
Definition: multisend.cpp:54