Skip to content

Commit 35a663b

Browse files
authored
Introduce VLM pipeline support (#3095)
Implement VisualLanguageModelServable and its dependecies to support multimodal pipelines in HttpLLMCalculator.
1 parent 74d4527 commit 35a663b

File tree

17 files changed

+430
-19
lines changed

17 files changed

+430
-19
lines changed

demos/common/export_models/export_model.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def add_common_arguments(parser):
3838
subparsers = parser.add_subparsers(help='subcommand help', required=True, dest='task')
3939
parser_text = subparsers.add_parser('text_generation', help='export model for chat and completion endpoints')
4040
add_common_arguments(parser_text)
41+
parser_text.add_argument('--pipeline_type', default='CONTINUOUS_BATCHING', help='Type of the pipeline to be used. Can be either CONTINUOUS_BATCHING or VISUAL_LANGUAGE_MODEL.', dest='pipeline_type')
4142
parser_text.add_argument('--kv_cache_precision', default=None, choices=["u8"], help='u8 or empty (model default). Reduced kv cache precision to u8 lowers the cache size consumption.', dest='kv_cache_precision')
4243
parser_text.add_argument('--enable_prefix_caching', action='store_true', help='This algorithm is used to cache the prompt tokens.', dest='enable_prefix_caching')
4344
parser_text.add_argument('--disable_dynamic_split_fuse', action='store_false', help='The maximum number of tokens that can be batched together.', dest='dynamic_split_fuse')
@@ -143,6 +144,7 @@ def add_common_arguments(parser):
143144
}
144145
node_options: {
145146
[type.googleapis.com / mediapipe.LLMCalculatorOptions]: {
147+
pipeline_type: {{pipeline_type}},
146148
models_path: "{{model_path}}",
147149
plugin_config: '{ {% if kv_cache_precision %}"KV_CACHE_PRECISION": "{{kv_cache_precision}}"{% endif %}}',
148150
enable_prefix_caching: {% if not enable_prefix_caching %}false{% else %} true{% endif%},

prepare_llm_models.sh

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ if [ -z "$1" ]; then
2020
exit 1
2121
fi
2222

23+
CB_MODEL="facebook/opt-125m"
2324
EMBEDDING_MODEL="thenlper/gte-small"
2425
RERANK_MODEL="BAAI/bge-reranker-base"
2526
VLM_MODEL="OpenGVLab/InternVL2-1B"
26-
if [ -d "$1/facebook/opt-125m" ] && [ -d "$1/$EMBEDDING_MODEL" ] && [ -d "$1/$RERANK_MODEL" ]; then
27+
if [ -d "$1/$CB_MODEL" ] && [ -d "$1/$EMBEDDING_MODEL" ] && [ -d "$1/$RERANK_MODEL" ] && [ -d "$1/$VLM_MODEL" ]; then
2728
echo "Models directory $1 exists. Skipping downloading models."
2829
exit 0
2930
fi
@@ -44,16 +45,16 @@ else
4445
fi
4546
mkdir -p $1
4647

47-
if [ -d "$1/facebook/opt-125m" ]; then
48-
echo "Models directory $1/facebook/opt-125m exists. Skipping downloading models."
48+
if [ -d "$1/$CB_MODEL" ]; then
49+
echo "Models directory $1/$CB_MODEL exists. Skipping downloading models."
4950
else
50-
python3 demos/common/export_models/export_model.py text_generation --source_model facebook/opt-125m --weight-format int8 --model_repository_path $1
51+
python3 demos/common/export_models/export_model.py text_generation --source_model "$CB_MODEL" --weight-format int8 --model_repository_path $1
5152
fi
5253

5354
if [ -d "$1/$VLM_MODEL" ]; then
5455
echo "Models directory $1/$VLM_MODEL exists. Skipping downloading models."
5556
else
56-
python3 demos/common/export_models/export_model.py text_generation --source_model "$VLM_MODEL" --weight-format int4 --kv_cache_precision u8 --model_repository_path $1
57+
python3 demos/common/export_models/export_model.py text_generation --pipeline_type VISUAL_LANGUAGE_MODEL --source_model "$VLM_MODEL" --weight-format int4 --kv_cache_precision u8 --model_repository_path $1
5758
fi
5859

5960
if [ -d "$1/$EMBEDDING_MODEL" ]; then

src/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,6 +1896,7 @@ cc_test(
18961896
"test/llmnode_test.cpp",
18971897
"test/llmtemplate_test.cpp",
18981898
"test/text_streamer_test.cpp",
1899+
"test/llm/visual_language_model/complete_flow_test.cpp",
18991900
],
19001901
"//:disable_python" : [],
19011902
}),

src/llm/BUILD

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,10 @@ cc_library(
9393
cc_library(
9494
name = "genai_servables",
9595
hdrs = ["servable.hpp", "servable_initializer.hpp",
96-
"continuous_batching/servable.hpp", "continuous_batching/llm_executor.hpp", "continuous_batching/servable_initializer.hpp"],
97-
srcs = ["servable.cpp", "servable_initializer.cpp", "continuous_batching/servable.cpp", "continuous_batching/servable_initializer.cpp"],
96+
"continuous_batching/servable.hpp", "continuous_batching/llm_executor.hpp", "continuous_batching/servable_initializer.hpp",
97+
"visual_language_model/servable.hpp"],
98+
srcs = ["servable.cpp", "servable_initializer.cpp", "continuous_batching/servable.cpp", "continuous_batching/servable_initializer.cpp",
99+
"visual_language_model/servable.cpp"],
98100
deps = [
99101
"//third_party:openvino",
100102
"@mediapipe//mediapipe/framework:calculator_framework",

src/llm/apis/openai_completions.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,14 @@ absl::Status OpenAIChatCompletionsHandler::parseMessages() {
139139
auto& obj = it->value.GetArray()[i];
140140
if (!obj.IsObject())
141141
return absl::InvalidArgumentError("Message is not a JSON object");
142+
// Add new message to chat history
143+
request.chatHistory.push_back({});
142144
for (auto member = obj.MemberBegin(); member != obj.MemberEnd(); member++) {
143145
if (!member->name.IsString())
144146
return absl::InvalidArgumentError("Invalid message structure");
145147
if (member->value.IsString()) {
148+
// Add new field to the last message in history
149+
request.chatHistory.back().insert({member->name.GetString(), member->value.GetString()});
146150
continue;
147151
} else {
148152
if (member->name.GetString() == std::string("content") && member->value.IsArray()) {
@@ -191,7 +195,13 @@ absl::Status OpenAIChatCompletionsHandler::parseMessages() {
191195
return absl::InvalidArgumentError("Unsupported content type");
192196
}
193197
}
198+
// Pulling out text from nested structure to the "content" field for text and erase whole "content" value for image data
199+
// since images are stored separately in request.images
194200
member->value = contentText;
201+
// Add new field to the last message in history if content is text
202+
if (member->value.IsString()) {
203+
request.chatHistory.back().insert({member->name.GetString(), member->value.GetString()});
204+
}
195205
} else {
196206
return absl::InvalidArgumentError("Invalid message structure - content should be string or array");
197207
}
@@ -214,6 +224,10 @@ const std::vector<ov::Tensor> OpenAIChatCompletionsHandler::getImages() const {
214224
return request.images;
215225
}
216226

227+
const ov::genai::ChatHistory& OpenAIChatCompletionsHandler::getChatHistory() const {
228+
return request.chatHistory;
229+
}
230+
217231
absl::Status OpenAIChatCompletionsHandler::parseChatCompletionsPart(uint32_t maxTokensLimit) {
218232
// messages: [{role: content}, {role: content}, ...]; required
219233
auto status = parseMessages();

src/llm/apis/openai_completions.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ struct CompletionUsageStatistics {
6363

6464
// Class that maps OpenAI request content and provides methods to create GenerationConfig from it.
6565
struct OpenAIChatCompletionsRequest {
66+
ov::genai::ChatHistory chatHistory;
6667
std::string processedJson;
6768
std::vector<ov::Tensor> images;
6869
std::optional<std::string> prompt{std::nullopt};
@@ -189,6 +190,7 @@ class OpenAIChatCompletionsHandler {
189190
StreamOptions getStreamOptions() const;
190191
const std::string& getProcessedJson() const;
191192
const std::vector<ov::Tensor> getImages() const;
193+
const ov::genai::ChatHistory& getChatHistory() const;
192194

193195
bool isStream() const;
194196
std::string getModel() const;

src/llm/continuous_batching/servable.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ void ContinuousBatchingServable::notifyExecutorThread() {
4848
properties->llmExecutorWrapper->notifyNewRequestArrived();
4949
}
5050

51+
absl::Status ContinuousBatchingServable::addRequestToPipeline(std::shared_ptr<ContinuousBatchingServableExecutionContext>& executionContext) {
52+
executionContext->generationHandle = properties->pipeline->add_request(currentRequestId++, // to be removed from API?
53+
executionContext->inputIds,
54+
executionContext->apiHandler->createGenerationConfig());
55+
return absl::OkStatus();
56+
}
57+
5158
// Node resources interface start
5259
std::shared_ptr<GenAiServableExecutionContext> ContinuousBatchingServable::createExecutionContext() {
5360
return std::make_shared<ContinuousBatchingServableExecutionContext>();
@@ -63,9 +70,10 @@ absl::Status ContinuousBatchingServable::scheduleExecution(std::shared_ptr<GenAi
6370
return absl::CancelledError();
6471
}
6572

66-
cbExecutionContext->generationHandle = properties->pipeline->add_request(currentRequestId++, // to be removed from API?
67-
cbExecutionContext->inputIds,
68-
cbExecutionContext->apiHandler->createGenerationConfig());
73+
auto status = addRequestToPipeline(cbExecutionContext);
74+
if (!status.ok()) {
75+
return status;
76+
}
6977

7078
cbExecutionContext->payload.client->registerDisconnectionCallback([genHandle = cbExecutionContext->generationHandle]() {
7179
genHandle->stop();

src/llm/continuous_batching/servable.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,19 @@ struct ContinuousBatchingServableProperties : public GenAiServableProperties {
3737
};
3838

3939
class ContinuousBatchingServable : public GenAiServable {
40-
std::shared_ptr<ContinuousBatchingServableProperties> properties;
41-
4240
protected:
41+
std::shared_ptr<ContinuousBatchingServableProperties> properties;
4342
void notifyExecutorThread();
4443

4544
public:
4645
ContinuousBatchingServable() {
4746
properties = std::make_shared<ContinuousBatchingServableProperties>();
4847
}
4948

49+
// addRequestToPipeline implementation can be specific for different servables with Continuous Batching engine
50+
// This method is used in scheduleExecution and MUST fill generationHandle in executionContext
51+
virtual absl::Status addRequestToPipeline(std::shared_ptr<ContinuousBatchingServableExecutionContext>& executionContext);
52+
5053
// Interface methods
5154
std::shared_ptr<GenAiServableExecutionContext> createExecutionContext() override;
5255
std::shared_ptr<GenAiServableProperties> getProperties() override;

src/llm/continuous_batching/servable_initializer.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ Status ContinuousBatchingServableInitializer::initializeExperimental(std::shared
6868
if (!status.ok()) {
6969
return status;
7070
}
71-
72-
servable = std::make_shared<ContinuousBatchingServable>();
7371
auto properties = std::static_pointer_cast<ContinuousBatchingServableProperties>(servable->getProperties());
7472
properties->modelsPath = getBasePath();
7573

@@ -132,8 +130,6 @@ Status ContinuousBatchingServableInitializer::initialize(std::shared_ptr<GenAiSe
132130
if (!status.ok()) {
133131
return status;
134132
}
135-
136-
servable = std::make_shared<ContinuousBatchingServable>();
137133
auto properties = std::static_pointer_cast<ContinuousBatchingServableProperties>(servable->getProperties());
138134

139135
properties->modelsPath = getBasePath();

src/llm/servable_initializer.cpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@
3232
#include "../logging.hpp"
3333
#include "../mediapipe_internal/mediapipe_utils.hpp"
3434
#include "../status.hpp"
35+
#include "continuous_batching/servable.hpp"
3536
#include "continuous_batching/servable_initializer.hpp"
36-
#include "servable.hpp"
3737
#include "servable_initializer.hpp"
38+
#include "visual_language_model/servable.hpp"
3839

3940
namespace ovms {
4041

@@ -146,19 +147,34 @@ Status initializeGenAiServable(std::shared_ptr<GenAiServable>& servable, const :
146147
Status status;
147148
if (nodeOptions.has_models_path()) { // Stable initialization
148149
if (nodeOptions.pipeline_type() == mediapipe::LLMCalculatorOptions::CONTINUOUS_BATCHING) {
150+
SPDLOG_LOGGER_INFO(modelmanager_logger, "Initializing Continuous Batching servable");
149151
ContinuousBatchingServableInitializer cbServableInitializer;
152+
servable = std::make_shared<ContinuousBatchingServable>();
150153
status = cbServableInitializer.initialize(servable, nodeOptions, graphPath);
151154
if (status != StatusCode::OK) {
152155
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Error during LLM node resources initialization: {}", status.string());
153156
return status;
154157
}
158+
} else if (nodeOptions.pipeline_type() == mediapipe::LLMCalculatorOptions::VISUAL_LANGUAGE_MODEL) {
159+
// VLM uses CB engine, so initialization part is shared (both servables share the same properties),
160+
// therefore we can use CB servable initializer to initialize VLM servable
161+
SPDLOG_LOGGER_INFO(modelmanager_logger, "Initializing Visual Language Model servable");
162+
ContinuousBatchingServableInitializer cbServableInitializer;
163+
servable = std::make_shared<VisualLanguageModelServable>();
164+
status = cbServableInitializer.initialize(servable, nodeOptions, graphPath);
165+
if (status != StatusCode::OK) {
166+
SPDLOG_LOGGER_ERROR(modelmanager_logger, "Error during LLM node resources initialization: {}", status.string());
167+
return status;
168+
}
169+
155170
} else {
156171
SPDLOG_LOGGER_ERROR(modelmanager_logger, "LLM node options do not contain any recognized pipeline configuration.");
157172
return StatusCode::INTERNAL_ERROR;
158173
}
159174
} else {
160175
if (nodeOptions.has_continuous_batching_pipeline_config()) { // Experimental initialization
161176
ContinuousBatchingServableInitializer cbServableInitializer;
177+
servable = std::make_shared<ContinuousBatchingServable>();
162178
status = cbServableInitializer.initializeExperimental(servable, nodeOptions, graphPath);
163179
} else {
164180
SPDLOG_LOGGER_ERROR(modelmanager_logger, "LLM node options do not contain any recognized pipeline configuration.");

0 commit comments

Comments
 (0)