Skip to content

Commit 2b325e1

Browse files
belveryinDaniel Martinez Maqueda
andauthored
Add descending order to scan (#401)
* Add desc shorting to scan * Add tests to the in-memory and on disk options * Fix errors after merge * Add reverse order to TransactionScan * Address PR comments * Fix LevelStream reverse order * Fix fmt --------- Co-authored-by: Daniel Martinez Maqueda <daniel@super.ai>
1 parent 7a2ad3d commit 2b325e1

File tree

21 files changed

+1250
-92
lines changed

21 files changed

+1250
-92
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ async fn main() {
144144
- [x] (Optimistic) Transactions.
145145
- [x] Leveled compaction strategy.
146146
- [x] Push down filter, limit and projection.
147+
- [x] Reverse scan (descending order) for efficient newest-first queries.
147148
- [x] Runtime schema definition.
148149
- [ ] SQL (via [Apache DataFusion](https://datafusion.apache.org/)).
149150
- [ ] Fusion storage across RAM, flash, SSD, and remote Object Storage Service (OSS) for each column-family, balancing performance and cost efficiency per data block:

examples/declare.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,26 @@ async fn main() {
9393
}
9494
}
9595

96+
{
97+
let upper = "Blob".into();
98+
// reverse scan of user (descending order)
99+
let mut reverse_scan = txn
100+
.scan((Bound::Included(&name), Bound::Excluded(&upper)))
101+
.reverse() // scan in descending order
102+
.projection(&["name", "grade"]) // tonbo supports combining reverse with projection
103+
.limit(10) // and with limits
104+
.take()
105+
.await
106+
.unwrap();
107+
108+
println!("Users in reverse order:");
109+
while let Some(entry) = reverse_scan.next().await.transpose().unwrap() {
110+
if let Some(user) = entry.value() {
111+
println!("- {}: {:?}", user.name, user.grade.unwrap_or(0.0.into()));
112+
}
113+
}
114+
}
115+
96116
// commit transaction
97117
txn.commit().await.unwrap();
98118
}

guide/src/lsm_structure.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,12 @@ The query process of immutable memtable is different from memtable:
109109

110110
```rust
111111
let range = self.index.range::<TsRef<<<A::Record as Record>::Schema as Schema>::Key>, _>((lower, upper));
112-
ImmutableScan::<A::Record>::new(range, self.data.as_record_batch(), projection_mask)
112+
let boxed_range = if order == Some(Order::Desc) {
113+
Box::new(range.rev())
114+
} else {
115+
Box::new(range)
116+
};
117+
ImmutableScan::<A::Record>::new(boxed_range, self.data.as_record_batch(), projection_mask)
113118
```
114119

115120
For `get(key)`, it can be transformed into `scan(key, key)`. The final result returned by `scan` is then converted into `RecordBatchEntry` using Arrow's `.as_record_batch()` method.

guide/src/start.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,20 @@ while let Some(entry) = scan.next().await.transpose().unwrap() {
172172
let data = entry.value(); // type of UserRef
173173
// ......
174174
}
175+
176+
// reverse scan of user (descending order)
177+
let mut reverse_scan = txn
178+
.scan((Bound::Included(&name), Bound::Excluded(&upper)))
179+
.reverse() // scan in descending order
180+
.limit(10) // optionally limit results
181+
.take()
182+
.await
183+
.unwrap();
184+
185+
while let Some(entry) = reverse_scan.next().await.transpose().unwrap() {
186+
let data = entry.value(); // records in reverse order
187+
// ......
188+
}
175189
```
176190

177191
### Persistence

guide/src/usage/tonbo.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ while let Some(entry) = scan.next().await.transpose().unwrap() {
7777
let data = entry.value(); // type of UserRef
7878
// ......
7979
}
80+
81+
// Reverse scan (descending order)
82+
let mut reverse_scan = db
83+
.scan((Bound::Included(&name), Bound::Excluded(&upper)))
84+
.reverse() // scan in descending order
85+
.limit(100) // limit results
86+
.await
87+
.unwrap();
88+
while let Some(entry) = reverse_scan.next().await.transpose().unwrap() {
89+
let data = entry.value(); // newest records first
90+
// ......
91+
}
8092
```
8193
### Insert/Remove
8294

src/compaction/leveled.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ where
285285
u32::MAX.into(),
286286
None,
287287
ProjectionMask::all(),
288+
None,
288289
)
289290
.await?,
290291
});
@@ -342,6 +343,7 @@ where
342343
u32::MAX.into(),
343344
None,
344345
ProjectionMask::all(),
346+
None, // Default order for compaction
345347
)
346348
.await?,
347349
});
@@ -359,6 +361,7 @@ where
359361
ProjectionMask::all(),
360362
level_fs.clone(),
361363
ctx.parquet_lru.clone(),
364+
None, // Default order for compaction
362365
)
363366
.ok_or(CompactionError::EmptyLevel)?;
364367

@@ -384,6 +387,7 @@ where
384387
ProjectionMask::all(),
385388
level_l_fs.clone(),
386389
ctx.parquet_lru.clone(),
390+
None, // Default order for compaction
387391
)
388392
.ok_or(CompactionError::EmptyLevel)?;
389393

src/compaction/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ where
5353
schema: &R::Schema,
5454
fs: &Arc<dyn DynFs>,
5555
) -> Result<(), CompactionError<R>> {
56-
let mut stream = MergeStream::<R>::from_vec(streams, u32::MAX.into()).await?;
56+
let mut stream = MergeStream::<R>::from_vec(streams, u32::MAX.into(), None).await?;
5757

5858
// Kould: is the capacity parameter necessary?
5959
let mut builder =

0 commit comments

Comments
 (0)