From 53bf65f0bb07b024ba8876491ad69cf5a0a1f3e1 Mon Sep 17 00:00:00 2001 From: atobisze Date: Thu, 16 Jan 2025 15:07:24 +0100 Subject: [PATCH] Check --- src/BUILD | 1 + src/mediapipe_internal/mediapipegraphdefinition.cpp | 5 +++++ src/mediapipe_internal/mediapipegraphdefinition.hpp | 2 ++ src/mediapipe_internal/mediapipegraphexecutor.hpp | 9 +++++++++ 4 files changed, 17 insertions(+) diff --git a/src/BUILD b/src/BUILD index 74a1ce2459..dcb811bf77 100644 --- a/src/BUILD +++ b/src/BUILD @@ -701,6 +701,7 @@ cc_library( "@mediapipe//mediapipe/calculators/tensor:image_to_tensor_calculator", "@mediapipe//mediapipe/modules/holistic_landmark:holistic_landmark_cpu", "libovmsmediapipe_utils", + "@mediapipe//mediapipe/framework:thread_pool_executor", "@mediapipe//mediapipe/calculators/geti/inference:inference_calculators", "@mediapipe//mediapipe/calculators/geti/utils:utils", "@mediapipe//mediapipe/calculators/geti/utils:emptylabel_calculators", diff --git a/src/mediapipe_internal/mediapipegraphdefinition.cpp b/src/mediapipe_internal/mediapipegraphdefinition.cpp index d3bd6b8bb4..426fa36f90 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.cpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.cpp @@ -191,6 +191,10 @@ MediapipeGraphDefinition::MediapipeGraphDefinition(const std::string name, reporter(std::make_unique(metricConfig, registry, name)) { mgconfig = config; passKfsRequestFlag = false; + if (!sharedThreadPool) { + SPDLOG_ERROR("Created shared Thread Pool XXX"); + sharedThreadPool = std::make_shared(std::thread::hardware_concurrency()); // TODO FIXME should be in MP factory + } } Status MediapipeGraphDefinition::createInputsInfo() { @@ -474,4 +478,5 @@ Status MediapipeGraphDefinition::initializeNodes() { } return StatusCode::OK; } +std::shared_ptr sharedThreadPool; // TODO FIXME should be in MP factory } // namespace ovms diff --git a/src/mediapipe_internal/mediapipegraphdefinition.hpp b/src/mediapipe_internal/mediapipegraphdefinition.hpp index cd86bdb7f9..0ae2766284 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.hpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.hpp @@ -37,6 +37,7 @@ #include "mediapipe/framework/calculator_graph.h" #include "mediapipe/framework/port/parse_text_proto.h" #include "mediapipe/framework/port/status.h" +#include "mediapipe/framework/thread_pool_executor.h" #pragma GCC diagnostic pop #pragma warning(pop) @@ -57,6 +58,7 @@ class GenAiServable; using PythonNodeResourcesMap = std::unordered_map>; using GenAiServableMap = std::unordered_map>; +extern std::shared_ptr sharedThreadPool; class MediapipeGraphDefinition { friend MediapipeGraphDefinitionUnloadGuard; diff --git a/src/mediapipe_internal/mediapipegraphexecutor.hpp b/src/mediapipe_internal/mediapipegraphexecutor.hpp index c785adb9a8..d5fc28ed07 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.hpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.hpp @@ -24,6 +24,7 @@ #include #include "../execution_context.hpp" +#include "../logging.hpp" #include "../model_metric_reporter.hpp" #include "../profiler.hpp" #include "../status.hpp" @@ -109,6 +110,9 @@ class MediapipeGraphExecutor { MetricCounterGuard failedRequestsGuard(this->mediapipeServableMetricReporter->getRequestsMetric(executionContext, false)); MetricGaugeGuard currentGraphsGuard(this->mediapipeServableMetricReporter->currentGraphs.get()); ::mediapipe::CalculatorGraph graph; + SPDLOG_ERROR("SetExecutor XXX"); + std::ignore = graph.SetExecutor("", sharedThreadPool); // TODO FIXME + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXbegin", this->name); MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR); enum : unsigned int { PROCESS, @@ -116,6 +120,7 @@ class MediapipeGraphExecutor { }; Timer timer; timer.start(PROCESS); + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXend", this->name); std::unordered_map outputPollers; for (auto& name : this->outputNames) { if (name.empty()) { @@ -136,7 +141,9 @@ class MediapipeGraphExecutor { inputSidePackets[PYTHON_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->pythonNodeResourcesMap).At(STARTING_TIMESTAMP); inputSidePackets[LLM_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->llmNodeResourcesMap).At(STARTING_TIMESTAMP); #endif + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXbegin", this->name); MP_RETURN_ON_FAIL(graph.StartRun(inputSidePackets), std::string("start MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_START_ERROR); + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXend", this->name); ::mediapipe::Packet packet; std::set outputPollersWithReceivedPacket; @@ -231,7 +238,9 @@ class MediapipeGraphExecutor { { OVMS_PROFILE_SCOPE("Mediapipe graph initialization"); // Init + SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX", this->name); MP_RETURN_ON_FAIL(graph.Initialize(this->config), "graph initialization", StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR); + SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX ended", this->name); } enum : unsigned int { PROCESS,