Skip to content

Commit 1d9c97b

Browse files
authored
chore(query): support local semaphore (#17827)
* chore(query): support local semaphore * chore(query): support local semaphore
1 parent 74a844b commit 1d9c97b

File tree

6 files changed

+257
-199
lines changed

6 files changed

+257
-199
lines changed

src/query/config/src/config.rs

+5
Original file line numberDiff line numberDiff line change
@@ -1458,6 +1458,9 @@ pub struct QueryConfig {
14581458
#[clap(long, value_name = "VALUE", default_value = "8")]
14591459
pub max_running_queries: u64,
14601460

1461+
#[clap(long, value_name = "VALUE", default_value = "true")]
1462+
pub global_statement_queue: bool,
1463+
14611464
/// The max total memory in bytes that can be used by this process.
14621465
#[clap(long, value_name = "VALUE", default_value = "0")]
14631466
pub max_server_memory_usage: u64,
@@ -1788,6 +1791,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
17881791
mysql_tls_server_key: self.mysql_tls_server_key,
17891792
max_active_sessions: self.max_active_sessions,
17901793
max_running_queries: self.max_running_queries,
1794+
global_statement_queue: self.global_statement_queue,
17911795
max_server_memory_usage: self.max_server_memory_usage,
17921796
max_memory_limit_enabled: self.max_memory_limit_enabled,
17931797
clickhouse_http_handler_host: self.clickhouse_http_handler_host,
@@ -1880,6 +1884,7 @@ impl From<InnerQueryConfig> for QueryConfig {
18801884
mysql_tls_server_key: inner.mysql_tls_server_key,
18811885
max_active_sessions: inner.max_active_sessions,
18821886
max_running_queries: inner.max_running_queries,
1887+
global_statement_queue: inner.global_statement_queue,
18831888
max_server_memory_usage: inner.max_server_memory_usage,
18841889
max_memory_limit_enabled: inner.max_memory_limit_enabled,
18851890

src/query/config/src/inner.rs

+2
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ pub struct QueryConfig {
169169
pub mysql_tls_server_key: String,
170170
pub max_active_sessions: u64,
171171
pub max_running_queries: u64,
172+
pub global_statement_queue: bool,
172173
pub max_server_memory_usage: u64,
173174
pub max_memory_limit_enabled: bool,
174175
pub clickhouse_http_handler_host: String,
@@ -271,6 +272,7 @@ impl Default for QueryConfig {
271272
mysql_tls_server_key: "".to_string(),
272273
max_active_sessions: 256,
273274
max_running_queries: 8,
275+
global_statement_queue: true,
274276
max_server_memory_usage: 0,
275277
max_memory_limit_enabled: false,
276278
clickhouse_http_handler_host: "127.0.0.1".to_string(),

src/query/service/src/interpreters/interpreter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ async fn plan_sql(
257257
if !acquire_queue {
258258
// If queue guard is not required, plan the statement directly.
259259
let plan = planner.plan_stmt(&extras.statement).await?;
260-
return Ok((plan, extras, AcquireQueueGuard::create(None)));
260+
return Ok((plan, extras, AcquireQueueGuard::create_global(None)));
261261
}
262262

263263
let need_acquire_lock = need_acquire_lock(ctx.clone(), &extras.statement);

src/query/service/src/sessions/queue_mgr.rs

+105-62
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ use databend_common_sql::PlanExtras;
5353
use log::info;
5454
use parking_lot::Mutex;
5555
use pin_project_lite::pin_project;
56+
use tokio::sync::AcquireError as TokioAcquireError;
57+
use tokio::sync::OwnedSemaphorePermit;
58+
use tokio::sync::Semaphore;
5659
use tokio::time::error::Elapsed;
5760

5861
use crate::sessions::QueryContext;
@@ -87,6 +90,8 @@ pub(crate) struct Inner<Data: QueueData> {
8790
pub struct QueueManager<Data: QueueData> {
8891
permits: usize,
8992
meta_store: MetaStore,
93+
semaphore: Arc<Semaphore>,
94+
global_statement_queue: bool,
9095
queue: Mutex<HashMap<Data::Key, Inner<Data>>>,
9196
}
9297

@@ -101,23 +106,33 @@ impl<Data: QueueData> QueueManager<Data> {
101106
};
102107

103108
info!("queue manager permits: {:?}", permits);
104-
GlobalInstance::set(Self::create(permits, metastore));
109+
GlobalInstance::set(Self::create(
110+
permits,
111+
metastore,
112+
conf.query.global_statement_queue,
113+
));
105114
Ok(())
106115
}
107116

108117
pub fn instance() -> Arc<Self> {
109118
GlobalInstance::get::<Arc<Self>>()
110119
}
111120

112-
pub fn create(mut permits: usize, meta_store: MetaStore) -> Arc<QueueManager<Data>> {
121+
pub fn create(
122+
mut permits: usize,
123+
meta_store: MetaStore,
124+
global_statement_queue: bool,
125+
) -> Arc<QueueManager<Data>> {
113126
if permits == 0 {
114127
permits = usize::MAX >> 4;
115128
}
116129

117130
Arc::new(QueueManager {
118131
permits,
119132
meta_store,
133+
global_statement_queue,
120134
queue: Mutex::new(HashMap::new()),
135+
semaphore: Arc::new(Semaphore::new(permits)),
121136
})
122137
}
123138

@@ -156,21 +171,35 @@ impl<Data: QueueData> QueueManager<Data> {
156171
);
157172

158173
let timeout = data.timeout();
159-
let semaphore_acquire = self.meta_store.new_acquired(
160-
data.get_lock_key(),
161-
self.permits as u64,
162-
data.get_key(), // ID of this acquirer
163-
data.lock_ttl(),
164-
);
165174

166-
let future = AcquireQueueFuture::create(
167-
Arc::new(data),
168-
tokio::time::timeout(timeout, semaphore_acquire),
169-
self.clone(),
170-
);
171175
let start_time = SystemTime::now();
176+
let acquire_res = match self.global_statement_queue {
177+
true => {
178+
let semaphore_acquire = self.meta_store.new_acquired(
179+
data.get_lock_key(),
180+
self.permits as u64,
181+
data.get_key(), // ID of this acquirer
182+
data.lock_ttl(),
183+
);
172184

173-
return match future.await {
185+
AcquireQueueFuture::create(
186+
Arc::new(data),
187+
tokio::time::timeout(timeout, semaphore_acquire),
188+
self.clone(),
189+
)
190+
.await
191+
}
192+
false => {
193+
AcquireQueueFuture::create(
194+
Arc::new(data),
195+
tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()),
196+
self.clone(),
197+
)
198+
.await
199+
}
200+
};
201+
202+
return match acquire_res {
174203
Ok(v) => {
175204
info!("finished acquiring from queue, length: {}", self.length());
176205

@@ -197,7 +226,7 @@ impl<Data: QueueData> QueueManager<Data> {
197226
};
198227
}
199228

200-
Ok(AcquireQueueGuard::create(None))
229+
Ok(AcquireQueueGuard::create_global(None))
201230
}
202231

203232
pub(crate) fn add_entity(&self, inner: Inner<Data>) -> Data::Key {
@@ -231,28 +260,35 @@ impl<Data: QueueData> QueueManager<Data> {
231260
}
232261
}
233262

234-
pub struct AcquireQueueGuard {
235-
#[allow(dead_code)]
236-
permit: Option<Permit>,
263+
pub enum AcquireQueueGuard {
264+
Global(Option<Permit>),
265+
Local(Option<OwnedSemaphorePermit>),
237266
}
238267

239268
impl Drop for AcquireQueueGuard {
240269
fn drop(&mut self) {
241-
if self.permit.is_some() {
242-
dec_session_running_acquired_queries();
270+
match self {
271+
AcquireQueueGuard::Local(Some(_)) | AcquireQueueGuard::Global(Some(_)) => {
272+
dec_session_running_acquired_queries();
273+
}
274+
_ => {}
243275
}
244276
}
245277
}
246278

247279
impl AcquireQueueGuard {
248-
pub fn create(permit: Option<Permit>) -> Self {
249-
AcquireQueueGuard { permit }
280+
pub fn create_global(permit: Option<Permit>) -> Self {
281+
AcquireQueueGuard::Global(permit)
282+
}
283+
284+
pub fn create_local(permit: Option<OwnedSemaphorePermit>) -> Self {
285+
AcquireQueueGuard::Local(permit)
250286
}
251287
}
252288

253289
pin_project! {
254-
pub struct AcquireQueueFuture<Data: QueueData, T>
255-
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
290+
pub struct AcquireQueueFuture<Data: QueueData, T, Permit, E>
291+
where T: Future<Output = std::result::Result<std::result::Result<Permit, E>, Elapsed>>
256292
{
257293
#[pin]
258294
inner: T,
@@ -266,8 +302,8 @@ where T: Future<Output = std::result::Result<std::result::Result<Permit, Acquir
266302
}
267303
}
268304

269-
impl<Data: QueueData, T> AcquireQueueFuture<Data, T>
270-
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
305+
impl<Data: QueueData, T, Permit, E> AcquireQueueFuture<Data, T, Permit, E>
306+
where T: Future<Output = std::result::Result<std::result::Result<Permit, E>, Elapsed>>
271307
{
272308
pub fn create(data: Arc<Data>, inner: T, mgr: Arc<QueueManager<Data>>) -> Self {
273309
AcquireQueueFuture {
@@ -281,53 +317,60 @@ where T: Future<Output = std::result::Result<std::result::Result<Permit, Acquire
281317
}
282318
}
283319

284-
impl<Data: QueueData, T> Future for AcquireQueueFuture<Data, T>
285-
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
286-
{
287-
type Output = Result<AcquireQueueGuard>;
320+
macro_rules! impl_acquire_queue_future {
321+
($Permit:ty, $fn_name:ident, $Error:ty) => {
322+
impl<Data: QueueData, T> Future for AcquireQueueFuture<Data, T, $Permit, $Error>
323+
where T: Future<Output = std::result::Result<std::result::Result<$Permit, $Error>, Elapsed>>
324+
{
325+
type Output = Result<AcquireQueueGuard>;
288326

289-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
290-
let this = self.project();
327+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
328+
let this = self.project();
291329

292-
if this.is_abort.load(Ordering::SeqCst) {
293-
return Poll::Ready(Err(Data::remove_error_message(this.key.take())));
294-
}
330+
if this.is_abort.load(Ordering::SeqCst) {
331+
return Poll::Ready(Err(Data::remove_error_message(this.key.take())));
332+
}
333+
334+
match this.inner.poll(cx) {
335+
Poll::Ready(res) => {
336+
if let Some(key) = this.key.take() {
337+
if this.manager.remove_entity(&key).is_none() {
338+
return Poll::Ready(Err(Data::remove_error_message(Some(key))));
339+
}
340+
}
295341

296-
match this.inner.poll(cx) {
297-
Poll::Ready(res) => {
298-
if let Some(key) = this.key.take() {
299-
if this.manager.remove_entity(&key).is_none() {
300-
return Poll::Ready(Err(Data::remove_error_message(Some(key))));
342+
Poll::Ready(match res {
343+
Ok(Ok(v)) => Ok(AcquireQueueGuard::$fn_name(Some(v))),
344+
Ok(Err(_)) => Err(ErrorCode::TokioError("acquire queue failure.")),
345+
Err(_elapsed) => Err(ErrorCode::Timeout("query queuing timeout")),
346+
})
301347
}
302-
}
348+
Poll::Pending => {
349+
if !*this.has_pending {
350+
*this.has_pending = true;
351+
}
303352

304-
Poll::Ready(match res {
305-
Ok(Ok(v)) => Ok(AcquireQueueGuard::create(Some(v))),
306-
Ok(Err(_)) => Err(ErrorCode::TokioError("acquire queue failure.")),
307-
Err(_elapsed) => Err(ErrorCode::Timeout("query queuing timeout")),
308-
})
309-
}
310-
Poll::Pending => {
311-
if !*this.has_pending {
312-
*this.has_pending = true;
313-
}
353+
if let Some(data) = this.data.take() {
354+
let waker = cx.waker().clone();
355+
*this.key = Some(this.manager.add_entity(Inner {
356+
data,
357+
waker,
358+
instant: Instant::now(),
359+
is_abort: this.is_abort.clone(),
360+
}));
361+
}
314362

315-
if let Some(data) = this.data.take() {
316-
let waker = cx.waker().clone();
317-
*this.key = Some(this.manager.add_entity(Inner {
318-
data,
319-
waker,
320-
instant: Instant::now(),
321-
is_abort: this.is_abort.clone(),
322-
}));
363+
Poll::Pending
364+
}
323365
}
324-
325-
Poll::Pending
326366
}
327367
}
328-
}
368+
};
329369
}
330370

371+
impl_acquire_queue_future!(Permit, create_global, AcquireError);
372+
impl_acquire_queue_future!(OwnedSemaphorePermit, create_local, TokioAcquireError);
373+
331374
pub struct QueryEntry {
332375
ctx: Arc<QueryContext>,
333376
pub query_id: String,

0 commit comments

Comments
 (0)