Skip to content

Commit 16c38da

Browse files
committed
Stress MP tests work
1 parent 3fd0ab3 commit 16c38da

13 files changed

+293
-69
lines changed

src/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ cc_library(
154154
copts = [],
155155
visibility = ["//visibility:public",],
156156
linkopts = [],
157+
local_defines = COMMON_LOCAL_DEFINES,
157158
)
158159
cc_library(
159160
name = "mediapipe_internal_graphqueue",

src/mediapipe_internal/graphqueue.cpp

Lines changed: 87 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,57 +40,104 @@ const std::string PYTHON_SESSION_SIDE_PACKET_NAME = "py";
4040
const std::string LLM_SESSION_SIDE_PACKET_NAME = "llm";
4141
} // namespace
4242
namespace ovms {
43+
44+
std::shared_ptr<GraphHelper> constructGraphHelper(const ::mediapipe::CalculatorGraphConfig& config, PythonNodeResourcesMap& pythonNodeResourcesMap, GenAiServableMap& genAiServableMap) {
45+
auto gh = std::make_shared<GraphHelper>();
46+
SPDLOG_ERROR("ER GraphHelper():{}", (void*)gh.get());
47+
gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
48+
gh->currentTimestamp = ::mediapipe::Timestamp(0);
49+
50+
auto absStatus = gh->graph->Initialize(config);
51+
if (!absStatus.ok()) {
52+
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
53+
throw 42;
54+
}
55+
for (auto& name : config.output_stream()) {
56+
std::string streamName = getStreamName(name);
57+
gh->outStreamObservers[streamName] = std::shared_ptr<OutputStreamObserverI>(new NullOutputStreamObserver()); // TODO use at() FIXME
58+
auto& perGraphObserverFunctor = gh->outStreamObservers[streamName];
59+
absStatus = gh->graph->ObserveOutputStream(streamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }); // TODO FIXME throw?
60+
if (!absStatus.ok()) {
61+
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
62+
throw 42;
63+
}
64+
}
65+
std::map<std::string, mediapipe::Packet> inputSidePackets;
66+
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<PythonNodeResourcesMap>(pythonNodeResourcesMap)
67+
.At(STARTING_TIMESTAMP);
68+
inputSidePackets[LLM_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<GenAiServableMap>(genAiServableMap).At(STARTING_TIMESTAMP);
69+
for (auto [k, v] : inputSidePackets) {
70+
SPDLOG_ERROR("k:{} v", k);
71+
}
72+
SPDLOG_ERROR("ER");
73+
absStatus = gh->graph->StartRun(inputSidePackets);
74+
SPDLOG_ERROR("ER");
75+
if (!absStatus.ok()) {
76+
SPDLOG_ERROR("Input sidePackets size:{}, python map size:{} key:{} side packet name:{}", inputSidePackets.size(), pythonNodeResourcesMap.size(), pythonNodeResourcesMap.begin()->first, PYTHON_SESSION_SIDE_PACKET_NAME);
77+
SPDLOG_ERROR("ER issue:{}", absStatus.ToString());
78+
throw 42;
79+
}
80+
SPDLOG_ERROR("ER");
81+
return gh;
82+
}
83+
void GraphQueue::restoreStream(int streamId) {
84+
if (streamId < inferRequests.size()) {
85+
SPDLOG_ERROR("Cannot restore stream id > queue length");
86+
assert(streamId < inferRequests.size());
87+
}
88+
inferRequests[streamId] = constructGraphHelper(*this->config, *this->pythonNodeResourcesMap, *this->genAiServableMap);
89+
}
90+
4391
GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength) :
4492
Queue(streamsLength),
93+
config(std::make_shared<const ::mediapipe::CalculatorGraphConfig>(config)),
4594
pythonNodeResourcesMap(pythonNodeResourcesMap),
4695
genAiServableMap(genAiServableMap) {
47-
SPDLOG_ERROR("ER Constr graph queue:{}", (void*)this);
96+
SPDLOG_ERROR("ER GraphQueue():{}", (void*)this);
4897
inferRequests.reserve(streamsLength);
4998
// TODO FIXME split constructor to init to handle retCodes?
5099
for (auto i = 0; i < streamsLength; ++i) {
51-
auto gh = std::make_shared<GraphHelper>();
52-
gh->graph = std::make_shared<::mediapipe::CalculatorGraph>();
53-
gh->currentTimestamp = ::mediapipe::Timestamp(0);
54-
55-
auto absStatus = gh->graph->Initialize(config);
56-
if (!absStatus.ok()) {
57-
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
58-
throw 42;
59-
}
60-
for (auto& name : config.output_stream()) {
61-
std::string streamName = getStreamName(name);
62-
gh->outStreamObservers[streamName] = std::shared_ptr<OutputStreamObserverI>(new NullOutputStreamObserver()); // TODO use at() FIXME
63-
auto& perGraphObserverFunctor = gh->outStreamObservers[streamName];
64-
absStatus = gh->graph->ObserveOutputStream(streamName, [&perGraphObserverFunctor](const ::mediapipe::Packet& packet) -> absl::Status { return perGraphObserverFunctor->handlePacket(packet); }); // TODO FIXME throw?
65-
if (!absStatus.ok()) {
66-
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
67-
throw 42;
68-
}
69-
}
70-
std::map<std::string, mediapipe::Packet> inputSidePackets;
71-
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<PythonNodeResourcesMap>(*pythonNodeResourcesMap)
72-
.At(STARTING_TIMESTAMP);
73-
inputSidePackets[LLM_SESSION_SIDE_PACKET_NAME] = mediapipe::MakePacket<GenAiServableMap>(*genAiServableMap).At(STARTING_TIMESTAMP);
74-
for (auto [k, v] : inputSidePackets) {
75-
SPDLOG_ERROR("k:{} v", k);
76-
}
77100
SPDLOG_ERROR("ER");
78-
absStatus = gh->graph->StartRun(inputSidePackets);
101+
inferRequests.emplace_back(std::move(constructGraphHelper(*this->config, *pythonNodeResourcesMap, *genAiServableMap)));
79102
SPDLOG_ERROR("ER");
80-
if (!absStatus.ok()) {
81-
SPDLOG_ERROR("Input sidePackets size:{}, python map size:{} key:{} side packet name:{}", inputSidePackets.size(), pythonNodeResourcesMap->size(), pythonNodeResourcesMap->begin()->first, PYTHON_SESSION_SIDE_PACKET_NAME);
82-
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
83-
throw 42;
84-
}
103+
}
104+
}
85105

86-
SPDLOG_ERROR("ER");
87-
inferRequests.emplace_back(std::move(gh));
88-
SPDLOG_ERROR("ER");
106+
GraphHelper::~GraphHelper() {
107+
SPDLOG_TRACE("GraphHelper wait until idle graph");
108+
auto absStatus = this->graph->WaitUntilIdle();
109+
if (!absStatus.ok()) {
110+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
111+
// throw 42.2;
112+
}
113+
absStatus = this->graph->CloseAllPacketSources();
114+
if (!absStatus.ok()) {
115+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
116+
// throw "as";
89117
}
118+
SPDLOG_TRACE("GraphQueue wait until done graph");
119+
absStatus = this->graph->WaitUntilDone();
120+
if (!absStatus.ok()) {
121+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
122+
// throw 42.2;
123+
}
124+
this->graph->Cancel();
125+
if (!absStatus.ok()) {
126+
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
127+
// throw 42.2;
128+
}
129+
SPDLOG_ERROR("ER");
130+
this->graph.reset();
131+
SPDLOG_ERROR("ER ~GraphHelper:{}", (void*) this);
90132
}
133+
91134
GraphQueue::~GraphQueue() {
92-
SPDLOG_ERROR("ER Destroy graph queue:{}", (void*)this);
135+
SPDLOG_ERROR("ER ~GraphQueue:{}", (void*)this);
93136
for (auto& graphHelper : inferRequests) {
137+
SPDLOG_TRACE("GraphQueue wait until idle graph");
138+
graphHelper.reset();
139+
SPDLOG_ERROR("ER");
140+
continue;
94141
auto absStatus = graphHelper->graph->WaitUntilIdle();
95142
if (!absStatus.ok()) {
96143
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
@@ -101,6 +148,7 @@ GraphQueue::~GraphQueue() {
101148
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
102149
// throw "as";
103150
}
151+
SPDLOG_TRACE("GraphQueue wait until done graph");
104152
absStatus = graphHelper->graph->WaitUntilDone();
105153
if (!absStatus.ok()) {
106154
SPDLOG_ERROR("ER issue:{} {}", absStatus.ToString(), (void*)this);
@@ -115,6 +163,7 @@ GraphQueue::~GraphQueue() {
115163
graphHelper->graph.reset();
116164
SPDLOG_ERROR("ER");
117165
}
118-
SPDLOG_ERROR("ER Destroy graph queue:{}", (void*)this);
166+
SPDLOG_ERROR("ER ~GraphQueue:{}", (void*)this);
119167
}
168+
// TODO FIXME @atobisze move to destructor
120169
} // namespace ovms

src/mediapipe_internal/graphqueue.hpp

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,40 +54,55 @@ struct GraphHelper {
5454
outStreamObservers(std::move(gh.outStreamObservers)),
5555
currentTimestamp(gh.currentTimestamp) {}
5656
GraphHelper& operator=(GraphHelper&& gh) = default;
57+
~GraphHelper();
5758
};
5859
// we need to keep Graph alive during MP reload hence shared_ptr
5960
//class GraphQueue : public Queue<std::shared_ptr<::mediapipe::CalculatorGraph>> {
6061
class GraphQueue : public Queue<std::shared_ptr<GraphHelper>> {
62+
const std::shared_ptr<const ::mediapipe::CalculatorGraphConfig> config;
6163
std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap;
6264
std::shared_ptr<GenAiServableMap> genAiServableMap;
6365

6466
public:
6567
GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::shared_ptr<PythonNodeResourcesMap> pythonNodeResourcesMap, std::shared_ptr<GenAiServableMap> genAiServableMap, int streamsLength);
6668
~GraphQueue();
69+
// FIXME @atobisze exception handling, make friend with guard? Need to ensure it is only called by someone having that id
70+
void restoreStream(int streamId);
6771
};
6872

6973
struct GraphIdGuard {
7074
std::weak_ptr<GraphQueue> weakQueue;
7175
const int id;
7276
std::shared_ptr<GraphHelper> gh;
77+
bool success = false;
7378
// TODO FIXME shared_ptr
7479
::mediapipe::CalculatorGraph& graph;
7580
GraphIdGuard(std::shared_ptr<GraphQueue>& queue) :
7681
weakQueue(queue),
82+
// TODO unloading graph will be blocked until all waiting requests will get their stream
7783
id(queue->getIdleStream().get()),
7884
gh((queue->getInferRequest(id))),
7985
graph(*gh->graph) {
80-
SPDLOG_ERROR("ER Guard construct this:{}", (void*)this);
86+
SPDLOG_ERROR("ER Guard construct this:{}, id:{}", (void*)this, this->id);
8187
}
8288
GraphIdGuard(GraphIdGuard&&) = default;
8389
GraphIdGuard(const GraphIdGuard&) = delete;
8490
~GraphIdGuard() {
8591
auto existingQueue = weakQueue.lock();
8692
SPDLOG_ERROR("ER DEstroy Guard begin qu:{}", (void*)existingQueue.get());
87-
if (existingQueue)
93+
if (existingQueue) {
94+
SPDLOG_ERROR("ER returning stream id:{}", this->id);
95+
if (!success) {
96+
SPDLOG_ERROR("ER restoring stream id:{}", this->id);
97+
existingQueue->restoreStream(this->id);
98+
}
8899
existingQueue->returnStream(this->id);
89-
SPDLOG_ERROR("ER Destroy Guard end qu:{}", (void*)existingQueue.get());
90-
SPDLOG_ERROR("ER Guard destroy this:{}", (void*)this);
100+
SPDLOG_ERROR("ER returned stream id:{}", this->id);
101+
} else {
102+
SPDLOG_ERROR("ER XXX nonexistingqueue");
103+
}
104+
//SPDLOG_ERROR("ER Destroy Guard end qu:{}", (void*)existingQueue.get());
105+
SPDLOG_ERROR("ER Guard destroy this:{}, graph:{}, ghCount:{}", (void*)this, (void*)&graph, gh.use_count());
91106
}
92107
};
93108
} // namespace ovms

src/mediapipe_internal/mediapipefactory.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,15 @@ Status MediapipeFactory::createDefinition(const std::string& pipelineName,
6969
}
7070
std::shared_ptr<MediapipeGraphDefinition> graphDefinition = std::make_shared<MediapipeGraphDefinition>(pipelineName, config, manager.getMetricRegistry(), &manager.getMetricConfig(), pythonBackend);
7171
auto stat = graphDefinition->validate(manager);
72+
SPDLOG_ERROR("ER");
7273
if (stat.getCode() == StatusCode::MEDIAPIPE_GRAPH_NAME_OCCUPIED) {
7374
return stat;
7475
}
76+
SPDLOG_ERROR("ER");
7577
std::unique_lock lock(definitionsMtx);
78+
SPDLOG_ERROR("ER");
7679
definitions.insert({pipelineName, std::move(graphDefinition)});
80+
SPDLOG_ERROR("ER");
7781
return stat;
7882
}
7983

@@ -113,6 +117,7 @@ Status MediapipeFactory::create(std::shared_ptr<MediapipeGraphExecutor>& pipelin
113117
return StatusCode::MEDIAPIPE_DEFINITION_NAME_MISSING;
114118
}
115119
auto& definition = *definitions.at(name);
120+
lock.unlock();
116121
return definition.create(pipeline);
117122
}
118123

0 commit comments

Comments
 (0)