Skip to content

Commit 0db5de7

Browse files
committed
Tests do not crash
[ FAILED ] 23 tests, listed below: [ FAILED ] MetricFlowTest.GrpcModelInfer [ FAILED ] MetricFlowTest.RestModelInfer [ FAILED ] MetricFlowTest.RestV3Unary [ FAILED ] MediapipeNegativeFrameworkTest.NoOutputPacketProduced [ FAILED ] HttpOpenAIHandlerTest.Unary [ FAILED ] HttpOpenAIHandlerTest.UnaryWithHeaders [ FAILED ] PythonFlowTest.PythonNodeFileDoesNotExist [ FAILED ] PythonFlowTest.PythonNodeClassDoesNotExist [ FAILED ] PythonFlowTest.PythonNodeExecuteNotImplemented [ FAILED ] PythonFlowTest.PythonNodeNameAlreadyExist [ FAILED ] PythonFlowTest.PythonNodeInitFailed [ FAILED ] PythonFlowTest.PythonNodeInitFailedImportOutsideTheClassError [ FAILED ] PythonFlowTest.PythonNodeInitException [ FAILED ] PythonFlowTest.PythonNodeOptionsMissing [ FAILED ] PythonFlowTest.PythonNodeNameMissing [ FAILED ] PythonFlowTest.PythonNodeNameDoesNotExist [ FAILED ] PythonFlowTest.PythonNodeInitMembers [ FAILED ] PythonFlowTest.PythonNodePassArgumentsToConstructor [ FAILED ] PythonFlowTest.PythonCalculatorTestSingleInSingleOutMultiRunWithErrors [ FAILED ] PythonFlowTest.Negative_NodeFiresProcessWithoutAllInputs [ FAILED ] PythonFlowTest.Positive_NodeFiresProcessWithoutAllInputs [ FAILED ] StreamingTest.SingleStreamSend1Receive3 [ FAILED ] StreamingTest.InvalidGraph
1 parent 58bf089 commit 0db5de7

11 files changed

+297
-63
lines changed

src/BUILD

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ cc_library(
174174
"@mediapipe//mediapipe/framework:calculator_graph",
175175
"//src/python:libovmspythonmodule", # TODO not splitted
176176
"//src/llm:genai_servables", # TODO split!
177+
"//src:assert",
177178
],
178179
copts = [],
179180
visibility = ["//visibility:public",],
@@ -808,6 +809,16 @@ cc_library(
808809
copts = COPTS_ADJUSTED + COPTS_OV_TRACE,
809810
linkopts = LINKOPTS_ADJUSTED,
810811
)
812+
cc_library(
813+
name = "assert",
814+
hdrs = ["assert.hpp"],
815+
deps = [
816+
],
817+
visibility = ["//visibility:public",],
818+
local_defines = COMMON_LOCAL_DEFINES,
819+
copts = COPTS_ADJUSTED + COPTS_OV_TRACE,
820+
linkopts = LINKOPTS_ADJUSTED,
821+
)
811822
cc_library(
812823
name = "notifyreceiver",
813824
hdrs = ["notifyreceiver.hpp"],
@@ -2777,7 +2788,6 @@ cc_test(
27772788
"test/reranknode_test.cpp",
27782789
"test/rerank_handler_test.cpp",
27792790
"test/rerank_chunking_test.cpp",
2780-
"test/streaming_test.cpp", # Mediapipe enabled
27812791
"test/mediapipe_validation_test.cpp", # Mediapipe enabled
27822792
"test/get_mediapipe_graph_metadata_response_test.cpp",
27832793
"test/mediapipe_framework_test.cpp",
@@ -2950,6 +2960,7 @@ cc_test(
29502960
"//src/test/mediapipe/calculators:mediapipe_test_calculators",
29512961
"@mediapipe//mediapipe/calculators/ovms:ovms_calculator",
29522962
"@mediapipe//mediapipe/framework:calculator_runner",
2963+
"//src:streaming_test", # Mediapipe enabled
29532964
],
29542965
"//:disable_mediapipe" :
29552966
[
@@ -3036,6 +3047,22 @@ cc_library(
30363047
],
30373048
copts = COPTS_TESTS,
30383049
)
3050+
cc_library(
3051+
name = "streaming_test",
3052+
linkstatic = 1,
3053+
alwayslink = True,
3054+
srcs = ["test/streaming_test.cpp"],
3055+
data = [ # FIXME @atobisze
3056+
],
3057+
linkopts = LINKOPTS_ADJUSTED,
3058+
deps = [
3059+
"//src:ovms_lib",
3060+
"//src:test_utils",
3061+
"@com_google_googletest//:gtest",
3062+
],
3063+
local_defines = COMMON_LOCAL_DEFINES,
3064+
copts = COPTS_TESTS,
3065+
)
30393066
cc_library(
30403067
name = "openvino_remote_tensors_tests",
30413068
linkstatic = 1,

src/assert.hpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#pragma once
2+
//*****************************************************************************
3+
// Copyright 2025 Intel Corporation
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
//*****************************************************************************
17+
#define ASSERT_ALWAYS(cond) \
18+
do { \
19+
if (!(cond)) { \
20+
std::cerr << "Assertion failed: (" #cond "), function " << __func__ \
21+
<< ", file " << __FILE__ << ", line " << __LINE__ << "." \
22+
<< std::endl; \
23+
std::abort(); \
24+
} \
25+
} while (false)
26+

src/mediapipe_internal/graphqueue.cpp

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "../queue.hpp"
3030
#include "src/python/pythonnoderesources.hpp"
3131
#include "src/llm/servable.hpp"
32+
#include "src/assert.hpp"
3233

3334
#include "mediapipe/framework/calculator_graph.h"
3435
#include "mediapipe/framework/port/status.h"
@@ -42,16 +43,13 @@ const std::string LLM_SESSION_SIDE_PACKET_NAME = "llm";
4243
namespace ovms {
4344

4445
std::shared_ptr<GraphHelper> constructGraphHelper(const ::mediapipe::CalculatorGraphConfig& config, PythonNodeResourcesMap& pythonNodeResourcesMap, GenAiServableMap& genAiServableMap) {
45-
auto gh = std::make_shared<GraphHelper>();
4646
SPDLOG_TRACE("Constructing GraphHelper():{}", (void*)gh.get());
47-
gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
48-
gh->currentTimestamp = ::mediapipe::Timestamp(0);
49-
47+
auto gh = std::make_shared<GraphHelper>();
5048
auto absStatus = gh->graph->Initialize(config);
5149
if (!absStatus.ok()) {
5250
SPDLOG_ERROR("Failed to initialize graph issue:{}", absStatus.ToString());
5351
// This would mean validation did execute fully
54-
assert(true);
52+
ASSERT_ALWAYS(true);
5553
}
5654
for (auto& name : config.output_stream()) {
5755
std::string streamName = getStreamName(name);
@@ -86,13 +84,13 @@ std::shared_ptr<GraphHelper> constructGraphHelper(const ::mediapipe::CalculatorG
8684
void GraphQueue::restoreStream(int streamId) {
8785
if (streamId < inferRequests.size()) {
8886
SPDLOG_ERROR("Cannot restore stream id > queue length");
89-
assert(streamId < inferRequests.size());
87+
ASSERT_ALWAYS(streamId < inferRequests.size());
9088
}
9189
SPDLOG_TRACE("Restoring graph helper id:{}", streamId);
9290
auto gh = constructGraphHelper(*this->config, *this->pythonNodeResourcesMap, *this->genAiServableMap);
9391
if (gh == nullptr) {
9492
SPDLOG_ERROR("Failed to restore graph helper: {}", streamId);
95-
assert(false);
93+
ASSERT_ALWAYS(false);
9694
}
9795
inferRequests[streamId] = gh;
9896
}
@@ -116,12 +114,12 @@ GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::sh
116114
inferRequests.emplace_back(std::move(gh));
117115
}
118116
}
119-
120-
GraphHelper::~GraphHelper() {
121-
SPDLOG_TRACE("GraphHelper wait until idle graph");
117+
void GraphHelper::closeGraph() {
118+
SPDLOG_ERROR("ER");
119+
ASSERT_ALWAYS(this->graph.get() != nullptr);
122120
auto absStatus = absl::OkStatus();
123121
if (this->initialized) {
124-
SPDLOG_ERROR("Calling wait until idle");
122+
SPDLOG_ERROR("Calling wait until idle graph:{}", (void*)this->graph.get());
125123
absStatus = this->graph->WaitUntilIdle();
126124
}
127125
if (!absStatus.ok()) {
@@ -145,6 +143,16 @@ GraphHelper::~GraphHelper() {
145143
// throw 42.2;
146144
}
147145
SPDLOG_ERROR("ER");
146+
147+
}
148+
149+
GraphHelper::GraphHelper() :
150+
graph(std::make_shared<::mediapipe::CalculatorGraph>()),
151+
currentTimestamp(::mediapipe::Timestamp(0)) {}
152+
153+
GraphHelper::~GraphHelper() {
154+
SPDLOG_TRACE("GraphHelper wait until idle graph");
155+
closeGraph();
148156
this->graph.reset();
149157
SPDLOG_ERROR("ER ~GraphHelper:{}", (void*) this);
150158
}
@@ -155,31 +163,6 @@ GraphQueue::~GraphQueue() {
155163
SPDLOG_TRACE("GraphQueue wait until idle graph");
156164
graphHelper.reset();
157165
SPDLOG_ERROR("ER");
158-
continue;
159-
auto absStatus = graphHelper->graph->WaitUntilIdle();
160-
if (!absStatus.ok()) {
161-
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
162-
// throw 42.2;
163-
}
164-
absStatus = graphHelper->graph->CloseAllPacketSources();
165-
if (!absStatus.ok()) {
166-
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
167-
// throw "as";
168-
}
169-
SPDLOG_TRACE("GraphQueue wait until done graph");
170-
absStatus = graphHelper->graph->WaitUntilDone();
171-
if (!absStatus.ok()) {
172-
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
173-
// throw 42.2;
174-
}
175-
graphHelper->graph->Cancel();
176-
if (!absStatus.ok()) {
177-
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
178-
// throw 42.2;
179-
}
180-
SPDLOG_ERROR("ER");
181-
graphHelper->graph.reset();
182-
SPDLOG_ERROR("ER");
183166
}
184167
SPDLOG_ERROR("ER ~GraphQueue:{}", (void*)this);
185168
}

src/mediapipe_internal/graphqueue.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ struct GraphHelper {
4848
// We need to know how the cleanup of graph helper happens if graph is not fully initialized eg. obseervers are not even installed FIXME @atobisze consider unique-ptr with custom lambda?
4949
bool initialized = false;
5050
// TODO FIXME move constr/=
51-
GraphHelper() = default;
51+
GraphHelper();
5252
GraphHelper(const GraphHelper&) = delete;
5353
GraphHelper& operator=(const GraphHelper&) = delete;
5454
GraphHelper(GraphHelper&& gh) :
@@ -57,6 +57,7 @@ struct GraphHelper {
5757
currentTimestamp(gh.currentTimestamp) {}
5858
GraphHelper& operator=(GraphHelper&& gh) = default;
5959
~GraphHelper();
60+
void closeGraph();
6061
};
6162
// we need to keep Graph alive during MP reload hence shared_ptr
6263
//class GraphQueue : public Queue<std::shared_ptr<::mediapipe::CalculatorGraph>> {
@@ -77,6 +78,7 @@ struct GraphIdGuard {
7778
const int id;
7879
std::shared_ptr<GraphHelper> gh;
7980
bool success = false;
81+
bool disarm = false; // FIXME @atobisze WA until steaming is implemented
8082
// TODO FIXME shared_ptr
8183
::mediapipe::CalculatorGraph& graph;
8284
GraphIdGuard(std::shared_ptr<GraphQueue>& queue) :

src/mediapipe_internal/mediapipegraphexecutor.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
#include "../python/python_backend.hpp"
3232
#endif
3333

34+
#include "../assert.hpp"
35+
3436
namespace ovms {
3537

3638
MediapipeGraphExecutor::MediapipeGraphExecutor(
@@ -58,7 +60,12 @@ MediapipeGraphExecutor::MediapipeGraphExecutor(
5860
pythonBackend(pythonBackend),
5961
currentStreamTimestamp(STARTING_TIMESTAMP),
6062
mediapipeServableMetricReporter(mediapipeServableMetricReporter),
61-
guard(std::move(guard)) {}
63+
guard(std::move(guard)) {
64+
ASSERT_ALWAYS(pythonNodeResourcesMap.get() != nullptr);
65+
ASSERT_ALWAYS(llmNodeResourcesMap.get() != nullptr);
66+
SPDLOG_ERROR("XXX ER: {}", (void*)this->pythonNodeResourcesMap.get());
67+
SPDLOG_ERROR("XXX ER: {}", (void*)this->llmNodeResourcesMap.get());
68+
}
6269

6370
const std::string MediapipeGraphExecutor::PYTHON_SIDE_PACKET_NAME = "py";
6471
const std::string MediapipeGraphExecutor::LLM_SESSION_PACKET_NAME = "llm";

src/mediapipe_internal/mediapipegraphexecutor.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ class MediapipeGraphExecutor {
275275
Status inferStream(const RequestType& req, ReaderWriterType& serverReaderWriter, ExecutionContext executionContext) {
276276
OVMS_PROFILE_FUNCTION();
277277
SPDLOG_DEBUG("Start MediapipeGraphExecutor::inferEx mediapipe graph: {} execution", this->name);
278+
this->guard.success = true; // FIXME @atobisze WA until fully implemented streaming
278279
std::mutex sendMutex;
279280
try {
280281
MetricGaugeGuard currentGraphs(this->mediapipeServableMetricReporter->currentGraphs.get());
@@ -290,10 +291,12 @@ class MediapipeGraphExecutor {
290291
};
291292
Timer<TIMER_END2> timer;
292293
timer.start(PROCESS);
294+
SPDLOG_ERROR("ER");
293295
{
294296
OVMS_PROFILE_SCOPE("Mediapipe graph installing packet observers");
295297
// Installing observers
296298
for (const auto& outputName : this->outputNames) {
299+
SPDLOG_ERROR("ER");
297300
MP_RETURN_ON_FAIL(graph.ObserveOutputStream(outputName, [&serverReaderWriter, &sendMutex, &outputName, &executionContext, this](const ::mediapipe::Packet& packet) -> absl::Status {
298301
OVMS_PROFILE_SCOPE("Mediapipe Packet Ready Callback");
299302
try {
@@ -322,16 +325,21 @@ class MediapipeGraphExecutor {
322325
}
323326
}
324327

328+
SPDLOG_ERROR("ER");
325329
std::map<std::string, mediapipe::Packet> inputSidePackets;
326330
{
327331
OVMS_PROFILE_SCOPE("Mediapipe graph creating input side packets");
332+
SPDLOG_ERROR("ER");
328333
OVMS_RETURN_ON_FAIL(deserializeInputSidePacketsFromFirstRequestImpl(inputSidePackets, req));
334+
SPDLOG_ERROR("ER: {}", (void*)this->pythonNodeResourcesMap.get());
329335
#if (PYTHON_DISABLE == 0)
330336
inputSidePackets[PYTHON_SIDE_PACKET_NAME] = mediapipe::MakePacket<PythonNodeResourcesMap>(*this->pythonNodeResourcesMap)
331337
.At(STARTING_TIMESTAMP);
338+
SPDLOG_ERROR("ER");
332339
inputSidePackets[LLM_SESSION_PACKET_NAME] = mediapipe::MakePacket<GenAiServableMap>(*this->llmNodeResourcesMap).At(STARTING_TIMESTAMP);
333340
#endif
334341
}
342+
SPDLOG_ERROR("ER");
335343

336344
{
337345
OVMS_PROFILE_SCOPE("Mediapipe graph start run");

src/test/mediapipe/calculators/streaming_test_calculator.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ class AddOneSingleStreamTestCalculator : public CalculatorBase {
4646

4747
absl::Status Process(CalculatorContext* cc) final {
4848
LOG(INFO) << "AddOneSingleStreamTestCalculator::Process";
49+
if (cc->Inputs().NumEntries() == 0) {
50+
LOG(ERROR) << "exit 1";
51+
return tool::StatusStop();
52+
}
53+
for (const std::string& tag : cc->Inputs().GetTags()) {
54+
if (cc->Inputs().Tag(tag).IsEmpty()) {
55+
LOG(ERROR) << "OpenVINOInferenceCalculator expects all input packets at the same time for each call to Process()."; RET_CHECK(false); }
56+
}
4957
ov::Tensor input = cc->Inputs().Index(0).Get<ov::Tensor>();
5058
ov::Tensor output(input.get_element_type(), input.get_shape());
5159
for (size_t i = 0; i < input.get_byte_size() / sizeof(float); i++) {
@@ -85,6 +93,14 @@ class AddOne3CycleIterationsTestCalculator : public CalculatorBase {
8593

8694
absl::Status Process(CalculatorContext* cc) final {
8795
LOG(INFO) << "AddOne3CycleIterationsTestCalculator::Process";
96+
if (cc->Inputs().NumEntries() == 0) {
97+
LOG(ERROR) << "exit 1";
98+
return tool::StatusStop();
99+
}
100+
for (const std::string& tag : cc->Inputs().GetTags()) {
101+
if (cc->Inputs().Tag(tag).IsEmpty()) {
102+
LOG(ERROR) << "OpenVINOInferenceCalculator expects all input packets at the same time for each call to Process()."; RET_CHECK(false); }
103+
}
88104
if (++cycle_iteration > 3) {
89105
return absl::OkStatus();
90106
}
@@ -132,6 +148,14 @@ class AddNumbersMultiInputsOutputsTestCalculator : public CalculatorBase {
132148

133149
absl::Status Process(CalculatorContext* cc) final {
134150
LOG(INFO) << "AddNumbersMultiInputsOutputsTestCalculator::Process";
151+
if (cc->Inputs().NumEntries() == 0) {
152+
LOG(ERROR) << "exit 1";
153+
return tool::StatusStop();
154+
}
155+
for (const std::string& tag : cc->Inputs().GetTags()) {
156+
if (cc->Inputs().Tag(tag).IsEmpty()) {
157+
LOG(ERROR) << "OpenVINOInferenceCalculator expects all input packets at the same time for each call to Process()."; RET_CHECK(false); }
158+
}
135159
ov::Tensor input1 = cc->Inputs().Index(0).Get<ov::Tensor>();
136160
LOG(INFO) << "AddNumbersMultiInputsOutputsTestCalculator::Process";
137161
ov::Tensor input2 = cc->Inputs().Index(1).Get<ov::Tensor>();
@@ -200,6 +224,14 @@ class AddSidePacketToSingleStreamTestCalculator : public CalculatorBase {
200224

201225
absl::Status Process(CalculatorContext* cc) final {
202226
LOG(INFO) << "AddSidePacketToSingleStreamTestCalculator::Process";
227+
if (cc->Inputs().NumEntries() == 0) {
228+
LOG(ERROR) << "exit 1";
229+
return tool::StatusStop();
230+
}
231+
for (const std::string& tag : cc->Inputs().GetTags()) {
232+
if (cc->Inputs().Tag(tag).IsEmpty()) {
233+
LOG(ERROR) << "OpenVINOInferenceCalculator expects all input packets at the same time for each call to Process()."; RET_CHECK(false); }
234+
}
203235
ov::Tensor input = cc->Inputs().Index(0).Get<ov::Tensor>();
204236
ov::Tensor output(input.get_element_type(), input.get_shape());
205237
int64_t valueToAdd = cc->InputSidePackets().Index(0).Get<int64_t>();

0 commit comments

Comments
 (0)