Skip to content

Commit 2432c12

Browse files
authored
chore(query): add string view dict/freq/onevalue encoding for native format (#17833)
* chore(query): add view encoding * chore(query): fix bench * chore(query): fix bench * chore(query): update * chore(query): update * chore(query): update
1 parent 163d0d8 commit 2432c12

File tree

16 files changed

+230
-53
lines changed

16 files changed

+230
-53
lines changed

src/common/column/src/binview/builder.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ impl<T: ViewType + ?Sized> BinaryViewColumnBuilder<T> {
132132
let len = v.length;
133133
if len <= 12 {
134134
self.total_bytes_len += len as usize;
135-
debug_assert!(self.views.capacity() > self.views.len());
136135
self.views.push(v)
137136
} else {
138137
let data = buffers.get_unchecked(v.buffer_idx as usize);
@@ -143,6 +142,15 @@ impl<T: ViewType + ?Sized> BinaryViewColumnBuilder<T> {
143142
}
144143
}
145144

145+
/// # Safety
146+
/// - caller ensure that view is duplicated
147+
#[inline]
148+
pub(crate) unsafe fn push_duplicated_view_unchecked(&mut self, v: View) {
149+
let len = v.length;
150+
self.total_bytes_len += len as usize;
151+
self.views.push(v);
152+
}
153+
146154
pub fn push_value<V: AsRef<T>>(&mut self, value: V) {
147155
let value = value.as_ref();
148156
let bytes = value.to_bytes();

src/common/column/src/binview/mod.rs

+39-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub(crate) mod fmt;
1818
mod iterator;
1919
mod view;
2020

21+
use std::collections::HashMap;
2122
use std::fmt::Debug;
2223
use std::marker::PhantomData;
2324
use std::sync::Arc;
@@ -57,7 +58,6 @@ pub trait ViewType: Sealed + 'static + PartialEq + AsRef<Self> {
5758
/// # Safety
5859
/// The caller must ensure `index < self.len()`.
5960
unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self;
60-
6161
fn to_bytes(&self) -> &[u8];
6262

6363
#[allow(clippy::wrong_self_convention)]
@@ -344,6 +344,44 @@ impl<T: ViewType + ?Sized> BinaryViewColumnGeneric<T> {
344344
mutable.freeze()
345345
}
346346

347+
/// Garbage collect with dict compressed
348+
pub fn gc_with_dict(&self, validity: Option<Bitmap>) -> Self {
349+
if self.buffers.is_empty() {
350+
return self.clone();
351+
}
352+
let mut map = HashMap::new();
353+
let mut mutable = BinaryViewColumnBuilder::with_capacity(self.len());
354+
let buffers = self.buffers.as_ref();
355+
356+
for (idx, (val, view)) in self.iter().zip(self.views.iter()).enumerate() {
357+
// if it's null
358+
if validity.as_ref().map(|v| !v.get_bit(idx)).unwrap_or(false) {
359+
let default_v = View::new_inline(&[]);
360+
mutable.views.push(default_v);
361+
continue;
362+
}
363+
364+
// did not care about buffer for small values
365+
if view.length <= View::MAX_INLINE_SIZE {
366+
mutable.total_bytes_len += view.length as usize;
367+
mutable.views.push(*view);
368+
continue;
369+
}
370+
371+
match map.entry(val.to_bytes()) {
372+
std::collections::hash_map::Entry::Occupied(v) => {
373+
unsafe { mutable.push_duplicated_view_unchecked(*v.get()) };
374+
}
375+
std::collections::hash_map::Entry::Vacant(vacant_entry) => {
376+
unsafe { mutable.push_view_unchecked(*view, buffers) };
377+
let last_view = mutable.views.last().unwrap();
378+
vacant_entry.insert(*last_view);
379+
}
380+
}
381+
}
382+
mutable.freeze()
383+
}
384+
347385
pub fn is_sliced(&self) -> bool {
348386
!std::ptr::eq(self.views.as_ptr(), self.views.data_ptr())
349387
}

src/common/column/src/binview/view.rs

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ impl View {
4949
unsafe { std::mem::transmute(self) }
5050
}
5151

52+
#[inline(always)]
53+
pub fn as_i128(self) -> i128 {
54+
unsafe { std::mem::transmute(self) }
55+
}
56+
5257
/// Create a new inline view without verifying the length
5358
///
5459
/// # Safety

src/common/column/tests/it/binview/mod.rs

+28
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
mod builder;
1616

1717
use databend_common_column::binview::BinaryViewColumn;
18+
use databend_common_column::binview::StringColumn;
1819
use databend_common_column::binview::Utf8ViewColumn;
1920

2021
#[test]
@@ -97,6 +98,33 @@ fn basics_binary_view() {
9798
);
9899
}
99100

101+
#[test]
102+
fn gc_dict() {
103+
let data = (0..100).map(|c| format!("loooooooooooonstr{}", c % 3));
104+
let array: StringColumn = data.into_iter().collect();
105+
let array2 = array.gc_with_dict(None);
106+
107+
assert_eq!(array, array2);
108+
assert_eq!(array.total_buffer_len(), 18 * 100);
109+
assert_eq!(array2.total_buffer_len(), 18 * 3);
110+
111+
for i in (0..100).step_by(10) {
112+
let s = array.clone().sliced(i, 10);
113+
let s2 = s.gc_with_dict(None);
114+
115+
assert_eq!(s, s2);
116+
assert_eq!(s2.total_buffer_len(), 18 * 3);
117+
}
118+
}
119+
120+
#[test]
121+
fn gc_dict_small() {
122+
let data = (0..100).map(|c| format!("lo{}", c % 3));
123+
let array: StringColumn = data.into_iter().collect();
124+
let array2 = array.gc_with_dict(None);
125+
assert_eq!(array, array2);
126+
}
127+
100128
#[test]
101129
fn from() {
102130
let array1 = Utf8ViewColumn::from(["hello", " ", ""]);

src/common/native/src/compression/basic.rs

+26
Original file line numberDiff line numberDiff line change
@@ -187,4 +187,30 @@ impl CommonCompression {
187187
}
188188
Ok(())
189189
}
190+
191+
pub fn decompress_buffer<R: NativeReadBuf>(
192+
&self,
193+
reader: &mut R,
194+
_uncompressed_size: usize,
195+
compressed_size: usize,
196+
out_slice: &mut [u8],
197+
scratch: &mut Vec<u8>,
198+
) -> Result<()> {
199+
let mut use_inner = false;
200+
reader.fill_buf()?;
201+
let input = if reader.buffer_bytes().len() >= compressed_size {
202+
use_inner = true;
203+
reader.buffer_bytes()
204+
} else {
205+
scratch.resize(compressed_size, 0);
206+
reader.read_exact(scratch.as_mut_slice())?;
207+
scratch.as_slice()
208+
};
209+
210+
self.decompress(&input[..compressed_size], out_slice)?;
211+
if use_inner {
212+
reader.consume(compressed_size);
213+
}
214+
Ok(())
215+
}
190216
}

src/common/native/src/compression/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub mod binary;
1818
pub mod boolean;
1919
pub mod double;
2020
pub mod integer;
21+
pub mod view;
2122

2223
pub use basic::CommonCompression;
2324
use databend_common_expression::types::Bitmap;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_column::binview::BinaryViewColumn;
16+
use databend_common_column::bitmap::Bitmap;
17+
use databend_common_expression::types::Buffer;
18+
19+
use super::basic::CommonCompression;
20+
use crate::compression::integer::compress_integer;
21+
use crate::error::Result;
22+
use crate::write::WriteOptions;
23+
24+
pub fn compress_view(
25+
col: &BinaryViewColumn,
26+
validity: Option<Bitmap>,
27+
buf: &mut Vec<u8>,
28+
write_options: WriteOptions,
29+
) -> Result<()> {
30+
// choose compressor
31+
let col = col.gc_with_dict(validity.clone());
32+
let view_array: Buffer<i128> = col.views().iter().map(|x| x.as_i128()).collect();
33+
compress_integer(&view_array, validity, write_options.clone(), buf)?;
34+
compress_buffers(&col, buf, write_options.default_compression)
35+
}
36+
37+
fn compress_buffers(
38+
array: &BinaryViewColumn,
39+
buf: &mut Vec<u8>,
40+
c: CommonCompression,
41+
) -> Result<()> {
42+
let codec = c.to_compression() as u8;
43+
buf.extend_from_slice(&(array.data_buffers().len() as u32).to_le_bytes());
44+
// let's encode the buffers
45+
for buffer in array.data_buffers().iter() {
46+
buf.extend_from_slice(&[codec]);
47+
let pos = buf.len();
48+
buf.extend_from_slice(&[0u8; 8]);
49+
50+
let compressed_size = c.compress(buffer.as_slice(), buf)?;
51+
buf[pos..pos + 4].copy_from_slice(&(compressed_size as u32).to_le_bytes());
52+
buf[pos + 4..pos + 8].copy_from_slice(&(buffer.len() as u32).to_le_bytes());
53+
}
54+
Ok(())
55+
}

src/common/native/src/read/array/view.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use databend_common_expression::types::Buffer;
2323
use databend_common_expression::Column;
2424
use databend_common_expression::TableDataType;
2525

26+
use crate::compression::integer::decompress_integer;
2627
use crate::error::Result;
2728
use crate::nested::InitNested;
2829
use crate::nested::NestedState;
@@ -112,22 +113,21 @@ fn read_view_col<R: NativeReadBuf>(
112113
data_type: TableDataType,
113114
validity: Option<Bitmap>,
114115
) -> Result<Column> {
115-
let mut scratch = vec![0; 9];
116-
let (_c, _compressed_size, _uncompressed_size) = read_compress_header(reader, &mut scratch)?;
116+
let mut scratch = Vec::new();
117+
let mut views: Vec<i128> = Vec::with_capacity(length);
118+
decompress_integer(reader, length, &mut views, &mut scratch)?;
117119

118-
let mut buffer = vec![View::default(); length];
119-
let temp_data =
120-
unsafe { std::slice::from_raw_parts_mut(buffer.as_mut_ptr() as *mut u8, length * 16) };
121-
reader.read_exact(temp_data)?;
122-
let views = Buffer::from(buffer);
120+
let views: Buffer<i128> = views.into();
121+
let views = unsafe { std::mem::transmute::<Buffer<i128>, Buffer<View>>(views) };
123122

124123
let buffer_len = reader.read_u32::<LittleEndian>()?;
125-
let mut buffers = Vec::with_capacity(buffer_len as _);
124+
let mut buffers = Vec::with_capacity(buffer_len as usize);
126125

127126
for _ in 0..buffer_len {
128127
scratch.clear();
129128
let (compression, compressed_size, uncompressed_size) =
130129
read_compress_header(reader, &mut scratch)?;
130+
131131
let c = CommonCompression::try_from(&compression)?;
132132
let mut buffer = vec![];
133133
c.decompress_common_binary(
@@ -137,6 +137,7 @@ fn read_view_col<R: NativeReadBuf>(
137137
&mut buffer,
138138
&mut scratch,
139139
)?;
140+
140141
buffers.push(Buffer::from(buffer));
141142
}
142143

src/common/native/src/write/serialize.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ pub fn write<W: Write>(
6060
}
6161
}),
6262
Column::Boolean(column) => write_bitmap(w, &column, validity, write_options, scratch),
63-
Column::String(column) => write_view::<W>(w, &column.to_binview(), write_options, scratch),
63+
Column::String(column) => {
64+
write_view::<W>(w, &column.to_binview(), validity, write_options, scratch)
65+
}
6466
Column::Timestamp(column) => {
6567
write_primitive::<i64, W>(w, &column, validity, write_options, scratch)
6668
}

src/common/native/src/write/view.rs

+7-29
Original file line numberDiff line numberDiff line change
@@ -15,43 +15,21 @@
1515
use std::io::Write;
1616

1717
use databend_common_column::binview::BinaryViewColumn;
18-
use databend_common_column::binview::View;
18+
use databend_common_column::bitmap::Bitmap;
1919

2020
use super::WriteOptions;
21+
use crate::compression::view::compress_view;
2122
use crate::error::Result;
2223

2324
pub(crate) fn write_view<W: Write>(
2425
w: &mut W,
2526
array: &BinaryViewColumn,
27+
validity: Option<Bitmap>,
2628
write_options: WriteOptions,
27-
buf: &mut Vec<u8>,
29+
scratch: &mut Vec<u8>,
2830
) -> Result<()> {
29-
// TODO: adaptive gc and dict by stats
30-
let array = array.clone().gc();
31-
let c = write_options.default_compression;
32-
let codec = c.to_compression();
33-
34-
let total_size = array.len() * std::mem::size_of::<View>()
35-
+ array.data_buffers().iter().map(|x| x.len()).sum::<usize>();
36-
w.write_all(&[codec as u8])?;
37-
w.write_all(&(total_size as u32).to_le_bytes())?;
38-
w.write_all(&(total_size as u32).to_le_bytes())?;
39-
40-
let input_buf: &[u8] = bytemuck::cast_slice(array.views().as_slice());
41-
w.write_all(input_buf)?;
42-
w.write_all(&(array.data_buffers().len() as u32).to_le_bytes())?;
43-
44-
for buffer in array.data_buffers().iter() {
45-
buf.clear();
46-
let pos = buf.len();
47-
w.write_all(&[codec as u8])?;
48-
buf.extend_from_slice(&[0u8; 8]);
49-
50-
let compressed_size = c.compress(buffer.as_slice(), buf)?;
51-
buf[pos..pos + 4].copy_from_slice(&(compressed_size as u32).to_le_bytes());
52-
buf[pos + 4..pos + 8].copy_from_slice(&(buffer.len() as u32).to_le_bytes());
53-
w.write_all(buf.as_slice())?;
54-
buf.clear();
55-
}
31+
scratch.clear();
32+
compress_view(array, validity, scratch, write_options)?;
33+
w.write_all(scratch.as_slice())?;
5634
Ok(())
5735
}

src/common/native/tests/it/native/io.rs

+19
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,25 @@ fn test_freq() {
109109
test_write_read(chunk);
110110
}
111111

112+
#[test]
113+
fn test_dict_string() {
114+
let size = WRITE_PAGE * 5;
115+
116+
let values = vec![
117+
(0..size)
118+
.map(|s| format!("stringxxxxxxxx{}", s % 4))
119+
.collect::<Vec<_>>(),
120+
// (0..size)
121+
// .map(|s| format!("abc{}", s % 4))
122+
// .collect::<Vec<_>>(),
123+
];
124+
for v in values {
125+
let col = StringColumn::from_iter(v.iter());
126+
let chunk = vec![Column::String(col)];
127+
test_write_read(chunk);
128+
}
129+
}
130+
112131
#[test]
113132
fn test_bitpacking() {
114133
let size = WRITE_PAGE * 5;

0 commit comments

Comments
 (0)