Skip to content

Commit 8c2967c

Browse files
committed
Use POSIX I/O to move data section when header extent grows
1 parent ef097c8 commit 8c2967c

File tree

1 file changed

+159
-13
lines changed

1 file changed

+159
-13
lines changed

src/drivers/ncmpio/ncmpio_enddef.c

Lines changed: 159 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,132 @@
3636
*/
3737
#define MOVE_UNIT 16777216
3838

39+
#define USE_POSIX_IO_TO_MOVE
40+
#ifdef USE_POSIX_IO_TO_MOVE
41+
/*----< move_file_block() >-------------------------------------------------*/
42+
/* Call POSIX I/O subroutines to move data */
43+
#include <fcntl.h> /* open() */
44+
#include <sys/types.h> /* open() */
45+
#include <sys/stat.h> /* open() */
46+
#include <unistd.h> /* pread(), pwrite(), close() */
47+
48+
static int
49+
move_file_block(NC *ncp,
50+
MPI_Offset to, /* destination starting file offset */
51+
MPI_Offset from, /* source starting file offset */
52+
MPI_Offset nbytes) /* amount to be moved */
53+
{
54+
int fd, rank, nprocs, status=NC_NOERR, do_open;
55+
void *buf;
56+
size_t num_moves, mv_amnt, p_units;
57+
off_t off_last, off_from, off_to;
58+
char *path = ncmpii_remove_file_system_type_prefix(ncp->path);
59+
60+
rank = ncp->rank;
61+
nprocs = ncp->nprocs;
62+
63+
/* buf will be used as a temporal buffer to move data in chunks, i.e.
64+
* read a chunk and later write to the new location
65+
*/
66+
buf = NCI_Malloc(MOVE_UNIT);
67+
if (buf == NULL) DEBUG_RETURN_ERROR(NC_ENOMEM)
68+
69+
p_units = MOVE_UNIT * nprocs;
70+
num_moves = nbytes / p_units;
71+
if (nbytes % p_units) num_moves++;
72+
off_last = (num_moves - 1) * p_units + rank * MOVE_UNIT;
73+
off_from = from + off_last;
74+
off_to = to + off_last;
75+
mv_amnt = nbytes % p_units;
76+
if (mv_amnt == 0 && nbytes > 0) mv_amnt = p_units;
77+
78+
/* determine the subset of processes that have data to move */
79+
do_open = 0;
80+
if (nbytes >= p_units)
81+
do_open = 1;
82+
else {
83+
int n_units = nbytes / MOVE_UNIT;
84+
if (nbytes % MOVE_UNIT) n_units++;
85+
if (rank < n_units) do_open = 1;
86+
}
87+
88+
if (do_open && (fd = open(path, O_RDWR)) == -1) {
89+
fprintf(stderr,"Error at %s line %d: open file %s (%s)\n",
90+
__func__,__LINE__,path,strerror(errno));
91+
DEBUG_RETURN_ERROR(NC_EFILE)
92+
}
93+
94+
/* move the data section starting from its tail toward its beginning */
95+
while (nbytes > 0) {
96+
size_t chunk_size;
97+
ssize_t get_size, put_size;
98+
99+
if (mv_amnt == p_units) {
100+
/* each rank moves amount of chunk_size */
101+
chunk_size = MOVE_UNIT;
102+
}
103+
else {
104+
/* when total move amount is less than p_units */
105+
size_t num_chunks = mv_amnt / MOVE_UNIT;
106+
if (mv_amnt % MOVE_UNIT) num_chunks++;
107+
if (rank < num_chunks) {
108+
chunk_size = MOVE_UNIT;
109+
if (rank == num_chunks - 1 && mv_amnt % MOVE_UNIT > 0)
110+
chunk_size = mv_amnt % MOVE_UNIT;
111+
assert(chunk_size > 0);
112+
}
113+
else
114+
chunk_size = 0;
115+
}
116+
117+
if (chunk_size > 0) {
118+
/* read from file at off_from for amount of chunk_size */
119+
get_size = pread(fd, buf, chunk_size, off_from);
120+
if (get_size < 0) {
121+
fprintf(stderr,
122+
"Error at %s line %d: pread file %s offset %ld size %zd (%s)\n",
123+
__func__,__LINE__,path,off_from,chunk_size,strerror(errno));
124+
DEBUG_RETURN_ERROR(NC_EREAD)
125+
get_size = 0;
126+
}
127+
ncp->get_size += get_size;
128+
}
129+
else
130+
get_size = 0;
131+
132+
/* to prevent from one rank's write run faster than other's read */
133+
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
134+
135+
if (get_size > 0) {
136+
/* Write to new location at off_to for amount of get_size. Assuming
137+
* the call to MPI_Get_count() above returns the actual amount of
138+
* data read from the file, i.e. get_size.
139+
*/
140+
put_size = pwrite(fd, buf, get_size, off_to);
141+
if (put_size < 0) {
142+
fprintf(stderr,
143+
"Error at %s line %d: pwrite file %s offset %ld size %zd (%s)\n",
144+
__func__,__LINE__,path,off_to,get_size,strerror(errno));
145+
DEBUG_RETURN_ERROR(NC_EREAD)
146+
put_size = 0;
147+
}
148+
ncp->put_size += put_size;
149+
}
150+
151+
/* move on to the next round */
152+
mv_amnt = p_units;
153+
off_from -= mv_amnt;
154+
off_to -= mv_amnt;
155+
nbytes -= mv_amnt;
156+
}
157+
158+
if (do_open && close(fd) == -1)
159+
DEBUG_RETURN_ERROR(NC_EFILE)
160+
161+
NCI_Free(buf);
162+
return status;
163+
}
164+
#else
39165
/*----< move_file_block() >-------------------------------------------------*/
40166
/* Call MPI I/O subroutines to move data */
41167
static int
@@ -54,17 +180,14 @@ move_file_block(NC *ncp,
54180
rank = ncp->rank;
55181
nprocs = ncp->nprocs;
56182

57-
/* To make sure all processes finish their I/O before any process starts to
58-
* read, it is necessary to call MPI_Barrier.
183+
/* collective_fh can be used in either MPI independent or collective I/O
184+
* APIs to move data, within this subroutine.
59185
*/
60-
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
61-
62-
/* moving file blocks must be done in collective mode, ignoring NC_HCOLL */
63186
fh = ncp->collective_fh;
64187

65-
/* make fileview entire file visible */
66-
TRACE_IO(MPI_File_set_view)(fh, 0, MPI_BYTE, MPI_BYTE, "native",
67-
MPI_INFO_NULL);
188+
/* MPI-IO fileview has been reset in ncmpi_redef() to make the entire file
189+
* visible
190+
*/
68191

69192
/* Use MPI collective I/O subroutines to move data, only if nproc > 1 and
70193
* MPI-IO hint "romio_no_indep_rw" is set to true. Otherwise, use MPI
@@ -116,12 +239,13 @@ move_file_block(NC *ncp,
116239
* Thus we initialize it above to work around.
117240
*/
118241
memset(&mpistatus, 0, sizeof(MPI_Status));
242+
mpireturn = MPI_SUCCESS;
119243

120244
/* read from file at off_from for amount of chunk_size */
121245
if (do_coll)
122246
TRACE_IO(MPI_File_read_at_all)(fh, off_from, buf, chunk_size,
123247
MPI_BYTE, &mpistatus);
124-
else
248+
else if (chunk_size > 0)
125249
TRACE_IO(MPI_File_read_at)(fh, off_from, buf, chunk_size,
126250
MPI_BYTE, &mpistatus);
127251
if (mpireturn != MPI_SUCCESS) {
@@ -130,7 +254,7 @@ move_file_block(NC *ncp,
130254
DEBUG_ASSIGN_ERROR(status, NC_EREAD)
131255
get_size = chunk_size;
132256
}
133-
else {
257+
else if (chunk_size > 0) {
134258
/* for zero-length read, MPI_Get_count may report incorrect result
135259
* for some MPICH version, due to the uninitialized MPI_Status
136260
* object passed to MPI-IO calls. Thus we initialize it above to
@@ -155,6 +279,7 @@ move_file_block(NC *ncp,
155279
* Thus we initialize it above to work around.
156280
*/
157281
memset(&mpistatus, 0, sizeof(MPI_Status));
282+
mpireturn = MPI_SUCCESS;
158283

159284
/* Write to new location at off_to for amount of get_size. Assuming the
160285
* call to MPI_Get_count() above returns the actual amount of data read
@@ -164,7 +289,7 @@ move_file_block(NC *ncp,
164289
TRACE_IO(MPI_File_write_at_all)(fh, off_to, buf,
165290
get_size /* NOT chunk_size */,
166291
MPI_BYTE, &mpistatus);
167-
else
292+
else if (get_size > 0)
168293
TRACE_IO(MPI_File_write_at)(fh, off_to, buf,
169294
get_size /* NOT chunk_size */,
170295
MPI_BYTE, &mpistatus);
@@ -173,7 +298,7 @@ move_file_block(NC *ncp,
173298
if (status == NC_NOERR && err == NC_EFILE)
174299
DEBUG_ASSIGN_ERROR(status, NC_EWRITE)
175300
}
176-
else {
301+
else if (get_size > 0) {
177302
/* update the number of bytes written since file open.
178303
* Because each rank reads and writes no more than one chunk_size
179304
* at a time and chunk_size is < NC_MAX_INT, it is OK to call
@@ -197,6 +322,7 @@ move_file_block(NC *ncp,
197322
NCI_Free(buf);
198323
return status;
199324
}
325+
#endif
200326

201327
/*----< move_record_vars() >-------------------------------------------------*/
202328
/* Move the record variables from lower offsets (old) to higher offsets. */
@@ -1266,6 +1392,12 @@ ncmpio__enddef(void *ncdp,
12661392
* data section can be moved as a single contiguous block to a
12671393
* higher file offset.
12681394
*/
1395+
1396+
/* Make sure all processes finish their I/O before any process
1397+
* starts to read the data section.
1398+
*/
1399+
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
1400+
12691401
/* amount of data section to be moved */
12701402
nbytes = ncp->old->begin_rec - ncp->old->begin_var
12711403
+ ncp->old->recsize * ncp->old->numrecs;
@@ -1281,6 +1413,12 @@ ncmpio__enddef(void *ncdp,
12811413
* record variable section must be moved to a higher file
12821414
* offset.
12831415
*/
1416+
1417+
/* Make sure all processes finish their I/O before any process
1418+
* starts to read the data section.
1419+
*/
1420+
if (ncp->nprocs > 1) MPI_Barrier(ncp->comm);
1421+
12841422
if (ncp->vars.num_rec_vars == ncp->old->vars.num_rec_vars) {
12851423
/* no new record variable has been added, then the entire
12861424
* record variable section can be moved as a single
@@ -1310,6 +1448,11 @@ ncmpio__enddef(void *ncdp,
13101448
* variable section must be moved to a higher file offset.
13111449
*/
13121450

1451+
/* Make sure all processes finish their I/O before any process
1452+
* starts to read the data section.
1453+
*/
1454+
if (!mov_done && ncp->nprocs > 1) MPI_Barrier(ncp->comm);
1455+
13131456
/* First, find the size of fix-sized variable section, i.e.
13141457
* from the last fix-sized variable's begin and len. Note there
13151458
* may be some free space at the end of fix-sized variable
@@ -1333,7 +1476,10 @@ ncmpio__enddef(void *ncdp,
13331476
}
13341477
}
13351478

1336-
/* to prevent some ranks run faster than others */
1479+
/* to prevent some ranks run faster than others and start to read
1480+
* after exiting ncmpi_enddef(), while some processes are still moving
1481+
* the data section
1482+
*/
13371483
if (mov_done && ncp->nprocs > 1) MPI_Barrier(ncp->comm);
13381484

13391485
} /* ... ncp->old != NULL */

0 commit comments

Comments
 (0)