CoreNEURON
output_spikes.cpp
Go to the documentation of this file.
1 /*
2 # =============================================================================
3 # Copyright (c) 2016 - 2021 Blue Brain Project/EPFL
4 #
5 # See top-level LICENSE file for details.
6 # =============================================================================.
7 */
8 
9 #include <iostream>
10 #include <sstream>
11 #include <cstring>
12 #include <stdexcept> // std::lenght_error
13 #include <vector>
14 #include <algorithm>
15 #include <numeric>
16 #include <limits>
17 
18 #include "coreneuron/nrnconf.h"
21 #include "coreneuron/mpi/nrnmpi.h"
27 #ifdef ENABLE_SONATA_REPORTS
28 #include "bbp/sonata/reports.h"
29 #endif // ENABLE_SONATA_REPORTS
30 
31 /**
32  * @brief Return all spike vectors to NEURON
33  *
34  * @param spiketvec - vector of spikes at the end of CORENEURON simulation
35  * @param spikegidvec - vector of gids at the end of CORENEURON simulation
36  * @return true if we are in embedded_run and NEURON has successfully retrieved the vectors
37  */
38 static bool all_spikes_return(std::vector<double>& spiketvec, std::vector<int>& spikegidvec) {
40  (*nrn2core_all_spike_vectors_return_)(spiketvec, spikegidvec);
41 }
42 
43 namespace coreneuron {
44 /// --> Coreneuron as SpikeBuffer class
45 std::vector<double> spikevec_time;
46 std::vector<int> spikevec_gid;
47 
48 static OMP_Mutex mut;
49 
50 void mk_spikevec_buffer(int sz) {
51  try {
52  spikevec_time.reserve(sz);
53  spikevec_gid.reserve(sz);
54  } catch (const std::length_error& le) {
55  std::cerr << "Lenght error" << le.what() << std::endl;
56  }
57 }
58 
59 void spikevec_lock() {
60  mut.lock();
61 }
62 
64  mut.unlock();
65 }
66 
67 static void local_spikevec_sort(std::vector<double>& isvect,
68  std::vector<int>& isvecg,
69  std::vector<double>& osvect,
70  std::vector<int>& osvecg) {
71  osvect.resize(isvect.size());
72  osvecg.resize(isvecg.size());
73  // first build a permutation vector
74  std::vector<std::size_t> perm(isvect.size());
75  std::iota(perm.begin(), perm.end(), 0);
76  // sort by gid (second predicate first)
77  std::stable_sort(perm.begin(), perm.end(), [&](std::size_t i, std::size_t j) {
78  return isvecg[i] < isvecg[j];
79  });
80  // then sort by time
81  std::stable_sort(perm.begin(), perm.end(), [&](std::size_t i, std::size_t j) {
82  return isvect[i] < isvect[j];
83  });
84  // now apply permutation to time and gid output vectors
85  std::transform(perm.begin(), perm.end(), osvect.begin(), [&](std::size_t i) {
86  return isvect[i];
87  });
88  std::transform(perm.begin(), perm.end(), osvecg.begin(), [&](std::size_t i) {
89  return isvecg[i];
90  });
91 }
92 
93 #if NRNMPI
94 
95 static void sort_spikes(std::vector<double>& spikevec_time, std::vector<int>& spikevec_gid) {
96  double lmin_time = std::numeric_limits<double>::max();
97  double lmax_time = std::numeric_limits<double>::min();
98  if (!spikevec_time.empty()) {
99  lmin_time = *(std::min_element(spikevec_time.begin(), spikevec_time.end()));
100  lmax_time = *(std::max_element(spikevec_time.begin(), spikevec_time.end()));
101  }
102  double min_time = nrnmpi_dbl_allmin(lmin_time);
103  double max_time = nrnmpi_dbl_allmax(lmax_time);
104 
105  // allocate send and receive counts and displacements for MPI_Alltoallv
106  std::vector<int> snd_cnts(nrnmpi_numprocs);
107  std::vector<int> rcv_cnts(nrnmpi_numprocs);
108  std::vector<int> snd_dsps(nrnmpi_numprocs);
109  std::vector<int> rcv_dsps(nrnmpi_numprocs);
110 
111  double bin_t = (max_time - min_time) / nrnmpi_numprocs;
112  bin_t = bin_t ? bin_t : 1;
113  // first find number of spikes in each time window
114  for (const auto& st: spikevec_time) {
115  int idx = (int) (st - min_time) / bin_t;
116  snd_cnts[idx]++;
117  }
118  for (int i = 1; i < nrnmpi_numprocs; i++) {
119  snd_dsps[i] = snd_dsps[i - 1] + snd_cnts[i - 1];
120  }
121 
122  // now let each rank know how many spikes they will receive
123  // and get in turn all the buffer sizes to receive
124  nrnmpi_int_alltoall(&snd_cnts[0], &rcv_cnts[0], 1);
125  for (int i = 1; i < nrnmpi_numprocs; i++) {
126  rcv_dsps[i] = rcv_dsps[i - 1] + rcv_cnts[i - 1];
127  }
128  std::size_t new_sz = 0;
129  for (const auto& r: rcv_cnts) {
130  new_sz += r;
131  }
132  // prepare new sorted vectors
133  std::vector<double> svt_buf(new_sz, 0.0);
134  std::vector<int> svg_buf(new_sz, 0);
135 
136  // now exchange data
138  &snd_cnts[0],
139  &snd_dsps[0],
140  svt_buf.data(),
141  &rcv_cnts[0],
142  &rcv_dsps[0]);
144  &snd_cnts[0],
145  &snd_dsps[0],
146  svg_buf.data(),
147  &rcv_cnts[0],
148  &rcv_dsps[0]);
149 
150  local_spikevec_sort(svt_buf, svg_buf, spikevec_time, spikevec_gid);
151 }
152 
153 #ifdef ENABLE_SONATA_REPORTS
154 /** Split spikevec_time and spikevec_gid by populations
155  * Add spike data with population name and gid offset tolibsonatareport API
156  */
157 void output_spike_populations(const SpikesInfo& spikes_info) {
158  // Write spikes with default population name and offset
159  if (spikes_info.population_info.empty()) {
160  sonata_add_spikes_population("All",
161  0,
162  spikevec_time.data(),
163  spikevec_time.size(),
164  spikevec_gid.data(),
165  spikevec_gid.size());
166  return;
167  }
168  int n_populations = spikes_info.population_info.size();
169  for (int idx = 0; idx < n_populations; idx++) {
170  const auto& curr_pop = spikes_info.population_info[idx];
171  std::string population_name = curr_pop.first;
172  int population_offset = curr_pop.second;
173  int gid_lower = population_offset;
174  int gid_upper = std::numeric_limits<int>::max();
175  if (idx != n_populations - 1) {
176  gid_upper = spikes_info.population_info[idx + 1].second - 1;
177  }
178  std::vector<double> pop_spikevec_time;
179  std::vector<int> pop_spikevec_gid;
180  for (int j = 0; j < spikevec_gid.size(); j++) {
181  if (spikevec_gid[j] >= gid_lower && spikevec_gid[j] <= gid_upper) {
182  pop_spikevec_time.push_back(spikevec_time[j]);
183  pop_spikevec_gid.push_back(spikevec_gid[j]);
184  }
185  }
186  sonata_add_spikes_population(population_name.data(),
187  population_offset,
188  pop_spikevec_time.data(),
189  pop_spikevec_time.size(),
190  pop_spikevec_gid.data(),
191  pop_spikevec_gid.size());
192  }
193 }
194 #endif // ENABLE_SONATA_REPORTS
195 
196 /** Write generated spikes to out.dat using mpi parallel i/o.
197  * \todo : MPI related code should be factored into nrnmpi.c
198  * Check spike record length which is set to 64 chars
199  */
200 static void output_spikes_parallel(const char* outpath, const SpikesInfo& spikes_info) {
201  std::stringstream ss;
202  ss << outpath << "/out.dat";
203  std::string fname = ss.str();
204 
205  // remove if file already exist
206  if (nrnmpi_myid == 0) {
207  remove(fname.c_str());
208  }
209 #ifdef ENABLE_SONATA_REPORTS
210  sonata_create_spikefile(outpath, spikes_info.file_name.data());
211  output_spike_populations(spikes_info);
212  sonata_write_spike_populations();
213  sonata_close_spikefile();
214 #endif // ENABLE_SONATA_REPORTS
215 
216  sort_spikes(spikevec_time, spikevec_gid);
217  nrnmpi_barrier();
218 
219  // each spike record in the file is time + gid (64 chars sufficient)
220  const int SPIKE_RECORD_LEN = 64;
221  size_t num_spikes = spikevec_gid.size();
222  size_t num_bytes = (sizeof(char) * num_spikes * SPIKE_RECORD_LEN);
223  char* spike_data = (char*) malloc(num_bytes);
224 
225  if (spike_data == nullptr) {
226  printf("Error while writing spikes due to memory allocation\n");
227  return;
228  }
229 
230  // empty if no spikes
231  strcpy(spike_data, "");
232 
233  // populate buffer with all spike entries
234  char spike_entry[SPIKE_RECORD_LEN];
235  size_t spike_data_offset = 0;
236  for (size_t i = 0; i < num_spikes; i++) {
237  int spike_entry_chars =
238  snprintf(spike_entry, 64, "%.8g\t%d\n", spikevec_time[i], spikevec_gid[i]);
239  spike_data_offset =
240  strcat_at_pos(spike_data, spike_data_offset, spike_entry, spike_entry_chars);
241  }
242 
243  // calculate offset into global file. note that we don't write
244  // all num_bytes but only "populated" buffer
245  size_t num_chars = strlen(spike_data);
246 
247  nrnmpi_write_file(fname, spike_data, num_chars);
248 
249  free(spike_data);
250 }
251 #endif
252 
253 static void output_spikes_serial(const char* outpath) {
254  std::stringstream ss;
255  ss << outpath << "/out.dat";
256  std::string fname = ss.str();
257 
258  // reserve some space for sorted spikevec buffers
259  std::vector<double> sorted_spikevec_time(spikevec_time.size());
260  std::vector<int> sorted_spikevec_gid(spikevec_gid.size());
261  local_spikevec_sort(spikevec_time, spikevec_gid, sorted_spikevec_time, sorted_spikevec_gid);
262 
263  // remove if file already exist
264  remove(fname.c_str());
265 
266  FILE* f = fopen(fname.c_str(), "w");
267  if (!f && nrnmpi_myid == 0) {
268  std::cout << "WARNING: Could not open file for writing spikes." << std::endl;
269  return;
270  }
271 
272  for (std::size_t i = 0; i < sorted_spikevec_gid.size(); ++i)
273  if (sorted_spikevec_gid[i] > -1)
274  fprintf(f, "%.8g\t%d\n", sorted_spikevec_time[i], sorted_spikevec_gid[i]);
275 
276  fclose(f);
277 }
278 
279 void output_spikes(const char* outpath, const SpikesInfo& spikes_info) {
280  // try to transfer spikes to NEURON. If successfull, don't write out.dat
283  return;
284  }
285 #if NRNMPI
287  output_spikes_parallel(outpath, spikes_info);
288  } else
289 #endif
290  {
291  output_spikes_serial(outpath);
292  }
294 }
295 
297  auto spikevec_time_capacity = spikevec_time.capacity();
298  auto spikevec_gid_capacity = spikevec_gid.capacity();
299  spikevec_time.clear();
300  spikevec_gid.clear();
301  spikevec_time.reserve(spikevec_time_capacity);
302  spikevec_gid.reserve(spikevec_gid_capacity);
303 }
304 
305 void validation(std::vector<std::pair<double, int>>& res) {
306  for (unsigned i = 0; i < spikevec_gid.size(); ++i)
307  if (spikevec_gid[i] > -1)
308  res.push_back(std::make_pair(spikevec_time[i], spikevec_gid[i]));
309 }
310 } // namespace coreneuron
all_spikes_return
static bool all_spikes_return(std::vector< double > &spiketvec, std::vector< int > &spikegidvec)
Return all spike vectors to NEURON.
Definition: output_spikes.cpp:38
coreneuron::nrnmpi_write_file
mpi_function< cnrn_make_integral_constant_t(nrnmpi_write_file_impl)> nrnmpi_write_file
Definition: nrnmpidec.cpp:20
coreneuron::nrnmpi_int_alltoall
mpi_function< cnrn_make_integral_constant_t(nrnmpi_int_alltoall_impl)> nrnmpi_int_alltoall
Definition: nrnmpidec.cpp:32
coreneuron::spikevec_gid
std::vector< int > spikevec_gid
Definition: output_spikes.cpp:46
OMP_Mutex
Definition: nrnmutdec.hpp:55
string_utils.h
Utility functions for strings.
nrn2core_direct.h
coreneuron::local_spikevec_sort
static void local_spikevec_sort(std::vector< double > &isvect, std::vector< int > &isvecg, std::vector< double > &osvect, std::vector< int > &osvecg)
Definition: output_spikes.cpp:67
coreneuron::nrnmpi_barrier
mpi_function< cnrn_make_integral_constant_t(nrnmpi_barrier_impl)> nrnmpi_barrier
Definition: nrnmpidec.cpp:42
coreneuron::nrnmpi_numprocs
int nrnmpi_numprocs
Definition: nrnmpi_def_cinc.cpp:10
coreneuron::mut
static OMP_Mutex mut
Definition: nrn_setup.cpp:152
output_spikes.hpp
nrn2core_all_spike_vectors_return_
int(* nrn2core_all_spike_vectors_return_)(std::vector< double > &spikevec, std::vector< int > &gidvec)
Definition: nrn_setup.cpp:71
nrnmpidec.h
strcat_at_pos
unsigned strcat_at_pos(char *dest, unsigned start_position, char *src, unsigned src_length)
Appends a copy of the source string to the destination string.
Definition: string_utils.cpp:11
coreneuron::nrnmpi_int_alltoallv
mpi_function< cnrn_make_integral_constant_t(nrnmpi_int_alltoallv_impl)> nrnmpi_int_alltoallv
Definition: nrnmpidec.cpp:34
coreneuron
THIS FILE IS AUTO GENERATED DONT MODIFY IT.
Definition: corenrn_parameters.cpp:12
coreneuron::nrnmpi_initialized
mpi_function< cnrn_make_integral_constant_t(nrnmpi_initialized_impl)> nrnmpi_initialized
Definition: nrnmpidec.cpp:50
corenrn_parameters.hpp
coreneuron::i
int i
Definition: cellorder.cpp:485
OMP_Mutex::unlock
void unlock()
Definition: nrnmutdec.hpp:74
coreneuron::clear_spike_vectors
void clear_spike_vectors()
Definition: output_spikes.cpp:296
nrnmpi.hpp
coreneuron::spikevec_time
std::vector< double > spikevec_time
--> Coreneuron as SpikeBuffer class
Definition: output_spikes.cpp:45
coreneuron::corenrn_param
corenrn_parameters corenrn_param
Printing method.
Definition: corenrn_parameters.cpp:268
OMP_Mutex::lock
void lock()
Definition: nrnmutdec.hpp:72
nrnconf.h
coreneuron::output_spikes_serial
static void output_spikes_serial(const char *outpath)
Definition: output_spikes.cpp:253
coreneuron::nrnmpi_dbl_alltoallv
mpi_function< cnrn_make_integral_constant_t(nrnmpi_dbl_alltoallv_impl)> nrnmpi_dbl_alltoallv
Definition: nrnmpidec.cpp:36
coreneuron::validation
void validation(std::vector< std::pair< double, int >> &res)
Definition: output_spikes.cpp:305
coreneuron::nrnmpi_dbl_allmin
mpi_function< cnrn_make_integral_constant_t(nrnmpi_dbl_allmin_impl)> nrnmpi_dbl_allmin
Definition: nrnmpidec.cpp:38
coreneuron::nrnmpi_dbl_allmax
mpi_function< cnrn_make_integral_constant_t(nrnmpi_dbl_allmax_impl)> nrnmpi_dbl_allmax
Definition: nrnmpidec.cpp:40
coreneuron::SpikesInfo
Definition: nrnreport.hpp:42
nrnmutdec.hpp
corenrn_embedded
bool corenrn_embedded
--> Coreneuron
Definition: nrn_setup.cpp:46
coreneuron::corenrn_parameters_data::mpi_enable
bool mpi_enable
Initialization seed for random number generator (int)
Definition: corenrn_parameters.hpp:59
coreneuron::spikevec_unlock
void spikevec_unlock()
Definition: output_spikes.cpp:63
coreneuron::nrnmpi_myid
int nrnmpi_myid
Definition: nrnmpi_def_cinc.cpp:11
coreneuron::output_spikes
void output_spikes(const char *outpath, const SpikesInfo &spikes_info)
Definition: output_spikes.cpp:279
coreneuron::spikevec_lock
void spikevec_lock()
Definition: output_spikes.cpp:59
coreneuron::mk_spikevec_buffer
void mk_spikevec_buffer(int sz)
Definition: output_spikes.cpp:50
nrnmpi.h