@@ -32,6 +32,7 @@ extern "C" {
32
32
#include " base/logging.h"
33
33
#include " core/bloom.h"
34
34
#include " core/json/json_object.h"
35
+ #include " core/overloaded.h"
35
36
#include " core/sorted_map.h"
36
37
#include " core/string_map.h"
37
38
#include " core/string_set.h"
@@ -48,6 +49,7 @@ extern "C" {
48
49
#include " server/server_state.h"
49
50
#include " server/set_family.h"
50
51
#include " server/tiering/common.h" // for _KB literal
52
+ #include " server/tiering/disk_storage.h"
51
53
#include " server/transaction.h"
52
54
#include " strings/human_readable.h"
53
55
@@ -387,6 +389,7 @@ class RdbLoaderBase::OpaqueObjLoader {
387
389
void operator ()(const LzfString& lzfstr);
388
390
void operator ()(const unique_ptr<LoadTrace>& ptr);
389
391
void operator ()(const RdbSBF& src);
392
+ void operator ()(const RdbTieredSegment& segmnet);
390
393
391
394
std::error_code ec () const {
392
395
return ec_;
@@ -481,6 +484,10 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
481
484
pv_->SetSBF (sbf);
482
485
}
483
486
487
+ void RdbLoaderBase::OpaqueObjLoader::operator ()(const RdbTieredSegment& src) {
488
+ CHECK (false ) << " unreachable" ;
489
+ }
490
+
484
491
void RdbLoaderBase::OpaqueObjLoader::CreateSet (const LoadTrace* ltrace) {
485
492
size_t len = ltrace->blob_count ();
486
493
@@ -1385,6 +1392,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
1385
1392
case RDB_TYPE_SBF:
1386
1393
iores = ReadSBF ();
1387
1394
break ;
1395
+ case RDB_TYPE_TIERED_SEGMENT:
1396
+ iores = ReadTieredSegment ();
1397
+ break ;
1388
1398
default :
1389
1399
LOG (ERROR) << " Unsupported rdb type " << rdbtype;
1390
1400
@@ -1878,6 +1888,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
1878
1888
return OpaqueObj{std::move (res), RDB_TYPE_SBF};
1879
1889
}
1880
1890
1891
+ auto RdbLoaderBase::ReadTieredSegment () -> io::Result<OpaqueObj> {
1892
+ RdbTieredSegment segment;
1893
+ SET_OR_UNEXPECT (LoadLen (nullptr ), segment.offset );
1894
+ SET_OR_UNEXPECT (LoadLen (nullptr ), segment.length );
1895
+ SET_OR_UNEXPECT (LoadLen (nullptr ), segment.enc_mask );
1896
+ return OpaqueObj{segment, RDB_TYPE_TIERED_SEGMENT};
1897
+ };
1898
+
1881
1899
template <typename T> io::Result<T> RdbLoaderBase::FetchInt () {
1882
1900
auto ec = EnsureRead (sizeof (T));
1883
1901
if (ec)
@@ -1924,6 +1942,18 @@ RdbLoader::RdbLoader(Service* service)
1924
1942
}
1925
1943
1926
1944
RdbLoader::~RdbLoader () {
1945
+ for (auto & [_, page] : small_items_pages_) {
1946
+ if (!holds_alternative<tiering::DiskSegment>(page))
1947
+ continue ;
1948
+ auto segment = get<tiering::DiskSegment>(page);
1949
+ EngineShard::tlocal ()->tiered_storage ()->BorrowStorage ().MarkAsFree (segment);
1950
+ }
1951
+
1952
+ for (auto & [_, items] : small_items_) {
1953
+ for (Item* item : items)
1954
+ delete item;
1955
+ }
1956
+
1927
1957
while (true ) {
1928
1958
Item* item = item_queue_.Pop ();
1929
1959
if (item == nullptr )
@@ -2117,6 +2147,11 @@ error_code RdbLoader::Load(io::Source* src) {
2117
2147
continue ;
2118
2148
}
2119
2149
2150
+ if (type == RDB_OPCODE_TIERED_PAGE) {
2151
+ RETURN_ON_ERR (LoadTieredPage ());
2152
+ continue ;
2153
+ }
2154
+
2120
2155
if (!rdbIsObjectTypeDF (type)) {
2121
2156
return RdbError (errc::invalid_rdb_type);
2122
2157
}
@@ -2126,6 +2161,11 @@ error_code RdbLoader::Load(io::Source* src) {
2126
2161
settings.Reset ();
2127
2162
} // main load loop
2128
2163
2164
+ // Flush all small items
2165
+ HandleSmallItems (true );
2166
+
2167
+ FlushAllShards ();
2168
+
2129
2169
DVLOG (1 ) << " RdbLoad loop finished" ;
2130
2170
2131
2171
if (stop_early_) {
@@ -2348,6 +2388,38 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
2348
2388
return std::error_code{};
2349
2389
}
2350
2390
2391
+ error_code RdbLoader::LoadTieredPage () {
2392
+ size_t offset;
2393
+ SET_OR_RETURN (LoadLen (nullptr ), offset);
2394
+
2395
+ std::string page;
2396
+ SET_OR_RETURN (FetchGenericString (), page);
2397
+
2398
+ // If tiering is enabled, try saving the received page on disk
2399
+ // Fall back to memory in case of errors
2400
+ if (EngineShard::tlocal () && EngineShard::tlocal ()->tiered_storage ()) {
2401
+ auto & storage = EngineShard::tlocal ()->tiered_storage ()->BorrowStorage ();
2402
+
2403
+ util::fb2::Done done;
2404
+ std::error_code ec;
2405
+ auto cb = [this , offset, &ec, &done](io::Result<tiering::DiskSegment> res) {
2406
+ if (res.has_value ())
2407
+ small_items_pages_[offset] = res.value ();
2408
+ else
2409
+ ec = res.error ();
2410
+ done.Notify ();
2411
+ };
2412
+ ec = storage.Stash (io::Buffer (page), {}, cb);
2413
+
2414
+ done.Wait ();
2415
+ if (!ec)
2416
+ return {};
2417
+ }
2418
+
2419
+ small_items_pages_[offset] = page;
2420
+ return {};
2421
+ }
2422
+
2351
2423
error_code RdbLoader::HandleAux () {
2352
2424
/* AUX: generic string-string fields. Use to add state to RDB
2353
2425
* which is backward compatible. Implementations of RDB loading
@@ -2531,20 +2603,37 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
2531
2603
2532
2604
item->is_sticky = settings->is_sticky ;
2533
2605
2534
- ShardId sid = Shard (item->key , shard_set->size ());
2535
2606
item->expire_ms = settings->expiretime ;
2536
2607
2537
- auto & out_buf = shard_buf_[sid];
2608
+ std::move (cleanup).Cancel ();
2609
+
2610
+ if (item->val .rdb_type == RDB_TYPE_TIERED_SEGMENT) {
2611
+ auto segment = get<RdbTieredSegment>(item->val .obj );
2612
+ {
2613
+ size_t offset = segment.offset / tiering::kPageSize * tiering::kPageSize ;
2614
+ auto & items = small_items_[offset];
2615
+ small_items_sizes_.erase ({items.size (), offset});
2616
+ items.push_back (item);
2617
+ small_items_sizes_.insert ({items.size (), offset});
2618
+ }
2619
+ HandleSmallItems (false ); // don't force flush
2620
+ return kOk ;
2621
+ }
2622
+
2623
+ Add (item);
2624
+ return kOk ;
2625
+ }
2626
+
2627
+ void RdbLoader::Add (Item* item) {
2628
+ ShardId sid = Shard (item->key , shard_set->size ());
2538
2629
2630
+ auto & out_buf = shard_buf_[sid];
2539
2631
out_buf.emplace_back (item);
2540
- std::move (cleanup).Cancel ();
2541
2632
2542
2633
constexpr size_t kBufSize = 128 ;
2543
2634
if (out_buf.size () >= kBufSize ) {
2544
2635
FlushShardAsync (sid);
2545
2636
}
2546
-
2547
- return kOk ;
2548
2637
}
2549
2638
2550
2639
void RdbLoader::LoadScriptFromAux (string&& body) {
@@ -2559,6 +2648,42 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
2559
2648
}
2560
2649
}
2561
2650
2651
+ void RdbLoader::HandleSmallItems (bool flush) {
2652
+ while (!small_items_.empty () && (flush || small_items_.size () > 1000 )) {
2653
+ auto [_, offset] = small_items_sizes_.extract (small_items_sizes_.begin ()).value ();
2654
+ auto node = small_items_.extract (offset);
2655
+
2656
+ auto page_reader = [](tiering::DiskSegment segment) {
2657
+ auto & store = EngineShard::tlocal ()->tiered_storage ()->BorrowStorage ();
2658
+ util::fb2::Future<std::string> f;
2659
+ store.Read (segment, [f](io::Result<std::string_view> result) mutable {
2660
+ CHECK (result.has_value ()); // TODO
2661
+ f.Resolve (string{result.value ()});
2662
+ });
2663
+ return f.Get ();
2664
+ };
2665
+ string page = visit (Overloaded{[](const string& s) { return s; }, page_reader},
2666
+ small_items_pages_[offset]);
2667
+
2668
+ for (Item* item : node.mapped ()) {
2669
+ RdbTieredSegment segment = get<RdbTieredSegment>(item->val .obj );
2670
+
2671
+ CompactObj co;
2672
+ co.SetEncodingMask (segment.enc_mask );
2673
+ co.Materialize ({page.data () + (segment.offset - offset), segment.length }, true );
2674
+
2675
+ VLOG (0 ) << " Loaded " << co.ToString ();
2676
+
2677
+ base::PODArray<char > arr (co.Size (), nullptr );
2678
+ co.GetString (arr.data ());
2679
+
2680
+ item->val .rdb_type = RDB_TYPE_STRING;
2681
+ item->val .obj = std::move (arr);
2682
+ Add (item);
2683
+ }
2684
+ }
2685
+ }
2686
+
2562
2687
void RdbLoader::LoadSearchIndexDefFromAux (string&& def) {
2563
2688
facade::CapturingReplyBuilder crb{};
2564
2689
ConnectionContext cntx{nullptr , nullptr , &crb};
0 commit comments