Skip to content

Commit 359099e

Browse files
authored
refactor: use DatabendRuntime::spawn in meta binaries (databendlabs#19324)
Replace `databend_common_base::runtime::spawn` with `DatabendRuntime::spawn` from the `SpawnApi` trait to decouple meta binaries from `databend-common-base` runtime implementation. Changes: - Replace `runtime::spawn()` with `DatabendRuntime::spawn()` in `metaverifier` - Replace `runtime::spawn()` with `DatabendRuntime::spawn()` in `metabench` - Remove `databend_common_base::runtime` imports
1 parent 0eb6787 commit 359099e

File tree

2 files changed

+90
-76
lines changed

2 files changed

+90
-76
lines changed

src/meta/binaries/metabench/main.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use std::time::Instant;
2929

3030
use chrono::Utc;
3131
use clap::Parser;
32-
use databend_common_base::runtime;
3332
use databend_common_meta_api::DatabaseApi;
3433
use databend_common_meta_api::TableApi;
3534
use databend_common_meta_api::serialize_struct;
@@ -49,6 +48,7 @@ use databend_common_meta_client::ClientHandle;
4948
use databend_common_meta_client::MetaGrpcClient;
5049
use databend_common_meta_client::required;
5150
use databend_common_meta_kvapi::kvapi::KVApi;
51+
use databend_common_meta_runtime_api::SpawnApi;
5252
use databend_common_meta_semaphore::Semaphore;
5353
use databend_common_meta_types::MatchSeq;
5454
use databend_common_meta_types::Operation;
@@ -60,6 +60,7 @@ use databend_common_tracing::StderrConfig;
6060
use databend_common_tracing::init_logging;
6161
use databend_common_version::BUILD_INFO;
6262
use databend_common_version::METASRV_COMMIT_VERSION;
63+
use databend_meta_runtime::DatabendRuntime;
6364
use futures::TryStreamExt;
6465
use serde::Deserialize;
6566
use serde::Serialize;
@@ -155,25 +156,28 @@ async fn main() {
155156

156157
let client = client.clone();
157158

158-
let handle = runtime::spawn(async move {
159-
for i in 0..config.number {
160-
if cmd == "upsert_kv" {
161-
benchmark_upsert(&client, prefix, client_num, i).await;
162-
} else if cmd == "table" {
163-
benchmark_table(&client, prefix, client_num, i).await;
164-
} else if cmd == "get_table" {
165-
benchmark_get_table(&client, prefix, client_num, i).await;
166-
} else if cmd == "table_copy_file" {
167-
benchmark_table_copy_file(&client, prefix, client_num, i, &param).await;
168-
} else if cmd == "semaphore" {
169-
benchmark_semaphore(&client, prefix, client_num, i, &param).await;
170-
} else if cmd == "list" {
171-
benchmark_list(&client, prefix, client_num, i, &param).await;
172-
} else {
173-
unreachable!("Invalid config.rpc: {}", rpc);
159+
let handle = DatabendRuntime::spawn(
160+
async move {
161+
for i in 0..config.number {
162+
if cmd == "upsert_kv" {
163+
benchmark_upsert(&client, prefix, client_num, i).await;
164+
} else if cmd == "table" {
165+
benchmark_table(&client, prefix, client_num, i).await;
166+
} else if cmd == "get_table" {
167+
benchmark_get_table(&client, prefix, client_num, i).await;
168+
} else if cmd == "table_copy_file" {
169+
benchmark_table_copy_file(&client, prefix, client_num, i, &param).await;
170+
} else if cmd == "semaphore" {
171+
benchmark_semaphore(&client, prefix, client_num, i, &param).await;
172+
} else if cmd == "list" {
173+
benchmark_list(&client, prefix, client_num, i, &param).await;
174+
} else {
175+
unreachable!("Invalid config.rpc: {}", rpc);
176+
}
174177
}
175-
}
176-
});
178+
},
179+
None,
180+
);
177181
handles.push(handle)
178182
}
179183

src/meta/binaries/metaverifier/main.rs

Lines changed: 67 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ use std::time::Instant;
2626
use anyhow::Result;
2727
use anyhow::bail;
2828
use clap::Parser;
29-
use databend_common_base::runtime;
3029
use databend_common_meta_client::ClientHandle;
3130
use databend_common_meta_client::MetaGrpcClient;
3231
use databend_common_meta_kvapi::kvapi::KVApi;
3332
use databend_common_meta_kvapi::kvapi::KvApiExt;
33+
use databend_common_meta_runtime_api::SpawnApi;
3434
use databend_common_meta_types::MatchSeq;
3535
use databend_common_meta_types::Operation;
3636
use databend_common_meta_types::UpsertKV;
@@ -40,6 +40,7 @@ use databend_common_tracing::StderrConfig;
4040
use databend_common_tracing::init_logging;
4141
use databend_common_version::BUILD_INFO;
4242
use databend_common_version::METASRV_COMMIT_VERSION;
43+
use databend_meta_runtime::DatabendRuntime;
4344
use rand::Rng;
4445
use rand::SeedableRng;
4546
use rand::rngs::StdRng;
@@ -127,70 +128,79 @@ async fn main() -> Result<()> {
127128
.map(|addr| addr.to_string())
128129
.collect();
129130

130-
let handle = runtime::spawn(async move {
131-
let client = MetaGrpcClient::try_create(
132-
addrs.clone(),
133-
BUILD_INFO.semver(),
134-
"root",
135-
"xxx",
136-
None,
137-
None,
138-
None,
139-
);
140-
141-
let client = match client {
142-
Ok(client) => client,
143-
Err(e) => {
144-
fs::write(VERIFIER_RESULT_FILE, "ERROR")?;
145-
eprintln!("Failed to create client: {}", e);
146-
bail!("Failed to create client: {}", e);
147-
}
148-
};
149-
150-
verifier(
151-
&client,
152-
prefix,
153-
config.number,
154-
client_num,
155-
config.remove_percent,
156-
)
157-
.await
158-
});
131+
let handle = DatabendRuntime::spawn(
132+
async move {
133+
let client = MetaGrpcClient::try_create(
134+
addrs.clone(),
135+
BUILD_INFO.semver(),
136+
"root",
137+
"xxx",
138+
None,
139+
None,
140+
None,
141+
);
142+
143+
let client = match client {
144+
Ok(client) => client,
145+
Err(e) => {
146+
fs::write(VERIFIER_RESULT_FILE, "ERROR")?;
147+
eprintln!("Failed to create client: {}", e);
148+
bail!("Failed to create client: {}", e);
149+
}
150+
};
151+
152+
verifier(
153+
&client,
154+
prefix,
155+
config.number,
156+
client_num,
157+
config.remove_percent,
158+
)
159+
.await
160+
},
161+
None,
162+
);
159163
println!("verifier worker {} started..", client_num);
160164
handles.push(handle)
161165
}
162166

163-
let waiter_handle = runtime::spawn(async move {
164-
let result = rx.recv_timeout(Duration::from_secs(config.time));
167+
let waiter_handle = DatabendRuntime::spawn(
168+
async move {
169+
let result = rx.recv_timeout(Duration::from_secs(config.time));
165170

166-
match result {
167-
Ok(_) => {
168-
println!("verifier completed within the timeout.");
169-
}
170-
Err(e) => {
171-
println!(
172-
"verifier did not complete within the timeout: {:?}s, error: {:?}",
173-
config.time, e
174-
);
175-
let _ = fs::write(VERIFIER_RESULT_FILE, "ERROR");
176-
panic!("verifier did not complete within the timeout.")
171+
match result {
172+
Ok(_) => {
173+
println!("verifier completed within the timeout.");
174+
}
175+
Err(e) => {
176+
println!(
177+
"verifier did not complete within the timeout: {:?}s, error: {:?}",
178+
config.time, e
179+
);
180+
let _ = fs::write(VERIFIER_RESULT_FILE, "ERROR");
181+
panic!("verifier did not complete within the timeout.")
182+
}
177183
}
178-
}
179-
});
180-
181-
let wait_verifier_handle = runtime::spawn(async move {
182-
for handle in handles {
183-
let ret = handle.await.unwrap();
184-
if let Err(e) = ret {
185-
fs::write(VERIFIER_RESULT_FILE, "ERROR")?;
186-
println!("verifier return error: {:?}", e);
187-
return Err(e);
184+
},
185+
None,
186+
);
187+
188+
let wait_verifier_handle = DatabendRuntime::spawn(
189+
async move {
190+
for handle in handles {
191+
let ret = handle.await.unwrap();
192+
if let Err(e) = ret {
193+
fs::write(VERIFIER_RESULT_FILE, "ERROR")?;
194+
println!("verifier return error: {:?}", e);
195+
return Err(e);
196+
}
188197
}
189-
}
190-
tx.send(()).unwrap();
198+
tx.send(()).unwrap();
191199

192-
Ok(())
193-
});
200+
Ok(())
201+
},
202+
None,
203+
);
194204

195205
let _ret = wait_verifier_handle.await.unwrap();
196206
if let Err(e) = waiter_handle.await {

0 commit comments

Comments
 (0)