Skip to content

Commit 17eb5e5

Browse files
Do not flush if the backend does not support Span API (#1863)
* Do not flush if the backend does not support Span API * Fix flushing logic
1 parent c128403 commit 17eb5e5

File tree

7 files changed

+68
-34
lines changed

7 files changed

+68
-34
lines changed

include/openPMD/IO/IOTask.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ struct OPENPMDAPI_EXPORT
558558
}
559559

560560
// in parameters
561+
bool queryOnly = false; // query if the backend supports this
561562
Offset offset;
562563
Extent extent;
563564
Datatype dtype = Datatype::UNDEFINED;

include/openPMD/RecordComponent.tpp

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,6 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
8585
{
8686
verifyChunk<T>(o, e);
8787

88-
/*
89-
* The openPMD backend might not yet know about this dataset.
90-
* Flush the openPMD hierarchy to the backend without flushing any actual
91-
* data yet.
92-
*/
93-
seriesFlush_impl</* flush_entire_series = */ false>(
94-
{FlushLevel::SkeletonOnly});
95-
9688
size_t size = 1;
9789
for (auto ext : e)
9890
{
@@ -102,33 +94,61 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
10294
* Flushing the skeleton does not create datasets,
10395
* so we might need to do it now.
10496
*/
105-
if (!written())
97+
auto &rc = get();
98+
if (!rc.m_dataset.has_value())
10699
{
107-
auto &rc = get();
108-
if (!rc.m_dataset.has_value())
109-
{
110-
throw error::WrongAPIUsage(
111-
"[RecordComponent] Must specify dataset type and extent before "
112-
"using storeChunk() (see RecordComponent::resetDataset()).");
113-
}
114-
Parameter<Operation::CREATE_DATASET> dCreate(rc.m_dataset.value());
115-
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
116-
IOHandler()->enqueue(IOTask(this, dCreate));
100+
throw error::WrongAPIUsage(
101+
"[RecordComponent] Must specify dataset type and extent before "
102+
"using storeChunk() (see RecordComponent::resetDataset()).");
117103
}
118104

105+
Parameter<Operation::GET_BUFFER_VIEW> query;
106+
query.queryOnly = true;
107+
IOHandler()->enqueue(IOTask(this, query));
108+
IOHandler()->flush(internal::defaultFlushParams);
109+
119110
Parameter<Operation::GET_BUFFER_VIEW> getBufferView;
120111
getBufferView.offset = o;
121112
getBufferView.extent = e;
122113
getBufferView.dtype = getDatatype();
123-
IOHandler()->enqueue(IOTask(this, getBufferView));
124-
IOHandler()->flush(internal::defaultFlushParams);
125-
auto &out = *getBufferView.out;
126-
if (!out.backendManagedBuffer)
114+
115+
if (query.out->backendManagedBuffer)
116+
{
117+
// Need to initialize the dataset for the Span API
118+
// But this is a non-collective call and initializing the dataset is
119+
// collective in HDF5 So we do this only in backends that actually
120+
// support the Span API (i.e. ADIOS2) which do not share this
121+
// restriction
122+
// TODO: Add some form of collective ::commitDefinitions() call to
123+
// RecordComponents to be called by users before the Span API
124+
if (!written())
125+
{
126+
/*
127+
* The openPMD backend might not yet know about this dataset.
128+
* Flush the openPMD hierarchy to the backend without flushing any
129+
* actual data yet.
130+
*/
131+
seriesFlush_impl</* flush_entire_series = */ false>(
132+
{FlushLevel::SkeletonOnly});
133+
Parameter<Operation::CREATE_DATASET> dCreate(rc.m_dataset.value());
134+
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
135+
IOHandler()->enqueue(IOTask(this, dCreate));
136+
137+
setWritten(true, EnqueueAsynchronously::OnlyAsync);
138+
}
139+
140+
IOHandler()->enqueue(IOTask(this, getBufferView));
141+
IOHandler()->flush(internal::defaultFlushParams);
142+
}
143+
144+
// The backend might still refuse the operation even if backend managed
145+
// buffers are generally supported, so check again
146+
if (!getBufferView.out->backendManagedBuffer)
127147
{
128148
// note that data might have either
129149
// type shared_ptr<T> or shared_ptr<T[]>
130150
auto data = std::forward<F>(createBuffer)(size);
131-
out.ptr = static_cast<void *>(data.get());
151+
getBufferView.out->ptr = static_cast<void *>(data.get());
132152
if (size > 0)
133153
{
134154
storeChunk(std::move(data), std::move(o), std::move(e));

include/openPMD/backend/Attributable.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,9 +589,10 @@ OPENPMD_protected
589589
{
590590
return writable().written;
591591
}
592-
enum class EnqueueAsynchronously : bool
592+
enum class EnqueueAsynchronously : uint8_t
593593
{
594-
Yes,
594+
OnlyAsync,
595+
Both,
595596
No
596597
};
597598
/*

src/IO/ADIOS/ADIOS2IOHandler.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,12 @@ void ADIOS2IOHandlerImpl::getBufferView(
13161316
parameters.out->backendManagedBuffer = false;
13171317
return;
13181318
}
1319+
else if (parameters.queryOnly)
1320+
{
1321+
parameters.out->backendManagedBuffer = true;
1322+
return;
1323+
}
1324+
13191325
setAndGetFilePosition(writable);
13201326
auto file = refreshFileFromParent(writable, /* preferParentFile = */ false);
13211327
detail::ADIOS2File &ba = getFileData(file, IfFileNotOpen::ThrowError);

src/Iteration.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ void Iteration::flushFileBased(
256256
* If it was written before, then in the context of another iteration.
257257
*/
258258
auto &attr = s.get().m_rankTable.m_attributable;
259-
attr.setWritten(false, Attributable::EnqueueAsynchronously::Yes);
259+
attr.setWritten(false, Attributable::EnqueueAsynchronously::Both);
260260
s.get()
261261
.m_rankTable.m_attributable.get()
262262
.m_writable.abstractFilePosition.reset();
@@ -853,7 +853,7 @@ auto Iteration::beginStep(
853853
{
854854
bool previous = series.iterations.written();
855855
series.iterations.setWritten(
856-
false, Attributable::EnqueueAsynchronously::Yes);
856+
false, Attributable::EnqueueAsynchronously::Both);
857857
auto oldStatus = IOHandl->m_seriesStatus;
858858
IOHandl->m_seriesStatus = internal::SeriesStatus::Parsing;
859859
try
@@ -870,7 +870,7 @@ auto Iteration::beginStep(
870870
}
871871
IOHandl->m_seriesStatus = oldStatus;
872872
series.iterations.setWritten(
873-
previous, Attributable::EnqueueAsynchronously::Yes);
873+
previous, Attributable::EnqueueAsynchronously::Both);
874874
}
875875
else if (thisObject.has_value())
876876
{

src/Series.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,9 +1503,9 @@ void Series::flushFileBased(
15031503
* current iteration by the backend)
15041504
*/
15051505
this->setWritten(
1506-
false, Attributable::EnqueueAsynchronously::Yes);
1506+
false, Attributable::EnqueueAsynchronously::Both);
15071507
series.iterations.setWritten(
1508-
false, Attributable::EnqueueAsynchronously::Yes);
1508+
false, Attributable::EnqueueAsynchronously::Both);
15091509

15101510
setDirty(dirty() || it->second.dirty());
15111511
std::string filename = iterationFilename(it->first);
@@ -1996,7 +1996,7 @@ void Series::readOneIterationFileBased(std::string const &filePath)
19961996
setWritten(false, Attributable::EnqueueAsynchronously::No);
19971997
setIterationEncoding_internal(
19981998
encoding_out, internal::default_or_explicit::explicit_);
1999-
setWritten(old_written, Attributable::EnqueueAsynchronously::Yes);
1999+
setWritten(old_written, Attributable::EnqueueAsynchronously::Both);
20002000
}
20012001
else
20022002
throw std::runtime_error(

src/backend/Attributable.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,12 +524,18 @@ void Attributable::setWritten(bool val, EnqueueAsynchronously ea)
524524
switch (ea)
525525
{
526526

527-
case EnqueueAsynchronously::Yes: {
527+
case EnqueueAsynchronously::OnlyAsync: {
528528
Parameter<Operation::SET_WRITTEN> param;
529529
param.target_status = val;
530530
IOHandler()->enqueue(IOTask(this, param));
531+
return;
532+
}
533+
case EnqueueAsynchronously::Both: {
534+
Parameter<Operation::SET_WRITTEN> param;
535+
param.target_status = val;
536+
IOHandler()->enqueue(IOTask(this, param));
537+
break;
531538
}
532-
break;
533539
case EnqueueAsynchronously::No:
534540
break;
535541
}

0 commit comments

Comments
 (0)