59 static int n_xtra_cons_check_;
62 static int xtra_cons_hist_[MAXNCONS + 1];
74 static unsigned long enq2_find_time_;
75 static unsigned long enq2_enqueue_time_;
78 #define PHASE2BUFFER_SIZE 2048 // power of 2
79 #define PHASE2BUFFER_MASK (PHASE2BUFFER_SIZE - 1)
86 #define MULTISEND_RECEIVEBUFFER_SIZE 10000
87 class Multisend_ReceiveBuffer {
89 Multisend_ReceiveBuffer();
90 virtual ~Multisend_ReceiveBuffer();
92 void incoming(
int gid,
double spiketime);
95 int size_{MULTISEND_RECEIVEBUFFER_SIZE};
99 int nsend_{}, nrecv_{};
101 NRNMPI_Spike** buffer_{};
105 InputPreSyn** psbuf_{};
110 int phase2_nsend_cell_{}, phase2_nsend_{};
111 Phase2Buffer* phase2_buffer_{};
114 #define MULTISEND_INTERVAL 2
115 static Multisend_ReceiveBuffer* multisend_receive_buffer[MULTISEND_INTERVAL];
116 static int current_rbuf, next_rbuf;
117 #if MULTISEND_INTERVAL == 2
122 static int* targets_phase1_;
123 static int* targets_phase2_;
126 int i = ps->multisend_index_;
130 int* ranks = targets_phase1_ +
i;
132 int cnt_phase1 = ranks[1];
135 spk.gid = ps->output_index_;
137 if (next_rbuf == 1) {
141 multisend_receive_buffer[next_rbuf]->nsend_ +=
cnt;
142 multisend_receive_buffer[next_rbuf]->nsend_cell_ += 1;
143 nrnmpi_multisend(&spk, cnt_phase1, ranks);
150 static void multisend_send_phase2(InputPreSyn* ps,
int gid,
double t) {
151 int i = ps->multisend_phase2_index_;
154 int* ranks = targets_phase2_ +
i;
155 int cnt_phase2 = ranks[0];
160 nrnmpi_multisend(&spk, cnt_phase2, ranks);
163 Multisend_ReceiveBuffer::Multisend_ReceiveBuffer()
165 new NRNMPI_Spike*[size_]
169 new InputPreSyn*[size_]
172 , phase2_buffer_{
new Phase2Buffer[PHASE2BUFFER_SIZE]} {}
174 Multisend_ReceiveBuffer::~Multisend_ReceiveBuffer() {
176 for (
int i = 0;
i < count_; ++
i) {
182 delete[] phase2_buffer_;
184 void Multisend_ReceiveBuffer::init(
int index) {
186 nsend_cell_ = nsend_ = nrecv_ = maxcount_ = 0;
188 for (
int i = 0;
i < count_; ++
i) {
193 phase2_head_ = phase2_tail_ = 0;
194 phase2_nsend_cell_ = phase2_nsend_ = 0;
196 void Multisend_ReceiveBuffer::incoming(
int gid,
double spiketime) {
201 if (count_ >= size_) {
203 NRNMPI_Spike** newbuf =
new NRNMPI_Spike*[size_];
204 for (
int i = 0;
i < count_; ++
i) {
205 newbuf[
i] = buffer_[
i];
211 psbuf_ =
new InputPreSyn*[size_];
214 NRNMPI_Spike* spk =
new NRNMPI_Spike();
216 spk->spiketime = spiketime;
217 buffer_[count_++] = spk;
218 if (maxcount_ < count_) {
225 void Multisend_ReceiveBuffer::enqueue() {
231 for (
int i = 0;
i < count_; ++
i) {
232 NRNMPI_Spike* spk = buffer_[
i];
234 auto gid2in_it =
gid2in.find(spk->gid);
235 assert(gid2in_it !=
gid2in.end());
236 InputPreSyn* ps = gid2in_it->second;
238 if (
use_phase2_ && ps->multisend_phase2_index_ >= 0) {
239 Phase2Buffer& pb = phase2_buffer_[phase2_head_++];
240 phase2_head_ &= PHASE2BUFFER_MASK;
241 assert(phase2_head_ != phase2_tail_);
243 pb.spiketime = spk->spiketime;
261 void Multisend_ReceiveBuffer::enqueue1() {
266 for (
int i = 0;
i < count_; ++
i) {
267 NRNMPI_Spike* spk = buffer_[
i];
269 auto gid2in_it =
gid2in.find(spk->gid);
270 assert(gid2in_it !=
gid2in.end());
271 InputPreSyn* ps = gid2in_it->second;
273 if (
use_phase2_ && ps->multisend_phase2_index_ >= 0) {
274 Phase2Buffer& pb = phase2_buffer_[phase2_head_++];
275 phase2_head_ &= PHASE2BUFFER_MASK;
276 assert(phase2_head_ != phase2_tail_);
278 pb.spiketime = spk->spiketime;
286 void Multisend_ReceiveBuffer::enqueue2() {
291 for (
int i = 0;
i < count_; ++
i) {
292 NRNMPI_Spike* spk = buffer_[
i];
293 InputPreSyn* ps = psbuf_[
i];
304 void Multisend_ReceiveBuffer::phase2send() {
305 while (phase2_head_ != phase2_tail_) {
306 Phase2Buffer& pb = phase2_buffer_[phase2_tail_++];
307 phase2_tail_ &= PHASE2BUFFER_MASK;
312 multisend_send_phase2(pb.ps, gid, pb.spiketime);
316 static int max_ntarget_host;
320 static int max_multisend_targets;
324 multisend_receive_buffer[
i]->init(
i);
329 enq2_find_time_ = enq2_enqueue_time_ = 0;
331 n_xtra_cons_check_ = 0;
333 for (
int i = 0;
i <= MAXNCONS; ++
i) {
334 xtra_cons_hist_[
i] = 0;
339 static int multisend_advance() {
342 while (nrnmpi_multisend_single_advance(&spk)) {
345 #if MULTISEND_INTERVAL == 2
351 multisend_receive_buffer[j]->incoming(spk.gid, spk.spiketime);
361 multisend_receive_buffer[current_rbuf]->enqueue();
372 int& s = multisend_receive_buffer[current_rbuf]->nsend_;
373 int& r = multisend_receive_buffer[current_rbuf]->nrecv_;
375 #if NRN_MULTISEND & 1
382 while (nrnmpi_multisend_conserve(s, r) != 0) {
392 multisend_receive_buffer[current_rbuf]->enqueue();
395 multisend_receive_buffer[current_rbuf]->enqueue1();
396 multisend_receive_buffer[current_rbuf]->enqueue2();
399 multisend_receive_buffer[current_rbuf]->enqueue();
400 s = r = multisend_receive_buffer[current_rbuf]->nsend_cell_ = 0;
402 multisend_receive_buffer[current_rbuf]->phase2_nsend_cell_ = 0;
403 multisend_receive_buffer[current_rbuf]->phase2_nsend_ = 0;
406 enq2_enqueue_time_ = 0;
407 #endif // ENQUEUE == 2
410 #if MULTISEND_INTERVAL == 2
413 current_rbuf = next_rbuf;
414 next_rbuf = ((next_rbuf + 1) & 1);
420 if (targets_phase1_) {
421 delete[] targets_phase1_;
422 targets_phase1_ =
nullptr;
425 if (targets_phase2_) {
426 delete[] targets_phase2_;
427 targets_phase2_ =
nullptr;
438 nrnmpi_multisend_comm();
449 max_ntarget_host = 0;
450 max_multisend_targets = 0;
455 if (!multisend_receive_buffer[0]) {
456 multisend_receive_buffer[0] =
new Multisend_ReceiveBuffer();
458 #if MULTISEND_INTERVAL == 2
460 multisend_receive_buffer[1] =
new Multisend_ReceiveBuffer();
464 #endif // NRN_MULTISEND