Skip to content

Commit c69e015

Browse files
authored
Merge pull request #29830 from andrwng/ct-l1-sg
ct/l1: explicitly use metastore scheduling group
2 parents 1a8fcd2 + cc6ccc6 commit c69e015

File tree

9 files changed

+51
-19
lines changed

9 files changed

+51
-19
lines changed

src/v/cloud_topics/app.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ ss::future<> app::construct(
8383
ss::sharded_parameter([this] { return &l1_io.local(); }),
8484
config::node().l1_staging_path(),
8585
ss::sharded_parameter([&remote] { return &remote->local(); }),
86-
bucket);
86+
bucket,
87+
scheduling_groups::instance().cloud_topics_metastore_sg());
8788

8889
co_await construct_service(
8990
l1_metastore_router,

src/v/cloud_topics/level_one/domain/db_domain_manager.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,14 @@ db_domain_manager::db_domain_manager(
197197
std::filesystem::path staging_dir,
198198
cloud_io::remote* remote,
199199
cloud_storage_clients::bucket_name bucket,
200-
io* object_io)
200+
io* object_io,
201+
ss::scheduling_group sg)
201202
: expected_term_(expected_term)
202203
, staging_dir_(std::move(staging_dir))
203204
, remote_(remote)
204205
, bucket_(std::move(bucket))
205206
, object_io_(object_io)
207+
, sg_(sg)
206208
, stm_(std::move(stm))
207209
, gc_interval_(
208210
config::shard_local_cfg()
@@ -211,7 +213,9 @@ db_domain_manager::db_domain_manager(
211213
}
212214

213215
void db_domain_manager::start() {
214-
ssx::spawn_with_gate(gate_, [this] { return gc_loop(); });
216+
ssx::spawn_with_gate(gate_, [this] {
217+
return ss::with_scheduling_group(sg_, [this] { return gc_loop(); });
218+
});
215219
}
216220

217221
ss::future<> db_domain_manager::stop_and_wait() {
@@ -1375,7 +1379,7 @@ ss::future<std::expected<void, rpc::errc>> db_domain_manager::maybe_open_db() {
13751379
vlog(
13761380
cd_log.debug, "Opening database with expected term {}", expected_term_);
13771381
auto db_res = co_await replicated_database::open(
1378-
expected_term_, stm_.get(), staging_dir_, remote_, bucket_, as_);
1382+
expected_term_, stm_.get(), staging_dir_, remote_, bucket_, as_, sg_);
13791383
if (!db_res.has_value()) {
13801384
co_return std::unexpected(
13811385
log_and_convert(db_res.error(), "Failed to open database: "));
@@ -1559,7 +1563,7 @@ db_domain_manager::restore_domain(rpc::restore_domain_request req) {
15591563
"Re-opening database with expected term {}",
15601564
expected_term_);
15611565
auto db_res = co_await replicated_database::open(
1562-
expected_term_, stm_.get(), staging_dir_, remote_, bucket_, as_);
1566+
expected_term_, stm_.get(), staging_dir_, remote_, bucket_, as_, sg_);
15631567
if (!db_res.has_value()) {
15641568
co_return rpc::restore_domain_reply{
15651569
.ec = log_and_convert(db_res.error(), "Failed to reopen database: "),

src/v/cloud_topics/level_one/domain/db_domain_manager.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <seastar/core/abort_source.hh>
1919
#include <seastar/core/rwlock.hh>
20+
#include <seastar/core/scheduling.hh>
2021

2122
namespace cloud_topics::l1 {
2223
class io;
@@ -33,7 +34,8 @@ class db_domain_manager final : public domain_manager {
3334
std::filesystem::path staging_dir,
3435
cloud_io::remote* remote,
3536
cloud_storage_clients::bucket_name bucket,
36-
io* object_io);
37+
io* object_io,
38+
ss::scheduling_group sg);
3739

3840
void start() override;
3941
ss::future<> stop_and_wait() override;
@@ -169,6 +171,7 @@ class db_domain_manager final : public domain_manager {
169171
cloud_io::remote* remote_;
170172
cloud_storage_clients::bucket_name bucket_;
171173
io* object_io_;
174+
ss::scheduling_group sg_;
172175

173176
ss::shared_ptr<stm> stm_;
174177

src/v/cloud_topics/level_one/domain/domain_supervisor.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ class domain_supervisor::impl {
3636
io* io,
3737
std::filesystem::path staging_dir,
3838
cloud_io::remote* remote,
39-
cloud_storage_clients::bucket_name bucket)
39+
cloud_storage_clients::bucket_name bucket,
40+
ss::scheduling_group sg)
4041
: _controller(controller)
4142
, _object_io(io)
4243
, _staging_dir(std::move(staging_dir))
4344
, _remote(remote)
4445
, _bucket(std::move(bucket))
46+
, _sg(sg)
4547
, _queue([](const std::exception_ptr& ex) {
4648
vlog(cd_log.error, "Unexpected domain supervisor error: {}", ex);
4749
}) {}
@@ -326,7 +328,8 @@ class domain_supervisor::impl {
326328
_staging_dir,
327329
_remote,
328330
_bucket,
329-
_object_io);
331+
_object_io,
332+
_sg);
330333
} else {
331334
domain_mgr = ss::make_shared<simple_domain_manager>(
332335
stm_manager->get<simple_stm>(), _object_io);
@@ -340,6 +343,7 @@ class domain_supervisor::impl {
340343
std::filesystem::path _staging_dir;
341344
cloud_io::remote* _remote;
342345
cloud_storage_clients::bucket_name _bucket;
346+
ss::scheduling_group _sg;
343347

344348
// Queue to process async work associated with starting and stopping domain
345349
// managers when handling partition notifications.
@@ -360,10 +364,16 @@ domain_supervisor::domain_supervisor(
360364
io* io,
361365
std::filesystem::path staging_dir,
362366
cloud_io::remote* remote,
363-
cloud_storage_clients::bucket_name bucket)
367+
cloud_storage_clients::bucket_name bucket,
368+
ss::scheduling_group sg)
364369
: _impl(
365370
std::make_unique<impl>(
366-
controller, io, std::move(staging_dir), remote, std::move(bucket))) {}
371+
controller,
372+
io,
373+
std::move(staging_dir),
374+
remote,
375+
std::move(bucket),
376+
sg)) {}
367377

368378
domain_supervisor::~domain_supervisor() = default;
369379

src/v/cloud_topics/level_one/domain/domain_supervisor.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "model/fundamental.h"
1414

1515
#include <seastar/core/future.hh>
16+
#include <seastar/core/scheduling.hh>
1617
#include <seastar/core/shared_ptr.hh>
1718
#include <seastar/util/optimized_optional.hh>
1819

@@ -42,7 +43,8 @@ class domain_supervisor {
4243
io*,
4344
std::filesystem::path staging_dir,
4445
cloud_io::remote*,
45-
cloud_storage_clients::bucket_name bucket);
46+
cloud_storage_clients::bucket_name bucket,
47+
ss::scheduling_group sg);
4648
domain_supervisor(const domain_supervisor&) = delete;
4749
domain_supervisor(domain_supervisor&&) = delete;
4850
domain_supervisor& operator=(const domain_supervisor&) = delete;

src/v/cloud_topics/level_one/domain/tests/db_domain_manager_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ struct domain_manager_node {
7676
staging_directory.get_path(),
7777
remote,
7878
bucket,
79-
&object_io);
79+
&object_io,
80+
ss::default_scheduling_group());
8081
if (start_gc) {
8182
mgr->start();
8283
}

src/v/cloud_topics/level_one/metastore/lsm/replicated_db.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ replicated_database::open(
6161
const std::filesystem::path& staging_directory,
6262
cloud_io::remote* remote,
6363
const cloud_storage_clients::bucket_name& bucket,
64-
ss::abort_source& as) {
64+
ss::abort_source& as,
65+
ss::scheduling_group sg) {
6566
auto term_result = co_await s->sync(std::chrono::seconds(30));
6667
if (!term_result.has_value()) {
6768
co_return std::unexpected(
@@ -142,6 +143,7 @@ replicated_database::open(
142143
lsm::database::open(
143144
lsm::options{
144145
.database_epoch = epoch(),
146+
.compaction_scheduling_group = sg,
145147
.file_deletion_delay = absl::FromChrono(
146148
config::shard_local_cfg()
147149
.cloud_topics_long_term_file_deletion_delay()),
@@ -193,13 +195,15 @@ replicated_database::open(
193195
}
194196
}
195197
auto ret = std::unique_ptr<replicated_database>(
196-
new replicated_database(term, domain_uuid, s, std::move(db), as));
198+
new replicated_database(term, domain_uuid, s, std::move(db), as, sg));
197199
ret->start();
198200
co_return std::move(ret);
199201
}
200202

201203
void replicated_database::start() {
202-
ssx::spawn_with_gate(gate_, [this] { return apply_loop(); });
204+
ssx::spawn_with_gate(gate_, [this] {
205+
return ss::with_scheduling_group(sg_, [this] { return apply_loop(); });
206+
});
203207
}
204208

205209
ss::future<std::expected<void, replicated_database::error>>

src/v/cloud_topics/level_one/metastore/lsm/replicated_db.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include "model/fundamental.h"
2121
#include "utils/detailed_error.h"
2222

23+
#include <seastar/core/scheduling.hh>
24+
2325
#include <expected>
2426
#include <filesystem>
2527

@@ -63,7 +65,8 @@ class replicated_database {
6365
const std::filesystem::path& staging_directory,
6466
cloud_io::remote* remote,
6567
const cloud_storage_clients::bucket_name& bucket,
66-
ss::abort_source& as);
68+
ss::abort_source& as,
69+
ss::scheduling_group sg);
6770

6871
replicated_database(replicated_database&&) = delete;
6972
~replicated_database() = default;
@@ -99,12 +102,14 @@ class replicated_database {
99102
domain_uuid domain_uuid,
100103
stm* s,
101104
lsm::database db,
102-
ss::abort_source& as)
105+
ss::abort_source& as,
106+
ss::scheduling_group sg)
103107
: term_(term)
104108
, expected_domain_uuid_(domain_uuid)
105109
, stm_(s)
106110
, db_(std::move(db))
107-
, as_(as) {}
111+
, as_(as)
112+
, sg_(sg) {}
108113

109114
ss::future<> apply_loop();
110115

@@ -133,6 +138,7 @@ class replicated_database {
133138

134139
ss::gate gate_;
135140
ss::abort_source& as_;
141+
ss::scheduling_group sg_;
136142
};
137143

138144
} // namespace cloud_topics::l1

src/v/cloud_topics/level_one/metastore/lsm/tests/replicated_db_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@ struct replicated_db_node {
8282
staging_directory.get_path(),
8383
remote,
8484
bucket,
85-
as);
85+
as,
86+
ss::default_scheduling_group());
8687
if (!ret.has_value()) {
8788
co_return std::unexpected(ret.error());
8889
}

0 commit comments

Comments
 (0)