@@ -120,24 +120,28 @@ class RestoreArgs {
120
120
: expiration_(expiration), abs_time_(abs_time), replace_(replace) {
121
121
}
122
122
123
- constexpr bool Replace () const {
123
+ bool Replace () const {
124
124
return replace_;
125
125
}
126
126
127
- constexpr bool Sticky () const {
127
+ bool Sticky () const {
128
128
return sticky_;
129
129
}
130
130
131
+ void SetSticky (bool sticky) {
132
+ sticky_ = sticky;
133
+ }
134
+
131
135
uint64_t ExpirationTime () const {
132
136
DCHECK_GE (expiration_, 0 );
133
137
return expiration_;
134
138
}
135
139
136
- [[nodiscard]] constexpr bool Expired () const {
140
+ bool Expired () const {
137
141
return expiration_ < 0 ;
138
142
}
139
143
140
- [[nodiscard]] constexpr bool HasExpiration () const {
144
+ bool HasExpiration () const {
141
145
return expiration_ != NO_EXPIRATION;
142
146
}
143
147
@@ -152,9 +156,12 @@ class RdbRestoreValue : protected RdbLoaderBase {
152
156
rdb_version_ = rdb_version;
153
157
}
154
158
155
- std::optional<DbSlice::ItAndUpdater> Add (std::string_view payload, std::string_view key,
156
- DbSlice& db_slice, const DbContext& cntx,
157
- const RestoreArgs& args, EngineShard* shard);
159
+ // Returns default ItAndUpdater if Add failed.
160
+ // In case a valid ItAndUpdater is returned, then second is true in case a new key is added,
161
+ // false if the existing key is updated (should not happen unless we have a bug).
162
+ pair<DbSlice::ItAndUpdater, bool > Add (string_view key, string_view payload, const DbContext& cntx,
163
+ const RestoreArgs& args, DbSlice* db_slice,
164
+ EngineShard* shard);
158
165
159
166
private:
160
167
std::optional<OpaqueObj> Parse (io::Source* source);
@@ -185,18 +192,17 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* sourc
185
192
return std::optional<OpaqueObj>(std::move (obj));
186
193
}
187
194
188
- std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add (std::string_view data,
189
- std::string_view key, DbSlice& db_slice,
190
- const DbContext& cntx,
191
- const RestoreArgs& args,
192
- EngineShard* shard) {
195
+ pair<DbSlice::ItAndUpdater, bool > RdbRestoreValue::Add (string_view key, string_view data,
196
+ const DbContext& cntx,
197
+ const RestoreArgs& args, DbSlice* db_slice,
198
+ EngineShard* shard) {
193
199
InMemSource data_src (data);
194
200
PrimeValue pv;
195
201
bool first_parse = true ;
196
202
do {
197
203
auto opaque_res = Parse (&data_src);
198
204
if (!opaque_res) {
199
- return std::nullopt ;
205
+ return {} ;
200
206
}
201
207
202
208
LoadConfig config;
@@ -212,16 +218,18 @@ std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
212
218
if (auto ec = FromOpaque (*opaque_res, config, &pv); ec) {
213
219
// we failed - report and exit
214
220
LOG (WARNING) << " error while trying to read data: " << ec;
215
- return std::nullopt ;
221
+ return {} ;
216
222
}
217
223
} while (pending_read_.remaining > 0 );
218
224
219
- if (auto res = db_slice. AddNew (cntx, key, std::move (pv), args.ExpirationTime ()); res) {
225
+ if (auto res = db_slice-> AddOrUpdate (cntx, key, std::move (pv), args.ExpirationTime ()); res) {
220
226
res->it ->first .SetSticky (args.Sticky ());
221
227
shard->search_indices ()->AddDoc (key, cntx, res->it ->second );
222
- return std::move (res.value ());
228
+ return {DbSlice::ItAndUpdater{
229
+ .it = res->it , .exp_it = res->exp_it , .post_updater = std::move (res->post_updater )},
230
+ res->is_new };
223
231
}
224
- return std::nullopt ;
232
+ return {} ;
225
233
}
226
234
227
235
[[nodiscard]] bool RestoreArgs::UpdateExpiration (int64_t now_msec) {
@@ -467,14 +475,14 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {
467
475
return OpStatus::OK;
468
476
}
469
477
470
- RdbRestoreValue loader (serialized_value_.version .value ());
471
- auto restored_dest_it = loader.Add (serialized_value_.value , dest_key_, db_slice, op_args.db_cntx ,
472
- restore_args, shard);
478
+ restore_args.SetSticky (serialized_value_.sticky );
473
479
474
- if (restored_dest_it) {
475
- auto & dest_it = restored_dest_it-> it ;
476
- dest_it-> first . SetSticky (serialized_value_. sticky );
480
+ RdbRestoreValue loader (serialized_value_. version . value ());
481
+ auto [restored_dest_it, is_new] = loader. Add (dest_key_, serialized_value_. value , op_args. db_cntx ,
482
+ restore_args, &db_slice, shard );
477
483
484
+ if (restored_dest_it.IsValid ()) {
485
+ LOG_IF (DFATAL, !is_new) << " Unexpected override for key " << dest_key_ << " " << dest_found_;
478
486
auto bc = op_args.db_cntx .ns ->GetBlockingController (op_args.shard ->shard_id ());
479
487
if (bc) {
480
488
bc->AwakeWatched (t->GetDbIndex (), dest_key_);
@@ -527,27 +535,28 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
527
535
return OpStatus::KEY_NOTFOUND;
528
536
}
529
537
530
- OpResult<bool > OnRestore (const OpArgs& op_args, std::string_view key, std::string_view payload,
538
+ OpResult<bool > OpRestore (const OpArgs& op_args, std::string_view key, std::string_view payload,
531
539
RestoreArgs restore_args, RdbVersion rdb_version) {
532
540
if (!restore_args.UpdateExpiration (op_args.db_cntx .time_now_ms )) {
533
541
return OpStatus::OUT_OF_RANGE;
534
542
}
535
543
536
544
auto & db_slice = op_args.GetDbSlice ();
545
+ bool found_prev = false ;
546
+
537
547
// The redis impl (see cluster.c function restoreCommand), remove the old key if
538
548
// the replace option is set, so lets do the same here
539
549
{
540
550
auto res = db_slice.FindMutable (op_args.db_cntx , key);
541
- if (restore_args.Replace ()) {
542
- if (IsValid (res.it )) {
551
+ if (IsValid (res.it )) {
552
+ found_prev = true ;
553
+ if (restore_args.Replace ()) {
543
554
VLOG (1 ) << " restore command is running with replace, found old key '" << key
544
555
<< " ' and removing it" ;
545
556
res.post_updater .Run ();
546
557
CHECK (db_slice.Del (op_args.db_cntx , res.it ));
547
- }
548
- } else {
549
- // we are not allowed to replace it, so make sure it doesn't exist
550
- if (IsValid (res.it )) {
558
+ } else {
559
+ // we are not allowed to replace it.
551
560
return OpStatus::KEY_EXISTS;
552
561
}
553
562
}
@@ -559,8 +568,14 @@ OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::strin
559
568
}
560
569
561
570
RdbRestoreValue loader (rdb_version);
562
- auto res = loader.Add (payload, key, db_slice, op_args.db_cntx , restore_args, op_args.shard );
563
- return res.has_value ();
571
+ auto [res_it, is_new] =
572
+ loader.Add (key, payload, op_args.db_cntx , restore_args, &db_slice, op_args.shard );
573
+ LOG_IF (DFATAL, res_it.IsValid () && !is_new)
574
+ << " Unexpected override for key " << key << " , found previous " << found_prev
575
+ << " override: " << restore_args.Replace ()
576
+ << " , type: " << ObjTypeToString (res_it.it ->second .ObjType ());
577
+
578
+ return res_it.IsValid ();
564
579
}
565
580
566
581
bool ScanCb (const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, string* scratch,
@@ -1473,7 +1488,7 @@ void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder*
1473
1488
}
1474
1489
1475
1490
auto cb = [&](Transaction* t, EngineShard* shard) {
1476
- return OnRestore (t->GetOpArgs (shard), key, serialized_value, restore_args.value (),
1491
+ return OpRestore (t->GetOpArgs (shard), key, serialized_value, restore_args.value (),
1477
1492
rdb_version.value ());
1478
1493
};
1479
1494
0 commit comments