openCARP
Doxygen code documentation for the open cardiac electrophysiology simulator openCARP
async_io.cc
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // openCARP is an open cardiac electrophysiology simulator.
3 //
4 // Copyright (C) 2022 openCARP project
5 //
6 // This program is licensed under the openCARP Academic Public License (APL)
7 // v1.0: You can use and redistribute it and/or modify it in non-commercial
8 // academic environments under the terms of APL as published by the openCARP
9 // project v1.0, or (at your option) any later version. Commercial use requires
10 // a commercial license (info@opencarp.org).
11 //
12 // This program is distributed without any warranty; see the openCARP APL for
13 // more details.
14 //
15 // You should have received a copy of the openCARP APL along with this program
16 // and can find it online: http://www.opencarp.org/license
17 // ----------------------------------------------------------------------------
18 
27 #include "basics.h"
28 #include "sim_utils.h"
29 #include "fem.h"
30 #include "physics.h"
31 #include "async_io.h"
32 
33 namespace opencarp {
34 namespace async {
35 
37 {
38  int rank = get_rank();
39 
40 #if 0
41  int size = get_size();
42 
43  for(int pid = 0; pid < size; pid++) {
44  if(rank == pid)
45  printf("IO rank %d / %d: polling ..\n", rank+1, size);
46 
47  MPI_Barrier(PETSC_COMM_WORLD);
48  }
49 #endif
50 
51  bool do_continue = true;
52  int cmd_buff = 0;
53  MPI_Status status;
54 
55  while(do_continue) {
56  if(!rank)
57  MPI_Recv(&cmd_buff, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm, &status);
58 
59  MPI_Bcast(&cmd_buff, 1, MPI_INT, 0, PETSC_COMM_WORLD);
60 
61  switch(cmd_buff) {
62  case ASYNC_CMD_EXIT:
63  do_continue = false;
64  break;
65 
67  IO_register_output(io_queue);
68  break;
69 
70  case ASYNC_CMD_OUTPUT:
71  IO_do_output(io_queue);
72  break;
73 
74  default: break;
75  }
76  }
77 
78  for(IGBheader* igb : io_queue.IGBs) {
79  if(igb->fileptr()) fclose((FILE*)igb->fileptr());
80  delete igb;
81  }
82 
83 #if 0
84  for(int pid = 0; pid < size; pid++) {
85  if(rank == pid)
86  printf("IO rank %d / %d: exiting ..\n", rank+1, size);
87 
88  MPI_Barrier(PETSC_COMM_WORLD);
89  }
90 #endif
91 }
92 
95 {
96  int rank = get_rank();
97 
98  if(!rank) {
99  int msg = ASYNC_CMD_EXIT;
100  MPI_Send(&msg, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm);
101  }
102 }
103 
105  const int dpn,
106  const char* name,
107  const char* units)
108 {
109  intercomm_layout il;
111 
112  long int loc_size = idx.size();
113  SF::vector<long int> layout;
114  layout_from_count(loc_size, layout, PETSC_COMM_WORLD);
115 
116  if(il.loc_rank == 0) {
117  char header[2048];
118 
120  const int num_io = tm.timers[iotm_spacedt]->numIOs;
121  const double dimt = tm.end - tm.start;
122 
123  snprintf(header, sizeof header, "%d %d %lf %s %s", dpn, num_io, dimt, name, units);
124 
125  int msg = ASYNC_CMD_REGISTER_OUTPUT;
126  MPI_Send(&msg, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm);
127 
128  MPI_Send(header, 2048, MPI_CHAR, 0, ASYNC_TAG, user_globals::IO_Intercomm);
129  MPI_Send(layout.data(), layout.size(), MPI_LONG, 0, ASYNC_TAG, user_globals::IO_Intercomm);
130  }
131 
132  MPI_Barrier(PETSC_COMM_WORLD);
133 
134  int receive_rank = COMPUTE_get_receive_rank(il);
135  MPI_Send(idx.data(), idx.size()*sizeof(mesh_int_t), MPI_BYTE, receive_rank,
137 
138  int id = -1;
139 
140  if(il.loc_rank == 0) {
141  MPI_Status status;
142  MPI_Recv(&id, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm, &status);
143  }
144 
145  MPI_Bcast(&id, 1, MPI_INT, 0, PETSC_COMM_WORLD);
146  return id;
147 }
148 
150  SF::commgraph<size_t> & grph,
151  SF::vector<mesh_int_t> & perm_before_comm,
152  SF::vector<mesh_int_t> & perm_after_comm)
153 {
154  int size = get_size(), rank = get_rank();
155  SF::vector<mesh_int_t> idx(inp_idx);
156 
157  // determine global min and max indices
158  mesh_int_t gmax = SF::global_max(idx, PETSC_COMM_WORLD);
159  mesh_int_t gmin = SF::global_min(idx, PETSC_COMM_WORLD);
160 
161  // block size
162  mesh_int_t bsize = (gmax - gmin) / size + 1;
163 
164  // distribute tuples uniquely and linearly ascending across the ranks ----------------
165  SF::vector<mesh_int_t> dest(idx.size());
166  SF::interval(perm_before_comm, 0, idx.size());
167 
168  // find a destination for every tuple
169  for(size_t i=0; i<dest.size(); i++)
170  dest[i] = (idx[i] - gmin) / bsize;
171 
172  // find permutation to sort tuples in the send buffer
173  binary_sort_copy(dest, perm_before_comm);
174 
175  // fill send buffer
176  SF::vector<mesh_int_t> snd_idx(idx.size());
177  for(size_t i=0; i<perm_before_comm.size(); i++)
178  snd_idx[i] = idx[perm_before_comm[i]];
179 
180  // communicate
181  grph.configure(dest, PETSC_COMM_WORLD);
182 
183  size_t rsize = sum(grph.rcnt);
184  SF::vector<mesh_int_t> recv_idx(rsize);
185 
186  MPI_Exchange(grph, snd_idx, recv_idx, PETSC_COMM_WORLD);
187 
188  // sort the received values locally
189  SF::interval(perm_after_comm, 0, recv_idx.size());
190  binary_sort_copy(recv_idx, perm_after_comm);
191 }
192 
193 
194 IGBheader* IO_open_igb(const int numIOs, const double dimt,
195  const size_t gsize, const int dpn,
196  const char* name, const char* units)
197 {
198  IGBheader* pigb = new IGBheader();
199  IGBheader & igb = *pigb;
200  int err = 0;
201 
202  igb.x(gsize / dpn);
203  igb.dim_x(igb.x()-1);
204  igb.inc_x(1);
205 
206  igb.y(1); igb.z(1);
207  igb.t(numIOs);
208  igb.dim_t(dimt);
209 
210  switch(dpn) {
211  default:
212  case 1: igb.type(IGB_FLOAT); break;
213  case 3: igb.type(IGB_VEC3_f); break;
214  case 4: igb.type(IGB_VEC4_f); break;
215  case 9: igb.type(IGB_VEC9_f); break;
216  }
217 
218  igb.unites_x("um"); igb.unites_y("um"); igb.unites_z("um");
219  igb.unites_t("ms");
220  igb.unites(units);
221 
222  igb.inc_t(param_globals::spacedt);
223 
224  if(get_rank() == 0) {
225  FILE_SPEC file = f_open(name, "wb");
226  if(file != NULL) {
227  igb.fileptr(file->fd);
228  igb.write();
229  delete file;
230  }
231  else err++;
232  }
233 
234  err = get_global(err, MPI_SUM);
235  if(err) {
236  log_msg(0,5,0, "%s error: Could not set up data output! Aborting!", __func__);
237  EXIT(1);
238  }
239 
240  return pigb;
241 }
242 
244 {
245  set_dir(OUTPUT);
246 
247  intercomm_layout il;
249 
250  char header[2048];
251  SF::vector<long int> data_layout(il.rem_size+1);
252  MPI_Status status;
253 
254  if(il.loc_rank == 0) {
255  MPI_Recv(header, 2048, MPI_CHAR, 0, ASYNC_TAG, user_globals::IO_Intercomm, &status);
256  MPI_Recv(data_layout.data(), data_layout.size(), MPI_LONG, 0, ASYNC_TAG, user_globals::IO_Intercomm, &status);
257  }
258 
259  MPI_Bcast(header, 2048, MPI_CHAR, 0, PETSC_COMM_WORLD);
260  MPI_Bcast(data_layout.data(), data_layout.size(), MPI_LONG, 0, PETSC_COMM_WORLD);
261 
262  int dpn = 0, numIOs = 0;
263  double dimt = 0.0;
264 
265  char name_str[2048], units_str[2048];
266  sscanf(header, "%d %d %lf %s %s", &dpn, &numIOs, &dimt, name_str, units_str);
267 
268  size_t gsize = data_layout[data_layout.size()-1];
269  IGBheader* igb = IO_open_igb(numIOs, dimt, gsize, dpn, name_str, units_str);
270 
271  SF::vector<int> senders;
272  IO_get_sender_ranks(il, senders);
273 
274  size_t num_recv = 0;
275  for(int s : senders)
276  num_recv += data_layout[s+1] - data_layout[s];
277 
278  SF::vector<mesh_int_t> idx_buff(num_recv);
279 
280  SF::vector<MPI_Request> req (senders.size());
281  SF::vector<MPI_Status> stat(senders.size());
282 
283  for(size_t i=0, dsp=0; i<senders.size(); i++) {
284  int send_rank = senders[i];
285  size_t send_size = data_layout[send_rank+1] - data_layout[send_rank];
286  MPI_Irecv(idx_buff.data() + dsp, send_size*sizeof(mesh_int_t), MPI_BYTE, send_rank,
287  ASYNC_TAG, user_globals::IO_Intercomm, req.data()+i);
288  dsp += send_size;
289  }
290 
291  MPI_Waitall(senders.size(), req.data(), stat.data());
292 
294  SF::vector<mesh_int_t> pafter, pbefore;
295  IO_prepare_sort(idx_buff, cg, pbefore, pafter);
296 
297  int id = io_queue.add(igb, data_layout, cg, pbefore, pafter);
298 
299  if(il.loc_rank == 0)
300  MPI_Send(&id, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm);
301 }
302 
305 {
306  SF::vector<int> remote_dist;
307  SF::divide(il.loc_size, il.rem_size, remote_dist);
308 
309  int remote_idx = 0;
310  int rank_cnt = remote_dist[remote_idx];
311 
312  while(remote_idx < int(remote_dist.size()) && il.loc_rank >= rank_cnt) {
313  remote_idx++;
314  rank_cnt += remote_dist[remote_idx];
315  }
316 
317  return remote_idx;
318 }
321 {
322  SF::vector<int> recv_cnt, recv_dsp, recv_ranks;
323  SF::divide(il.rem_size, il.loc_size, recv_cnt);
324  SF::dsp_from_cnt(recv_cnt, recv_dsp);
325 
326  sender.resize(0);
327  sender.reserve(recv_cnt[il.loc_rank]);
328 
329  int start = recv_dsp[il.loc_rank], stop = recv_dsp[il.loc_rank+1];
330  for(int i=start; i<stop; i++)
331  sender.push_back(i);
332 }
333 
335  const SF::vector<mesh_int_t> & perm_b,
336  const SF::vector<mesh_int_t> & perm_a,
338 {
339  SF::vector<float> snd_data(data.size());
340  for(size_t i=0; i<perm_b.size(); i++)
341  snd_data[i] = data[perm_b[i]];
342 
343  SF::vector<float> recv_data(perm_a.size());
344  SF::MPI_Exchange(cg, snd_data, recv_data, PETSC_COMM_WORLD);
345 
346  data.resize(perm_a.size());
347  for(size_t i=0; i<perm_a.size(); i++)
348  data[i] = recv_data[perm_a[i]];
349 }
350 
351 void IO_do_output(async_IO_queue & io_queue)
352 {
353  intercomm_layout il;
355 
356  MPI_Status status;
357  int id = -1;
358 
359  if(il.loc_rank == 0)
360  MPI_Recv(&id, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm, &status);
361  MPI_Bcast(&id, 1, MPI_INT, 0, PETSC_COMM_WORLD);
362 
363  assert(id > -1 && id < int(io_queue.IGBs.size()));
364 
365  IGBheader* igb = io_queue.IGBs[id];
366  SF::vector<long int> & layout = io_queue.layouts[id];
367  SF::commgraph<size_t> & cg = io_queue.cg[id];
368  SF::vector<mesh_int_t> & perm_b = io_queue.perm_b[id];
369  SF::vector<mesh_int_t> & perm_a = io_queue.perm_a[id];
370 
371  SF::vector<int> senders;
372  IO_get_sender_ranks(il, senders);
373 
374  size_t num_recv = 0;
375  for(int s : senders)
376  num_recv += layout[s+1] - layout[s];
377 
378  SF::vector<float> buff(num_recv);
379  SF::vector<MPI_Request> req (senders.size());
380  SF::vector<MPI_Status> stat(senders.size());
381 
382  for(size_t i=0, dsp=0; i<senders.size(); i++) {
383  int send_rank = senders[i];
384  size_t send_size = layout[send_rank+1] - layout[send_rank];
385 
386  MPI_Irecv(buff.data() + dsp, send_size, MPI_FLOAT, send_rank,
387  ASYNC_TAG, user_globals::IO_Intercomm, req.data()+i);
388  dsp += send_size;
389  }
390 
391  MPI_Waitall(senders.size(), req.data(), stat.data());
392  IO_sort_data(buff, perm_b, perm_a, cg);
393  SF::root_write((FILE*)igb->fileptr(), buff, PETSC_COMM_WORLD);
394 }
395 
396 void COMPUTE_do_output(SF_real* dat, const int lsize, const int IO_id)
397 {
398  intercomm_layout il;
400 
401  if(il.loc_rank == 0) {
402  int msg = ASYNC_CMD_OUTPUT;
403  MPI_Send(&msg, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm);
404 
405  msg = IO_id;
406  MPI_Send(&msg, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm);
407  }
408 
409  int receive_rank = COMPUTE_get_receive_rank(il);
410  SF::vector<float> data;
411  data.assign(dat, dat+lsize);
412 
413  MPI_Send(data.data(), data.size(), MPI_FLOAT, receive_rank, ASYNC_TAG, user_globals::IO_Intercomm);
414 }
415 
416 void COMPUTE_do_output(SF_real* dat, const SF::vector<mesh_int_t> & idx, const int IO_id)
417 {
418  intercomm_layout il;
420 
421  if(il.loc_rank == 0) {
422  int msg = ASYNC_CMD_OUTPUT;
423  MPI_Send(&msg, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm);
424 
425  msg = IO_id;
426  MPI_Send(&msg, 1, MPI_INT, 0, ASYNC_TAG, user_globals::IO_Intercomm);
427  }
428 
429  int receive_rank = COMPUTE_get_receive_rank(il);
430 
431  SF::vector<float> data;
432  data.resize(idx.size()); data.resize(0);
433 
434  for(mesh_int_t ii : idx)
435  data.push_back(dat[ii]);
436 
437  MPI_Send(data.data(), data.size(), MPI_FLOAT, receive_rank, ASYNC_TAG, user_globals::IO_Intercomm);
438 }
439 
440 
441 }}
minimal information needed for communication between MPI_Comms
Definition: async_io.h:40
#define ASYNC_TAG
Definition: async_io.h:33
T global_min(const vector< T > &vec, MPI_Comm comm)
Compute the global minimum of a distributed vector.
Definition: SF_network.h:126
int COMPUTE_get_receive_rank(const intercomm_layout &il)
get the IO node rank that will receive our data chunk
Definition: async_io.cc:304
#define ASYNC_CMD_EXIT
Definition: async_io.h:35
void IO_register_output(async_IO_queue &io_queue)
Definition: async_io.cc:243
void dim_t(float a)
Definition: IGBheader.h:333
void layout_from_count(const T count, vector< T > &layout, MPI_Comm comm)
Definition: SF_network.h:201
#define IGB_FLOAT
Definition: IGBheader.h:60
void MPI_Exchange(commgraph< T > &grph, vector< S > &send, vector< S > &recv, MPI_Comm comm)
Exchange data in parallel over MPI.
Definition: SF_network.h:47
#define ASYNC_CMD_OUTPUT
Definition: async_io.h:37
#define IGB_VEC9_f
Definition: IGBheader.h:76
void IO_sort_data(SF::vector< float > &data, const SF::vector< mesh_int_t > &perm_b, const SF::vector< mesh_int_t > &perm_a, SF::commgraph< size_t > &cg)
Definition: async_io.cc:334
void unites_t(const char *a)
Definition: IGBheader.h:354
void interval(vector< T > &vec, size_t start, size_t end)
Create an integer interval between start and end.
Definition: SF_vector.h:350
void IO_do_output(async_IO_queue &io_queue)
Definition: async_io.cc:351
#define IGB_VEC3_f
Definition: IGBheader.h:71
timer_manager * tm_manager
a manager for the various physics timers
Definition: main.cc:52
void divide(const size_t gsize, const size_t num_parts, vector< T > &loc_sizes)
divide gsize into num_parts local parts with even distribution of the remainder
Definition: SF_vector.h:358
void COMPUTE_send_exit_flag()
this function sends the exit flag from a compute node to an io node.
Definition: async_io.cc:94
void IO_poll_for_output(async_IO_queue &io_queue)
Definition: async_io.cc:36
T * data()
Pointer to the vector&#39;s start.
Definition: SF_vector.h:91
T & push_back(T val)
Definition: SF_vector.h:283
SF::vector< SF::vector< mesh_int_t > > perm_a
permutation after MPI_Exchange
Definition: async_io.h:60
int loc_rank
the local rank
Definition: async_io.h:41
double end
final time
Definition: timer_utils.h:81
void inc_t(float a)
Definition: IGBheader.h:321
#define ASYNC_CMD_REGISTER_OUTPUT
Definition: async_io.h:36
SF::vector< SF::vector< mesh_int_t > > perm_b
permutation before MPI_Exchange
Definition: async_io.h:59
void COMPUTE_do_output(SF_real *dat, const int lsize, const int IO_id)
Definition: async_io.cc:396
SF::vector< SF::vector< long int > > layouts
data layouts
Definition: async_io.h:57
int mesh_int_t
Definition: SF_container.h:46
void unites(const char *a)
Definition: IGBheader.h:357
int loc_size
the local communicator size
Definition: async_io.h:42
void fileptr(gzFile f)
Definition: IGBheader.cc:336
void unites_z(const char *a)
Definition: IGBheader.h:351
Async IO functions.
int COMPUTE_register_output(const SF::vector< mesh_int_t > &idx, const int dpn, const char *name, const char *units)
Definition: async_io.cc:104
Top-level header of FEM module.
void unites_x(const char *a)
Definition: IGBheader.h:345
void dim_x(float a)
Definition: IGBheader.h:324
int rem_size
the remote communicator size
Definition: async_io.h:43
int add(IGBheader *igb, const SF::vector< long int > &lt, const SF::commgraph< size_t > &c, const SF::vector< mesh_int_t > &pb, const SF::vector< mesh_int_t > &pa)
add one slice of IO info to the queue
Definition: async_io.h:63
void inc_x(float a)
Definition: IGBheader.h:312
SF::vector< SF::commgraph< size_t > > cg
commgraphs for MPI_Exchange
Definition: async_io.h:58
void IO_get_sender_ranks(const intercomm_layout &il, SF::vector< int > &sender)
get the compute node ranks that will send their data chunk to us
Definition: async_io.cc:320
void setup(MPI_Comm ic)
setup routine for the members
Definition: async_io.h:46
void configure(const vector< V > &dest, MPI_Comm comm)
Set up the communication graph.
Definition: SF_container.h:667
T sum(const vector< T > &vec)
Compute sum of a vector&#39;s entries.
Definition: SF_vector.h:340
T get_global(T in, MPI_Op OP, MPI_Comm comm=PETSC_COMM_WORLD)
Do a global reduction on a variable.
Definition: basics.h:233
MPI_Comm IO_Intercomm
Communicator between IO and compute worlds.
Definition: main.cc:60
IGBheader * IO_open_igb(const int numIOs, const double dimt, const size_t gsize, const int dpn, const char *name, const char *units)
Definition: async_io.cc:194
void binary_sort_copy(vector< T > &_V, vector< S > &_W)
Definition: SF_sort.h:296
File descriptor struct.
Definition: basics.h:133
int set_dir(IO_t dest)
Definition: sim_utils.cc:446
T global_max(const vector< T > &vec, MPI_Comm comm)
Compute the global maximum of a distributed vector.
Definition: SF_network.h:156
size_t size() const
The current size of the vector.
Definition: SF_vector.h:104
Top-level header of physics module.
void log_msg(FILE_SPEC out, int level, unsigned char flag, const char *fmt,...)
Definition: basics.cc:72
std::vector< base_timer * > timers
vector containing individual timers
Definition: timer_utils.h:84
void reserve(size_t n)
Definition: SF_vector.h:241
void assign(InputIterator s, InputIterator e)
Assign a memory range.
Definition: SF_vector.h:161
std::map< int, std::string > units
Definition: stimulate.cc:41
double SF_real
Use the general double as real type.
Definition: SF_globals.h:38
Basic utility structs and functions, mostly IO related.
vector< T > rcnt
Number of elements received from each rank.
Definition: SF_container.h:630
size_t root_write(FILE *fd, const vector< V > &vec, MPI_Comm comm)
Write vector data binary to disk.
void unites_y(const char *a)
Definition: IGBheader.h:348
void resize(size_t n)
Resize a vector.
Definition: SF_vector.h:209
#define IGB_VEC4_f
Definition: IGBheader.h:73
int get_size(MPI_Comm comm=PETSC_COMM_WORLD)
Definition: basics.h:290
Simulator-level utility execution control functions.
double start
initial time (nonzero when restarting)
Definition: timer_utils.h:79
void dsp_from_cnt(const vector< T > &cnt, vector< T > &dsp)
Compute displacements from counts.
Definition: SF_vector.h:310
centralize time managment and output triggering
Definition: timer_utils.h:73
int get_rank(MPI_Comm comm=PETSC_COMM_WORLD)
Definition: basics.h:276
void IO_prepare_sort(const SF::vector< mesh_int_t > &inp_idx, SF::commgraph< size_t > &grph, SF::vector< mesh_int_t > &perm_before_comm, SF::vector< mesh_int_t > &perm_after_comm)
Definition: async_io.cc:149
SF::vector< IGBheader * > IGBs
IGBs with open filehandles on rank 0.
Definition: async_io.h:56
queue with the data required for performing async IO writes to IGB
Definition: async_io.h:55
FILE_SPEC f_open(const char *fname, const char *mode)
Open a FILE_SPEC.
Definition: basics.cc:135