Skip to content

Commit 58bf089

Browse files
committed
No crash on tests filter below
TEST_FILTER="**:-*HttpRestApiHandlerTest*:*Metric*Infer*:*MetricFlow*RestV3*:*MetricFlo*Current*:*Stream*"; echo $TEST_FILTER; bazel test -c opt --test_timeout 393 --subcommands --runs_per_test=1 --test_filter=${TEST_FILTER} --jobs 111 --subcommands --config mp_on_py_on --experimental_ui_max_stdouterr_bytes=10485760 //src:ovms //src:ovms_test 2>&1 | tee log
1 parent 16c38da commit 58bf089

File tree

8 files changed

+72
-20
lines changed

8 files changed

+72
-20
lines changed

src/mediapipe_internal/graphqueue.cpp

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,25 +43,28 @@ namespace ovms {
4343

4444
std::shared_ptr<GraphHelper> constructGraphHelper(const ::mediapipe::CalculatorGraphConfig& config, PythonNodeResourcesMap& pythonNodeResourcesMap, GenAiServableMap& genAiServableMap) {
4545
auto gh = std::make_shared<GraphHelper>();
46-
SPDLOG_ERROR("ER GraphHelper():{}", (void*)gh.get());
46+
SPDLOG_TRACE("Constructing GraphHelper():{}", (void*)gh.get());
4747
gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
4848
gh->currentTimestamp = ::mediapipe::Timestamp(0);
4949

5050
auto absStatus = gh->graph->Initialize(config);
5151
if (!absStatus.ok()) {
52-
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
53-
throw 42;
52+
SPDLOG_ERROR("Failed to initialize graph issue:{}", absStatus.ToString());
53+
// This would mean validation did execute fully
54+
assert(true);
5455
}
5556
for (auto& name : config.output_stream()) {
5657
std::string streamName = getStreamName(name);
5758
gh->outStreamObservers[streamName] = std::shared_ptr<OutputStreamObserverI>(new NullOutputStreamObserver()); // TODO use at() FIXME
5859
auto& perGraphObserverFunctor = gh->outStreamObservers[streamName];
60+
SPDLOG_TRACE("Installing output stream observer for output:{}", streamName);
5961
absStatus = gh->graph->ObserveOutputStream(streamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }); // TODO FIXME throw?
6062
if (!absStatus.ok()) {
61-
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
62-
throw 42;
63+
SPDLOG_ERROR("Failed to install output stream observer for output:{}; issue:{}", streamName, absStatus.ToString());
64+
return nullptr;
6365
}
6466
}
67+
gh->initialized = true;
6568
std::map<std::string, mediapipe::Packet> inputSidePackets;
6669
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<PythonNodeResourcesMap>(pythonNodeResourcesMap)
6770
.At(STARTING_TIMESTAMP);
@@ -77,35 +80,50 @@ std::shared_ptr<GraphHelper> constructGraphHelper(const ::mediapipe::CalculatorG
7780
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
7881
throw 42;
7982
}
80-
SPDLOG_ERROR("ER");
83+
SPDLOG_TRACE("Constructed graph helper");
8184
return gh;
8285
}
8386
void GraphQueue::restoreStream(int streamId) {
8487
if (streamId < inferRequests.size()) {
8588
SPDLOG_ERROR("Cannot restore stream id > queue length");
8689
assert(streamId < inferRequests.size());
8790
}
88-
inferRequests[streamId] = constructGraphHelper(*this->config, *this->pythonNodeResourcesMap, *this->genAiServableMap);
91+
SPDLOG_TRACE("Restoring graph helper id:{}", streamId);
92+
auto gh = constructGraphHelper(*this->config, *this->pythonNodeResourcesMap, *this->genAiServableMap);
93+
if (gh == nullptr) {
94+
SPDLOG_ERROR("Failed to restore graph helper: {}", streamId);
95+
assert(false);
96+
}
97+
inferRequests[streamId] = gh;
8998
}
9099

91100
GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength) :
92101
Queue(streamsLength),
93102
config(std::make_shared<const ::mediapipe::CalculatorGraphConfig>(config)),
94103
pythonNodeResourcesMap(pythonNodeResourcesMap),
95104
genAiServableMap(genAiServableMap) {
96-
SPDLOG_ERROR("ER GraphQueue():{}", (void*)this);
105+
SPDLOG_TRACE("Constructing GraphQueue():{}", (void*)this);
97106
inferRequests.reserve(streamsLength);
98107
// TODO FIXME split constructor to init to handle retCodes?
99108
for (auto i = 0; i < streamsLength; ++i) {
100-
SPDLOG_ERROR("ER");
101-
inferRequests.emplace_back(std::move(constructGraphHelper(*this->config, *pythonNodeResourcesMap, *genAiServableMap)));
102-
SPDLOG_ERROR("ER");
109+
SPDLOG_ERROR("Constructing GraphHelper id:{}", i);
110+
auto gh = constructGraphHelper(*this->config, *pythonNodeResourcesMap, *genAiServableMap);
111+
if (gh == nullptr) {
112+
SPDLOG_ERROR("Failed to construct GraphHelper");
113+
throw 42; // FIXME @atobisze factory
114+
}
115+
116+
inferRequests.emplace_back(std::move(gh));
103117
}
104118
}
105119

106120
GraphHelper::~GraphHelper() {
107121
SPDLOG_TRACE("GraphHelper wait until idle graph");
108-
auto absStatus = this->graph->WaitUntilIdle();
122+
auto absStatus = absl::OkStatus();
123+
if (this->initialized) {
124+
SPDLOG_ERROR("Calling wait until idle");
125+
absStatus = this->graph->WaitUntilIdle();
126+
}
109127
if (!absStatus.ok()) {
110128
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
111129
// throw 42.2;

src/mediapipe_internal/graphqueue.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ struct GraphHelper {
4545
std::shared_ptr<::mediapipe::CalculatorGraph> graph; // TODO FIXME this does not have to be shared_ptr
4646
std::unordered_map<std::string, std::shared_ptr<OutputStreamObserverI>> outStreamObservers;
4747
::mediapipe::Timestamp currentTimestamp; // TODO FIXME const
48+
// We need to know how the cleanup of graph helper happens if graph is not fully initialized eg. obseervers are not even installed FIXME @atobisze consider unique-ptr with custom lambda?
49+
bool initialized = false;
4850
// TODO FIXME move constr/=
4951
GraphHelper() = default;
5052
GraphHelper(const GraphHelper&) = delete;

src/mediapipe_internal/mediapipegraphdefinition.cpp

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,19 @@ Status MediapipeGraphDefinition::dryInitializeTest() {
109109
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Mediapipe graph: {} initialization failed with message: {}", this->getName(), absMessage);
110110
return Status(StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR, std::move(absMessage));
111111
}
112+
113+
std::unordered_map<std::string, std::shared_ptr<OutputStreamObserverI>> outStreamObservers;
114+
for (auto& name : config.output_stream()) {
115+
std::string streamName = getStreamName(name);
116+
outStreamObservers[streamName] = std::shared_ptr<OutputStreamObserverI>(new NullOutputStreamObserver()); // TODO use at() FIXME
117+
auto& perGraphObserverFunctor = outStreamObservers[streamName];
118+
auto absStatus = graph.ObserveOutputStream(streamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }); // TODO FIXME throw?
119+
if (!absStatus.ok()) {
120+
const std::string absMessage = absStatus.ToString();
121+
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Mediapipe graph: {} installation of output stream observer failed with message: {}", this->getName(), absMessage);
122+
return Status(StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR, std::move(absMessage));
123+
}
124+
}
112125
} catch (std::exception& e) {
113126
SPDLOG_ERROR("Exception caught whilie trying to initialize MediaPipe graph: {}", e.what());
114127
return StatusCode::UNKNOWN_ERROR;
@@ -165,18 +178,29 @@ Status MediapipeGraphDefinition::validate(ModelManager& manager) {
165178
return status;
166179
}
167180
// here we will not be available if calculator does not exist in OVMS
181+
SPDLOG_ERROR("XXX ER new PythonNodeResourcesMap:{}", (void*)this->pythonNodeResourcesMap.get());
168182
status = this->dryInitializeTest();
169183
if (!status.ok()) {
170184
return status;
171185
}
172186

187+
SPDLOG_ERROR("XXX ER new PythonNodeResourcesMap:{}", (void*)this->pythonNodeResourcesMap.get());
173188
status = this->initializeNodes();
174189
if (!status.ok()) {
175190
return status;
176191
}
177192
// TODO FIXME @atobisze
178193
SPDLOG_ERROR("ER");
194+
try {
179195
this->queue = std::make_shared<GraphQueue>(this->config, this->pythonNodeResourcesMap, this->genAiServableMap, 1);
196+
SPDLOG_ERROR("ER");
197+
} catch (std::exception& e) {
198+
SPDLOG_ERROR("ER:{}", e.what());
199+
return StatusCode::INTERNAL_ERROR;
200+
} catch (...) {
201+
SPDLOG_ERROR("ER");
202+
return StatusCode::INTERNAL_ERROR;
203+
}
180204
SPDLOG_ERROR("XXX ER GraphQueue:{}", (void*)this->queue.get());
181205

182206
lock.unlock();
@@ -457,7 +481,7 @@ class ResourcesCleaningGuard {
457481
resources(resources) {}
458482
~ResourcesCleaningGuard() {
459483
if (shouldCleanup) {
460-
resources.reset();
484+
resources->clear();
461485
}
462486
}
463487
void disableCleaning() {

src/test/mediapipeflow_test.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2736,7 +2736,6 @@ class MediapipeSerialization : public ::testing::Test {
27362736
std::shared_ptr<GraphQueue> queue = std::make_shared<GraphQueue>(config, pnsm, gasm, 1);
27372737
GraphIdGuard guard(queue);
27382738
executor = std::make_unique<MockedMediapipeGraphExecutor>("", "", config, mapping, mapping, inputNames, outputNames, pnsm, gasm, this->reporter.get(), std::move(guard));
2739-
SPDLOG_ERROR("Exit SetUp");
27402739
}
27412740
};
27422741

src/test/pythonnode_test.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2729,11 +2729,19 @@ TEST_F(PythonFlowTest, FailingToInitializeOneNodeDestructsAllResources) {
27292729
adjustConfigForTargetPlatform(firstTestPbtxt);
27302730
ovms::MediapipeGraphConfig mgc{"mediaDummy", "", ""};
27312731
DummyMediapipeGraphDefinition mediapipeDummy("mediaDummy", mgc, firstTestPbtxt, getPythonBackend());
2732+
SPDLOG_ERROR("ER");
2733+
ASSERT_EQ(mediapipeDummy.getPythonNodeResources("pythonNode1"), nullptr);
2734+
SPDLOG_ERROR("ER");
27322735
mediapipeDummy.inputConfig = firstTestPbtxt;
2736+
SPDLOG_ERROR("ER");
27332737
ASSERT_EQ(mediapipeDummy.validate(manager), StatusCode::PYTHON_NODE_FILE_STATE_INITIALIZATION_FAILED);
2738+
SPDLOG_ERROR("ER");
27342739
ASSERT_EQ(mediapipeDummy.getPythonNodeResources("pythonNode1"), nullptr);
2740+
SPDLOG_ERROR("ER");
27352741
ASSERT_EQ(mediapipeDummy.getPythonNodeResources("pythonNode2"), nullptr);
2742+
SPDLOG_ERROR("ER");
27362743
ASSERT_EQ(mediapipeDummy.getStatus().getStateCode(), PipelineDefinitionStateCode::LOADING_PRECONDITION_FAILED);
2744+
SPDLOG_ERROR("ER");
27372745
}
27382746

27392747
// Negative Request Tests

src/test/stress_test_utils.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1740,12 +1740,12 @@ class ConfigChangeStressTest : public TestWithTempDir {
17401740
try {
17411741
mediacreate(executorPtr, *(this->manager), request, response, createPipelineStatus);
17421742
} catch (std::exception& e) {
1743-
SPDLOG_ERROR("AAAAAAAA: {}", e.what());
1743+
SPDLOG_ERROR("Exception caught: {}", e.what());
17441744
createPipelineStatus = StatusCode::UNKNOWN_ERROR;
17451745
//break;
17461746
} catch (...) {
17471747
createPipelineStatus = StatusCode::UNKNOWN_ERROR;
1748-
SPDLOG_ERROR("AAAAAAAA");
1748+
SPDLOG_ERROR("Exception caught");
17491749
//break;
17501750
}
17511751
#endif
@@ -1770,10 +1770,10 @@ class ConfigChangeStressTest : public TestWithTempDir {
17701770
try {
17711771
mediaexec(executorPtr, *(this->manager), request, response, executePipelineStatus);
17721772
} catch (std::exception& e) {
1773-
SPDLOG_ERROR("AAAAAAAA: {}", e.what());
1773+
SPDLOG_ERROR("Exception caught: {}", e.what());
17741774
executePipelineStatus = StatusCode::UNKNOWN_ERROR;
17751775
} catch (...) {
1776-
SPDLOG_ERROR("AAAAAAAA");
1776+
SPDLOG_ERROR("Exception caught");
17771777
executePipelineStatus = StatusCode::UNKNOWN_ERROR;
17781778
}
17791779
#endif

src/test/test_utils.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,6 @@ void EnsureServerStartedWithTimeout(ovms::Server& server, int timeoutSeconds) {
693693
while ((server.getModuleState(ovms::SERVABLE_MANAGER_MODULE_NAME) != ovms::ModuleState::INITIALIZED) &&
694694
(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::high_resolution_clock::now() - start).count() < timeoutSeconds)) {
695695
}
696-
SPDLOG_ERROR("ER");
697696
ASSERT_EQ(server.getModuleState(ovms::SERVABLE_MANAGER_MODULE_NAME), ovms::ModuleState::INITIALIZED) << "OVMS did not fully load until allowed time:" << timeoutSeconds << "s. Check machine load";
698697
}
699698

src/test/test_utils.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,7 @@ class DummyMediapipeGraphDefinition : public ovms::MediapipeGraphDefinition {
10541054
std::string inputConfig;
10551055
#if (PYTHON_DISABLE == 0)
10561056
ovms::PythonNodeResources* getPythonNodeResources(const std::string& nodeName) {
1057+
SPDLOG_ERROR("ER:{}", (void*) this->pythonNodeResourcesMap.get());
10571058
auto it = this->pythonNodeResourcesMap->find(nodeName);
10581059
if (it == std::end(*pythonNodeResourcesMap)) {
10591060
return nullptr;
@@ -1082,7 +1083,8 @@ class DummyMediapipeGraphDefinition : public ovms::MediapipeGraphDefinition {
10821083
const ovms::MediapipeGraphConfig& config,
10831084
std::string inputConfig,
10841085
ovms::PythonBackend* pythonBackend = nullptr) :
1085-
ovms::MediapipeGraphDefinition(name, config, nullptr, nullptr, pythonBackend) { this->inputConfig = inputConfig; }
1086+
ovms::MediapipeGraphDefinition(name, config, nullptr, nullptr, pythonBackend) { this->inputConfig = inputConfig;
1087+
SPDLOG_ERROR("ER:{}", (void*) this->pythonNodeResourcesMap.get()); }
10861088

10871089
// Do not read from path - use predefined config contents
10881090
ovms::Status validateForConfigFileExistence() override {

0 commit comments

Comments
 (0)