Skip to content

Commit 374bb92

Browse files
authored
fix(query): Fix incorrect memory tracking in the HTTP handler (#17857)
fix(query): fix query memory state limit in http handler
1 parent 18ce7dd commit 374bb92

File tree

3 files changed

+43
-23
lines changed

3 files changed

+43
-23
lines changed

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use databend_common_base::headers::HEADER_QUERY_PAGE_ROWS;
2121
use databend_common_base::headers::HEADER_QUERY_STATE;
2222
use databend_common_base::runtime::drop_guard;
2323
use databend_common_base::runtime::execute_futures_in_parallel;
24+
use databend_common_base::runtime::MemStat;
2425
use databend_common_base::runtime::ThreadTracker;
2526
use databend_common_config::GlobalConfig;
2627
use databend_common_exception::ErrorCode;
@@ -416,13 +417,24 @@ pub(crate) async fn query_handler(
416417
ctx: &HttpQueryContext,
417418
Json(req): Json<HttpQueryRequest>,
418419
) -> PoemResult<impl IntoResponse> {
419-
let root = get_http_tracing_span(func_path!(), ctx, &ctx.query_id);
420-
let _t = SlowRequestLogTracker::new(ctx);
421-
422-
async {
423-
let agent_info = ctx.user_agent.as_ref().map(|s| (format!("(from {s})"))).unwrap_or("".to_string());
424-
let client_session_id_info = ctx.client_session_id.as_ref().map(|s| (format!("(client_session_id={s})"))).unwrap_or("".to_string());
425-
info!("http query new request{}{}: {}", agent_info, client_session_id_info, mask_connection_info(&format!("{:?}", req)));
420+
let query_handle = async {
421+
let agent_info = ctx
422+
.user_agent
423+
.as_ref()
424+
.map(|s| format!("(from {s})"))
425+
.unwrap_or("".to_string());
426+
427+
let client_session_id_info = ctx
428+
.client_session_id
429+
.as_ref()
430+
.map(|s| format!("(client_session_id={s})"))
431+
.unwrap_or("".to_string());
432+
info!(
433+
"http query new request{}{}: {}",
434+
agent_info,
435+
client_session_id_info,
436+
mask_connection_info(&format!("{:?}", req))
437+
);
426438
let sql = req.sql.clone();
427439

428440
match HttpQuery::try_create(ctx, req.clone()).await {
@@ -449,10 +461,14 @@ pub(crate) async fn query_handler(
449461
.get_response_page(0)
450462
.await
451463
.map_err(|err| err.display_with_sql(&sql))
452-
.map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?;
464+
.map_err(|err| {
465+
poem::Error::from_string(err.message(), StatusCode::NOT_FOUND)
466+
})?;
467+
453468
if matches!(resp.state.state, ExecuteStateKind::Failed) {
454469
ctx.set_fail();
455470
}
471+
456472
let (rows, next_page) = match &resp.data {
457473
None => (0, None),
458474
Some(p) => (p.page.data.num_rows(), p.next_page_no),
@@ -464,9 +480,24 @@ pub(crate) async fn query_handler(
464480
Ok(QueryResponse::from_internal(query.id.to_string(), resp, false).into_response())
465481
}
466482
}
467-
}
468-
.in_span(root)
469-
.await
483+
};
484+
485+
let query_handle = {
486+
let root = get_http_tracing_span(func_path!(), ctx, &ctx.query_id);
487+
let _t = SlowRequestLogTracker::new(ctx);
488+
query_handle.in_span(root)
489+
};
490+
491+
let query_handle = {
492+
let query_mem_stat = MemStat::create(format!("Query-{}", ctx.query_id));
493+
let mut tracking_payload = ThreadTracker::new_tracking_payload();
494+
tracking_payload.query_id = Some(ctx.query_id.clone());
495+
tracking_payload.mem_stat = Some(query_mem_stat.clone());
496+
let _tracking_guard = ThreadTracker::tracking(tracking_payload);
497+
ThreadTracker::tracking_future(query_handle)
498+
};
499+
500+
query_handle.await
470501
}
471502

472503
#[derive(Deserialize, Serialize, Debug)]

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ pub struct ExecuteRunning {
122122
// used to kill query
123123
session: Arc<Session>,
124124
// mainly used to get progress for now
125-
ctx: Arc<QueryContext>,
125+
pub(crate) ctx: Arc<QueryContext>,
126126
schema: Vec<QueryResponseField>,
127127
has_result_set: bool,
128128
#[allow(dead_code)]

src/query/service/src/servers/http/v1/query/http_query.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use databend_common_base::base::tokio::sync::Mutex as TokioMutex;
2626
use databend_common_base::runtime::CatchUnwindFuture;
2727
use databend_common_base::runtime::GlobalQueryRuntime;
2828
use databend_common_base::runtime::MemStat;
29-
use databend_common_base::runtime::ThreadTracker;
3029
use databend_common_base::runtime::TrySpawn;
3130
use databend_common_catalog::table_context::StageAttachment;
3231
use databend_common_exception::ErrorCode;
@@ -473,11 +472,6 @@ impl HttpQuery {
473472
session.set_client_host(ctx.client_host.clone());
474473

475474
let http_ctx = ctx;
476-
let query_mem_stat = MemStat::create(format!("Query-{}", query_id));
477-
let mut tracking_payload = ThreadTracker::new_tracking_payload();
478-
tracking_payload.mem_stat = Some(query_mem_stat.clone());
479-
let _tracking_guard = ThreadTracker::tracking(tracking_payload);
480-
481475
let ctx = session.create_query_context().await?;
482476

483477
// Deduplicate label is used on the DML queries which may be retried by the client.
@@ -742,11 +736,6 @@ impl HttpQuery {
742736
page_manager.format_settings.clone()
743737
};
744738

745-
let mut tracking_payload = ThreadTracker::new_tracking_payload();
746-
tracking_payload.mem_stat = query_context.get_query_memory_tracking();
747-
tracking_payload.query_id = Some(query_context.get_id());
748-
let _tracking_guard = ThreadTracker::tracking(tracking_payload);
749-
750739
GlobalQueryRuntime::instance().runtime().try_spawn(
751740
async move {
752741
if let Err(e) = CatchUnwindFuture::create(ExecuteState::try_start_query(

0 commit comments

Comments
 (0)