From f4c83f276a66705ce22aee2439388bc50840a73c Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Thu, 12 Sep 2024 03:01:40 +0000 Subject: [PATCH 01/13] Add back 24.05 response sender path --- src/pb_stub.cc | 73 ++++++++++++--- src/pb_stub.h | 4 +- src/python_be.cc | 212 ++++++++++++++++++++++++++++++++++++++++-- src/response_sender.h | 4 +- 4 files changed, 269 insertions(+), 24 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 007e7f29..a4e051e9 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -657,11 +657,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) IPCMessage::Create(shm_pool_, false /* Inline response */); execute_response->Command() = PYTHONSTUB_ExecuteResponse; - AllocatedSharedMemory response_batch = - shm_pool_->Construct(); - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); - execute_response->Args() = response_batch.handle_; + std::optional> response_batch; bool has_exception = false; std::string error_string; std::unique_ptr error_string_shm; @@ -669,11 +665,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); }); ScopedDefer _( [this, &execute_response] { SendIPCMessage(execute_response); }); - + py::object execute_return; try { - response_batch_shm_ptr->has_error = false; - response_batch_shm_ptr->is_error_set = false; - if (!py::hasattr(model_instance_, "execute")) { std::string message = "Python model " + model_context_.PythonModelPath() + " does not implement `execute` method."; @@ -683,7 +676,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { NVTX_RANGE(nvtx_, "PyExecute " + name_); - py::object execute_return = + execute_return = model_instance_.attr("execute")(py_request_list); bool is_coroutine = py::module::import("asyncio") @@ -696,10 +689,10 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } else { py::object coroutine_return = RunCoroutine(execute_return, false /* in_background */); - ProcessReturnedResponses(py_request_list, coroutine_return); + ProcessReturnedResponses(py_request_list, coroutine_return, response_batch); } } else { - ProcessReturnedResponses(py_request_list, execute_return); + ProcessReturnedResponses(py_request_list, execute_return, response_batch); } } } @@ -719,6 +712,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) "', message: ") + error_string; LOG_ERROR << err_message.c_str(); + if (!response_batch) { + response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); + } + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); + + response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); @@ -732,11 +731,35 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) request->GetResponseSender()->Close(); } } + + if (!response_batch) { + response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); + ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + response_batch_shm_ptr->batch_size = 0; + } + ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + response_batch_shm_ptr->has_error = false; + response_batch_shm_ptr->is_error_set = false; + execute_response->Args() = response_batch.value().handle_; + _.Complete(); + execute_finalize.Complete(); +} + +void +Stub::ProcessResponse(InferResponse* response) +{ + response->SaveToSharedMemory(shm_pool_, false /* copy_gpu */); + + for (auto& output_tensor : response->OutputTensors()) { + if (!output_tensor->IsCPU()) { + gpu_tensors_.push_back(output_tensor); + } + } } void Stub::ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj) + py::list py_requests, py::object py_responses_obj, std::optional>& response_batch) { // Return if there is nothing to process. if (py::isinstance(py_responses_obj)) { @@ -784,12 +807,32 @@ Stub::ProcessReturnedResponses( "return list, found type '" + std::string(py::str(py_responses[i].get_type())) + "'."); } + std::shared_ptr response = py_responses[i].cast>(); - request->GetResponseSender()->Send( - response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } + response_batch = std::move(shm_pool_->Construct( + requests_size * sizeof(bi::managed_external_buffer::handle_t) + + sizeof(ResponseBatch))); + ResponseBatch* response_batch_shm_ptr = + reinterpret_cast(response_batch.value().data_.get()); + + bi::managed_external_buffer::handle_t* responses_shm_handle = + reinterpret_cast( + response_batch.value().data_.get() + sizeof(ResponseBatch)); + + for (size_t i = 0; i < responses_size; i++) { + // Check the return type of execute function. + InferRequest* infer_request = py_requests[i].cast(); + InferResponse* infer_response = py_responses[i].cast(); + infer_response->PruneOutputTensors( + infer_request->RequestedOutputNames()); + ProcessResponse(infer_response); + responses_shm_handle[i] = infer_response->ShmHandle(); + } + response_batch_shm_ptr->batch_size = requests_size; } py::object diff --git a/src/pb_stub.h b/src/pb_stub.h index 9ed74d9a..85a2783a 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -254,7 +254,9 @@ class Stub { void ProcessRequests(RequestBatch* request_batch_shm_ptr); void ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj); + py::list py_requests, py::object py_responses_obj, std::optional>& response_batch); + + void ProcessResponse(InferResponse* response); py::object GetAsyncEventLoop(); diff --git a/src/python_be.cc b/src/python_be.cc index 761abdbf..78b306b9 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1229,7 +1229,7 @@ ModelInstanceState::ProcessRequests( ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest; ipc_message->Args() = request_batch.handle_; received_message_ = nullptr; - ScopedDefer _([this] { + ScopedDefer execute_finalize([this] { // Push a dummy message to signal the thread to terminate. Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE); }); @@ -1240,8 +1240,12 @@ ModelInstanceState::ProcessRequests( cv_.wait(guard, [this] { return received_message_ != nullptr; }); } - AllocatedSharedMemory response_batch = - Stub()->ShmPool()->Load(received_message_->Args()); + + AllocatedSharedMemory response_batch = Stub()->ShmPool()->Load(received_message_->Args()); + + ResponseBatch* response_batch_shm_ptr = + reinterpret_cast(response_batch.data_.get()); + received_message_.reset(); uint64_t compute_end_ns = 0; @@ -1249,10 +1253,10 @@ ModelInstanceState::ProcessRequests( reporter.SetComputeEndNs(compute_end_ns); reporter.SetBatchStatistics(total_batch_size); - if (response_batch.data_->has_error) { - if (response_batch.data_->is_error_set) { + if (response_batch_shm_ptr->has_error) { + if (response_batch_shm_ptr->is_error_set) { auto error = PbString::LoadFromSharedMemory( - Stub()->ShmPool(), response_batch.data_->error); + Stub()->ShmPool(), response_batch_shm_ptr->error); return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, error->String().c_str()); } @@ -1261,6 +1265,202 @@ ModelInstanceState::ProcessRequests( TRITONSERVER_ERROR_INTERNAL, "Failed to process the requests."); } + if (response_batch_shm_ptr->batch_size > 0) { + std::shared_ptr> responses( + new std::vector()); + responses->reserve(request_count); + for (size_t i = 0; i < request_count; i++) { + TRITONBACKEND_Response* response; + auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); + if (err == nullptr) { + responses->emplace_back(response); + } else { + responses->emplace_back(nullptr); + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response"); + TRITONSERVER_ErrorDelete(err); + } + } + bi::managed_external_buffer::handle_t* response_shm_handle = + reinterpret_cast( + response_batch.data_.get() + sizeof(ResponseBatch)); + + // If the output provided by the model is in GPU, we will pass the list of + // buffers provided by Triton to the stub process. + // bool has_gpu_output = false; + std::vector requires_deferred_callback; + + std::vector> shm_responses; + std::vector, void*>>> + gpu_output_buffers(request_count); + GPUBuffersHelper gpu_buffer_helper; + + for (uint32_t r = 0; r < request_count; ++r) { + NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); + TRITONBACKEND_Response* response = (*responses)[r]; + TRITONBACKEND_Request* request = requests[r]; + uint32_t requested_output_count = 0; + requires_deferred_callback.push_back(false); + + shm_responses.emplace_back(nullptr); + std::unique_ptr& infer_response = shm_responses.back(); + try { + if (pb_infer_requests[r]->ReleaseFlags() == + TRITONSERVER_REQUEST_RELEASE_RESCHEDULE) { + // For rescheduled requests, we do not need to send a response. + LOG_IF_ERROR( + TRITONBACKEND_ResponseDelete((*responses)[r]), + "failed to delete response"); + (*responses)[r] = nullptr; + continue; + } + infer_response = InferResponse::LoadFromSharedMemory( + Stub()->ShmPool(), response_shm_handle[r], + false /* open_cuda_handle */); + if (infer_response->HasError()) { + TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( + infer_response->Error()->Code(), + infer_response->Error()->Message().c_str()); + + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend( + (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), + "failed sending response"); + TRITONSERVER_ErrorDelete(err); + (*responses)[r] = nullptr; + + // Reset the release flags for the request. + pb_infer_requests[r]->SetReleaseFlags( + TRITONSERVER_REQUEST_RELEASE_ALL); + + // If has_error is true, we do not look at the response tensors. + continue; + } + } + catch (const PythonBackendException& pb_exception) { + TRITONSERVER_Error* err = TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, pb_exception.what()); + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend( + (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), + "failed sending response"); + TRITONSERVER_ErrorDelete(err); + (*responses)[r] = nullptr; + + // Reset the release flags for the request. + pb_infer_requests[r]->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL); + + continue; + } + + GUARDED_RESPOND_IF_ERROR( + responses, r, + TRITONBACKEND_RequestOutputCount(request, &requested_output_count)); + + std::set requested_output_names; + for (size_t j = 0; j < requested_output_count; ++j) { + const char* output_name; + GUARDED_RESPOND_IF_ERROR( + responses, r, + TRITONBACKEND_RequestOutputName(request, j, &output_name)); + requested_output_names.insert(output_name); + } + + bool require_deferred_callback = false; + +#ifdef TRITON_ENABLE_GPU + for (auto& output_tensor : infer_response->OutputTensors()) { + if (output_tensor->MemoryType() == TRITONSERVER_MEMORY_GPU) { + // Attempt to use the cuda shared memory pool for GPU tensor. + ShareCUDAMemoryPool(output_tensor->MemoryTypeId()); + } + } +#endif // TRITON_ENABLE_GPU + + gpu_output_buffers[r] = + std::vector, void*>>{}; + infer_response->Send( + response, CudaStream(), require_deferred_callback, + TRITONSERVER_RESPONSE_COMPLETE_FINAL, Stub()->ShmPool(), + gpu_buffer_helper, gpu_output_buffers[r], requested_output_names); + + requires_deferred_callback[r] = require_deferred_callback; + + if (requires_deferred_callback[r]) { + // has_gpu_output = true; + } + } + + // Finalize the execute. + execute_finalize.Complete(); + } + + // If the output tensor is in GPU, there will be a second round trip + // required for filling the GPU buffers provided by the main process. +// if (has_gpu_output) { +// ipc_message->Command() = +// PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; +// gpu_buffer_helper.Complete(Stub()->ShmPool()); +// ipc_message->Args() = gpu_buffer_helper.ShmHandle(); +// SendMessageAndReceiveResponse( +// ipc_message->ShmHandle(), response_message, restart, responses, +// requests, 0); + +// bool cuda_copy = false; + +// uint32_t response_index = 0; +// for (auto& gpu_output_buffer : gpu_output_buffers) { +// for (auto& buffer_memory_pair : gpu_output_buffer) { +// auto& pb_memory = buffer_memory_pair.first; +// void* pointer = buffer_memory_pair.second; +// bool cuda_used = false; + +// if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { +// GUARDED_RESPOND_IF_ERROR( +// responses, response_index, +// CopyBuffer( +// "Failed to copy the output tensor to buffer.", +// TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, +// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, +// CudaStream(), &cuda_used)); +// cuda_copy |= cuda_used; +// } else if ( +// (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && +// pb_memory->UseCUDASharedPool() && +// (pb_memory->DataPtr() != pointer)) { +// // If the data pointer from pb_memory is not the same as the +// // pointer, it means that the Triton-provided buffer is not used +// // during tensor transfer. Instead, an intermediate buffer that uses +// // CUDA shared memory pool is used. In this case, we need to copy +// // the data from the intermediate buffer back to the Triton-provided +// // buffer. +// GUARDED_RESPOND_IF_ERROR( +// responses, response_index, +// CopyBuffer( +// "Failed to copy the output tensor to buffer.", +// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), +// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), +// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, +// CudaStream(), &cuda_used)); +// cuda_copy |= cuda_used; +// } +// } +// response_index++; +// #ifdef TRITON_ENABLE_GPU +// if (cuda_copy) { +// cudaStreamSynchronize(stream_); +// } +// #endif // TRITON_ENABLE_GPU +// } +// } + +// bls_defer.Complete(); +// for (uint32_t r = 0; r < request_count; ++r) { +// if (requires_deferred_callback[r]) { +// shm_responses[r]->DeferredSendCallback(); +// } +// } +// } + return nullptr; // success } diff --git a/src/response_sender.h b/src/response_sender.h index 69f416c2..6b0258fd 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -46,13 +46,13 @@ class ResponseSender { ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled(); + void UpdateStateAndCounters( + const std::shared_ptr& response, const uint32_t flags); // Can be useful at stopping the model from sending any more responses. void Close(); private: - void UpdateStateAndCounters( - const std::shared_ptr& response, const uint32_t flags); void DeleteResponseFactory(); intptr_t request_address_; From b36b55d983925685244706fe3b176a3c1eaa02cd Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Mon, 16 Sep 2024 14:25:01 +0000 Subject: [PATCH 02/13] Improve perf --- src/infer_request.cc | 2 +- src/ipc_message.cc | 19 ++++++++++ src/ipc_message.h | 7 ++++ src/pb_stub.cc | 24 ++++++------ src/python_be.cc | 85 ++++++++++++++---------------------------- src/response_sender.cc | 14 +++---- 6 files changed, 76 insertions(+), 75 deletions(-) diff --git a/src/infer_request.cc b/src/infer_request.cc index 8a95b524..e5733662 100644 --- a/src/infer_request.cc +++ b/src/infer_request.cc @@ -484,7 +484,7 @@ InferRequest::Exec(const bool is_decoupled) { bi::scoped_lock lock{ *(ipc_message->ResponseMutex())}; - stub->SendIPCMessage(ipc_message); + stub->SendIPCUtilsMessage(ipc_message); ipc_message->ResponseCondition()->wait(lock); } diff --git a/src/ipc_message.cc b/src/ipc_message.cc index ea1dc5b0..1b813214 100644 --- a/src/ipc_message.cc +++ b/src/ipc_message.cc @@ -56,6 +56,19 @@ IPCMessage::Create( new IPCMessage(ipc_message_shm, response_mutex_shm, response_cond_shm)); } +std::unique_ptr +IPCMessage::Create(IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle) +{ + return std::unique_ptr(new IPCMessage(ipc_message_shm, message_handle)); +} + + AllocatedSharedMemory& +IPCMessage::GetAllocatedSharedMemory() +{ + return ipc_message_shm_; +} + std::unique_ptr IPCMessage::LoadFromSharedMemory( std::unique_ptr& shm_pool, @@ -133,4 +146,10 @@ IPCMessage::IPCMessage( ipc_message_handle_ = ipc_message_shm_.handle_; } +IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle) +{ + ipc_message_handle_ = handle; + ipc_message_shm_ptr_ = ipc_message_shm; +} + }}}; // namespace triton::backend::python diff --git a/src/ipc_message.h b/src/ipc_message.h index 8e762b8f..c7d0ae9d 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -97,6 +97,10 @@ class IPCMessage { static std::unique_ptr Create( const std::unique_ptr& shm_pool, bool inline_response); + + static std::unique_ptr + Create(IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle); static std::unique_ptr LoadFromSharedMemory( std::unique_ptr& shm_pool, bi::managed_external_buffer::handle_t message_handle); @@ -108,6 +112,7 @@ class IPCMessage { bi::interprocess_mutex* ResponseMutex(); bi::managed_external_buffer::handle_t& Args(); bi::managed_external_buffer::handle_t ShmHandle(); + AllocatedSharedMemory& GetAllocatedSharedMemory(); private: AllocatedSharedMemory ipc_message_shm_; @@ -129,6 +134,8 @@ class IPCMessage { AllocatedSharedMemory& ipc_message_shm, AllocatedSharedMemory& response_mutex_shm, AllocatedSharedMemory& response_cond_shm); + + IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle); }; }}}; // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index a4e051e9..e6c93214 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -653,9 +653,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { py::list py_request_list = LoadRequestsFromSharedMemory(request_batch_shm_ptr); - std::unique_ptr execute_response = - IPCMessage::Create(shm_pool_, false /* Inline response */); - execute_response->Command() = PYTHONSTUB_ExecuteResponse; + std::unique_ptr execute_response; + // IPCMessage::Create(shm_pool_, false /* Inline response */); std::optional> response_batch; bool has_exception = false; @@ -713,9 +712,9 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) error_string; LOG_ERROR << err_message.c_str(); if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); + response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; @@ -733,14 +732,17 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch)); - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->batch_size = 0; } - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get()); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->has_error = false; response_batch_shm_ptr->is_error_set = false; + execute_response = IPCMessage::Create(reinterpret_cast(response_batch.value().data_.get()), response_batch.value().handle_); execute_response->Args() = response_batch.value().handle_; + execute_response->InlineResponse() = false; + execute_response->Command() = PYTHONSTUB_ExecuteResponse; _.Complete(); execute_finalize.Complete(); } @@ -813,15 +815,15 @@ Stub::ProcessReturnedResponses( request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } - response_batch = std::move(shm_pool_->Construct( + response_batch = std::move(shm_pool_->Construct(sizeof(IPCMessageShm) + requests_size * sizeof(bi::managed_external_buffer::handle_t) + sizeof(ResponseBatch))); ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.value().data_.get()); + reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); bi::managed_external_buffer::handle_t* responses_shm_handle = reinterpret_cast( - response_batch.value().data_.get() + sizeof(ResponseBatch)); + response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); for (size_t i = 0; i < responses_size; i++) { // Check the return type of execute function. diff --git a/src/python_be.cc b/src/python_be.cc index 78b306b9..361e1401 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -290,8 +290,8 @@ ModelInstanceState::SaveRequestsToSharedMemory( request, &request_timeout)); std::unique_ptr infer_request; - TRITONBACKEND_ResponseFactory* factory_ptr; - RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + TRITONBACKEND_ResponseFactory* factory_ptr = nullptr; + // RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, @@ -322,8 +322,6 @@ ModelInstanceState::LaunchStubProcess() thread_pool_ = std::make_unique( model_state->StateForBackend()->thread_pool_size); - queue_monitor_thread_ = true; - queue_monitor_ = std::thread(&ModelInstanceState::MessageQueueMonitor, this); request_executor_ = std::make_unique( Stub()->ShmPool(), model_state->TritonServer()); @@ -685,44 +683,6 @@ ModelInstanceState::ExecuteBLSRequest( } } -void -ModelInstanceState::MessageQueueMonitor() -{ - while (queue_monitor_thread_) { - bi::managed_external_buffer::handle_t handle = - Stub()->ParentMessageQueue()->Pop(); - if (handle == DUMMY_MESSAGE) { - break; - } - std::unique_ptr message = - IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), handle); - - // Need to notify the model instance thread that the execute response has - // been received. - if (message->Command() == PYTHONSTUB_ExecuteResponse) { - std::lock_guard guard{mu_}; - received_message_ = std::move(message); - cv_.notify_one(); - } else if (message->Command() == PYTHONSTUB_ResponseSend) { - std::shared_ptr response_send_message = std::move(message); - std::packaged_task task([this, response_send_message] { - ResponseSendDecoupled(response_send_message); - }); - boost::asio::post(*thread_pool_, std::move(task)); - } else if ( - message->Command() == PYTHONSTUB_InferExecRequest || - message->Command() == PYTHONSTUB_InferStreamExecRequest) { - std::shared_ptr bls_execute = std::move(message); - std::packaged_task task([this, bls_execute] { - ExecuteBLSRequest( - bls_execute, - (bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest)); - }); - boost::asio::post(*thread_pool_, std::move(task)); - } - } -} - void ModelInstanceState::StubToParentMQMonitor() { @@ -769,6 +729,25 @@ ModelInstanceState::StubToParentMQMonitor() ProcessModelControlRequest(message); break; } + case PYTHONSTUB_ResponseSend: { + std::shared_ptr response_send_message = std::move(message); + std::packaged_task task([this, response_send_message] { + ResponseSendDecoupled(response_send_message); + }); + boost::asio::post(*thread_pool_, std::move(task)); + break; + } + case PYTHONSTUB_InferExecRequest: + case PYTHONSTUB_InferStreamExecRequest: { + std::shared_ptr bls_execute = std::move(message); + std::packaged_task task([this, bls_execute] { + ExecuteBLSRequest( + bls_execute, + (bls_execute->Command() == PYTHONSTUB_InferStreamExecRequest)); + }); + boost::asio::post(*thread_pool_, std::move(task)); + break; + } default: { LOG_MESSAGE( TRITONSERVER_LOG_ERROR, "Unexpected message type received."); @@ -1228,26 +1207,23 @@ ModelInstanceState::ProcessRequests( IPCMessage::Create(Stub()->ShmPool(), false /*inline_response*/)); ipc_message->Command() = PYTHONSTUB_CommandType::PYTHONSTUB_ExecuteRequest; ipc_message->Args() = request_batch.handle_; - received_message_ = nullptr; + ScopedDefer execute_finalize([this] { // Push a dummy message to signal the thread to terminate. Stub()->StubMessageQueue()->Push(DUMMY_MESSAGE); }); + std::unique_ptr response; { - std::unique_lock guard{mu_}; Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle()); - cv_.wait(guard, [this] { return received_message_ != nullptr; }); + bi::managed_external_buffer::handle_t response_message; + Stub()->ReceiveMessageFromStub(response_message); + response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); } - - - AllocatedSharedMemory response_batch = Stub()->ShmPool()->Load(received_message_->Args()); - + char* ipc_message_shm = reinterpret_cast(response->GetAllocatedSharedMemory().data_.get());; ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.data_.get()); + reinterpret_cast(ipc_message_shm + sizeof(IPCMessageShm)); - received_message_.reset(); - uint64_t compute_end_ns = 0; SET_TIMESTAMP(compute_end_ns); reporter.SetComputeEndNs(compute_end_ns); @@ -1282,7 +1258,7 @@ ModelInstanceState::ProcessRequests( } bi::managed_external_buffer::handle_t* response_shm_handle = reinterpret_cast( - response_batch.data_.get() + sizeof(ResponseBatch)); + ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); // If the output provided by the model is in GPU, we will pass the list of // buffers provided by Triton to the stub process. @@ -1390,8 +1366,6 @@ ModelInstanceState::ProcessRequests( } } - // Finalize the execute. - execute_finalize.Complete(); } // If the output tensor is in GPU, there will be a second round trip @@ -1610,7 +1584,6 @@ ModelInstanceState::~ModelInstanceState() Stub()->TerminateStub(); TerminateMonitor(); Stub()->ClearQueues(); - received_message_.reset(); Stub().reset(); } diff --git a/src/response_sender.cc b/src/response_sender.cc index 0a88fb6b..cef4e3a7 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -69,7 +69,7 @@ ResponseSender::ResponseSender( ResponseSender::~ResponseSender() { - DeleteResponseFactory(); + // DeleteResponseFactory(); } void @@ -172,7 +172,7 @@ ResponseSender::Send( { bi::scoped_lock guard{send_message_payload->mu}; - stub->SendIPCMessage(ipc_message); + stub->SendIPCUtilsMessage(ipc_message); while (!send_message_payload->is_stub_turn) { send_message_payload->cv.wait(guard); } @@ -248,7 +248,7 @@ ResponseSender::Send( } if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - DeleteResponseFactory(); + // DeleteResponseFactory(); } } @@ -270,10 +270,10 @@ ResponseSender::DeleteResponseFactory() { bool already_deleted = response_factory_deleted_.exchange(true); if (!already_deleted) { - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - stub->EnqueueCleanupId( - reinterpret_cast(response_factory_address_), - PYTHONSTUB_DecoupledResponseFactoryCleanup); + // std::unique_ptr& stub = Stub::GetOrCreateInstance(); + // stub->EnqueueCleanupId( + // reinterpret_cast(response_factory_address_), + // PYTHONSTUB_DecoupledResponseFactoryCleanup); } } From 3e9dcc562b69c8aa7d37672a3c1452a60cb66eb9 Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Mon, 16 Sep 2024 19:56:06 +0000 Subject: [PATCH 03/13] Fix cleanup --- src/python_be.cc | 199 ++++++++++++++++++++++++++--------------- src/python_be.h | 17 +++- src/response_sender.cc | 17 ++-- 3 files changed, 151 insertions(+), 82 deletions(-) diff --git a/src/python_be.cc b/src/python_be.cc index 361e1401..1c6c6505 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -291,7 +291,7 @@ ModelInstanceState::SaveRequestsToSharedMemory( std::unique_ptr infer_request; TRITONBACKEND_ResponseFactory* factory_ptr = nullptr; - // RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); + RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request)); infer_request = std::make_unique( id, correlation_id, pb_input_tensors, requested_output_names, @@ -1009,6 +1009,62 @@ ModelInstanceState::ProcessModelControlRequest( }); } +void +ModelInstanceState::SendMessageToStub( + bi::managed_external_buffer::handle_t message) +{ + Stub()->StubMessageQueue()->Push(message); +} + +void +ModelInstanceState::SendMessageAndReceiveResponse( + bi::managed_external_buffer::handle_t message, + bi::managed_external_buffer::handle_t& response, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count) +{ + SendMessageToStub(message); + + bi::managed_external_buffer::handle_t response_message; + auto error = Stub()->ReceiveMessageFromStub(response_message); + if (error != nullptr) { + RespondErrorToAllRequests( + TRITONSERVER_ErrorMessage(error), responses, requests, request_count); + + return; + } + + response = response_message; +} + +void +ModelInstanceState::RespondErrorToAllRequests( + const char* message, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count) +{ + for (uint32_t r = 0; r < request_count; ++r) { + if ((*responses)[r] == nullptr) + continue; + + std::string err_message = + std::string( + "Failed to process the request(s) for model instance '" + Name() + + "', message: ") + + message; + + TRITONSERVER_Error* err = + TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, err_message.c_str()); + LOG_IF_ERROR( + TRITONBACKEND_ResponseSend( + (*responses)[r], TRITONSERVER_RESPONSE_COMPLETE_FINAL, err), + "failed sending response"); + + (*responses)[r] = nullptr; + TRITONSERVER_ErrorDelete(err); + } +} + void ModelInstanceState::StartMonitor() { @@ -1164,6 +1220,12 @@ ModelInstanceState::ResponseSendDecoupled( SetErrorForResponseSendMessage( send_message_payload, WrapTritonErrorInSharedPtr(error), error_message); } + + if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory(reinterpret_cast(response_factory)); + } } TRITONSERVER_Error* @@ -1265,6 +1327,7 @@ ModelInstanceState::ProcessRequests( // bool has_gpu_output = false; std::vector requires_deferred_callback; + bool has_gpu_output = false; std::vector> shm_responses; std::vector, void*>>> gpu_output_buffers(request_count); @@ -1362,78 +1425,75 @@ ModelInstanceState::ProcessRequests( requires_deferred_callback[r] = require_deferred_callback; if (requires_deferred_callback[r]) { - // has_gpu_output = true; + has_gpu_output = true; } } - } - // If the output tensor is in GPU, there will be a second round trip // required for filling the GPU buffers provided by the main process. -// if (has_gpu_output) { -// ipc_message->Command() = -// PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; -// gpu_buffer_helper.Complete(Stub()->ShmPool()); -// ipc_message->Args() = gpu_buffer_helper.ShmHandle(); -// SendMessageAndReceiveResponse( -// ipc_message->ShmHandle(), response_message, restart, responses, -// requests, 0); - -// bool cuda_copy = false; - -// uint32_t response_index = 0; -// for (auto& gpu_output_buffer : gpu_output_buffers) { -// for (auto& buffer_memory_pair : gpu_output_buffer) { -// auto& pb_memory = buffer_memory_pair.first; -// void* pointer = buffer_memory_pair.second; -// bool cuda_used = false; - -// if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { -// GUARDED_RESPOND_IF_ERROR( -// responses, response_index, -// CopyBuffer( -// "Failed to copy the output tensor to buffer.", -// TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, -// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, -// CudaStream(), &cuda_used)); -// cuda_copy |= cuda_used; -// } else if ( -// (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && -// pb_memory->UseCUDASharedPool() && -// (pb_memory->DataPtr() != pointer)) { -// // If the data pointer from pb_memory is not the same as the -// // pointer, it means that the Triton-provided buffer is not used -// // during tensor transfer. Instead, an intermediate buffer that uses -// // CUDA shared memory pool is used. In this case, we need to copy -// // the data from the intermediate buffer back to the Triton-provided -// // buffer. -// GUARDED_RESPOND_IF_ERROR( -// responses, response_index, -// CopyBuffer( -// "Failed to copy the output tensor to buffer.", -// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), -// TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), -// pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, -// CudaStream(), &cuda_used)); -// cuda_copy |= cuda_used; -// } -// } -// response_index++; -// #ifdef TRITON_ENABLE_GPU -// if (cuda_copy) { -// cudaStreamSynchronize(stream_); -// } -// #endif // TRITON_ENABLE_GPU -// } -// } - -// bls_defer.Complete(); -// for (uint32_t r = 0; r < request_count; ++r) { -// if (requires_deferred_callback[r]) { -// shm_responses[r]->DeferredSendCallback(); -// } -// } -// } + if (has_gpu_output) { + ipc_message->Command() = + PYTHONSTUB_CommandType::PYTHONSTUB_LoadGPUBuffers; + gpu_buffer_helper.Complete(Stub()->ShmPool()); + ipc_message->Args() = gpu_buffer_helper.ShmHandle(); + bi::managed_external_buffer::handle_t response_message; + SendMessageAndReceiveResponse( + ipc_message->ShmHandle(), response_message, responses, requests, 0); + + bool cuda_copy = false; + + uint32_t response_index = 0; + for (auto& gpu_output_buffer : gpu_output_buffers) { + for (auto& buffer_memory_pair : gpu_output_buffer) { + auto& pb_memory = buffer_memory_pair.first; + void* pointer = buffer_memory_pair.second; + bool cuda_used = false; + + if (pb_memory->MemoryType() == TRITONSERVER_MEMORY_CPU) { + GUARDED_RESPOND_IF_ERROR( + responses, response_index, + CopyBuffer( + "Failed to copy the output tensor to buffer.", + TRITONSERVER_MEMORY_CPU, 0, TRITONSERVER_MEMORY_CPU, 0, + pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, + CudaStream(), &cuda_used)); + cuda_copy |= cuda_used; + } else if ( + (pb_memory->MemoryType() == TRITONSERVER_MEMORY_GPU) && + pb_memory->UseCUDASharedPool() && + (pb_memory->DataPtr() != pointer)) { + // If the data pointer from pb_memory is not the same as the + // pointer, it means that the Triton-provided buffer is not used + // during tensor transfer. Instead, an intermediate buffer that uses + // CUDA shared memory pool is used. In this case, we need to copy + // the data from the intermediate buffer back to the Triton-provided + // buffer. + GUARDED_RESPOND_IF_ERROR( + responses, response_index, + CopyBuffer( + "Failed to copy the output tensor to buffer.", + TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), + TRITONSERVER_MEMORY_GPU, pb_memory->MemoryTypeId(), + pb_memory->ByteSize(), pb_memory->DataPtr(), pointer, + CudaStream(), &cuda_used)); + cuda_copy |= cuda_used; + } + } + response_index++; +#ifdef TRITON_ENABLE_GPU + if (cuda_copy) { + cudaStreamSynchronize(stream_); + } +#endif // TRITON_ENABLE_GPU + } + } + + for (uint32_t r = 0; r < request_count; ++r) { + if (requires_deferred_callback[r]) { + shm_responses[r]->DeferredSendCallback(); + } + } + } return nullptr; // success } @@ -1575,9 +1635,6 @@ ModelInstanceState::~ModelInstanceState() if (Stub()->IsHealthy()) { // Wait for all the pending tasks to finish. thread_pool_->wait(); - // Push a dummy message to signal the thread to terminate. - Stub()->ParentMessageQueue()->Push(DUMMY_MESSAGE); - queue_monitor_.join(); } // Terminate stub first to allow any last messages to be received by the back // end before deallocating the queue memory diff --git a/src/python_be.h b/src/python_be.h index 59660fc4..4608298e 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -287,9 +287,6 @@ class ModelInstanceState : public BackendModelInstance { std::thread stub_to_parent_queue_monitor_; bool stub_to_parent_thread_; - // Queue monitor thread - std::thread queue_monitor_; - bool queue_monitor_thread_; std::mutex mu_; std::condition_variable cv_; std::unique_ptr received_message_; @@ -361,6 +358,20 @@ class ModelInstanceState : public BackendModelInstance { AllocatedSharedMemory& request_batch, std::shared_ptr>& responses); + void SendMessageAndReceiveResponse( + bi::managed_external_buffer::handle_t message, + bi::managed_external_buffer::handle_t& response, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count); + + void RespondErrorToAllRequests( + const char* message, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count); + + void SendMessageToStub( + bi::managed_external_buffer::handle_t message); + // Model instance stub std::unique_ptr& Stub() { return model_instance_stub_; } diff --git a/src/response_sender.cc b/src/response_sender.cc index cef4e3a7..043ef41d 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -69,7 +69,7 @@ ResponseSender::ResponseSender( ResponseSender::~ResponseSender() { - // DeleteResponseFactory(); + DeleteResponseFactory(); } void @@ -172,6 +172,10 @@ ResponseSender::Send( { bi::scoped_lock guard{send_message_payload->mu}; + // The server will destruct the response factory if the final flag is set. + if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + response_factory_deleted_.exchange(true); + } stub->SendIPCUtilsMessage(ipc_message); while (!send_message_payload->is_stub_turn) { send_message_payload->cv.wait(guard); @@ -247,9 +251,6 @@ ResponseSender::Send( } } - if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - // DeleteResponseFactory(); - } } bool @@ -270,10 +271,10 @@ ResponseSender::DeleteResponseFactory() { bool already_deleted = response_factory_deleted_.exchange(true); if (!already_deleted) { - // std::unique_ptr& stub = Stub::GetOrCreateInstance(); - // stub->EnqueueCleanupId( - // reinterpret_cast(response_factory_address_), - // PYTHONSTUB_DecoupledResponseFactoryCleanup); + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + stub->EnqueueCleanupId( + reinterpret_cast(response_factory_address_), + PYTHONSTUB_DecoupledResponseFactoryCleanup); } } From dfe90749108f636fd47fdfe6fc788af36896cdac Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Tue, 17 Sep 2024 15:34:31 +0000 Subject: [PATCH 04/13] Review comments --- src/ipc_message.cc | 14 +++++--- src/ipc_message.h | 8 +++-- src/pb_stub.cc | 78 +++++++++++++++++++++++++----------------- src/pb_stub.h | 3 +- src/python_be.cc | 23 ++++++++----- src/python_be.h | 9 +++-- src/response_sender.cc | 1 - 7 files changed, 80 insertions(+), 56 deletions(-) diff --git a/src/ipc_message.cc b/src/ipc_message.cc index 1b813214..2fa13ba3 100644 --- a/src/ipc_message.cc +++ b/src/ipc_message.cc @@ -57,13 +57,15 @@ IPCMessage::Create( } std::unique_ptr -IPCMessage::Create(IPCMessageShm* ipc_message_shm, - bi::managed_external_buffer::handle_t& message_handle) +IPCMessage::Create( + IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& message_handle) { - return std::unique_ptr(new IPCMessage(ipc_message_shm, message_handle)); + return std::unique_ptr( + new IPCMessage(ipc_message_shm, message_handle)); } - AllocatedSharedMemory& +AllocatedSharedMemory& IPCMessage::GetAllocatedSharedMemory() { return ipc_message_shm_; @@ -146,7 +148,9 @@ IPCMessage::IPCMessage( ipc_message_handle_ = ipc_message_shm_.handle_; } -IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle) +IPCMessage::IPCMessage( + IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& handle) { ipc_message_handle_ = handle; ipc_message_shm_ptr_ = ipc_message_shm; diff --git a/src/ipc_message.h b/src/ipc_message.h index c7d0ae9d..c3d1472e 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -98,8 +98,8 @@ class IPCMessage { const std::unique_ptr& shm_pool, bool inline_response); - static std::unique_ptr - Create(IPCMessageShm* ipc_message_shm, + static std::unique_ptr Create( + IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& message_handle); static std::unique_ptr LoadFromSharedMemory( std::unique_ptr& shm_pool, @@ -135,7 +135,9 @@ class IPCMessage { AllocatedSharedMemory& response_mutex_shm, AllocatedSharedMemory& response_cond_shm); - IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle); + IPCMessage( + IPCMessageShm* ipc_message_shm, + bi::managed_external_buffer::handle_t& handle); }; }}}; // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index e6c93214..4b7bffc1 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -654,7 +654,6 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) py::list py_request_list = LoadRequestsFromSharedMemory(request_batch_shm_ptr); std::unique_ptr execute_response; - // IPCMessage::Create(shm_pool_, false /* Inline response */); std::optional> response_batch; bool has_exception = false; @@ -675,8 +674,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) { NVTX_RANGE(nvtx_, "PyExecute " + name_); - execute_return = - model_instance_.attr("execute")(py_request_list); + execute_return = model_instance_.attr("execute")(py_request_list); bool is_coroutine = py::module::import("asyncio") .attr("iscoroutine")(execute_return) @@ -688,10 +686,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } else { py::object coroutine_return = RunCoroutine(execute_return, false /* in_background */); - ProcessReturnedResponses(py_request_list, coroutine_return, response_batch); + ProcessReturnedResponses( + py_request_list, coroutine_return, response_batch); } } else { - ProcessReturnedResponses(py_request_list, execute_return, response_batch); + ProcessReturnedResponses( + py_request_list, execute_return, response_batch); } } } @@ -712,11 +712,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) error_string; LOG_ERROR << err_message.c_str(); if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch = shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + } + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get()); + response_batch_shm_ptr = + reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); @@ -732,14 +735,19 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } if (!response_batch) { - response_batch = shm_pool_->Construct(sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - ResponseBatch* response_batch_shm_ptr =reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->batch_size = 0; - } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch = shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch_shm_ptr->batch_size = 0; + } + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->has_error = false; response_batch_shm_ptr->is_error_set = false; - execute_response = IPCMessage::Create(reinterpret_cast(response_batch.value().data_.get()), response_batch.value().handle_); + execute_response = IPCMessage::Create( + reinterpret_cast(response_batch.value().data_.get()), + response_batch.value().handle_); execute_response->Args() = response_batch.value().handle_; execute_response->InlineResponse() = false; execute_response->Command() = PYTHONSTUB_ExecuteResponse; @@ -761,7 +769,8 @@ Stub::ProcessResponse(InferResponse* response) void Stub::ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj, std::optional>& response_batch) + py::list py_requests, py::object py_responses_obj, + std::optional>& response_batch) { // Return if there is nothing to process. if (py::isinstance(py_responses_obj)) { @@ -812,29 +821,34 @@ Stub::ProcessReturnedResponses( std::shared_ptr response = py_responses[i].cast>(); - request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + request->GetResponseSender()->UpdateStateAndCounters( + response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } - response_batch = std::move(shm_pool_->Construct(sizeof(IPCMessageShm) + + // Return all the created responses using response_batch. The reason + // that both of the paths are available is that sending the responses + // using response_batch is faster than using `response_sender`. + response_batch = std::move(shm_pool_->Construct( + sizeof(IPCMessageShm) + requests_size * sizeof(bi::managed_external_buffer::handle_t) + sizeof(ResponseBatch))); - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast(response_batch.value().data_.get() + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); bi::managed_external_buffer::handle_t* responses_shm_handle = reinterpret_cast( - response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - - for (size_t i = 0; i < responses_size; i++) { - // Check the return type of execute function. - InferRequest* infer_request = py_requests[i].cast(); - InferResponse* infer_response = py_responses[i].cast(); - infer_response->PruneOutputTensors( - infer_request->RequestedOutputNames()); - ProcessResponse(infer_response); - responses_shm_handle[i] = infer_response->ShmHandle(); - } - response_batch_shm_ptr->batch_size = requests_size; + response_batch.value().data_.get() + sizeof(ResponseBatch) + + sizeof(IPCMessageShm)); + + for (size_t i = 0; i < responses_size; i++) { + // Check the return type of execute function. + InferRequest* infer_request = py_requests[i].cast(); + InferResponse* infer_response = py_responses[i].cast(); + infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); + ProcessResponse(infer_response); + responses_shm_handle[i] = infer_response->ShmHandle(); + } + response_batch_shm_ptr->batch_size = requests_size; } py::object diff --git a/src/pb_stub.h b/src/pb_stub.h index 85a2783a..7d76ec9a 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -254,7 +254,8 @@ class Stub { void ProcessRequests(RequestBatch* request_batch_shm_ptr); void ProcessReturnedResponses( - py::list py_requests, py::object py_responses_obj, std::optional>& response_batch); + py::list py_requests, py::object py_responses_obj, + std::optional>& response_batch); void ProcessResponse(InferResponse* response); diff --git a/src/python_be.cc b/src/python_be.cc index 1c6c6505..8e78ecd7 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1023,7 +1023,7 @@ ModelInstanceState::SendMessageAndReceiveResponse( std::shared_ptr>& responses, TRITONBACKEND_Request** requests, const uint32_t request_count) { - SendMessageToStub(message); + SendMessageToStub(message); bi::managed_external_buffer::handle_t response_message; auto error = Stub()->ReceiveMessageFromStub(response_message); @@ -1224,7 +1224,8 @@ ModelInstanceState::ResponseSendDecoupled( if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - lresponse_factory(reinterpret_cast(response_factory)); + lresponse_factory( + reinterpret_cast(response_factory)); } } @@ -1280,12 +1281,15 @@ ModelInstanceState::ProcessRequests( Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle()); bi::managed_external_buffer::handle_t response_message; Stub()->ReceiveMessageFromStub(response_message); - response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); + response = + IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); } - char* ipc_message_shm = reinterpret_cast(response->GetAllocatedSharedMemory().data_.get());; + char* ipc_message_shm = + reinterpret_cast(response->GetAllocatedSharedMemory().data_.get()); + ; ResponseBatch* response_batch_shm_ptr = reinterpret_cast(ipc_message_shm + sizeof(IPCMessageShm)); - + uint64_t compute_end_ns = 0; SET_TIMESTAMP(compute_end_ns); reporter.SetComputeEndNs(compute_end_ns); @@ -1304,10 +1308,10 @@ ModelInstanceState::ProcessRequests( } if (response_batch_shm_ptr->batch_size > 0) { - std::shared_ptr> responses( - new std::vector()); + std::shared_ptr> responses( + new std::vector()); responses->reserve(request_count); - for (size_t i = 0; i < request_count; i++) { + for (size_t i = 0; i < request_count; i++) { TRITONBACKEND_Response* response; auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); if (err == nullptr) { @@ -1324,7 +1328,6 @@ ModelInstanceState::ProcessRequests( // If the output provided by the model is in GPU, we will pass the list of // buffers provided by Triton to the stub process. - // bool has_gpu_output = false; std::vector requires_deferred_callback; bool has_gpu_output = false; @@ -1429,6 +1432,8 @@ ModelInstanceState::ProcessRequests( } } + execute_finalize.Complete(); + // If the output tensor is in GPU, there will be a second round trip // required for filling the GPU buffers provided by the main process. if (has_gpu_output) { diff --git a/src/python_be.h b/src/python_be.h index 4608298e..34871ea5 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -365,12 +365,11 @@ class ModelInstanceState : public BackendModelInstance { TRITONBACKEND_Request** requests, const uint32_t request_count); void RespondErrorToAllRequests( - const char* message, - std::shared_ptr>& responses, - TRITONBACKEND_Request** requests, const uint32_t request_count); + const char* message, + std::shared_ptr>& responses, + TRITONBACKEND_Request** requests, const uint32_t request_count); - void SendMessageToStub( - bi::managed_external_buffer::handle_t message); + void SendMessageToStub(bi::managed_external_buffer::handle_t message); // Model instance stub std::unique_ptr& Stub() { return model_instance_stub_; } diff --git a/src/response_sender.cc b/src/response_sender.cc index 043ef41d..7df90ec2 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -250,7 +250,6 @@ ResponseSender::Send( "An error occurred while sending a response."); } } - } bool From 7bf6d9f318b0cf84ef4df50c6a4195a972d2760c Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Tue, 17 Sep 2024 19:50:52 +0000 Subject: [PATCH 05/13] Fix up --- src/pb_stub.cc | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 4b7bffc1..d6e50e38 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -659,6 +659,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) bool has_exception = false; std::string error_string; std::unique_ptr error_string_shm; + std::string err_message; ScopedDefer execute_finalize([this] { stub_message_queue_->Pop(); }); ScopedDefer _( @@ -705,11 +706,10 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } if (has_exception) { - std::string err_message = - std::string( - "Failed to process the request(s) for model '" + name_ + - "', message: ") + - error_string; + err_message = std::string( + "Failed to process the request(s) for model '" + name_ + + "', message: ") + + error_string; LOG_ERROR << err_message.c_str(); if (!response_batch) { response_batch = shm_pool_->Construct( @@ -718,12 +718,11 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ResponseBatch* response_batch_shm_ptr = reinterpret_cast( response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr = - reinterpret_cast(response_batch.value().data_.get()); response_batch_shm_ptr->has_error = true; error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); response_batch_shm_ptr->is_error_set = true; + response_batch_shm_ptr->batch_size = 0; // Once the error is sent to the backend, the backend is supposed to close // all response factories if not already closed, so closing all response // senders if not already closed to prevent the model from sending more @@ -732,23 +731,25 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) InferRequest* request = py_request.cast(); request->GetResponseSender()->Close(); } - } - - if (!response_batch) { - response_batch = shm_pool_->Construct( - sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + } else { + if (!response_batch) { + response_batch = shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + ResponseBatch* response_batch_shm_ptr = reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch_shm_ptr->batch_size = 0; + } ResponseBatch* response_batch_shm_ptr = reinterpret_cast( response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->batch_size = 0; + response_batch_shm_ptr->has_error = false; + response_batch_shm_ptr->is_error_set = false; } - ResponseBatch* response_batch_shm_ptr = reinterpret_cast( - response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->has_error = false; - response_batch_shm_ptr->is_error_set = false; + execute_response = IPCMessage::Create( reinterpret_cast(response_batch.value().data_.get()), response_batch.value().handle_); - execute_response->Args() = response_batch.value().handle_; + execute_response->Args() = + response_batch.value().handle_ + sizeof(IPCMessageShm); execute_response->InlineResponse() = false; execute_response->Command() = PYTHONSTUB_ExecuteResponse; _.Complete(); From c42afe19f8a550c30f1657ac764bb780aed8de0d Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Mon, 23 Sep 2024 23:31:32 +0000 Subject: [PATCH 06/13] Fix up --- src/pb_stub.cc | 13 +++++++---- src/python_be.cc | 16 +++++++------ src/response_sender.cc | 5 ++-- src/response_sender.h | 3 +-- src/stub_launcher.cc | 53 +++--------------------------------------- src/stub_launcher.h | 3 +-- 6 files changed, 25 insertions(+), 68 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index d6e50e38..63bedd0c 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -820,8 +820,7 @@ Stub::ProcessReturnedResponses( std::string(py::str(py_responses[i].get_type())) + "'."); } - std::shared_ptr response = - py_responses[i].cast>(); + InferResponse* response = py_responses[i].cast(); request->GetResponseSender()->UpdateStateAndCounters( response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } @@ -845,9 +844,13 @@ Stub::ProcessReturnedResponses( // Check the return type of execute function. InferRequest* infer_request = py_requests[i].cast(); InferResponse* infer_response = py_responses[i].cast(); - infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); - ProcessResponse(infer_response); - responses_shm_handle[i] = infer_response->ShmHandle(); + if (!py::isinstance(py_responses[i])) { + infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); + ProcessResponse(infer_response); + responses_shm_handle[i] = infer_response->ShmHandle(); + } else { + responses_shm_handle[i] = 0; + } } response_batch_shm_ptr->batch_size = requests_size; } diff --git a/src/python_be.cc b/src/python_be.cc index 8e78ecd7..b5334aa2 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1026,13 +1026,7 @@ ModelInstanceState::SendMessageAndReceiveResponse( SendMessageToStub(message); bi::managed_external_buffer::handle_t response_message; - auto error = Stub()->ReceiveMessageFromStub(response_message); - if (error != nullptr) { - RespondErrorToAllRequests( - TRITONSERVER_ErrorMessage(error), responses, requests, request_count); - - return; - } + Stub()->ReceiveMessageFromStub(response_message); response = response_message; } @@ -1355,6 +1349,14 @@ ModelInstanceState::ProcessRequests( (*responses)[r] = nullptr; continue; } + + if (response_shm_handle[r] == 0) { + LOG_IF_ERROR( + TRITONBACKEND_ResponseDelete((*responses)[r]), + "failed to delete response"); + (*responses)[r] = nullptr; + continue; + } infer_response = InferResponse::LoadFromSharedMemory( Stub()->ShmPool(), response_shm_handle[r], false /* open_cuda_handle */); diff --git a/src/response_sender.cc b/src/response_sender.cc index 7df90ec2..6639a3e3 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -74,7 +74,7 @@ ResponseSender::~ResponseSender() void ResponseSender::UpdateStateAndCounters( - const std::shared_ptr& response, const uint32_t flags) + InferResponse* response, const uint32_t flags) { if (is_decoupled_ == nullptr) { // TODO: Can a model access the response sender on a BLS infer request? @@ -106,6 +106,7 @@ ResponseSender::UpdateStateAndCounters( } if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + response_factory_deleted_.exchange(true); closed_ = true; } number_of_response_sent_++; @@ -123,7 +124,7 @@ ResponseSender::Send( py::gil_scoped_release release; CheckResponseSenderArguments(infer_response, flags); - UpdateStateAndCounters(infer_response, flags); + UpdateStateAndCounters(infer_response.get(), flags); if (infer_response) { infer_response->PruneOutputTensors(requested_output_names_); } diff --git a/src/response_sender.h b/src/response_sender.h index 6b0258fd..6ca7e997 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -46,8 +46,7 @@ class ResponseSender { ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled(); - void UpdateStateAndCounters( - const std::shared_ptr& response, const uint32_t flags); + void UpdateStateAndCounters(InferResponse* response, const uint32_t flags); // Can be useful at stopping the model from sending any more responses. void Close(); diff --git a/src/stub_launcher.cc b/src/stub_launcher.cc index 828228e6..90834170 100644 --- a/src/stub_launcher.cc +++ b/src/stub_launcher.cc @@ -593,7 +593,7 @@ StubLauncher::ModelInstanceStubProcess() stub_message_queue_->Push(initialize_message->ShmHandle()); bi::managed_external_buffer::handle_t message; - RETURN_IF_ERROR(ReceiveMessageFromStub(message)); + ReceiveMessageFromStub(message); std::unique_ptr initialize_response_message = IPCMessage::LoadFromSharedMemory(shm_pool_, message); @@ -724,58 +724,11 @@ StubLauncher::KillStubProcess() #endif } -TRITONSERVER_Error* +void StubLauncher::ReceiveMessageFromStub( bi::managed_external_buffer::handle_t& message) { - bool success = false; - while (!success) { - uint64_t timeout_miliseconds = 1000; - { - boost::posix_time::ptime timeout = - boost::get_system_time() + - boost::posix_time::milliseconds(timeout_miliseconds); - - bi::scoped_lock lock(*health_mutex_, timeout); - - // Check if lock has been acquired. - if (lock) { - ipc_control_->stub_health = false; - } else { - // If it failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex."); - } - } - - message = parent_message_queue_->Pop( - timeout_miliseconds /* duration ms */, success); - - bool is_stub_alive = false; - { - boost::posix_time::ptime timeout = - boost::get_system_time() + boost::posix_time::seconds(1); - bi::scoped_lock lock(*health_mutex_, timeout); - if (lock) { - is_stub_alive = ipc_control_->stub_health; - } else { - // If It failed to obtain the lock, it means that the stub has been - // stuck or exited while holding the health mutex lock. - is_stub_alive = false; - } - } - - if (!success && !is_stub_alive) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, - (std::string("Stub process '") + model_instance_name_ + - "' is not healthy.") - .c_str()); - } - } - - return nullptr; // success + message = parent_message_queue_->Pop(); } void diff --git a/src/stub_launcher.h b/src/stub_launcher.h index 6c8dd910..714a8773 100644 --- a/src/stub_launcher.h +++ b/src/stub_launcher.h @@ -146,8 +146,7 @@ class StubLauncher { void KillStubProcess(); // Get a message from the stub process - TRITONSERVER_Error* ReceiveMessageFromStub( - bi::managed_external_buffer::handle_t& message); + void ReceiveMessageFromStub(bi::managed_external_buffer::handle_t& message); // Wait for stub process void WaitForStubProcess(); From 47adab9d83b93f7e801a77da3d5fd1e7e5229490 Mon Sep 17 00:00:00 2001 From: Iman Tabrizian Date: Tue, 24 Sep 2024 13:37:57 +0000 Subject: [PATCH 07/13] Fix response factory cleanup --- src/infer_request.h | 1 + src/python_be.cc | 37 ++++++++++++++++++++++++++++++------- src/response_sender.h | 1 + 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/src/infer_request.h b/src/infer_request.h index c67e2fb0..f368d692 100644 --- a/src/infer_request.h +++ b/src/infer_request.h @@ -96,6 +96,7 @@ class InferRequest { InferenceTrace& GetTrace(); uint32_t ReleaseFlags(); void SetReleaseFlags(const uint32_t& flags); + intptr_t GetResponseFactoryAddress() { return response_factory_address_; } #ifdef TRITON_PB_STUB std::shared_ptr Exec(const bool is_decoupled); diff --git a/src/python_be.cc b/src/python_be.cc index b5334aa2..7cbb4f4f 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1089,6 +1089,17 @@ ModelInstanceState::ResponseSendDecoupled( ResponseSendMessage* send_message_payload = reinterpret_cast(send_message.data_.get()); std::unique_ptr error_message; + ScopedDefer response_factory_deleter([send_message_payload] { + if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + TRITONBACKEND_ResponseFactory* response_factory = + reinterpret_cast( + send_message_payload->response_factory_address); + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory(reinterpret_cast( + response_factory)); + } + }); ScopedDefer _([send_message_payload] { { bi::scoped_lock guard{send_message_payload->mu}; @@ -1214,13 +1225,6 @@ ModelInstanceState::ResponseSendDecoupled( SetErrorForResponseSendMessage( send_message_payload, WrapTritonErrorInSharedPtr(error), error_message); } - - if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::unique_ptr< - TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> - lresponse_factory( - reinterpret_cast(response_factory)); - } } TRITONSERVER_Error* @@ -1291,6 +1295,15 @@ ModelInstanceState::ProcessRequests( if (response_batch_shm_ptr->has_error) { if (response_batch_shm_ptr->is_error_set) { + for (uint32_t r = 0; r < request_count; r++) { + TRITONBACKEND_ResponseFactory* response_factory = + reinterpret_cast( + pb_infer_requests[r]->GetResponseFactoryAddress()); + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory(reinterpret_cast( + response_factory)); + } auto error = PbString::LoadFromSharedMemory( Stub()->ShmPool(), response_batch_shm_ptr->error); return TRITONSERVER_ErrorNew( @@ -1357,6 +1370,16 @@ ModelInstanceState::ProcessRequests( (*responses)[r] = nullptr; continue; } + { + TRITONBACKEND_ResponseFactory* response_factory = + reinterpret_cast( + pb_infer_requests[r]->GetResponseFactoryAddress()); + std::unique_ptr< + TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> + lresponse_factory( + reinterpret_cast( + response_factory)); + } infer_response = InferResponse::LoadFromSharedMemory( Stub()->ShmPool(), response_shm_handle[r], false /* open_cuda_handle */); diff --git a/src/response_sender.h b/src/response_sender.h index 6ca7e997..7fce9dd2 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -43,6 +43,7 @@ class ResponseSender { const std::set& requested_output_names, std::unique_ptr& shm_pool, const std::shared_ptr& pb_cancel); + intptr_t ResponseFactory() { return response_factory_address_; } ~ResponseSender(); void Send(std::shared_ptr response, const uint32_t flags); bool IsCancelled(); From 921916fdb824cf96568f78be222671d0de83105a Mon Sep 17 00:00:00 2001 From: krishung5 Date: Tue, 1 Oct 2024 18:20:02 -0700 Subject: [PATCH 08/13] Fix segfault --- src/pb_stub.cc | 54 +++++++++++++++++++++++++++++++++++++++--- src/pb_utils.h | 3 +++ src/python_be.cc | 24 ++++++++++++++++--- src/response_sender.cc | 13 ++++++++++ src/response_sender.h | 1 + 5 files changed, 89 insertions(+), 6 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 63bedd0c..3c732cbc 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -665,6 +665,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ScopedDefer _( [this, &execute_response] { SendIPCMessage(execute_response); }); py::object execute_return; + py::object coroutine_return; try { if (!py::hasattr(model_instance_, "execute")) { std::string message = "Python model " + model_context_.PythonModelPath() + @@ -685,7 +686,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) // Do not wait for async decoupled execute to return. RunCoroutine(execute_return, true /* in_background */); } else { - py::object coroutine_return = + coroutine_return = RunCoroutine(execute_return, false /* in_background */); ProcessReturnedResponses( py_request_list, coroutine_return, response_batch); @@ -733,6 +734,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } } else { if (!response_batch) { + std::cerr << "===== response_batch is not set" << std::endl; response_batch = shm_pool_->Construct( sizeof(ResponseBatch) + sizeof(IPCMessageShm)); ResponseBatch* response_batch_shm_ptr = reinterpret_cast( @@ -743,6 +745,8 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->has_error = false; response_batch_shm_ptr->is_error_set = false; + std::cerr << "===== response_batch_shm_ptr->batch_size: " + << response_batch_shm_ptr->batch_size << std::endl; } execute_response = IPCMessage::Create( @@ -779,6 +783,27 @@ Stub::ProcessReturnedResponses( } // Only non-decoupled may return responses. if (IsDecoupled()) { + // For decoupled mode, if before returning from this error, there was + // already a response sent from the response sender, along with the complete + // final flag, then use the `is_response_factory_deleted` flag to notify the + // backend to NOT to delete the response factory again during error + // handling. + for (py::handle py_request : py_requests) { + InferRequest* request = py_request.cast(); + if (request->GetResponseSender()->IsClosed()) { + // Notify the backend to NOT to delete the response factory again during + // error handling. + if (!response_batch) { + response_batch = std::move(shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm))); + } + ResponseBatch* response_batch_shm_ptr = + reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch_shm_ptr->is_response_factory_deleted = true; + } + } + throw PythonBackendException( "Python model '" + name_ + "' is using the decoupled mode and the execute function must return " @@ -821,8 +846,31 @@ Stub::ProcessReturnedResponses( } InferResponse* response = py_responses[i].cast(); - request->GetResponseSender()->UpdateStateAndCounters( - response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + + try { + request->GetResponseSender()->UpdateStateAndCounters( + response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + } + catch (const PythonBackendException& pb_exception) { + // Special case for default(non-decoupled) mode, where the response + // factory should already be cleaned up with the previous response sent + // from response sender, and yet the model tries to return another + // response from `execute()` function. Notify the backend to NOT to + // delete the response factory again during error handling. + std::string error_string = pb_exception.what(); + if (error_string.find( + "Non-decoupled model cannot send more than one response") != + std::string::npos) { + response_batch = std::move(shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm))); + ResponseBatch* response_batch_shm_ptr = + reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch_shm_ptr->is_response_factory_deleted = true; + LOG_ERROR << "=== caught error: " << pb_exception.what(); + } + throw pb_exception; + } } } // Return all the created responses using response_batch. The reason diff --git a/src/pb_utils.h b/src/pb_utils.h index e68cfb0f..aacf6b49 100644 --- a/src/pb_utils.h +++ b/src/pb_utils.h @@ -167,6 +167,9 @@ struct ResponseBatch : SendMessageBase { bool is_error_set; uint32_t response_size; + + // Indicates whether the response factory has been deleted or not. + bool is_response_factory_deleted = false; }; enum LogLevel { kInfo = 0, kWarning, kError, kVerbose }; diff --git a/src/python_be.cc b/src/python_be.cc index 7cbb4f4f..b73cfc7d 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -826,6 +826,8 @@ ModelInstanceState::ProcessCleanupRequest( infer_payload_.erase(id); } else if (message->Command() == PYTHONSTUB_DecoupledResponseFactoryCleanup) { // Delete response factory + std::cerr << "=== ResponseFactoryDeleter -> ProcessCleanupRequest ===" + << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> response_factory(reinterpret_cast(id)); @@ -1094,6 +1096,8 @@ ModelInstanceState::ResponseSendDecoupled( TRITONBACKEND_ResponseFactory* response_factory = reinterpret_cast( send_message_payload->response_factory_address); + std::cerr << "=== ResponseFactoryDeleter -> ResponseSendDecoupled ===" + << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> lresponse_factory(reinterpret_cast( @@ -1284,7 +1288,6 @@ ModelInstanceState::ProcessRequests( } char* ipc_message_shm = reinterpret_cast(response->GetAllocatedSharedMemory().data_.get()); - ; ResponseBatch* response_batch_shm_ptr = reinterpret_cast(ipc_message_shm + sizeof(IPCMessageShm)); @@ -1294,16 +1297,27 @@ ModelInstanceState::ProcessRequests( reporter.SetBatchStatistics(total_batch_size); if (response_batch_shm_ptr->has_error) { - if (response_batch_shm_ptr->is_error_set) { + // The "is_response_factory_deleted" flag indicates whether the response + // factory has been deleted. The flag is used in a corner case + // where after the response sender sends a response and complete final flag, + // and closes the response factory, the model returns a response from + // `execute()`. For both default and decoupled mode, upon handling that + // error, no need to delete the response factory. + if (!response_batch_shm_ptr->is_response_factory_deleted) { for (uint32_t r = 0; r < request_count; r++) { TRITONBACKEND_ResponseFactory* response_factory = reinterpret_cast( pb_infer_requests[r]->GetResponseFactoryAddress()); + std::cerr << "=== ResponseFactoryDeleter -> " + "response_batch_shm_ptr->has_error ===" + << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> lresponse_factory(reinterpret_cast( response_factory)); } + } + if (response_batch_shm_ptr->is_error_set) { auto error = PbString::LoadFromSharedMemory( Stub()->ShmPool(), response_batch_shm_ptr->error); return TRITONSERVER_ErrorNew( @@ -1343,6 +1357,7 @@ ModelInstanceState::ProcessRequests( gpu_output_buffers(request_count); GPUBuffersHelper gpu_buffer_helper; + std::cerr << "=== PYBE request_count: " << request_count << std::endl; for (uint32_t r = 0; r < request_count; ++r) { NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); TRITONBACKEND_Response* response = (*responses)[r]; @@ -1374,6 +1389,8 @@ ModelInstanceState::ProcessRequests( TRITONBACKEND_ResponseFactory* response_factory = reinterpret_cast( pb_infer_requests[r]->GetResponseFactoryAddress()); + std::cerr << "=== ResponseFactoryDeleter -> regular workflow ===" + << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> lresponse_factory( @@ -1422,7 +1439,8 @@ ModelInstanceState::ProcessRequests( GUARDED_RESPOND_IF_ERROR( responses, r, TRITONBACKEND_RequestOutputCount(request, &requested_output_count)); - + std::cerr << "=== PYBE requested_output_count: " << requested_output_count + << std::endl; std::set requested_output_names; for (size_t j = 0; j < requested_output_count; ++j) { const char* output_name; diff --git a/src/response_sender.cc b/src/response_sender.cc index 6639a3e3..55730bd3 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -106,6 +106,8 @@ ResponseSender::UpdateStateAndCounters( } if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + std::cerr << "=== ResponseSender -> UpdateStateAndCounters closing RF ===" + << std::endl; response_factory_deleted_.exchange(true); closed_ = true; } @@ -175,6 +177,7 @@ ResponseSender::Send( bi::scoped_lock guard{send_message_payload->mu}; // The server will destruct the response factory if the final flag is set. if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { + std::cerr << "====== scoped_defer -> closing RF =====" << std::endl; response_factory_deleted_.exchange(true); } stub->SendIPCUtilsMessage(ipc_message); @@ -259,16 +262,26 @@ ResponseSender::IsCancelled() return pb_cancel_->IsCancelled(); } +bool +ResponseSender::IsClosed() +{ + std::lock_guard lk(mu_); + return closed_; +} + void ResponseSender::Close() { std::lock_guard lk(mu_); closed_ = true; + response_factory_deleted_.exchange(true); } void ResponseSender::DeleteResponseFactory() { + std::cerr << "=== ResponseSender -> DeleteResponseFactory, " + << response_factory_deleted_ << " ===" << std::endl; bool already_deleted = response_factory_deleted_.exchange(true); if (!already_deleted) { std::unique_ptr& stub = Stub::GetOrCreateInstance(); diff --git a/src/response_sender.h b/src/response_sender.h index 7fce9dd2..a696f9eb 100644 --- a/src/response_sender.h +++ b/src/response_sender.h @@ -51,6 +51,7 @@ class ResponseSender { // Can be useful at stopping the model from sending any more responses. void Close(); + bool IsClosed(); private: void DeleteResponseFactory(); From dbf5c4317c466f0da1f1ca00ab3df5a745ad70b8 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Wed, 2 Oct 2024 16:35:53 -0700 Subject: [PATCH 09/13] Fix error handling --- src/pb_stub.cc | 50 ++++++++-------- src/python_be.cc | 133 +++++++++++++++++++++++++++++++++++-------- src/python_be.h | 7 ++- src/stub_launcher.cc | 54 +++++++++++++++++- src/stub_launcher.h | 3 +- 5 files changed, 192 insertions(+), 55 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 3c732cbc..5f17886a 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -719,6 +719,24 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ResponseBatch* response_batch_shm_ptr = reinterpret_cast( response_batch.value().data_.get() + sizeof(IPCMessageShm)); + // Handle two special cases: + // 1. For default(non-decoupled) mode, where the response + // factory should already be cleaned up with the previous response sent + // from response sender, and yet the model tries to return another + // response from `execute()` function. Notify the backend to NOT to + // delete the response factory again during error handling. + // 2.The response sender is already closed, need to notify the backend to + // NOT to delete the response factory again during error handling. + // std::string error_string = pb_exception.what(); + if ((err_message.find( + "Non-decoupled model cannot send more than one response") != + std::string::npos) || + (err_message.find("Response sender has been closed") != + std::string::npos)) { + response_batch_shm_ptr->is_response_factory_deleted = true; + LOG_ERROR << "=== caught error: " << err_message; + } + response_batch_shm_ptr->has_error = true; error_string_shm = PbString::Create(shm_pool_, err_message); response_batch_shm_ptr->error = error_string_shm->ShmHandle(); @@ -734,6 +752,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } } else { if (!response_batch) { + // No response is returned from `execute()`. std::cerr << "===== response_batch is not set" << std::endl; response_batch = shm_pool_->Construct( sizeof(ResponseBatch) + sizeof(IPCMessageShm)); @@ -846,31 +865,8 @@ Stub::ProcessReturnedResponses( } InferResponse* response = py_responses[i].cast(); - - try { - request->GetResponseSender()->UpdateStateAndCounters( - response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); - } - catch (const PythonBackendException& pb_exception) { - // Special case for default(non-decoupled) mode, where the response - // factory should already be cleaned up with the previous response sent - // from response sender, and yet the model tries to return another - // response from `execute()` function. Notify the backend to NOT to - // delete the response factory again during error handling. - std::string error_string = pb_exception.what(); - if (error_string.find( - "Non-decoupled model cannot send more than one response") != - std::string::npos) { - response_batch = std::move(shm_pool_->Construct( - sizeof(ResponseBatch) + sizeof(IPCMessageShm))); - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast( - response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->is_response_factory_deleted = true; - LOG_ERROR << "=== caught error: " << pb_exception.what(); - } - throw pb_exception; - } + request->GetResponseSender()->UpdateStateAndCounters( + response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); } } // Return all the created responses using response_batch. The reason @@ -887,16 +883,18 @@ Stub::ProcessReturnedResponses( reinterpret_cast( response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - + std::cerr << "===== response_size: " << responses_size << std::endl; for (size_t i = 0; i < responses_size; i++) { // Check the return type of execute function. InferRequest* infer_request = py_requests[i].cast(); InferResponse* infer_response = py_responses[i].cast(); if (!py::isinstance(py_responses[i])) { + std::cerr << "===== response is NOT None" << std::endl; infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); ProcessResponse(infer_response); responses_shm_handle[i] = infer_response->ShmHandle(); } else { + std::cerr << "===== response is None" << std::endl; responses_shm_handle[i] = 0; } } diff --git a/src/python_be.cc b/src/python_be.cc index b73cfc7d..8edca5f5 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -153,6 +153,23 @@ ModelInstanceState::SetErrorForResponseSendMessage( } } +bool +ModelInstanceState::IsStubProcessAlive() +{ + boost::posix_time::ptime timeout = + boost::get_system_time() + boost::posix_time::seconds(1); + bi::scoped_lock lock(*Stub()->HealthMutex(), timeout); + + // Check if lock has been acquired. + if (lock) { + return Stub()->IpcControl()->stub_health; + } else { + // If It failed to obtain the lock, it means that the stub has been + // stuck or exited while holding the health mutex lock. + return false; + } +} + TRITONSERVER_Error* ModelInstanceState::SaveRequestsToSharedMemory( TRITONBACKEND_Request** requests, const uint32_t request_count, @@ -1011,11 +1028,43 @@ ModelInstanceState::ProcessModelControlRequest( }); } -void +TRITONSERVER_Error* ModelInstanceState::SendMessageToStub( bi::managed_external_buffer::handle_t message) { - Stub()->StubMessageQueue()->Push(message); + // Stub()->StubMessageQueue()->Push(message); + bool success = false; + while (!success) { + uint64_t timeout_miliseconds = 1000; + { + boost::posix_time::ptime timeout = + boost::get_system_time() + + boost::posix_time::milliseconds(timeout_miliseconds); + + bi::scoped_lock lock( + *(Stub()->HealthMutex()), timeout); + + // Check if lock has been acquired. + if (lock) { + Stub()->IpcControl()->stub_health = false; + } else { + // If it failed to obtain the lock, it means that the stub has been + // stuck or exited while holding the health mutex lock. + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex."); + } + } + + Stub()->StubMessageQueue()->Push( + message, timeout_miliseconds /* duration ms */, success); + + if (!success && !IsStubProcessAlive()) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, "Stub process is not healthy."); + } + } + + return nullptr; // success } void @@ -1025,10 +1074,29 @@ ModelInstanceState::SendMessageAndReceiveResponse( std::shared_ptr>& responses, TRITONBACKEND_Request** requests, const uint32_t request_count) { - SendMessageToStub(message); + // SendMessageToStub(message); + + // bi::managed_external_buffer::handle_t response_message; + // Stub()->ReceiveMessageFromStub(response_message); + + // response = response_message; + + auto error = SendMessageToStub(message); + if (error != nullptr) { + RespondErrorToAllRequests( + TRITONSERVER_ErrorMessage(error), responses, requests, request_count); + + return; + } bi::managed_external_buffer::handle_t response_message; - Stub()->ReceiveMessageFromStub(response_message); + error = Stub()->ReceiveMessageFromStub(response_message); + if (error != nullptr) { + RespondErrorToAllRequests( + TRITONSERVER_ErrorMessage(error), responses, requests, request_count); + + return; + } response = response_message; } @@ -1061,6 +1129,7 @@ ModelInstanceState::RespondErrorToAllRequests( } } + void ModelInstanceState::StartMonitor() { @@ -1282,7 +1351,7 @@ ModelInstanceState::ProcessRequests( { Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle()); bi::managed_external_buffer::handle_t response_message; - Stub()->ReceiveMessageFromStub(response_message); + RETURN_IF_ERROR(Stub()->ReceiveMessageFromStub(response_message)); response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message); } @@ -1329,26 +1398,34 @@ ModelInstanceState::ProcessRequests( } if (response_batch_shm_ptr->batch_size > 0) { + bi::managed_external_buffer::handle_t* response_shm_handle = + reinterpret_cast( + ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); + std::shared_ptr> responses( new std::vector()); responses->reserve(request_count); for (size_t i = 0; i < request_count; i++) { - TRITONBACKEND_Response* response; - auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); - if (err == nullptr) { - responses->emplace_back(response); - } else { + // It is possible to have multiple responses batched together in a single + // response batch shm, where some of the responses are None due to the + // usage of response sender, so only create a TRITONBACKEND_Response + // object for the valid responses, and skip the None responses later. + if (response_shm_handle[i] == 0) { + std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl; responses->emplace_back(nullptr); - LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response"); - TRITONSERVER_ErrorDelete(err); + } else { + TRITONBACKEND_Response* response; + auto err = TRITONBACKEND_ResponseNew(&response, requests[i]); + if (err == nullptr) { + responses->emplace_back(response); + } else { + responses->emplace_back(nullptr); + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, "Fail to create response"); + TRITONSERVER_ErrorDelete(err); + } } } - bi::managed_external_buffer::handle_t* response_shm_handle = - reinterpret_cast( - ipc_message_shm + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - // If the output provided by the model is in GPU, we will pass the list of - // buffers provided by Triton to the stub process. std::vector requires_deferred_callback; bool has_gpu_output = false; @@ -1360,6 +1437,11 @@ ModelInstanceState::ProcessRequests( std::cerr << "=== PYBE request_count: " << request_count << std::endl; for (uint32_t r = 0; r < request_count; ++r) { NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); + if (response_shm_handle[r] == 0) { + std::cerr << "=== PYBE skip the response_shm_handle is 0 ===" + << std::endl; + continue; + } TRITONBACKEND_Response* response = (*responses)[r]; TRITONBACKEND_Request* request = requests[r]; uint32_t requested_output_count = 0; @@ -1378,13 +1460,14 @@ ModelInstanceState::ProcessRequests( continue; } - if (response_shm_handle[r] == 0) { - LOG_IF_ERROR( - TRITONBACKEND_ResponseDelete((*responses)[r]), - "failed to delete response"); - (*responses)[r] = nullptr; - continue; - } + // if (response_shm_handle[r] == 0) { + // std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl; + // LOG_IF_ERROR( + // TRITONBACKEND_ResponseDelete((*responses)[r]), + // "failed to delete response"); + // (*responses)[r] = nullptr; + // continue; + // } { TRITONBACKEND_ResponseFactory* response_factory = reinterpret_cast( @@ -1448,6 +1531,8 @@ ModelInstanceState::ProcessRequests( responses, r, TRITONBACKEND_RequestOutputName(request, j, &output_name)); requested_output_names.insert(output_name); + std::cerr << "=== PYBE requested_output_name: " << output_name + << std::endl; } bool require_deferred_callback = false; diff --git a/src/python_be.h b/src/python_be.h index 34871ea5..c98e1284 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -369,7 +369,12 @@ class ModelInstanceState : public BackendModelInstance { std::shared_ptr>& responses, TRITONBACKEND_Request** requests, const uint32_t request_count); - void SendMessageToStub(bi::managed_external_buffer::handle_t message); + // void SendMessageToStub(bi::managed_external_buffer::handle_t message); + TRITONSERVER_Error* SendMessageToStub( + bi::managed_external_buffer::handle_t message); + + // Checks whether the stub process is live + bool IsStubProcessAlive(); // Model instance stub std::unique_ptr& Stub() { return model_instance_stub_; } diff --git a/src/stub_launcher.cc b/src/stub_launcher.cc index 90834170..e8d2430f 100644 --- a/src/stub_launcher.cc +++ b/src/stub_launcher.cc @@ -593,7 +593,7 @@ StubLauncher::ModelInstanceStubProcess() stub_message_queue_->Push(initialize_message->ShmHandle()); bi::managed_external_buffer::handle_t message; - ReceiveMessageFromStub(message); + RETURN_IF_ERROR(ReceiveMessageFromStub(message)); std::unique_ptr initialize_response_message = IPCMessage::LoadFromSharedMemory(shm_pool_, message); @@ -724,11 +724,59 @@ StubLauncher::KillStubProcess() #endif } -void +TRITONSERVER_Error* StubLauncher::ReceiveMessageFromStub( bi::managed_external_buffer::handle_t& message) { - message = parent_message_queue_->Pop(); + // message = parent_message_queue_->Pop(); + bool success = false; + while (!success) { + uint64_t timeout_miliseconds = 1000; + { + boost::posix_time::ptime timeout = + boost::get_system_time() + + boost::posix_time::milliseconds(timeout_miliseconds); + + bi::scoped_lock lock(*health_mutex_, timeout); + + // Check if lock has been acquired. + if (lock) { + ipc_control_->stub_health = false; + } else { + // If it failed to obtain the lock, it means that the stub has been + // stuck or exited while holding the health mutex lock. + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, "Failed to obtain the health mutex."); + } + } + + message = parent_message_queue_->Pop( + timeout_miliseconds /* duration ms */, success); + + bool is_stub_alive = false; + { + boost::posix_time::ptime timeout = + boost::get_system_time() + boost::posix_time::seconds(1); + bi::scoped_lock lock(*health_mutex_, timeout); + if (lock) { + is_stub_alive = ipc_control_->stub_health; + } else { + // If It failed to obtain the lock, it means that the stub has been + // stuck or exited while holding the health mutex lock. + is_stub_alive = false; + } + } + + if (!success && !is_stub_alive) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + (std::string("Stub process '") + model_instance_name_ + + "' is not healthy.") + .c_str()); + } + } + + return nullptr; // success } void diff --git a/src/stub_launcher.h b/src/stub_launcher.h index 714a8773..6c8dd910 100644 --- a/src/stub_launcher.h +++ b/src/stub_launcher.h @@ -146,7 +146,8 @@ class StubLauncher { void KillStubProcess(); // Get a message from the stub process - void ReceiveMessageFromStub(bi::managed_external_buffer::handle_t& message); + TRITONSERVER_Error* ReceiveMessageFromStub( + bi::managed_external_buffer::handle_t& message); // Wait for stub process void WaitForStubProcess(); From 95519a1f0b449e69ec47df5682b1c899e76c91f6 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Thu, 3 Oct 2024 16:25:09 -0700 Subject: [PATCH 10/13] Remove extra logs --- src/pb_stub.cc | 53 ++++++++++++++++++++++++------------------ src/python_be.cc | 34 +-------------------------- src/response_sender.cc | 5 ---- 3 files changed, 31 insertions(+), 61 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 5f17886a..6af07fb3 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -719,22 +719,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ResponseBatch* response_batch_shm_ptr = reinterpret_cast( response_batch.value().data_.get() + sizeof(IPCMessageShm)); - // Handle two special cases: - // 1. For default(non-decoupled) mode, where the response - // factory should already be cleaned up with the previous response sent - // from response sender, and yet the model tries to return another - // response from `execute()` function. Notify the backend to NOT to + + // If the response sender is already closed, notify the backend NOT to // delete the response factory again during error handling. - // 2.The response sender is already closed, need to notify the backend to - // NOT to delete the response factory again during error handling. - // std::string error_string = pb_exception.what(); - if ((err_message.find( - "Non-decoupled model cannot send more than one response") != - std::string::npos) || - (err_message.find("Response sender has been closed") != - std::string::npos)) { + if (err_message.find("Response sender has been closed") != + std::string::npos) { response_batch_shm_ptr->is_response_factory_deleted = true; - LOG_ERROR << "=== caught error: " << err_message; } response_batch_shm_ptr->has_error = true; @@ -752,8 +742,6 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) } } else { if (!response_batch) { - // No response is returned from `execute()`. - std::cerr << "===== response_batch is not set" << std::endl; response_batch = shm_pool_->Construct( sizeof(ResponseBatch) + sizeof(IPCMessageShm)); ResponseBatch* response_batch_shm_ptr = reinterpret_cast( @@ -764,8 +752,6 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) response_batch.value().data_.get() + sizeof(IPCMessageShm)); response_batch_shm_ptr->has_error = false; response_batch_shm_ptr->is_error_set = false; - std::cerr << "===== response_batch_shm_ptr->batch_size: " - << response_batch_shm_ptr->batch_size << std::endl; } execute_response = IPCMessage::Create( @@ -865,8 +851,32 @@ Stub::ProcessReturnedResponses( } InferResponse* response = py_responses[i].cast(); - request->GetResponseSender()->UpdateStateAndCounters( - response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + try { + request->GetResponseSender()->UpdateStateAndCounters( + response, TRITONSERVER_RESPONSE_COMPLETE_FINAL); + } + catch (const PythonBackendException& pb_exception) { + // Handle the exception here to catch the error when there's a response + // returned from `execute()`, and the below error message is thrown. + // In default (non-decoupled) mode, the response factory should already + // have been cleaned up when the previous response was sent by the + // response sender. However, if the model attempts to return another + // response from the `execute()` function, notify the backend NOT to + // delete the response factory again during error handling. + std::string err_message = pb_exception.what(); + if (err_message.find( + "Non-decoupled model cannot send more than one response") != + std::string::npos) { + response_batch = std::move(shm_pool_->Construct( + sizeof(ResponseBatch) + sizeof(IPCMessageShm))); + ResponseBatch* response_batch_shm_ptr = + reinterpret_cast( + response_batch.value().data_.get() + sizeof(IPCMessageShm)); + response_batch_shm_ptr->batch_size = 0; + response_batch_shm_ptr->is_response_factory_deleted = true; + } + throw pb_exception; + } } } // Return all the created responses using response_batch. The reason @@ -883,18 +893,15 @@ Stub::ProcessReturnedResponses( reinterpret_cast( response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm)); - std::cerr << "===== response_size: " << responses_size << std::endl; for (size_t i = 0; i < responses_size; i++) { // Check the return type of execute function. InferRequest* infer_request = py_requests[i].cast(); InferResponse* infer_response = py_responses[i].cast(); if (!py::isinstance(py_responses[i])) { - std::cerr << "===== response is NOT None" << std::endl; infer_response->PruneOutputTensors(infer_request->RequestedOutputNames()); ProcessResponse(infer_response); responses_shm_handle[i] = infer_response->ShmHandle(); } else { - std::cerr << "===== response is None" << std::endl; responses_shm_handle[i] = 0; } } diff --git a/src/python_be.cc b/src/python_be.cc index 8edca5f5..6b8c2516 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -843,8 +843,6 @@ ModelInstanceState::ProcessCleanupRequest( infer_payload_.erase(id); } else if (message->Command() == PYTHONSTUB_DecoupledResponseFactoryCleanup) { // Delete response factory - std::cerr << "=== ResponseFactoryDeleter -> ProcessCleanupRequest ===" - << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> response_factory(reinterpret_cast(id)); @@ -1165,8 +1163,6 @@ ModelInstanceState::ResponseSendDecoupled( TRITONBACKEND_ResponseFactory* response_factory = reinterpret_cast( send_message_payload->response_factory_address); - std::cerr << "=== ResponseFactoryDeleter -> ResponseSendDecoupled ===" - << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> lresponse_factory(reinterpret_cast( @@ -1366,20 +1362,11 @@ ModelInstanceState::ProcessRequests( reporter.SetBatchStatistics(total_batch_size); if (response_batch_shm_ptr->has_error) { - // The "is_response_factory_deleted" flag indicates whether the response - // factory has been deleted. The flag is used in a corner case - // where after the response sender sends a response and complete final flag, - // and closes the response factory, the model returns a response from - // `execute()`. For both default and decoupled mode, upon handling that - // error, no need to delete the response factory. if (!response_batch_shm_ptr->is_response_factory_deleted) { for (uint32_t r = 0; r < request_count; r++) { TRITONBACKEND_ResponseFactory* response_factory = reinterpret_cast( pb_infer_requests[r]->GetResponseFactoryAddress()); - std::cerr << "=== ResponseFactoryDeleter -> " - "response_batch_shm_ptr->has_error ===" - << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> lresponse_factory(reinterpret_cast( @@ -1411,7 +1398,6 @@ ModelInstanceState::ProcessRequests( // usage of response sender, so only create a TRITONBACKEND_Response // object for the valid responses, and skip the None responses later. if (response_shm_handle[i] == 0) { - std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl; responses->emplace_back(nullptr); } else { TRITONBACKEND_Response* response; @@ -1434,18 +1420,15 @@ ModelInstanceState::ProcessRequests( gpu_output_buffers(request_count); GPUBuffersHelper gpu_buffer_helper; - std::cerr << "=== PYBE request_count: " << request_count << std::endl; for (uint32_t r = 0; r < request_count; ++r) { NVTX_RANGE(nvtx_, "LoadingResponse " + Name()); + requires_deferred_callback.push_back(false); if (response_shm_handle[r] == 0) { - std::cerr << "=== PYBE skip the response_shm_handle is 0 ===" - << std::endl; continue; } TRITONBACKEND_Response* response = (*responses)[r]; TRITONBACKEND_Request* request = requests[r]; uint32_t requested_output_count = 0; - requires_deferred_callback.push_back(false); shm_responses.emplace_back(nullptr); std::unique_ptr& infer_response = shm_responses.back(); @@ -1459,21 +1442,10 @@ ModelInstanceState::ProcessRequests( (*responses)[r] = nullptr; continue; } - - // if (response_shm_handle[r] == 0) { - // std::cerr << "=== PYBE response_shm_handle is 0 ===" << std::endl; - // LOG_IF_ERROR( - // TRITONBACKEND_ResponseDelete((*responses)[r]), - // "failed to delete response"); - // (*responses)[r] = nullptr; - // continue; - // } { TRITONBACKEND_ResponseFactory* response_factory = reinterpret_cast( pb_infer_requests[r]->GetResponseFactoryAddress()); - std::cerr << "=== ResponseFactoryDeleter -> regular workflow ===" - << std::endl; std::unique_ptr< TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter> lresponse_factory( @@ -1522,8 +1494,6 @@ ModelInstanceState::ProcessRequests( GUARDED_RESPOND_IF_ERROR( responses, r, TRITONBACKEND_RequestOutputCount(request, &requested_output_count)); - std::cerr << "=== PYBE requested_output_count: " << requested_output_count - << std::endl; std::set requested_output_names; for (size_t j = 0; j < requested_output_count; ++j) { const char* output_name; @@ -1531,8 +1501,6 @@ ModelInstanceState::ProcessRequests( responses, r, TRITONBACKEND_RequestOutputName(request, j, &output_name)); requested_output_names.insert(output_name); - std::cerr << "=== PYBE requested_output_name: " << output_name - << std::endl; } bool require_deferred_callback = false; diff --git a/src/response_sender.cc b/src/response_sender.cc index 55730bd3..ef3b09dd 100644 --- a/src/response_sender.cc +++ b/src/response_sender.cc @@ -106,8 +106,6 @@ ResponseSender::UpdateStateAndCounters( } if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::cerr << "=== ResponseSender -> UpdateStateAndCounters closing RF ===" - << std::endl; response_factory_deleted_.exchange(true); closed_ = true; } @@ -177,7 +175,6 @@ ResponseSender::Send( bi::scoped_lock guard{send_message_payload->mu}; // The server will destruct the response factory if the final flag is set. if (flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) { - std::cerr << "====== scoped_defer -> closing RF =====" << std::endl; response_factory_deleted_.exchange(true); } stub->SendIPCUtilsMessage(ipc_message); @@ -280,8 +277,6 @@ ResponseSender::Close() void ResponseSender::DeleteResponseFactory() { - std::cerr << "=== ResponseSender -> DeleteResponseFactory, " - << response_factory_deleted_ << " ===" << std::endl; bool already_deleted = response_factory_deleted_.exchange(true); if (!already_deleted) { std::unique_ptr& stub = Stub::GetOrCreateInstance(); From 1282598aaa9b48c9f5758387070773217ebaa1c2 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Thu, 3 Oct 2024 17:16:17 -0700 Subject: [PATCH 11/13] Fix up, add comments --- src/pb_stub.cc | 35 ++++++++++++----------------------- src/python_be.cc | 5 ++++- src/stub_launcher.cc | 1 - 3 files changed, 16 insertions(+), 25 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 6af07fb3..5bf2a5c2 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -719,12 +719,22 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) ResponseBatch* response_batch_shm_ptr = reinterpret_cast( response_batch.value().data_.get() + sizeof(IPCMessageShm)); - - // If the response sender is already closed, notify the backend NOT to + // The backend will clean up the response factory if there is an error in + // the response batch. It is necessary to handle cases where the response + // sender should have already cleaned up, ensuring the backend does not // delete the response factory again during error handling. if (err_message.find("Response sender has been closed") != std::string::npos) { response_batch_shm_ptr->is_response_factory_deleted = true; + } else if ( + err_message.find("is using the decoupled mode and the execute function " + "must return None") != std::string::npos) { + for (py::handle py_request : py_request_list) { + InferRequest* request = py_request.cast(); + if (request->GetResponseSender()->IsClosed()) { + response_batch_shm_ptr->is_response_factory_deleted = true; + } + } } response_batch_shm_ptr->has_error = true; @@ -788,27 +798,6 @@ Stub::ProcessReturnedResponses( } // Only non-decoupled may return responses. if (IsDecoupled()) { - // For decoupled mode, if before returning from this error, there was - // already a response sent from the response sender, along with the complete - // final flag, then use the `is_response_factory_deleted` flag to notify the - // backend to NOT to delete the response factory again during error - // handling. - for (py::handle py_request : py_requests) { - InferRequest* request = py_request.cast(); - if (request->GetResponseSender()->IsClosed()) { - // Notify the backend to NOT to delete the response factory again during - // error handling. - if (!response_batch) { - response_batch = std::move(shm_pool_->Construct( - sizeof(ResponseBatch) + sizeof(IPCMessageShm))); - } - ResponseBatch* response_batch_shm_ptr = - reinterpret_cast( - response_batch.value().data_.get() + sizeof(IPCMessageShm)); - response_batch_shm_ptr->is_response_factory_deleted = true; - } - } - throw PythonBackendException( "Python model '" + name_ + "' is using the decoupled mode and the execute function must return " diff --git a/src/python_be.cc b/src/python_be.cc index 6b8c2516..40909388 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1362,6 +1362,9 @@ ModelInstanceState::ProcessRequests( reporter.SetBatchStatistics(total_batch_size); if (response_batch_shm_ptr->has_error) { + // Clean up the response factory if an error occurred. The + // `is_response_factory_deleted` flag indicates whether the response factory + // has been deleted for some corner cases. if (!response_batch_shm_ptr->is_response_factory_deleted) { for (uint32_t r = 0; r < request_count; r++) { TRITONBACKEND_ResponseFactory* response_factory = @@ -1396,7 +1399,7 @@ ModelInstanceState::ProcessRequests( // It is possible to have multiple responses batched together in a single // response batch shm, where some of the responses are None due to the // usage of response sender, so only create a TRITONBACKEND_Response - // object for the valid responses, and skip the None responses later. + // object for the valid responses. if (response_shm_handle[i] == 0) { responses->emplace_back(nullptr); } else { diff --git a/src/stub_launcher.cc b/src/stub_launcher.cc index e8d2430f..828228e6 100644 --- a/src/stub_launcher.cc +++ b/src/stub_launcher.cc @@ -728,7 +728,6 @@ TRITONSERVER_Error* StubLauncher::ReceiveMessageFromStub( bi::managed_external_buffer::handle_t& message) { - // message = parent_message_queue_->Pop(); bool success = false; while (!success) { uint64_t timeout_miliseconds = 1000; From 8528d75fde969b9efa707ff16b4ea2ca59b64088 Mon Sep 17 00:00:00 2001 From: krishung5 Date: Fri, 4 Oct 2024 12:37:25 -0700 Subject: [PATCH 12/13] Address comment --- src/pb_stub.cc | 27 ++++++--------------------- src/python_be.cc | 8 -------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 5bf2a5c2..6c4a9f08 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -723,17 +723,10 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) // the response batch. It is necessary to handle cases where the response // sender should have already cleaned up, ensuring the backend does not // delete the response factory again during error handling. - if (err_message.find("Response sender has been closed") != - std::string::npos) { - response_batch_shm_ptr->is_response_factory_deleted = true; - } else if ( - err_message.find("is using the decoupled mode and the execute function " - "must return None") != std::string::npos) { - for (py::handle py_request : py_request_list) { - InferRequest* request = py_request.cast(); - if (request->GetResponseSender()->IsClosed()) { - response_batch_shm_ptr->is_response_factory_deleted = true; - } + for (py::handle py_request : py_request_list) { + InferRequest* request = py_request.cast(); + if (request->GetResponseSender()->IsClosed()) { + response_batch_shm_ptr->is_response_factory_deleted = true; } } @@ -846,16 +839,8 @@ Stub::ProcessReturnedResponses( } catch (const PythonBackendException& pb_exception) { // Handle the exception here to catch the error when there's a response - // returned from `execute()`, and the below error message is thrown. - // In default (non-decoupled) mode, the response factory should already - // have been cleaned up when the previous response was sent by the - // response sender. However, if the model attempts to return another - // response from the `execute()` function, notify the backend NOT to - // delete the response factory again during error handling. - std::string err_message = pb_exception.what(); - if (err_message.find( - "Non-decoupled model cannot send more than one response") != - std::string::npos) { + // returned from `execute()`. + if (request->GetResponseSender()->IsClosed()) { response_batch = std::move(shm_pool_->Construct( sizeof(ResponseBatch) + sizeof(IPCMessageShm))); ResponseBatch* response_batch_shm_ptr = diff --git a/src/python_be.cc b/src/python_be.cc index 40909388..bdf7b95f 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1030,7 +1030,6 @@ TRITONSERVER_Error* ModelInstanceState::SendMessageToStub( bi::managed_external_buffer::handle_t message) { - // Stub()->StubMessageQueue()->Push(message); bool success = false; while (!success) { uint64_t timeout_miliseconds = 1000; @@ -1072,13 +1071,6 @@ ModelInstanceState::SendMessageAndReceiveResponse( std::shared_ptr>& responses, TRITONBACKEND_Request** requests, const uint32_t request_count) { - // SendMessageToStub(message); - - // bi::managed_external_buffer::handle_t response_message; - // Stub()->ReceiveMessageFromStub(response_message); - - // response = response_message; - auto error = SendMessageToStub(message); if (error != nullptr) { RespondErrorToAllRequests( From 815a5d4cb359ca6fdf313c84add0b643a0e36d1b Mon Sep 17 00:00:00 2001 From: krishung5 Date: Mon, 7 Oct 2024 14:37:57 -0700 Subject: [PATCH 13/13] Fix up --- src/pb_stub.cc | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 6c4a9f08..a26719d2 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -720,13 +720,15 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr) response_batch.value().data_.get() + sizeof(IPCMessageShm)); // The backend will clean up the response factory if there is an error in - // the response batch. It is necessary to handle cases where the response - // sender should have already cleaned up, ensuring the backend does not - // delete the response factory again during error handling. - for (py::handle py_request : py_request_list) { - InferRequest* request = py_request.cast(); - if (request->GetResponseSender()->IsClosed()) { - response_batch_shm_ptr->is_response_factory_deleted = true; + // the response batch. For decoupled mode, it is necessary to handle cases + // where the response sender should have already cleaned up, ensuring the + // backend does not delete the response factory again during error handling. + if (IsDecoupled()) { + for (py::handle py_request : py_request_list) { + InferRequest* request = py_request.cast(); + if (request->GetResponseSender()->IsClosed()) { + response_batch_shm_ptr->is_response_factory_deleted = true; + } } }