Skip to content

Commit e440a14

Browse files
authored
feat(core)!: composite PK plumbing, Key API change, and Arrow filters (#453)
* feat(core)!: composite PK plumbing, Key API change, and Arrow filters - Key API: replace `Key::to_arrow_datum` with `Key::to_arrow_datums` to supportmulti-component keys; update all builtin `Key` impls (ints/floats/string/date/time/timestamp/dynamic `Value`) to return a single-element Vec for single-PK types. - Composite key showcase: add `Key2`/`Key2Ref` under `#[cfg(test)]` with `Encode`/`Decode`, `Ord`, `Hash`, `Key`/`KeyRef`; round-trip and ordering tests included. - Parquet scan filtering: implement lexicographic composite range predicates in`get_range_filter` using `eq/gt/gt_eq/lt/lt_eq` with `and_kleene`/`or_kleene`; retain `_ts` upper-bound predicate. - Scan plumbing: thread `pk_indices` through `SsTable::{get,scan}`, `Version::{query,streams,table_query}`, and `LevelStream`; update test callers. Build fixed projections as `[0, 1] ∪ pk_indices` in read paths. - Docs: update composite PK RFC with “Current Status (2025-08-11)”; add draft RFC “Replace Schema Trait with Arrow Schema”. - Clippy hygiene: remove needless range loops and redundant `.into_iter()`; add a focused `#[allow(clippy::too_many_arguments)]` on `Version::table_query`. BREAKING CHANGE: - `Key::to_arrow_datum` was replaced by `Key::to_arrow_datums(Vec<Arc<dyn Datum>>)`; external `Key` implementors must migrate to the new method (return one datum per PK component). * refactor: remove unsafe code in src/ondisk/arrows.rs
1 parent c6fdc30 commit e440a14

File tree

14 files changed

+305
-67
lines changed

14 files changed

+305
-67
lines changed

guide/src/contribution/composite_primary_keys.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
This document outlines a practical, incremental plan to add composite (multi-column) primary key support to Tonbo while maintaining backward compatibility. It explains design goals, changes required across the codebase, and a step-by-step implementation and validation plan.
44

5+
## Current Status (2025-08-11)
6+
7+
- Phase 1: Completed. Plural `Schema` APIs, fixed projections, and Parquet writer configuration are implemented for single-PK schemas.
8+
- `Schema` now exposes `primary_key_indices()` and `primary_key_paths_and_sorting()` (src/record/mod.rs). Macro-generated single-PK schemas return one-element slices.
9+
- Read paths build fixed projections as `[0, 1] ∪ PKs` using `primary_key_indices()` (src/lib.rs, src/transaction.rs).
10+
- `DbOption::new` configures sorting columns (`_ts` then PKs) and enables stats + bloom filters for each PK column path (src/option.rs).
11+
- Phase 2: Not implemented. Composite key types under `src/record/key/composite/` are placeholders; derive macro still accepts only a single `#[record(primary_key)]` and generates a single-field key. No multi-PK trybuild/integration tests.
12+
- Phase 3: Not implemented. `DynSchema` remains single-PK (stores one `primary_index_arrow`, one `pk_path`, and sorting with a single PK column).
13+
514
## Goals
615

716
- Support multi-column primary keys with lexicographic ordering of PK components.

src/compaction/leveled.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ where
289289
None,
290290
ProjectionMask::all(),
291291
None, // Default order for compaction
292+
instance.primary_key_indices(),
292293
)
293294
.await?,
294295
});
@@ -307,6 +308,7 @@ where
307308
level_fs.clone(),
308309
ctx.parquet_lru.clone(),
309310
None, // Default order for compaction
311+
instance.primary_key_indices(),
310312
)
311313
.ok_or(CompactionError::EmptyLevel)?;
312314

@@ -333,6 +335,7 @@ where
333335
level_l_fs.clone(),
334336
ctx.parquet_lru.clone(),
335337
None, // Default order for compaction
338+
instance.primary_key_indices(),
336339
)
337340
.ok_or(CompactionError::EmptyLevel)?;
338341

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,7 @@ where
749749
TsRef::new(key, ts),
750750
projection,
751751
ctx.cache().clone(),
752+
self.record_schema.primary_key_indices(),
752753
)
753754
.await?
754755
.map(|entry| Entry::RecordBatch(entry)))
@@ -1009,6 +1010,7 @@ where
10091010
self.limit,
10101011
self.projection,
10111012
self.order,
1013+
self.mem_storage.record_schema.primary_key_indices(),
10121014
)
10131015
.await?;
10141016

@@ -1070,6 +1072,7 @@ where
10701072
self.limit,
10711073
self.projection,
10721074
self.order,
1075+
self.mem_storage.record_schema.primary_key_indices(),
10731076
)
10741077
.await?;
10751078
let merge_stream = MergeStream::from_vec(streams, self.ts, self.order).await?;

src/ondisk/arrows.rs

Lines changed: 100 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use std::ops::Bound;
22

33
use arrow::{
44
array::{BooleanArray, Datum},
5-
buffer::BooleanBuffer,
6-
compute::kernels::cmp::{gt, gt_eq, lt_eq},
7-
error::ArrowError,
5+
compute::kernels::{
6+
boolean::{and_kleene, or_kleene},
7+
cmp::{eq, gt, gt_eq, lt, lt_eq},
8+
},
89
};
910
use parquet::{
1011
arrow::{
@@ -19,70 +20,130 @@ use crate::{
1920
version::timestamp::Timestamp,
2021
};
2122

22-
unsafe fn get_range_bound_fn<R>(
23-
range: Bound<&<R::Schema as Schema>::Key>,
24-
) -> (
25-
Option<&'static <R::Schema as Schema>::Key>,
26-
&'static (dyn Fn(&dyn Datum, &dyn Datum) -> Result<BooleanArray, ArrowError> + Sync),
27-
)
23+
enum BoundKind {
24+
Lower { inclusive: bool },
25+
Upper { inclusive: bool },
26+
}
27+
28+
fn lower_bound_owned<R>(
29+
b: Bound<&<R::Schema as Schema>::Key>,
30+
) -> (Option<<R::Schema as Schema>::Key>, BoundKind)
31+
where
32+
R: Record,
33+
{
34+
match b {
35+
Bound::Included(k) => (Some(k.clone()), BoundKind::Lower { inclusive: true }),
36+
Bound::Excluded(k) => (Some(k.clone()), BoundKind::Lower { inclusive: false }),
37+
Bound::Unbounded => (None, BoundKind::Lower { inclusive: true }),
38+
}
39+
}
40+
41+
fn upper_bound_owned<R>(
42+
b: Bound<&<R::Schema as Schema>::Key>,
43+
) -> (Option<<R::Schema as Schema>::Key>, BoundKind)
2844
where
2945
R: Record,
3046
{
31-
let cmp: &'static (dyn Fn(&dyn Datum, &dyn Datum) -> Result<BooleanArray, ArrowError> + Sync);
32-
let key = match range {
33-
Bound::Included(key) => {
34-
cmp = &gt_eq;
35-
Some(&*(key as *const _))
36-
}
37-
Bound::Excluded(key) => {
38-
cmp = &gt;
39-
Some(&*(key as *const _))
40-
}
41-
Bound::Unbounded => {
42-
cmp = &|this, _| {
43-
let len = this.get().0.len();
44-
Ok(BooleanArray::new(
45-
BooleanBuffer::collect_bool(len, |_| true),
46-
None,
47-
))
48-
};
49-
None
50-
}
51-
};
52-
(key, cmp)
47+
match b {
48+
Bound::Included(k) => (Some(k.clone()), BoundKind::Upper { inclusive: true }),
49+
Bound::Excluded(k) => (Some(k.clone()), BoundKind::Upper { inclusive: false }),
50+
Bound::Unbounded => (None, BoundKind::Upper { inclusive: true }),
51+
}
5352
}
5453

55-
pub(crate) unsafe fn get_range_filter<R>(
54+
pub(crate) fn get_range_filter<R>(
5655
schema_descriptor: &SchemaDescriptor,
5756
range: (
5857
Bound<&<R::Schema as Schema>::Key>,
5958
Bound<&<R::Schema as Schema>::Key>,
6059
),
6160
ts: Timestamp,
61+
pk_indices: &[usize],
6262
) -> RowFilter
6363
where
6464
R: Record,
6565
{
66-
let (lower_key, lower_cmp) = get_range_bound_fn::<R>(range.0);
67-
let (upper_key, upper_cmp) = get_range_bound_fn::<R>(range.1);
66+
let (lower_key, lower_kind) = lower_bound_owned::<R>(range.0);
67+
let (upper_key, upper_kind) = upper_bound_owned::<R>(range.1);
6868

69+
let ts_scalar = ts.to_arrow_scalar();
6970
let mut predictions: Vec<Box<dyn ArrowPredicate>> = vec![Box::new(ArrowPredicateFn::new(
7071
ProjectionMask::roots(schema_descriptor, [1]),
71-
move |record_batch| lt_eq(record_batch.column(0), &ts.to_arrow_scalar() as &dyn Datum),
72+
move |record_batch| lt_eq(record_batch.column(0), &ts_scalar as &dyn Datum),
7273
))];
74+
7375
if let Some(lower_key) = lower_key {
76+
let pk_len = pk_indices.len();
7477
predictions.push(Box::new(ArrowPredicateFn::new(
75-
ProjectionMask::roots(schema_descriptor, [2]),
78+
ProjectionMask::roots(schema_descriptor, pk_indices.to_vec()),
7679
move |record_batch| {
77-
lower_cmp(record_batch.column(0), lower_key.to_arrow_datum().as_ref())
80+
let datums = lower_key.to_arrow_datums();
81+
debug_assert_eq!(datums.len(), pk_len);
82+
let n = datums.len();
83+
let mut acc: Option<BooleanArray> = None;
84+
for i in 0..n {
85+
let cmp_i = if i == n - 1 {
86+
match lower_kind {
87+
BoundKind::Lower { inclusive: true } => {
88+
gt_eq(record_batch.column(i), datums[i].as_ref())?
89+
}
90+
BoundKind::Lower { inclusive: false } => {
91+
gt(record_batch.column(i), datums[i].as_ref())?
92+
}
93+
_ => unreachable!(),
94+
}
95+
} else {
96+
gt(record_batch.column(i), datums[i].as_ref())?
97+
};
98+
let mut term = cmp_i;
99+
for (j, d) in datums.iter().enumerate().take(i) {
100+
let eq_j = eq(record_batch.column(j), d.as_ref())?;
101+
term = and_kleene(&term, &eq_j)?;
102+
}
103+
acc = Some(match acc {
104+
None => term,
105+
Some(prev) => or_kleene(&prev, &term)?,
106+
});
107+
}
108+
Ok(acc.expect("at least one key component"))
78109
},
79110
)));
80111
}
112+
81113
if let Some(upper_key) = upper_key {
114+
let pk_len = pk_indices.len();
82115
predictions.push(Box::new(ArrowPredicateFn::new(
83-
ProjectionMask::roots(schema_descriptor, [2]),
116+
ProjectionMask::roots(schema_descriptor, pk_indices.to_vec()),
84117
move |record_batch| {
85-
upper_cmp(upper_key.to_arrow_datum().as_ref(), record_batch.column(0))
118+
let datums = upper_key.to_arrow_datums();
119+
debug_assert_eq!(datums.len(), pk_len);
120+
let n = datums.len();
121+
let mut acc: Option<BooleanArray> = None;
122+
for i in 0..n {
123+
let cmp_i = if i == n - 1 {
124+
match upper_kind {
125+
BoundKind::Upper { inclusive: true } => {
126+
lt_eq(record_batch.column(i), datums[i].as_ref())?
127+
}
128+
BoundKind::Upper { inclusive: false } => {
129+
lt(record_batch.column(i), datums[i].as_ref())?
130+
}
131+
_ => unreachable!(),
132+
}
133+
} else {
134+
lt(record_batch.column(i), datums[i].as_ref())?
135+
};
136+
let mut term = cmp_i;
137+
for (j, d) in datums.iter().enumerate().take(i) {
138+
let eq_j = eq(record_batch.column(j), d.as_ref())?;
139+
term = and_kleene(&term, &eq_j)?;
140+
}
141+
acc = Some(match acc {
142+
None => term,
143+
Some(prev) => or_kleene(&prev, &term)?,
144+
});
145+
}
146+
Ok(acc.expect("at least one key component"))
86147
},
87148
)));
88149
}

src/ondisk/sstable.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,15 @@ where
9696
self,
9797
key: &TsRef<<R::Schema as Schema>::Key>,
9898
projection_mask: ProjectionMask,
99+
pk_indices: &[usize],
99100
) -> ParquetResult<Option<RecordBatchEntry<R>>> {
100101
self.scan(
101102
(Bound::Included(key.value()), Bound::Included(key.value())),
102103
key.ts(),
103104
Some(1),
104105
projection_mask,
105106
None, // Order doesn't matter for single-key get
107+
pk_indices,
106108
)
107109
.await?
108110
.next()
@@ -120,6 +122,7 @@ where
120122
limit: Option<usize>,
121123
projection_mask: ProjectionMask,
122124
order: Option<Order>,
125+
pk_indices: &[usize],
123126
) -> Result<SsTableScan<'scan, R>, parquet::errors::ParquetError> {
124127
let builder = self
125128
.into_parquet_builder(limit, projection_mask.clone())
@@ -128,9 +131,8 @@ where
128131
let schema_descriptor = builder.metadata().file_metadata().schema_descr();
129132
let full_schema = builder.schema().clone();
130133

131-
// Safety: filter's lifetime relies on range's lifetime, sstable must not live longer than
132-
// it
133-
let filter = unsafe { get_range_filter::<R>(schema_descriptor, range, ts) };
134+
// Build a row filter for ts and primary key range
135+
let filter = get_range_filter::<R>(schema_descriptor, range, ts, pk_indices);
134136

135137
Ok(SsTableScan::new(
136138
builder.with_row_filter(filter).build()?,
@@ -252,6 +254,7 @@ pub(crate) mod tests {
252254
.unwrap(),
253255
[0, 1, 2, 3],
254256
),
257+
TestSchema {}.primary_key_indices(),
255258
)
256259
.await
257260
.unwrap()
@@ -271,6 +274,7 @@ pub(crate) mod tests {
271274
.unwrap(),
272275
[0, 1, 2, 4],
273276
),
277+
TestSchema {}.primary_key_indices(),
274278
)
275279
.await
276280
.unwrap()
@@ -290,6 +294,7 @@ pub(crate) mod tests {
290294
.unwrap(),
291295
[0, 1, 2],
292296
),
297+
TestSchema {}.primary_key_indices(),
293298
)
294299
.await
295300
.unwrap()
@@ -337,6 +342,7 @@ pub(crate) mod tests {
337342
[0, 1, 2, 3],
338343
),
339344
None,
345+
TestSchema {}.primary_key_indices(),
340346
)
341347
.await
342348
.unwrap();
@@ -365,6 +371,7 @@ pub(crate) mod tests {
365371
[0, 1, 2, 4],
366372
),
367373
None,
374+
TestSchema {}.primary_key_indices(),
368375
)
369376
.await
370377
.unwrap();
@@ -393,6 +400,7 @@ pub(crate) mod tests {
393400
[0, 1, 2],
394401
),
395402
None,
403+
TestSchema {}.primary_key_indices(),
396404
)
397405
.await
398406
.unwrap();
@@ -442,6 +450,7 @@ pub(crate) mod tests {
442450
None,
443451
ProjectionMask::all(),
444452
None, // Forward order (default)
453+
TestSchema {}.primary_key_indices(),
445454
)
446455
.await
447456
.unwrap();
@@ -466,6 +475,7 @@ pub(crate) mod tests {
466475
None,
467476
ProjectionMask::all(),
468477
Some(Order::Desc), // Reverse order
478+
TestSchema {}.primary_key_indices(),
469479
)
470480
.await
471481
.unwrap();
@@ -520,6 +530,7 @@ pub(crate) mod tests {
520530
[0, 1, 2, 3], // Include vu32, exclude vbool
521531
),
522532
Some(Order::Desc),
533+
TestSchema {}.primary_key_indices(),
523534
)
524535
.await
525536
.unwrap();
@@ -552,6 +563,7 @@ pub(crate) mod tests {
552563
[0, 1, 2, 4], // Include vbool, exclude vu32
553564
),
554565
Some(Order::Desc),
566+
TestSchema {}.primary_key_indices(),
555567
)
556568
.await
557569
.unwrap();
@@ -608,6 +620,7 @@ pub(crate) mod tests {
608620
None,
609621
ProjectionMask::all(),
610622
Some(Order::Desc),
623+
TestSchema {}.primary_key_indices(),
611624
)
612625
.await
613626
.unwrap();
@@ -631,6 +644,7 @@ pub(crate) mod tests {
631644
None,
632645
ProjectionMask::all(),
633646
Some(Order::Desc),
647+
TestSchema {}.primary_key_indices(),
634648
)
635649
.await
636650
.unwrap();
@@ -678,6 +692,7 @@ pub(crate) mod tests {
678692
None,
679693
ProjectionMask::all(),
680694
None, // Forward order
695+
TestSchema {}.primary_key_indices(),
681696
)
682697
.await
683698
.unwrap();
@@ -698,6 +713,7 @@ pub(crate) mod tests {
698713
None,
699714
ProjectionMask::all(),
700715
Some(Order::Desc), // Reverse order
716+
TestSchema {}.primary_key_indices(),
701717
)
702718
.await
703719
.unwrap();

0 commit comments

Comments
 (0)