Line data Source code
1 : #ifndef UTIL_MPI_H
2 : #define UTIL_MPI_H
3 :
4 : #include <vector>
5 : #include <AMReX_ParallelDescriptor.H>
6 : #include <AMReX_ParallelReduce.H>
7 : #include <AMReX_ParallelContext.H>
8 :
9 :
10 : namespace Util
11 : {
12 : namespace MPI
13 : {
14 :
15 :
16 :
17 : template <class T>
18 0 : int Allgather(std::vector<T>& a_data)
19 : {
20 : // Gather information about how many sites were found on each processor
21 : int nprocs;
22 0 : MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
23 0 : int my_num = a_data.size();
24 0 : int num = my_num;
25 :
26 : // Communicate the total array size to everyone
27 0 : amrex::ParallelAllReduce::Sum(num, MPI_COMM_WORLD);
28 : // Temporary buffers to receive data from all procs
29 0 : std::vector<T> a_data_all(num);
30 : // Send information about how many sites on each proc to all procs
31 0 : std::vector<int> nsites_procs(nprocs);
32 0 : MPI_Allgather(&my_num, 1, amrex::ParallelDescriptor::Mpi_typemap<int>::type(),
33 0 : nsites_procs.data(), 1, amrex::ParallelDescriptor::Mpi_typemap<int>::type(),
34 : MPI_COMM_WORLD);
35 : // Calculate the offset for each
36 0 : std::vector<int> nsites_disp(nprocs);
37 0 : for (int i = 0; i < nprocs; i++)
38 : {
39 0 : nsites_disp[i] = 0;
40 0 : for (int j = 0; j < i; j++) nsites_disp[i] += nsites_procs[j];
41 : }
42 : // Store the MPI datatype for each
43 0 : MPI_Datatype mpi_type = amrex::ParallelDescriptor::Mpi_typemap<T>::type();
44 0 : MPI_Allgatherv(
45 0 : a_data.data(), my_num, mpi_type,
46 0 : a_data_all.data(), nsites_procs.data(), nsites_disp.data(), mpi_type,
47 : MPI_COMM_WORLD);
48 : // Swap out the data so the buffers are no longer needed.
49 0 : a_data.swap(a_data_all);
50 0 : a_data_all.clear();
51 0 : return 0;
52 0 : }
53 :
54 : inline void Bcast( std::string &s,
55 : int root = amrex::ParallelDescriptor::IOProcessorNumber())
56 : {
57 : using namespace amrex;
58 :
59 : int me = ParallelDescriptor::MyProc();
60 :
61 : // --- Broadcast length ---
62 : int len = (me == root) ? static_cast<int>(s.size()) : 0;
63 : ParallelDescriptor::Bcast(&len, 1, root);
64 :
65 : // --- Resize on receivers ---
66 : if (me != root)
67 : s.resize(len);
68 :
69 : // --- Broadcast character data ---
70 : if (len > 0)
71 : ParallelDescriptor::Bcast(s.data(), len, root);
72 : }
73 :
74 :
75 :
76 :
77 :
78 0 : inline void Allgather( const std::string &local,
79 : std::vector<std::string> &out)
80 : {
81 : using namespace amrex;
82 :
83 0 : int nprocs = ParallelDescriptor::NProcs();
84 0 : int me = ParallelDescriptor::MyProc();
85 :
86 : // Step 1: gather lengths from all ranks
87 0 : int local_len = static_cast<int>(local.size());
88 0 : std::vector<int> lengths(nprocs);
89 :
90 0 : for (int p = 0; p < nprocs; ++p) {
91 0 : int len = (me == p) ? local_len : 0;
92 0 : ParallelDescriptor::Bcast(&len, 1, p);
93 0 : lengths[p] = len;
94 : }
95 :
96 : // Step 2: compute offsets
97 0 : std::vector<int> offsets(nprocs, 0);
98 0 : int total_len = lengths[0];
99 0 : for (int i = 1; i < nprocs; ++i) {
100 0 : offsets[i] = offsets[i-1] + lengths[i-1];
101 0 : total_len += lengths[i];
102 : }
103 :
104 : // Step 3: prepare local buffer
105 0 : std::vector<char> local_buf(local_len);
106 0 : if (local_len > 0)
107 0 : std::memcpy(local_buf.data(), local.data(), local_len);
108 :
109 : // Step 4: broadcast character buffers from each rank
110 0 : std::vector<char> all_buf(total_len);
111 0 : for (int p = 0; p < nprocs; ++p) {
112 0 : int len = lengths[p];
113 0 : if (me == p) {
114 : // my buffer is already local_buf
115 0 : if (len > 0)
116 0 : std::memcpy(all_buf.data() + offsets[p], local_buf.data(), len);
117 : }
118 : // broadcast this rank's characters to everyone
119 0 : ParallelDescriptor::Bcast(all_buf.data() + offsets[p], len, p);
120 : }
121 :
122 : // Step 5: unpack strings
123 0 : out.resize(nprocs);
124 0 : for (int i = 0; i < nprocs; ++i) {
125 0 : out[i] = std::string(all_buf.data() + offsets[i], lengths[i]);
126 : }
127 0 : }
128 :
129 :
130 :
131 :
132 : }
133 : }
134 :
135 :
136 :
137 :
138 : #endif
|