CoreNEURON
mpispike.cpp
Go to the documentation of this file.
1 /*
2 # =============================================================================
3 # Copyright (c) 2016 - 2021 Blue Brain Project/EPFL
4 #
5 # See top-level LICENSE file for details.
6 # =============================================================================.
7 */
8 
9 #include "coreneuron/nrnconf.h"
10 /* do not want the redef in the dynamic load case */
12 #include "coreneuron/mpi/nrnmpi.h"
14 #include "nrnmpi.hpp"
17 
18 #include <mpi.h>
19 
20 #include <cstring>
21 
22 namespace coreneuron {
23 extern MPI_Comm nrnmpi_comm;
24 
25 static int np;
26 static int* displs{nullptr};
27 static int* byteovfl{nullptr}; /* for the compressed transfer method */
28 static MPI_Datatype spike_type;
29 
30 static void* emalloc(size_t size) {
31  void* memptr = malloc(size);
32  assert(memptr);
33  return memptr;
34 }
35 
36 // Register type NRNMPI_Spike
38  NRNMPI_Spike s;
39  int block_lengths[2] = {1, 1};
40  MPI_Aint addresses[3];
41 
42  MPI_Get_address(&s, &addresses[0]);
43  MPI_Get_address(&(s.gid), &addresses[1]);
44  MPI_Get_address(&(s.spiketime), &addresses[2]);
45 
46  MPI_Aint displacements[2] = {addresses[1] - addresses[0], addresses[2] - addresses[0]};
47 
48  MPI_Datatype typelist[2] = {MPI_INT, MPI_DOUBLE};
49  MPI_Type_create_struct(2, block_lengths, displacements, typelist, &spike_type);
50  MPI_Type_commit(&spike_type);
51 }
52 
53 #if nrn_spikebuf_size > 0
54 
55 static MPI_Datatype spikebuf_type;
56 
57 // Register type NRNMPI_Spikebuf
58 static void make_spikebuf_type() {
59  NRNMPI_Spikebuf s;
60  int block_lengths[3] = {1, nrn_spikebuf_size, nrn_spikebuf_size};
61  MPI_Datatype typelist[3] = {MPI_INT, MPI_INT, MPI_DOUBLE};
62 
63  MPI_Aint addresses[4];
64  MPI_Get_address(&s, &addresses[0]);
65  MPI_Get_address(&(s.nspike), &addresses[1]);
66  MPI_Get_address(&(s.gid[0]), &addresses[2]);
67  MPI_Get_address(&(s.spiketime[0]), &addresses[3]);
68 
69  MPI_Aint displacements[3] = {addresses[1] - addresses[0],
70  addresses[2] - addresses[0],
71  addresses[3] - addresses[0]};
72 
73  MPI_Type_create_struct(3, block_lengths, displacements, typelist, &spikebuf_type);
74  MPI_Type_commit(&spikebuf_type);
75 }
76 #endif
77 
79  MPI_Barrier(nrnmpi_comm);
80 }
81 
83  NRNMPI_Spike* spikeout,
84  int icapacity,
85  NRNMPI_Spike** spikein,
86  int& ovfl,
87  int nout,
88  NRNMPI_Spikebuf* spbufout,
89  NRNMPI_Spikebuf* spbufin) {
90  nrn_assert(spikein);
91  Instrumentor::phase_begin("spike-exchange");
92 
93  {
94  Instrumentor::phase p("imbalance");
96  }
97 
98  Instrumentor::phase_begin("communication");
99  if (!displs) {
101  displs = (int*) emalloc(np * sizeof(int));
102  displs[0] = 0;
103 #if nrn_spikebuf_size > 0
104  make_spikebuf_type();
105 #endif
106  }
107 #if nrn_spikebuf_size == 0
108  MPI_Allgather(&nout, 1, MPI_INT, nin, 1, MPI_INT, nrnmpi_comm);
109  int n = nin[0];
110  for (int i = 1; i < np; ++i) {
111  displs[i] = n;
112  n += nin[i];
113  }
114  if (n) {
115  if (icapacity < n) {
116  icapacity = n + 10;
117  free(*spikein);
118  *spikein = (NRNMPI_Spike*) emalloc(icapacity * sizeof(NRNMPI_Spike));
119  }
120  MPI_Allgatherv(spikeout, nout, spike_type, *spikein, nin, displs, spike_type, nrnmpi_comm);
121  }
122 #else
123  MPI_Allgather(spbufout, 1, spikebuf_type, spbufin, 1, spikebuf_type, nrnmpi_comm);
124  int novfl = 0;
125  int n = spbufin[0].nspike;
126  if (n > nrn_spikebuf_size) {
127  nin[0] = n - nrn_spikebuf_size;
128  novfl += nin[0];
129  } else {
130  nin[0] = 0;
131  }
132  for (int i = 1; i < np; ++i) {
133  displs[i] = novfl;
134  int n1 = spbufin[i].nspike;
135  n += n1;
136  if (n1 > nrn_spikebuf_size) {
137  nin[i] = n1 - nrn_spikebuf_size;
138  novfl += nin[i];
139  } else {
140  nin[i] = 0;
141  }
142  }
143  if (novfl) {
144  if (icapacity < novfl) {
145  icapacity = novfl + 10;
146  free(*spikein);
147  *spikein = (NRNMPI_Spike*) emalloc(icapacity * sizeof(NRNMPI_Spike));
148  }
149  int n1 = (nout > nrn_spikebuf_size) ? nout - nrn_spikebuf_size : 0;
150  MPI_Allgatherv(spikeout, n1, spike_type, *spikein, nin, displs, spike_type, nrnmpi_comm);
151  }
152  ovfl = novfl;
153 #endif
154  Instrumentor::phase_end("communication");
155  Instrumentor::phase_end("spike-exchange");
156  return n;
157 }
158 
159 /*
160 The compressed spike format is restricted to the fixed step method and is
161 a sequence of unsigned char.
162 nspike = buf[0]*256 + buf[1]
163 a sequence of spiketime, localgid pairs. There are nspike of them.
164  spiketime is relative to the last transfer time in units of dt.
165  note that this requires a mindelay < 256*dt.
166  localgid is an unsigned int, unsigned short,
167  or unsigned char in size depending on the range and thus takes
168  4, 2, or 1 byte respectively. To be machine independent we do our
169  own byte coding. When the localgid range is smaller than the true
170  gid range, the gid->PreSyn are remapped into
171  hostid specific maps. If there are not many holes, i.e just about every
172  spike from a source machine is delivered to some cell on a
173  target machine, then instead of a hash map, a vector is used.
174 The allgather sends the first part of the buf and the allgatherv buffer
175 sends any overflow.
176 */
178  unsigned char*& spfixin_ovfl,
179  int send_nspike,
180  int* nin,
181  int ovfl_capacity,
182  unsigned char* spikeout_fixed,
183  int ag_send_size,
184  unsigned char* spikein_fixed,
185  int& ovfl) {
186  if (!displs) {
188  displs = (int*) emalloc(np * sizeof(int));
189  displs[0] = 0;
190  }
191  if (!byteovfl) {
192  byteovfl = (int*) emalloc(np * sizeof(int));
193  }
194  MPI_Allgather(
195  spikeout_fixed, ag_send_size, MPI_BYTE, spikein_fixed, ag_send_size, MPI_BYTE, nrnmpi_comm);
196  int novfl = 0;
197  int ntot = 0;
198  int bstot = 0;
199  for (int i = 0; i < np; ++i) {
200  displs[i] = bstot;
201  int idx = i * ag_send_size;
202  int n = spikein_fixed[idx++] * 256;
203  n += spikein_fixed[idx++];
204  ntot += n;
205  nin[i] = n;
206  if (n > send_nspike) {
207  int bs = 2 + n * (1 + localgid_size) - ag_send_size;
208  byteovfl[i] = bs;
209  bstot += bs;
210  novfl += n - send_nspike;
211  } else {
212  byteovfl[i] = 0;
213  }
214  }
215  if (novfl) {
216  if (ovfl_capacity < novfl) {
217  ovfl_capacity = novfl + 10;
218  free(spfixin_ovfl);
219  spfixin_ovfl = (unsigned char*) emalloc(ovfl_capacity * (1 + localgid_size) *
220  sizeof(unsigned char));
221  }
222  int bs = byteovfl[nrnmpi_myid_];
223  /*
224  note that the spikeout_fixed buffer is one since the overflow
225  is contiguous to the first part. But the spfixin_ovfl is
226  completely separate from the spikein_fixed since the latter
227  dynamically changes its size during a run.
228  */
229  MPI_Allgatherv(spikeout_fixed + ag_send_size,
230  bs,
231  MPI_BYTE,
232  spfixin_ovfl,
233  byteovfl,
234  displs,
235  MPI_BYTE,
236  nrnmpi_comm);
237  }
238  ovfl = novfl;
239  return ntot;
240 }
241 
243  int result;
244  MPI_Allreduce(&x, &result, 1, MPI_INT, MPI_MAX, nrnmpi_comm);
245  return result;
246 }
247 
248 extern void nrnmpi_int_alltoall_impl(int* s, int* r, int n) {
249  MPI_Alltoall(s, n, MPI_INT, r, n, MPI_INT, nrnmpi_comm);
250 }
251 
252 extern void nrnmpi_int_alltoallv_impl(const int* s,
253  const int* scnt,
254  const int* sdispl,
255  int* r,
256  int* rcnt,
257  int* rdispl) {
258  MPI_Alltoallv(s, scnt, sdispl, MPI_INT, r, rcnt, rdispl, MPI_INT, nrnmpi_comm);
259 }
260 
261 extern void nrnmpi_dbl_alltoallv_impl(double* s,
262  int* scnt,
263  int* sdispl,
264  double* r,
265  int* rcnt,
266  int* rdispl) {
267  MPI_Alltoallv(s, scnt, sdispl, MPI_DOUBLE, r, rcnt, rdispl, MPI_DOUBLE, nrnmpi_comm);
268 }
269 
270 /* following are for the partrans */
271 
272 void nrnmpi_int_allgather_impl(int* s, int* r, int n) {
273  MPI_Allgather(s, n, MPI_INT, r, n, MPI_INT, nrnmpi_comm);
274 }
275 
276 double nrnmpi_dbl_allmin_impl(double x) {
277  double result;
278  MPI_Allreduce(&x, &result, 1, MPI_DOUBLE, MPI_MIN, nrnmpi_comm);
279  return result;
280 }
281 
282 double nrnmpi_dbl_allmax_impl(double x) {
283  double result;
284  MPI_Allreduce(&x, &result, 1, MPI_DOUBLE, MPI_MAX, nrnmpi_comm);
285  return result;
286 }
287 
289  MPI_Barrier(nrnmpi_comm);
290 }
291 
292 double nrnmpi_dbl_allreduce_impl(double x, int type) {
293  double result;
294  MPI_Op tt;
295  if (type == 1) {
296  tt = MPI_SUM;
297  } else if (type == 2) {
298  tt = MPI_MAX;
299  } else {
300  tt = MPI_MIN;
301  }
302  MPI_Allreduce(&x, &result, 1, MPI_DOUBLE, tt, nrnmpi_comm);
303  return result;
304 }
305 
306 void nrnmpi_dbl_allreduce_vec_impl(double* src, double* dest, int cnt, int type) {
307  MPI_Op tt;
308  assert(src != dest);
309  if (type == 1) {
310  tt = MPI_SUM;
311  } else if (type == 2) {
312  tt = MPI_MAX;
313  } else {
314  tt = MPI_MIN;
315  }
316  MPI_Allreduce(src, dest, cnt, MPI_DOUBLE, tt, nrnmpi_comm);
317  return;
318 }
319 
320 void nrnmpi_long_allreduce_vec_impl(long* src, long* dest, int cnt, int type) {
321  MPI_Op tt;
322  assert(src != dest);
323  if (type == 1) {
324  tt = MPI_SUM;
325  } else if (type == 2) {
326  tt = MPI_MAX;
327  } else {
328  tt = MPI_MIN;
329  }
330  MPI_Allreduce(src, dest, cnt, MPI_LONG, tt, nrnmpi_comm);
331  return;
332 }
333 
334 #if NRN_MULTISEND
335 
336 static MPI_Comm multisend_comm;
337 
338 void nrnmpi_multisend_comm_impl() {
339  if (!multisend_comm) {
340  MPI_Comm_dup(MPI_COMM_WORLD, &multisend_comm);
341  }
342 }
343 
344 void nrnmpi_multisend_impl(NRNMPI_Spike* spk, int n, int* hosts) {
345  MPI_Request r;
346  for (int i = 0; i < n; ++i) {
347  MPI_Isend(spk, 1, spike_type, hosts[i], 1, multisend_comm, &r);
348  MPI_Request_free(&r);
349  }
350 }
351 
352 int nrnmpi_multisend_single_advance_impl(NRNMPI_Spike* spk) {
353  int flag = 0;
354  MPI_Status status;
355  MPI_Iprobe(MPI_ANY_SOURCE, 1, multisend_comm, &flag, &status);
356  if (flag) {
357  MPI_Recv(spk, 1, spike_type, MPI_ANY_SOURCE, 1, multisend_comm, &status);
358  }
359  return flag;
360 }
361 
362 int nrnmpi_multisend_conserve_impl(int nsend, int nrecv) {
363  int tcnts[2];
364  tcnts[0] = nsend - nrecv;
365  MPI_Allreduce(tcnts, tcnts + 1, 1, MPI_INT, MPI_SUM, multisend_comm);
366  return tcnts[1];
367 }
368 
369 #endif /*NRN_MULTISEND*/
370 } // namespace coreneuron
coreneuron::NRNMPI_Spike::spiketime
double spiketime
Definition: nrnmpi.h:33
nrn_spikebuf_size
#define nrn_spikebuf_size
Definition: nrnmpi.h:19
coreneuron::nrnmpi_numprocs_
int nrnmpi_numprocs_
Definition: nrnmpi.cpp:25
coreneuron::nrnmpi_dbl_allmin_impl
double nrnmpi_dbl_allmin_impl(double x)
Definition: mpispike.cpp:276
nrnmpidec.h
coreneuron::nrnmpi_barrier_impl
void nrnmpi_barrier_impl()
Definition: mpispike.cpp:288
coreneuron::Instrumentor::phase_begin
static void phase_begin(const char *name)
Definition: profiler_interface.h:308
profiler_interface.h
coreneuron::NRNMPI_Spike::gid
int gid
Definition: nrnmpi.h:32
coreneuron
THIS FILE IS AUTO GENERATED DONT MODIFY IT.
Definition: corenrn_parameters.cpp:12
coreneuron::nrnmpi_spike_exchange_compressed_impl
int nrnmpi_spike_exchange_compressed_impl(int localgid_size, unsigned char *&spfixin_ovfl, int send_nspike, int *nin, int ovfl_capacity, unsigned char *spikeout_fixed, int ag_send_size, unsigned char *spikein_fixed, int &ovfl)
Definition: mpispike.cpp:177
coreneuron::nrnmpi_dbl_allmax_impl
double nrnmpi_dbl_allmax_impl(double x)
Definition: mpispike.cpp:282
coreneuron::i
int i
Definition: cellorder.cpp:485
coreneuron::nrnmpi_spike_initialize
void nrnmpi_spike_initialize()
Definition: mpispike.cpp:37
coreneuron::nrnmpi_myid_
int nrnmpi_myid_
Definition: nrnmpi.cpp:26
coreneuron::nrnmpi_dbl_allreduce_vec_impl
void nrnmpi_dbl_allreduce_vec_impl(double *src, double *dest, int cnt, int type)
Definition: mpispike.cpp:306
coreneuron::nrnmpi_comm
MPI_Comm nrnmpi_comm
Definition: nrnmpi.cpp:24
nrnmpiuse.h
coreneuron::displs
static int * displs
Definition: mpispike.cpp:26
coreneuron::Instrumentor::phase
Definition: profiler_interface.h:289
cnt
#define cnt
Definition: tqueue.hpp:44
coreneuron::nrnmpi_int_alltoall_impl
void nrnmpi_int_alltoall_impl(int *s, int *r, int n)
Definition: mpispike.cpp:248
coreneuron::NRNMPI_Spikebuf::nspike
int nspike
Definition: nrnmpi.h:24
coreneuron::nrnmpi_int_alltoallv_impl
void nrnmpi_int_alltoallv_impl(const int *s, const int *scnt, const int *sdispl, int *r, int *rcnt, int *rdispl)
Definition: mpispike.cpp:252
coreneuron::nrnmpi_long_allreduce_vec_impl
void nrnmpi_long_allreduce_vec_impl(long *src, long *dest, int cnt, int type)
Definition: mpispike.cpp:320
coreneuron::NRNMPI_Spikebuf
Definition: nrnmpi.h:23
nrnconf.h
coreneuron::nrnmpi_spike_exchange_impl
int nrnmpi_spike_exchange_impl(int *nin, NRNMPI_Spike *spikeout, int icapacity, NRNMPI_Spike **spikein, int &ovfl, int nout, NRNMPI_Spikebuf *spbufout, NRNMPI_Spikebuf *spbufin)
Definition: mpispike.cpp:82
coreneuron::nrnmpi_dbl_alltoallv_impl
void nrnmpi_dbl_alltoallv_impl(double *s, int *scnt, int *sdispl, double *r, int *rcnt, int *rdispl)
Definition: mpispike.cpp:261
coreneuron::spike_type
static MPI_Datatype spike_type
Definition: mpispike.cpp:28
coreneuron::emalloc
static void * emalloc(size_t size)
Definition: mpispike.cpp:30
coreneuron::nrnmpi_int_allgather_impl
void nrnmpi_int_allgather_impl(int *s, int *r, int n)
Definition: mpispike.cpp:272
coreneuron::byteovfl
static int * byteovfl
Definition: mpispike.cpp:27
coreneuron::nrnmpi_dbl_allreduce_impl
double nrnmpi_dbl_allreduce_impl(double x, int type)
Definition: mpispike.cpp:292
coreneuron::np
static int np
Definition: mpispike.cpp:25
coreneuron::nrnmpi_int_allmax_impl
int nrnmpi_int_allmax_impl(int x)
Definition: mpispike.cpp:242
nrnmpi.hpp
coreneuron::wait_before_spike_exchange
void wait_before_spike_exchange()
Definition: mpispike.cpp:78
nrn_assert
#define nrn_assert(x)
assert()-like macro, independent of NDEBUG status
Definition: nrn_assert.h:33
coreneuron::NRNMPI_Spike
Definition: nrnmpi.h:31
coreneuron::Instrumentor::phase_end
static void phase_end(const char *name)
Definition: profiler_interface.h:312
nrn_assert.h
nrnmpi.h