I'm having a problem with data corruption when using allgather
, but only one of the HPC systems we use. I think the problem is very likely somewhere in the infiniband stack, but I'm at a complete loss to track it down.
We can trigger the error with the following simple test script:
from mpi4py import MPI
NCOORD = 3600
class State:
def __init__(self, coords):
self.coords = coords
def main():
world = MPI.COMM_WORLD
rank = world.Get_rank()
N = world.Get_size()
with open(f"output_{rank}.txt", "w") as outfile:
if rank == 0:
# generate N random states
states = []
for i in range(N):
coords = np.random.randn(NCOORD, 3)
state = State(coords)
states.append(state)
# broadcast all states to each node
world.bcast(states, root=0)
# scatter a single state to each node
state = world.scatter(states, root=0)
else:
states = world.bcast(None, root=0)
state = world.scatter(None, root=0)
while True:
results = world.allgather(state)
for i, (s1, s2) in enumerate(zip(states, results)):
if (s1.coords != s2.coords).any():
print(f"Coordinates do not match on rank {rank}", file=outfile)
print(f"position {i}", file=outfile)
print("expected:", file=outfile)
print(s1.coords, file=outfile)
print("got:", file=outfile)
print(s2.coords, file=outfile)
mask = (s1.coords != s2.coords)
x1_diff = s1.coords[mask]
x2_diff = s2.coords[mask]
print("diff expected", file=outfile)
print(x1_diff, file=outfile)
print("diff got", file=outfile)
print(x2_diff, file=outfile)
raise ValueError()
print("success")
if __name__ == "__main__":
main()
This runs when ranks are confined to a single node, but fails whenever ranks span multiple nodes. The errors are either the ValueError
triggered when the data does not match what is expected, an unknown pickle protocol error, or a pickle data truncated error. In each case, the data is apparently being corrupted.
Typically, one of the ranks on a node will complete successfully, whereas the remaining ranks will recieve garbage data. This makes me think this is some kind of data race.
The same script works fine on the other HPC system that we use, which makes me think it is a some kind of problem with the network stack, rather than with mpi4py. However, I have been getting pushback from our system adminstrators because the following equivalent c program runs without issue:
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <sys/time.h>
double **alloc_darray2d(int, int);
void free_darray2d(double **, int, int);
double ***alloc_darray3d(int, int, int);
void free_darray3d(double ***, int, int, int);
double ***alloc_parray2d(int, int);
void free_parray2d(double ***, int, int);
void gauss_rng(double *, int, double);
int main(int argc, char *argv[]) {
int i, j, p, myid, numprocs, n, ntot, iter, found;
double *sbuf, *rbuf;
int ncoord = 3600;
int seed = -1, maxiter = 1000000;
unsigned int lseed;
double ***states, **state, ***results;
double sigma = 1.0;
const double randmax = (double)RAND_MAX + 1.0;
struct timeval curtime;
char fname[25];
FILE *fd;
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
/* allocate 2d array of doubles state[ncoord][3] contiguously in memory */
state = alloc_darray2d(ncoord, 3);
/* allocate 3d array of doubles states[numprocs][ncoord][3] contiguously in memory */
states = alloc_darray3d(numprocs, ncoord, 3);
n = ncoord*3;
ntot = n*numprocs;
if (states == NULL) { fprintf(stderr, "error: failed to allocate states array\n"); }
if (myid == 0) {
/* configure the random number generator */
if (argc >= 2) {
seed = atoi(argv[1]);
if (argc == 3) {
maxiter = atoi(argv[2]);
}
}
if (seed >= 0) {
lseed = seed;
} else {
/* randomly seed rng using the current microseconds within the current second */
gettimeofday(&curtime, NULL);
lseed = curtime.tv_usec;
}
srandom(lseed);
/* fill the states array with gaussian distributed random numbers */
for (p = 0; p < numprocs; p++) {
for (i = 0; i < ncoord; i++) {
for (j = 0; j < 3; j++) {
/* random returns integer between 0 and RAND_MAX, divide
by randmax gives number within [0,1) */
states[p][i][j] = random()/randmax;
}
}
}
gauss_rng(&states[0][0][0], ntot, sigma);
}
/* scatter states array into state array at each process */
MPI_Bcast(&states[0][0][0], ntot, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Scatter(&states[0][0][0], n, MPI_DOUBLE, &state[0][0], n, MPI_DOUBLE, 0, MPI_COMM_WORLD);
/* gather state array from each process into results array */
results = alloc_darray3d(numprocs, ncoord, 3);
found = 0;
for (iter = 1; iter <= maxiter; iter++) {
MPI_Allgather(&state[0][0], n, MPI_DOUBLE, &results[0][0][0], n, MPI_DOUBLE, MPI_COMM_WORLD);
snprintf(fname, 24, "scattergather.%d", myid);
fd = fopen(fname, "w");
/* compare states and results arrays */
for (p = 0; p < numprocs; p++) {
for (i = 0; i < ncoord; i++) {
if (states[p][i][0] != results[p][i][0]
|| states[p][i][1] != results[p][i][1]
|| states[p][i][2] != results[p][i][2]) {
fprintf(fd, "states and results differ on process %d at position %d:\n", myid, p);
fprintf(fd, "states[%d][%d][0] = %g\nresults[%d][%d][0] = %g\n", p, i, p, i, states[p][i][0], results[p][i][0]);
fprintf(fd, "states[%d][%d][1] = %g\nresults[%d][%d][1] = %g\n", p, i, p, i, states[p][i][1], results[p][i][1]);
fprintf(fd, "states[%d][%d][2] = %g\nresults[%d][%d][2] = %g\n", p, i, p, i, states[p][i][2], results[p][i][2]);
found = 1;
}
}
}
if (found == 1) { break; }
}
fclose(fd);
MPI_Finalize();
}
#include <math.h>
void gauss_rng(double *z, int n, double sigma) {
/*
gauss_rng converts an array z of length n filled with uniformly
distributed random numbers in [0,1) into gaussian distributed
random numbers with width sigma.
*/
double sq;
const double pi2 = 8.0*atan(1.0);
int i;
if ((n/2)*2 != n) {
fprintf(stderr, "error: n in gauss_rng must be even\n");
return;
}
for (i = 0; i < n; i += 2) {
sq = sigma*sqrt(-2.0*log(1.0 - z[i]));
z[i] = sq*sin(pi2*z[i + 1]);
z[i + 1] = sq*cos(pi2*z[i + 1]);
}
}
/* allocate a double 2d array with subscript range
dm[0,...,nx-1][0,...,ny-1] */
double **alloc_darray2d(int nx, int ny) {
int i;
double **dm;
dm = (double **)malloc((size_t) nx*sizeof(double*));
if (dm == NULL) { return NULL; }
dm[0] = (double *)malloc((size_t) nx*ny*sizeof(double));
if (dm[0] == NULL) {
free((void *) dm);
return NULL;
}
for (i = 1; i < nx; i++) dm[i] = dm[i - 1] + ny;
/* return pointer to array of pointers to rows */
return dm;
}
void free_darray2d(double **dm, int nx, int ny) {
free((void *) dm[0]);
free((void *) dm);
}
/* allocate a 2d array of pointers to doubles with subscript range
pm[0,...,nx-1][0,...,ny-1] */
double ***alloc_parray2d(int nx, int ny) {
int i;
double ***pm;
pm = (double ***)malloc((size_t) nx*sizeof(double**));
if (pm == NULL) { return NULL; }
pm[0]=(double **)malloc((size_t) nx*ny*sizeof(double*));
if (pm[0] == NULL) {
free((void *) pm);
return NULL;
}
for (i = 1; i < nx; i++) pm[i] = pm[i - 1] + ny;
/* return pointer to array of pointers to rows */
return pm;
}
void free_parray2d(double ***pm,int nx, int ny) {
free((void *) pm[0]);
free((void *) pm);
}
/* allocate 3d array of doubles with subscript range
dm[0,...,nx-1][0,...,ny-1][0,...,nz-1] */
double ***alloc_darray3d(int nx, int ny, int nz) {
int i, j;
double ***dm;
/* first, allocate 2d array of pointers to doubles */
dm = alloc_parray2d(nx, ny);
if (dm == NULL) return NULL;
/* allocate memory for the whole thing */
dm[0][0] = (double *)malloc((size_t) nx*ny*nz*sizeof(double));
if (dm[0][0] == NULL) {
free_parray2d(dm, nx, ny);
return NULL;
}
/* set the pointers inside the matrix */
for (i = 0; i < nx; i++) {
for (j = 1; j < ny; j++) {
dm[i][j] = dm[i][j - 1] + nz;
}
if (i < nx - 1) dm[i + 1][0] = dm[i][ny - 1] + nz;
}
/* return pointer to array of matrix of pointers */
return dm;
}
void free_darray3d(double ***dm, int nx, int ny, int nz) {
free((void *) dm[0][0]);
free_parray2d(dm, nx, ny);