CoreNEURON
netpar.cpp
Go to the documentation of this file.
1 /*
2 # =============================================================================
3 # Copyright (c) 2016 - 2022 Blue Brain Project/EPFL
4 #
5 # See top-level LICENSE file for details.
6 # =============================================================================.
7 */
8 
9 #include <cstdio>
10 #include <cstdlib>
11 #include <map>
12 #include <mutex>
13 #include <vector>
14 
15 #include "coreneuron/nrnconf.h"
19 
29 
30 #if NRNMPI
31 #include "coreneuron/mpi/nrnmpi.h"
33 int localgid_size_;
34 int ag_send_nspike;
35 namespace coreneuron {
36 int* nrnmpi_nin_;
37 }
38 int ovfl_capacity;
39 int icapacity;
40 unsigned char* spikeout_fixed;
41 unsigned char* spfixin_ovfl_;
42 unsigned char* spikein_fixed;
43 int ag_send_size;
44 int ovfl;
45 int nout;
48 #endif
49 
50 namespace coreneuron {
51 class PreSyn;
52 class InputPreSyn;
53 
55 
56 #if NRNMPI
57 static double t_exchange_;
58 static double dt1_; // 1/dt
59 
60 NRNMPI_Spike* spikeout;
61 NRNMPI_Spike* spikein;
62 
63 void nrn_timeout(int);
64 void nrn_spike_exchange(NrnThread*);
65 void nrn2ncs_outputevent(int netcon_output_index, double firetime);
66 
67 // for compressed gid info during spike exchange
69 void nrn_outputevent(unsigned char localgid, double firetime);
70 std::vector<std::map<int, InputPreSyn*>> localmaps;
71 
72 static int ocapacity_; // for spikeout
73 // require it to be smaller than min_interprocessor_delay.
74 static double wt_; // wait time for nrnmpi_spike_exchange
75 static double wt1_; // time to find the PreSyns and send the spikes.
76 static bool use_compress_;
77 static int spfixout_capacity_;
78 static int idxout_;
79 static void nrn_spike_exchange_compressed(NrnThread*);
80 
81 #endif // NRNMPI
82 
83 static bool active_ = false;
84 static double usable_mindelay_;
85 static double mindelay_; // the one actually used. Some of our optional algorithms
86 static double last_maxstep_arg_;
87 static std::vector<NetParEvent> npe_; // nrn_nthread of them
88 
89 #if NRNMPI
90 // for combination of threads and mpi.
91 static OMP_Mutex mut;
92 #endif
93 
94 /// Allocate space for spikes: 200 structs of {int gid; double time}
95 /// coming from nrnmpi.h and array of int of the global domain size
96 static void alloc_mpi_space() {
97 #if NRNMPI
98  if (corenrn_param.mpi_enable && !spikeout) {
99  ocapacity_ = 100;
100  spikeout = (NRNMPI_Spike*) emalloc(ocapacity_ * sizeof(NRNMPI_Spike));
101  icapacity = 100;
102  spikein = (NRNMPI_Spike*) malloc(icapacity * sizeof(NRNMPI_Spike));
103  nrnmpi_nin_ = (int*) emalloc(nrnmpi_numprocs * sizeof(int));
104 #if nrn_spikebuf_size > 0
105  spbufout = (NRNMPI_Spikebuf*) emalloc(sizeof(NRNMPI_Spikebuf));
106  spbufin = (NRNMPI_Spikebuf*) emalloc(nrnmpi_numprocs * sizeof(NRNMPI_Spikebuf));
107 #endif
108  }
109 #endif
110 }
111 
113  : ithread_(-1)
114  , wx_(0.)
115  , ws_(0.) {}
116 
117 void NetParEvent::send(double tt, NetCvode* nc, NrnThread* nt) {
118  nc->event(tt + usable_mindelay_, this, nt);
119 }
120 
121 void NetParEvent::deliver(double tt, NetCvode* nc, NrnThread* nt) {
123  nt->_stop_stepping = 1;
124  nt->_t = tt;
125  send(tt, nc, nt);
126 }
127 
128 void NetParEvent::pr(const char* m, double tt, NetCvode*) {
129  printf("%s NetParEvent %d t=%.15g tt-t=%g\n", m, ithread_, tt, tt - nrn_threads[ithread_]._t);
130 }
131 
132 #if NRNMPI
133 inline static void sppk(unsigned char* c, int gid) {
134  for (int i = localgid_size_ - 1; i >= 0; --i) {
135  c[i] = gid & 255;
136  gid >>= 8;
137  }
138 }
139 inline static int spupk(unsigned char* c) {
140  int gid = *c++;
141  for (int i = 1; i < localgid_size_; ++i) {
142  gid <<= 8;
143  gid += *c++;
144  }
145  return gid;
146 }
147 
148 void nrn_outputevent(unsigned char localgid, double firetime) {
149  if (!active_) {
150  return;
151  }
152  std::lock_guard<OMP_Mutex> lock(mut);
153  nout++;
154  int i = idxout_;
155  idxout_ += 2;
156  if (idxout_ >= spfixout_capacity_) {
157  spfixout_capacity_ *= 2;
158  spikeout_fixed = (unsigned char*) erealloc(spikeout_fixed,
159  spfixout_capacity_ * sizeof(unsigned char));
160  }
161  spikeout_fixed[i++] = (unsigned char) ((firetime - t_exchange_) * dt1_ + .5);
162  spikeout_fixed[i] = localgid;
163  // printf("%d idx=%d lgid=%d firetime=%g t_exchange_=%g [0]=%d [1]=%d\n", nrnmpi_myid, i,
164  // (int)localgid, firetime, t_exchange_, (int)spikeout_fixed[i-1], (int)spikeout_fixed[i]);
165 }
166 
167 void nrn2ncs_outputevent(int gid, double firetime) {
168  if (!active_) {
169  return;
170  }
171  std::lock_guard<OMP_Mutex> lock(mut);
172  if (use_compress_) {
173  nout++;
174  int i = idxout_;
175  idxout_ += 1 + localgid_size_;
176  if (idxout_ >= spfixout_capacity_) {
177  spfixout_capacity_ *= 2;
178  spikeout_fixed = (unsigned char*) erealloc(spikeout_fixed,
179  spfixout_capacity_ * sizeof(unsigned char));
180  }
181  // printf("%d nrnncs_outputevent %d %.20g %.20g %d\n", nrnmpi_myid, gid, firetime,
182  // t_exchange_,
183  //(int)((unsigned char)((firetime - t_exchange_)*dt1_ + .5)));
184  spikeout_fixed[i++] = (unsigned char) ((firetime - t_exchange_) * dt1_ + .5);
185  // printf("%d idx=%d firetime=%g t_exchange_=%g spfixout=%d\n", nrnmpi_myid, i, firetime,
186  // t_exchange_, (int)spikeout_fixed[i-1]);
187  sppk(spikeout_fixed + i, gid);
188  // printf("%d idx=%d gid=%d spupk=%d\n", nrnmpi_myid, i, gid, spupk(spikeout_fixed+i));
189  } else {
190 #if nrn_spikebuf_size == 0
191  int i = nout++;
192  if (i >= ocapacity_) {
193  ocapacity_ *= 2;
194  spikeout = (NRNMPI_Spike*) erealloc(spikeout, ocapacity_ * sizeof(NRNMPI_Spike));
195  }
196  // printf("%d cell %d in slot %d fired at %g\n", nrnmpi_myid, gid, i, firetime);
197  spikeout[i].gid = gid;
198  spikeout[i].spiketime = firetime;
199 #else
200  int i = nout++;
201  if (i >= nrn_spikebuf_size) {
202  i -= nrn_spikebuf_size;
203  if (i >= ocapacity_) {
204  ocapacity_ *= 2;
205  spikeout = (NRNMPI_Spike*) hoc_Erealloc(spikeout,
206  ocapacity_ * sizeof(NRNMPI_Spike));
207  hoc_malchk();
208  }
209  spikeout[i].gid = gid;
210  spikeout[i].spiketime = firetime;
211  } else {
212  spbufout->gid[i] = gid;
213  spbufout->spiketime[i] = firetime;
214  }
215 #endif
216  }
217  // printf("%d cell %d in slot %d fired at %g\n", nrnmpi_myid, gid, i, firetime);
218 }
219 #endif // NRNMPI
220 
221 static bool nrn_need_npe() {
222  if (active_ || nrn_nthread > 1) {
223  if (last_maxstep_arg_ == 0) {
224  last_maxstep_arg_ = 100.;
225  }
226  return true;
227  } else {
228  if (!npe_.empty()) {
229  npe_.clear();
230  npe_.shrink_to_fit();
231  }
232  return false;
233  }
234 }
235 
236 #define TBUFSIZE 0
237 
239  // printf("nrn_spike_exchange_init\n");
240  if (!nrn_need_npe()) {
241  return;
242  }
243  alloc_mpi_space();
245 #if NRN_MULTISEND
246  if (use_multisend_ && n_multisend_interval == 2) {
247  usable_mindelay_ *= 0.5;
248  }
249 #endif
250  if (nrn_nthread > 1) {
251  usable_mindelay_ -= dt;
252  }
253  if ((usable_mindelay_ < 1e-9) || (usable_mindelay_ < dt)) {
254  if (nrnmpi_myid == 0) {
255  hoc_execerror("usable mindelay is 0", "(or less than dt for fixed step method)");
256  } else {
257  return;
258  }
259  }
260 
261 #if TBUFSIZE
262  itbuf_ = 0;
263 #endif
264 
265 #if NRN_MULTISEND
266  if (use_multisend_) {
268  }
269 #endif
270 
271  if (npe_.size() != static_cast<std::size_t>(nrn_nthread)) {
272  if (!npe_.empty()) {
273  npe_.clear();
274  npe_.shrink_to_fit();
275  }
276  npe_.resize(nrn_nthread);
277  }
278  for (int i = 0; i < nrn_nthread; ++i) {
279  npe_[i].ithread_ = i;
280  npe_[i].wx_ = 0.;
281  npe_[i].ws_ = 0.;
282  npe_[i].send(t, net_cvode_instance, nrn_threads + i);
283  }
284 #if NRNMPI
286  if (use_compress_) {
287  idxout_ = 2;
288  t_exchange_ = t;
289  dt1_ = rev_dt;
290  usable_mindelay_ = floor(mindelay_ * dt1_ + 1e-9) * dt;
291  if (usable_mindelay_ * dt1_ >= 255.) {
292  usable_mindelay_ = 255. / dt1_;
293  }
294  assert(usable_mindelay_ >= dt && (usable_mindelay_ * dt1_) <= 255.);
295  } else {
296 #if nrn_spikebuf_size > 0
297  if (spbufout) {
298  spbufout->nspike = 0;
299  }
300 #endif
301  }
302  nout = 0;
303  }
304 #endif // NRNMPI
305  // if (nrnmpi_myid == 0){printf("usable_mindelay_ = %g\n", usable_mindelay_);}
306 }
307 
308 #if NRNMPI
309 void nrn_spike_exchange(NrnThread* nt) {
310  Instrumentor::phase p_spike_exchange("spike-exchange");
311  if (!active_) {
312  return;
313  }
314 #if NRN_MULTISEND
315  if (use_multisend_) {
317  return;
318  }
319 #endif
320  if (use_compress_) {
321  nrn_spike_exchange_compressed(nt);
322  return;
323  }
324 #if TBUFSIZE
325  nrnmpi_barrier();
326 #endif
327 
328 #if nrn_spikebuf_size > 0
329  spbufout->nspike = nout;
330 #endif
331  double wt = nrn_wtime();
332 
333  int n = nrnmpi_spike_exchange(
334  nrnmpi_nin_, spikeout, icapacity, &spikein, ovfl, nout, spbufout, spbufin);
335 
336  wt_ = nrn_wtime() - wt;
337  wt = nrn_wtime();
338 #if TBUFSIZE
339  tbuf_[itbuf_++] = (unsigned long) nout;
340  tbuf_[itbuf_++] = (unsigned long) n;
341 #endif
342 
343  errno = 0;
344  // if (n > 0) {
345  // printf("%d nrn_spike_exchange sent %d received %d\n", nrnmpi_myid, nout, n);
346  //}
347  nout = 0;
348  if (n == 0) {
349  return;
350  }
351 #if nrn_spikebuf_size > 0
352  for (int i = 0; i < nrnmpi_numprocs; ++i) {
353  int nn = spbufin[i].nspike;
354  if (nn > nrn_spikebuf_size) {
355  nn = nrn_spikebuf_size;
356  }
357  for (int j = 0; j < nn; ++j) {
358  auto gid2in_it = gid2in.find(spbufin[i].gid[j]);
359  if (gid2in_it != gid2in.end()) {
360  InputPreSyn* ps = gid2in_it->second;
361  ps->send(spbufin[i].spiketime[j], net_cvode_instance, nt);
362  }
363  }
364  }
365  n = ovfl;
366 #endif // nrn_spikebuf_size > 0
367  for (int i = 0; i < n; ++i) {
368  auto gid2in_it = gid2in.find(spikein[i].gid);
369  if (gid2in_it != gid2in.end()) {
370  InputPreSyn* ps = gid2in_it->second;
371  ps->send(spikein[i].spiketime, net_cvode_instance, nt);
372  }
373  }
375  wt1_ = nrn_wtime() - wt;
376 }
377 
378 void nrn_spike_exchange_compressed(NrnThread* nt) {
379  if (!active_) {
380  return;
381  }
382 #if TBUFSIZE
383  nrnmpi_barrier();
384 #endif
385 
386  assert(nout < 0x10000);
387  spikeout_fixed[1] = (unsigned char) (nout & 0xff);
388  spikeout_fixed[0] = (unsigned char) (nout >> 8);
389 
390  double wt = nrn_wtime();
391 
392  int n = nrnmpi_spike_exchange_compressed(localgid_size_,
393  spfixin_ovfl_,
394  ag_send_nspike,
395  nrnmpi_nin_,
396  ovfl_capacity,
397  spikeout_fixed,
398  ag_send_size,
399  spikein_fixed,
400  ovfl);
401  wt_ = nrn_wtime() - wt;
402  wt = nrn_wtime();
403 #if TBUFSIZE
404  tbuf_[itbuf_++] = (unsigned long) nout;
405  tbuf_[itbuf_++] = (unsigned long) n;
406 #endif
407  errno = 0;
408  // if (n > 0) {
409  // printf("%d nrn_spike_exchange sent %d received %d\n", nrnmpi_myid, nout, n);
410  //}
411  nout = 0;
412  idxout_ = 2;
413  if (n == 0) {
414  t_exchange_ = nrn_threads->_t;
415  return;
416  }
417  if (nrn_use_localgid_) {
418  int idxov = 0;
419  for (int i = 0; i < nrnmpi_numprocs; ++i) {
420  int j, nnn;
421  int nn = nrnmpi_nin_[i];
422  if (nn) {
423  if (i == nrnmpi_myid) { // skip but may need to increment idxov.
424  if (nn > ag_send_nspike) {
425  idxov += (nn - ag_send_nspike) * (1 + localgid_size_);
426  }
427  continue;
428  }
429  std::map<int, InputPreSyn*> gps = localmaps[i];
430  if (nn > ag_send_nspike) {
431  nnn = ag_send_nspike;
432  } else {
433  nnn = nn;
434  }
435  int idx = 2 + i * ag_send_size;
436  for (j = 0; j < nnn; ++j) {
437  // order is (firetime,gid) pairs.
438  double firetime = spikein_fixed[idx++] * dt + t_exchange_;
439  int lgid = (int) spikein_fixed[idx];
440  idx += localgid_size_;
441  auto gid2in_it = gps.find(lgid);
442  if (gid2in_it != gps.end()) {
443  InputPreSyn* ps = gid2in_it->second;
444  ps->send(firetime + 1e-10, net_cvode_instance, nt);
445  }
446  }
447  for (; j < nn; ++j) {
448  double firetime = spfixin_ovfl_[idxov++] * dt + t_exchange_;
449  int lgid = (int) spfixin_ovfl_[idxov];
450  idxov += localgid_size_;
451  auto gid2in_it = gps.find(lgid);
452  if (gid2in_it != gps.end()) {
453  InputPreSyn* ps = gid2in_it->second;
454  ps->send(firetime + 1e-10, net_cvode_instance, nt);
455  }
456  }
457  }
458  }
459  } else {
460  for (int i = 0; i < nrnmpi_numprocs; ++i) {
461  int nn = nrnmpi_nin_[i];
462  if (nn > ag_send_nspike) {
463  nn = ag_send_nspike;
464  }
465  int idx = 2 + i * ag_send_size;
466  for (int j = 0; j < nn; ++j) {
467  // order is (firetime,gid) pairs.
468  double firetime = spikein_fixed[idx++] * dt + t_exchange_;
469  int gid = spupk(spikein_fixed + idx);
470  idx += localgid_size_;
471  auto gid2in_it = gid2in.find(gid);
472  if (gid2in_it != gid2in.end()) {
473  InputPreSyn* ps = gid2in_it->second;
474  ps->send(firetime + 1e-10, net_cvode_instance, nt);
475  }
476  }
477  }
478  n = ovfl;
479  int idx = 0;
480  for (int i = 0; i < n; ++i) {
481  double firetime = spfixin_ovfl_[idx++] * dt + t_exchange_;
482  int gid = spupk(spfixin_ovfl_ + idx);
483  idx += localgid_size_;
484  auto gid2in_it = gid2in.find(gid);
485  if (gid2in_it != gid2in.end()) {
486  InputPreSyn* ps = gid2in_it->second;
487  ps->send(firetime + 1e-10, net_cvode_instance, nt);
488  }
489  }
490  }
491  // In case of multiple threads some above ps->send events put
492  // NetCon events into interthread buffers. Some of those may
493  // need to be delivered early enough that the interthread buffers
494  // need transfer to the thread event queues before the next dqueue_bin
495  // while loop in deliver_net_events. So enqueue now...
497  t_exchange_ = nrn_threads->_t;
498  wt1_ = nrn_wtime() - wt;
499 }
500 
501 static void mk_localgid_rep() {
502  // how many gids are there on this machine
503  // and can they be compressed into one byte
504  int ngid = 0;
505  for (const auto& gid2out_elem: gid2out) {
506  if (gid2out_elem.second->output_index_ >= 0) {
507  ++ngid;
508  }
509  }
510 
511  int ngidmax = nrnmpi_int_allmax(ngid);
512  if (ngidmax > 256) {
513  // do not compress
514  return;
515  }
516  localgid_size_ = sizeof(unsigned char);
517  nrn_use_localgid_ = true;
518 
519  // allocate Allgather receive buffer (send is the nrnmpi_myid one)
520  int* rbuf = new int[nrnmpi_numprocs * (ngidmax + 1)];
521  int* sbuf = new int[ngidmax + 1];
522 
523  sbuf[0] = ngid;
524  ++sbuf;
525  ngid = 0;
526  // define the local gid and fill with the gids on this machine
527  for (const auto& gid2out_elem: gid2out) {
528  if (gid2out_elem.second->output_index_ >= 0) {
529  gid2out_elem.second->localgid_ = (unsigned char) ngid;
530  sbuf[ngid] = gid2out_elem.second->output_index_;
531  ++ngid;
532  }
533  }
534  --sbuf;
535 
536  // exchange everything
537  nrnmpi_int_allgather(sbuf, rbuf, ngidmax + 1);
538  delete[] sbuf;
539  errno = 0;
540 
541  // create the maps
542  // there is a lot of potential for efficiency here. i.e. use of
543  // perfect hash functions, or even simple Vectors.
544  localmaps.clear();
545  localmaps.resize(nrnmpi_numprocs);
546 
547  // fill in the maps
548  for (int i = 0; i < nrnmpi_numprocs; ++i)
549  if (i != nrnmpi_myid) {
550  sbuf = rbuf + i * (ngidmax + 1);
551  ngid = *(sbuf++);
552  for (int k = 0; k < ngid; ++k) {
553  auto gid2in_it = gid2in.find(int(sbuf[k]));
554  if (gid2in_it != gid2in.end()) {
555  localmaps[i][k] = gid2in_it->second;
556  }
557  }
558  }
559 
560  // cleanup
561  delete[] rbuf;
562 }
563 
564 #endif // NRNMPI
565 
566 // may stimulate a gid for a cell not owned by this cpu. This allows
567 // us to run single cells or subnets and stimulate exactly according to
568 // their input in a full parallel net simulation.
569 // For some purposes, it may be useful to simulate a spike from a
570 // cell that does exist and would normally send its own spike, eg.
571 // recurrent stimulation. This can be useful in debugging where the
572 // spike raster comes from another implementation and one wants to
573 // get complete control of all input spikes without the confounding
574 // effects of output spikes from the simulated cells. In this case
575 // set the third arg to 1 and set the output cell thresholds very
576 // high so that they do not themselves generate spikes.
577 // Can only be called by thread 0 because of the ps->send.
578 void nrn_fake_fire(int gid, double spiketime, int fake_out) {
579  auto gid2in_it = gid2in.find(gid);
580  if (gid2in_it != gid2in.end()) {
581  InputPreSyn* psi = gid2in_it->second;
582  assert(psi);
583  // printf("nrn_fake_fire %d %g\n", gid, spiketime);
584  psi->send(spiketime, net_cvode_instance, nrn_threads);
585  } else if (fake_out) {
586  std::map<int, PreSyn*>::iterator gid2out_it;
587  gid2out_it = gid2out.find(gid);
588  if (gid2out_it != gid2out.end()) {
589  PreSyn* ps = gid2out_it->second;
590  assert(ps);
591  // printf("nrn_fake_fire fake_out %d %g\n", gid, spiketime);
592  ps->send(spiketime, net_cvode_instance, nrn_threads);
593  }
594  }
595 }
596 
597 static int timeout_ = 0;
598 int nrn_set_timeout(int timeout) {
599  int tt = timeout_;
600  timeout_ = timeout;
601  return tt;
602 }
603 
604 void BBS_netpar_solve(double tstop) {
605  double time = nrn_wtime();
606 
607 #if NRNMPI
609  tstopunset;
610  double mt = dt;
611  double md = mindelay_ - 1e-10;
612  if (md < mt) {
613  if (nrnmpi_myid == 0) {
614  hoc_execerror("mindelay is 0", "(or less than dt for fixed step method)");
615  } else {
616  return;
617  }
618  }
619 
620  nrn_timeout(timeout_);
622  ncs2nrn_integrate(tstop * (1. + 1e-11));
624  nrn_timeout(0);
625  if (!npe_.empty()) {
626  npe_[0].wx_ = npe_[0].ws_ = 0.;
627  };
628  // printf("%d netpar_solve exit t=%g tstop=%g mindelay_=%g\n",nrnmpi_myid, t, tstop,
629  // mindelay_);
630  nrnmpi_barrier();
631  } else
632 #endif
633  {
634  ncs2nrn_integrate(tstop);
635  }
636  tstopunset;
637 
638  if (nrnmpi_myid == 0 && !corenrn_param.is_quiet()) {
639  printf("\nSolver Time : %g\n", nrn_wtime() - time);
640  }
641 }
642 
643 double set_mindelay(double maxdelay) {
644  double mindelay = maxdelay;
645  last_maxstep_arg_ = maxdelay;
646 
647  // if all==1 then minimum delay of all NetCon no matter the source.
648  // except if src in same thread as NetCon
649  int all = (nrn_nthread > 1);
650  // minumum delay of all NetCon having an InputPreSyn source
651 
652  /** we have removed nt_ from PreSyn. Build local map of PreSyn
653  * and NrnThread which will be used to find out if src in same thread as NetCon */
654  std::map<PreSyn*, NrnThread*> presynmap;
655 
656  for (int ith = 0; ith < nrn_nthread; ++ith) {
657  NrnThread& nt = nrn_threads[ith];
658  for (int i = 0; i < nt.n_presyn; ++i) {
659  presynmap[nt.presyns + i] = nrn_threads + ith;
660  }
661  }
662 
663  for (int ith = 0; ith < nrn_nthread; ++ith) {
664  NrnThread& nt = nrn_threads[ith];
665  // if single thread or file transfer then definitely empty.
666  std::vector<int>& negsrcgid_tid = nrnthreads_netcon_negsrcgid_tid[ith];
667  size_t i_tid = 0;
668  for (int i = 0; i < nt.n_netcon; ++i) {
669  NetCon* nc = nt.netcons + i;
670  bool chk = false; // ignore nc.delay_
671  int gid = nrnthreads_netcon_srcgid[ith][i];
672  int tid = ith;
673  if (!negsrcgid_tid.empty() && gid < -1) {
674  tid = negsrcgid_tid[i_tid++];
675  }
676  PreSyn* ps;
677  InputPreSyn* psi;
678  netpar_tid_gid2ps(tid, gid, &ps, &psi);
679  if (psi) {
680  chk = true;
681  } else if (all) {
682  chk = true;
683  // but ignore if src in same thread as NetCon
684  if (ps && presynmap[ps] == &nt) {
685  chk = false;
686  }
687  }
688  if (chk && nc->delay_ < mindelay) {
689  mindelay = nc->delay_;
690  }
691  }
692  }
693 
694 #if NRNMPI
696  active_ = true;
697  if (use_compress_) {
698  if (mindelay / dt > 255) {
699  mindelay = 255 * dt;
700  }
701  }
702 
703  // printf("%d netpar_mindelay local %g now calling nrnmpi_mindelay\n", nrnmpi_myid,
704  // mindelay);
705  // double st = time();
706  mindelay_ = nrnmpi_dbl_allmin(mindelay);
707  // add_wait_time(st);
708  // printf("%d local min=%g global min=%g\n", nrnmpi_myid, mindelay, mindelay_);
709  errno = 0;
710  } else
711 #endif // NRNMPI
712  {
713  mindelay_ = mindelay;
714  }
715  return mindelay_;
716 }
717 
718 /* 08-Nov-2010
719 The workhorse for spike exchange on up to 10K machines is MPI_Allgather
720 but as the number of machines becomes far greater than the fanout per
721 cell we have been exploring a class of exchange methods called multisend
722 where the spikes only go to those machines that need them and there is
723 overlap between communication and computation. The numer of variants of
724 multisend has grown so that some method selection function is needed
725 that makes sense.
726 
727 The situation that needs to be captured by xchng_meth is
728 
729 Allgather
730 multisend implemented as MPI_ISend
731 multisend DCMF (only for Blue Gene/P)
732 multisend record_replay (only for Blue Gene/P with recordreplay_v1r4m2.patch)
733 
734 Note that Allgather allows spike compression and an allgather spike buffer
735  with size chosen at setup time. All methods allow bin queueing.
736 
737 All the multisend methods should allow two phase multisend.
738 
739 Note that, in principle, MPI_ISend allows the source to send the index
740  of the target PreSyn to avoid a hash table lookup (even with a two phase
741  variant)
742 
743 RecordReplay should be best on the BG/P. The whole point is to make the
744 spike transfer initiation as lowcost as possible since that is what causes
745 most load imbalance. I.e. since 10K more spikes arrive than are sent, spikes
746 received per processor per interval are much more statistically
747 balanced than spikes sent per processor per interval. And presently
748 DCMF multisend injects 10000 messages per spike into the network which
749 is quite expensive. record replay avoids this overhead and the idea of
750 two phase multisend distributes the injection.
751 */
752 
753 int nrnmpi_spike_compress(int nspike, bool gid_compress, int xchng_meth) {
754 #if NRNMPI
756 #if NRN_MULTISEND
757  if (xchng_meth > 0) {
758  use_multisend_ = 1;
759  return 0;
760  }
761 #endif
762  nrn_assert(xchng_meth == 0);
763  if (nspike >= 0) {
764  ag_send_nspike = 0;
765  if (spikeout_fixed) {
766  free(spikeout_fixed);
767  spikeout_fixed = nullptr;
768  }
769  if (spikein_fixed) {
770  free(spikein_fixed);
771  spikein_fixed = nullptr;
772  }
773  if (spfixin_ovfl_) {
774  free(spfixin_ovfl_);
775  spfixin_ovfl_ = nullptr;
776  }
777  localmaps.clear();
778  }
779  if (nspike == 0) { // turn off
780  use_compress_ = false;
781  nrn_use_localgid_ = false;
782  } else if (nspike > 0) { // turn on
783  use_compress_ = true;
784  ag_send_nspike = nspike;
785  nrn_use_localgid_ = false;
786  if (gid_compress) {
787  // we can only do this after everything is set up
788  mk_localgid_rep();
789  if (!nrn_use_localgid_ && nrnmpi_myid == 0) {
790  printf(
791  "Notice: gid compression did not succeed. Probably more than 255 cells on "
792  "one "
793  "cpu.\n");
794  }
795  }
796  if (!nrn_use_localgid_) {
797  localgid_size_ = sizeof(unsigned int);
798  }
799  ag_send_size = 2 + ag_send_nspike * (1 + localgid_size_);
800  spfixout_capacity_ = ag_send_size + 50 * (1 + localgid_size_);
801  spikeout_fixed = (unsigned char*) emalloc(spfixout_capacity_);
802  spikein_fixed = (unsigned char*) emalloc(nrnmpi_numprocs * ag_send_size);
803  ovfl_capacity = 100;
804  spfixin_ovfl_ = (unsigned char*) emalloc(ovfl_capacity * (1 + localgid_size_));
805  }
806  return ag_send_nspike;
807  } else
808 #endif
809  {
810  return 0;
811  }
812 }
813 } // namespace coreneuron
coreneuron::NrnThread::netcons
NetCon * netcons
Definition: multicore.hpp:87
coreneuron::nrnmpi_int_allgather
mpi_function< cnrn_make_integral_constant_t(nrnmpi_int_allgather_impl)> nrnmpi_int_allgather
Definition: nrnmpidec.cpp:30
coreneuron::NetCon::delay_
double delay_
Definition: netcon.hpp:50
coreneuron::nrn_use_localgid_
bool nrn_use_localgid_
coreneuron::usable_mindelay_
static double usable_mindelay_
Definition: netpar.cpp:84
coreneuron::nrn2ncs_outputevent
void nrn2ncs_outputevent(int netcon_output_index, double firetime)
multisend.hpp
coreneuron::nrn_spike_exchange
void nrn_spike_exchange(NrnThread *nt)
nrn_spikebuf_size
#define nrn_spikebuf_size
Definition: nrnmpi.h:19
coreneuron::corenrn_parameters::is_quiet
bool is_quiet()
Definition: corenrn_parameters.hpp:109
coreneuron::NetParEvent::deliver
virtual void deliver(double, NetCvode *, NrnThread *) override
Definition: netpar.cpp:121
coreneuron::nrn_nthread
int nrn_nthread
Definition: multicore.cpp:55
OMP_Mutex
Definition: nrnmutdec.hpp:55
utils.hpp
coreneuron::rev_dt
int rev_dt
Definition: register_mech.cpp:23
coreneuron::nrnthreads_netcon_srcgid
std::vector< int * > nrnthreads_netcon_srcgid
Only for setup vector of netcon source gids.
Definition: nrn_setup.cpp:164
coreneuron::nrnmpi_barrier
mpi_function< cnrn_make_integral_constant_t(nrnmpi_barrier_impl)> nrnmpi_barrier
Definition: nrnmpidec.cpp:42
coreneuron::timeout_
static int timeout_
Definition: netpar.cpp:597
coreneuron::nrnmpi_numprocs
int nrnmpi_numprocs
Definition: nrnmpi_def_cinc.cpp:10
coreneuron::NrnThread::_t
double _t
Definition: multicore.hpp:76
nrnoc_aux.hpp
coreneuron::mut
static OMP_Mutex mut
Definition: nrn_setup.cpp:152
coreneuron::NrnThread::presyns
PreSyn * presyns
Definition: multicore.hpp:83
coreneuron::BBS_netpar_solve
void BBS_netpar_solve(double tstop)
Definition: netpar.cpp:604
coreneuron::PreSyn::send
virtual void send(double sendtime, NetCvode *, NrnThread *) override
Definition: netcvode.cpp:409
nrnmpidec.h
netcvode.hpp
coreneuron::hoc_execerror
void hoc_execerror(const char *s1, const char *s2)
Definition: nrnoc_aux.cpp:39
coreneuron::NetCvode::event
TQItem * event(double tdeliver, DiscreteEvent *, NrnThread *)
Definition: netcvode.cpp:216
tstopunset
#define tstopunset
Definition: nrnconf.h:45
coreneuron::NetParEvent::NetParEvent
NetParEvent()
Definition: netpar.cpp:112
profiler_interface.h
coreneuron::use_multisend_
bool use_multisend_
Definition: multisend.cpp:53
coreneuron::ncs2nrn_integrate
void ncs2nrn_integrate(double tstop)
Definition: netcvode.cpp:488
coreneuron::nrnmpi_spike_exchange_compressed
mpi_function< cnrn_make_integral_constant_t(nrnmpi_spike_exchange_compressed_impl)> nrnmpi_spike_exchange_compressed
Definition: nrnmpidec.cpp:27
coreneuron::nrn_set_timeout
int nrn_set_timeout(int timeout)
Definition: netpar.cpp:598
coreneuron
THIS FILE IS AUTO GENERATED DONT MODIFY IT.
Definition: corenrn_parameters.cpp:12
coreneuron::t
double t
Definition: register_mech.cpp:22
corenrn_parameters.hpp
coreneuron::i
int i
Definition: cellorder.cpp:485
coreneuron::nrn_multisend_receive
void nrn_multisend_receive(NrnThread *)
ivocvect.hpp
nrniv_decl.h
coreneuron::PreSyn
Definition: netcon.hpp:104
coreneuron::dt
double dt
Definition: register_mech.cpp:22
coreneuron::NetParEvent::ithread_
int ithread_
Definition: netcon.hpp:151
coreneuron::nrn_multithread_job
void nrn_multithread_job(F &&job, Args &&... args)
Definition: multicore.hpp:161
coreneuron::nrnmpi_spike_compress
int nrnmpi_spike_compress(int nspike, bool gid_compress, int xchng_meth)
Definition: netpar.cpp:753
coreneuron::NetCon
Definition: netcon.hpp:47
coreneuron::NrnThread::_stop_stepping
int _stop_stepping
Definition: multicore.hpp:100
coreneuron::n_multisend_interval
int n_multisend_interval
Definition: multisend.cpp:55
coreneuron::active_
static bool active_
Definition: netpar.cpp:83
coreneuron::nrn_multisend_init
void nrn_multisend_init()
nrnmpi.hpp
coreneuron::NrnThread::n_presyn
int n_presyn
Definition: multicore.hpp:94
coreneuron::alloc_mpi_space
static void alloc_mpi_space()
Allocate space for spikes: 200 structs of {int gid; double time} coming from nrnmpi....
Definition: netpar.cpp:96
coreneuron::gid2in
std::map< int, InputPreSyn * > gid2in
Definition: nrn_setup.cpp:158
coreneuron::mindelay_
static double mindelay_
Definition: netpar.cpp:85
coreneuron::NrnThread
Definition: multicore.hpp:75
coreneuron::nrnthreads_netcon_negsrcgid_tid
std::vector< std::vector< int > > nrnthreads_netcon_negsrcgid_tid
If a nrnthreads_netcon_srcgid is negative, need to determine the thread when in order to use the corr...
Definition: nrn_setup.cpp:168
coreneuron::set_mindelay
double set_mindelay(double maxdelay)
Definition: netpar.cpp:643
netcon.hpp
coreneuron::corenrn_param
corenrn_parameters corenrn_param
Printing method.
Definition: corenrn_parameters.cpp:268
coreneuron::nrn_fake_fire
void nrn_fake_fire(int gid, double spiketime, int fake_out)
Definition: netpar.cpp:578
coreneuron::NRNMPI_Spikebuf::nspike
int nspike
Definition: nrnmpi.h:24
coreneuron::NRNMPI_Spikebuf::spiketime
double spiketime[nrn_spikebuf_size]
Definition: nrnmpi.h:26
coreneuron::NetParEvent::send
virtual void send(double, NetCvode *, NrnThread *) override
Definition: netpar.cpp:117
coreneuron::nrn_threads
NrnThread * nrn_threads
Definition: multicore.cpp:56
coreneuron::nrn_spike_exchange_init
void nrn_spike_exchange_init()
Definition: netpar.cpp:238
coreneuron::netpar_tid_gid2ps
void netpar_tid_gid2ps(int tid, int gid, PreSyn **ps, InputPreSyn **psi)
Definition: nrn_setup.cpp:225
coreneuron::coreneuron::phase
phase
Reading phase number.
Definition: nrn_setup.hpp:53
coreneuron::NRNMPI_Spikebuf::gid
int gid[nrn_spikebuf_size]
Definition: nrnmpi.h:25
coreneuron::NRNMPI_Spikebuf
Definition: nrnmpi.h:23
nrnconf.h
coreneuron::hoc_malchk
void hoc_malchk(void)
Definition: nrnoc_aux.cpp:83
coreneuron::NetCvode::deliver_events
void deliver_events(double til, NrnThread *)
Definition: netcvode.cpp:331
coreneuron::nrnmpi_dbl_allmin
mpi_function< cnrn_make_integral_constant_t(nrnmpi_dbl_allmin_impl)> nrnmpi_dbl_allmin
Definition: nrnmpidec.cpp:38
coreneuron::interthread_enqueue
void interthread_enqueue(NrnThread *nt)
Definition: netcvode.cpp:138
coreneuron::net_cvode_instance
NetCvode * net_cvode_instance
Definition: netcvode.cpp:35
coreneuron::nrn_outputevent
void nrn_outputevent(unsigned char, double)
coreneuron::gid2out
std::map< int, PreSyn * > gid2out
Maps for ouput and input presyns.
Definition: nrn_setup.cpp:157
coreneuron::emalloc
static void * emalloc(size_t size)
Definition: mpispike.cpp:30
multicore.hpp
coreneuron::nrn_wtime
double nrn_wtime()
Definition: utils.cpp:22
coreneuron::InputPreSyn
Definition: netcon.hpp:132
coreneuron::corenrn_parameters_data::mpi_enable
bool mpi_enable
Initialization seed for random number generator (int)
Definition: corenrn_parameters.hpp:59
coreneuron::last_maxstep_arg_
static double last_maxstep_arg_
Definition: netpar.cpp:86
coreneuron::nrnmpi_spike_exchange
mpi_function< cnrn_make_integral_constant_t(nrnmpi_spike_exchange_impl)> nrnmpi_spike_exchange
Definition: nrnmpidec.cpp:24
coreneuron::InputPreSyn::send
virtual void send(double sendtime, NetCvode *, NrnThread *) override
Definition: netcvode.cpp:442
coreneuron::NrnThread::n_netcon
int n_netcon
Definition: multicore.hpp:92
coreneuron::erealloc
void * erealloc(void *ptr, size_t size)
Definition: nrnoc_aux.cpp:94
coreneuron::NetParEvent::pr
virtual void pr(const char *, double t, NetCvode *) override
Definition: netpar.cpp:128
coreneuron::nrnmpi_myid
int nrnmpi_myid
Definition: nrnmpi_def_cinc.cpp:11
coreneuron::npe_
static std::vector< NetParEvent > npe_
Definition: netpar.cpp:87
coreneuron::NetCvode
Definition: netcvode.hpp:59
coreneuron::nrn_need_npe
static bool nrn_need_npe()
Definition: netpar.cpp:221
nrn_assert
#define nrn_assert(x)
assert()-like macro, independent of NDEBUG status
Definition: nrn_assert.h:33
coreneuron::NRNMPI_Spike
Definition: nrnmpi.h:31
nrn_assert.h
nrnmpi.h
coreneuron::nrnmpi_int_allmax
mpi_function< cnrn_make_integral_constant_t(nrnmpi_int_allmax_impl)> nrnmpi_int_allmax
Definition: nrnmpidec.cpp:28