Skip to content

Commit 6a6d3c3

Browse files
XuanwoBohuTANG
andauthored
feat: Use query id as trace id (#17947)
* feat: Use query id as trace id Signed-off-by: Xuanwo <github@xuanwo.io> * Make sure query id are compatible Signed-off-by: Xuanwo <github@xuanwo.io> * fix query id Signed-off-by: Xuanwo <github@xuanwo.io> * Fix tests Signed-off-by: Xuanwo <github@xuanwo.io> * Fix tests Signed-off-by: Xuanwo <github@xuanwo.io> * Fix RM_UUID Signed-off-by: Xuanwo <github@xuanwo.io> * fix query id Signed-off-by: Xuanwo <github@xuanwo.io> * Fix tests Signed-off-by: Xuanwo <github@xuanwo.io> * Fix uuid Signed-off-by: Xuanwo <github@xuanwo.io> --------- Signed-off-by: Xuanwo <github@xuanwo.io> Co-authored-by: Bohu <overred.shuttler@gmail.com>
1 parent 2b1ef37 commit 6a6d3c3

File tree

16 files changed

+53
-35
lines changed

16 files changed

+53
-35
lines changed

Cargo.lock

Lines changed: 0 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,6 @@ hashlink = "0.8"
328328
headers = "0.4.0"
329329
hex = "0.4.3"
330330
hickory-resolver = "0.25"
331-
highway = "1.1"
332331
hive_metastore = "0.1.0"
333332
hostname = "0.3.1"
334333
http = "1"

src/query/service/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ futures-util = { workspace = true }
126126
geozero = { workspace = true }
127127
headers = { workspace = true }
128128
hex = { workspace = true }
129-
highway = { workspace = true }
130129
http = { workspace = true }
131130
humantime = { workspace = true }
132131
indicatif = { workspace = true }

src/query/service/src/servers/flight/v1/actions/init_query_env.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ use crate::servers::flight::v1::packets::QueryEnv;
2828

2929
pub static INIT_QUERY_ENV: &str = "/actions/init_query_env";
3030

31-
pub async fn init_query_env(env: QueryEnv) -> Result<()> {
31+
pub async fn init_query_env(mut env: QueryEnv) -> Result<()> {
32+
// Update query id to make sure they are compatible.
33+
env.query_id = env.query_id.replace('-', "");
34+
3235
let mut tracking_workload_group = None;
3336
let mut parent_mem_stat = ParentMemStat::StaticRef(&GLOBAL_MEM_STAT);
3437

src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::servers::flight::v1::exchange::DataExchangeManager;
2121
pub static START_PREPARED_QUERY: &str = "/actions/start_prepared_query";
2222

2323
pub async fn start_prepared_query(id: String) -> Result<()> {
24+
let id = id.replace('-', "");
2425
let ctx = DataExchangeManager::instance().get_query_ctx(&id)?;
2526

2627
let mut tracking_payload = ThreadTracker::new_tracking_payload();

src/query/service/src/servers/http/middleware/session.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -681,8 +681,8 @@ impl<E: Endpoint> Endpoint for HTTPSessionEndpoint<E> {
681681
let query_id = req
682682
.headers()
683683
.get(HEADER_QUERY_ID)
684-
.map(|id| id.to_str().unwrap().to_string())
685-
.unwrap_or_else(|| Uuid::new_v4().to_string());
684+
.map(|id| id.to_str().unwrap().replace('-', ""))
685+
.unwrap_or_else(|| Uuid::now_v7().simple().to_string());
686686

687687
let mut login_history = LoginHistory::new();
688688
login_history.handler = LoginHandler::HTTP;

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use databend_common_metrics::http::metrics_incr_http_response_errors_count;
3434
use databend_common_version::DATABEND_SEMVER;
3535
use fastrace::func_path;
3636
use fastrace::prelude::*;
37-
use highway::HighwayHash;
3837
use http::HeaderMap;
3938
use http::HeaderValue;
4039
use http::StatusCode;
@@ -56,6 +55,7 @@ use poem::Request;
5655
use poem::Route;
5756
use serde::Deserialize;
5857
use serde::Serialize;
58+
use uuid::Uuid;
5959

6060
use super::query::ExecuteStateKind;
6161
use super::query::HttpQuery;
@@ -788,8 +788,8 @@ fn query_id_not_found(query_id: &str, node_id: &str) -> PoemError {
788788
}
789789

790790
fn query_id_to_trace_id(query_id: &str) -> TraceId {
791-
let [hash_high, hash_low] = highway::PortableHash::default().hash128(query_id.as_bytes());
792-
TraceId(((hash_high as u128) << 64) + (hash_low as u128))
791+
let uuid = Uuid::parse_str(query_id).unwrap_or_else(|_| Uuid::now_v7());
792+
TraceId(uuid.as_u128())
793793
}
794794

795795
/// The HTTP query endpoints are expected to be responses within 60 seconds.

src/query/service/src/servers/mysql/mysql_interactive_worker.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,15 +214,20 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for InteractiveWorke
214214
query: &'a str,
215215
writer: QueryResultWriter<'a, W>,
216216
) -> Result<()> {
217-
let query_id = Uuid::new_v4().to_string();
217+
let query_id = Uuid::now_v7();
218+
// Ensure the query id shares the same representation as trace_id.
219+
let query_id_str = query_id.simple().to_string();
220+
218221
let sampled =
219222
thread_rng().gen_range(0..100) <= self.base.session.get_trace_sample_rate()?;
220-
let root = Span::root(func_path!(), SpanContext::random().sampled(sampled))
223+
let span_context =
224+
SpanContext::new(TraceId(query_id.as_u128()), SpanId::default()).sampled(sampled);
225+
let root = Span::root(func_path!(), span_context)
221226
.with_properties(|| self.base.session.to_fastrace_properties());
222227

223228
let mut tracking_payload = ThreadTracker::new_tracking_payload();
224-
tracking_payload.query_id = Some(query_id.clone());
225-
tracking_payload.mem_stat = Some(MemStat::create(query_id.clone()));
229+
tracking_payload.query_id = Some(query_id_str.clone());
230+
tracking_payload.mem_stat = Some(MemStat::create(query_id_str.to_string()));
226231
let _guard = ThreadTracker::tracking(tracking_payload);
227232

228233
ThreadTracker::tracking_future(async {
@@ -247,7 +252,7 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for InteractiveWorke
247252
let instant = Instant::now();
248253
let query_result = self
249254
.base
250-
.do_query(query_id, query)
255+
.do_query(query_id_str, query)
251256
.await
252257
.map_err(|err| err.display_with_sql(query));
253258

src/query/service/src/sessions/query_ctx.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,7 @@ impl TableContext for QueryContext {
847847
}
848848

849849
fn get_id(&self) -> String {
850-
self.shared.init_query_id.as_ref().read().clone()
850+
self.shared.init_query_id.as_ref().read().replace('-', "")
851851
}
852852

853853
fn get_current_catalog(&self) -> String {

src/query/service/tests/it/servers/http/http_query_handlers.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,24 @@ async fn test_return_when_finish() -> Result<()> {
447447
async fn test_client_query_id() -> Result<()> {
448448
let _fixture = TestFixture::setup().await?;
449449

450+
let wait_time_secs = 5;
451+
let sql = "select * from numbers(1)";
452+
let ep = create_endpoint()?;
453+
let mut headers = HeaderMap::new();
454+
headers.insert("x-databend-query-id", "testqueryid".parse().unwrap());
455+
let (status, result) =
456+
post_sql_to_endpoint_new_session(&ep, sql, wait_time_secs, headers).await?;
457+
assert_eq!(status, StatusCode::OK);
458+
assert_eq!(result.id, "testqueryid");
459+
460+
Ok(())
461+
}
462+
463+
// `-` in query id will be trimmed.
464+
#[tokio::test(flavor = "current_thread")]
465+
async fn test_client_compatible_query_id() -> Result<()> {
466+
let _fixture = TestFixture::setup().await?;
467+
450468
let wait_time_secs = 5;
451469
let sql = "select * from numbers(1)";
452470
let ep = create_endpoint()?;
@@ -455,7 +473,7 @@ async fn test_client_query_id() -> Result<()> {
455473
let (status, result) =
456474
post_sql_to_endpoint_new_session(&ep, sql, wait_time_secs, headers).await?;
457475
assert_eq!(status, StatusCode::OK);
458-
assert_eq!(result.id, "test-query-id");
476+
assert_eq!(result.id, "testqueryid");
459477

460478
Ok(())
461479
}

0 commit comments

Comments
 (0)