Skip to content

Commit a330de2

Browse files
committed
feat: new table option FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING
When this option is set to true, fuse table with parquet storage format will enable encoding and dictionary during serialization. The defult encoding policy of paquet-rs will be used.
1 parent dd86b4e commit a330de2

File tree

12 files changed

+104
-14
lines changed

12 files changed

+104
-14
lines changed

src/query/formats/src/output_format/parquet.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ impl OutputFormat for ParquetOutputFormat {
5353
return Ok(vec![]);
5454
}
5555
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
56-
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?;
56+
// Unloading data, enable encoding unconditionally in this case, since ...
57+
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd, true)?;
5758
Ok(buf)
5859
}
5960
}

src/query/service/src/interpreters/common/table_option_validation.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_sql::BloomIndexColumns;
2626
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
2727
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
2828
use databend_common_storages_fuse::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
29+
use databend_common_storages_fuse::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING;
2930
use databend_common_storages_fuse::FUSE_OPT_KEY_FILE_SIZE;
3031
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD;
3132
use databend_common_storages_fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
@@ -75,6 +76,7 @@ pub static CREATE_FUSE_OPTIONS: LazyLock<HashSet<&'static str>> = LazyLock::new(
7576
r.insert("transient");
7677
r.insert(OPT_KEY_TEMP_PREFIX);
7778
r.insert(OPT_KEY_ENABLE_COPY_DEDUP_FULL_PATH);
79+
r.insert(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING);
7880
r
7981
});
8082

@@ -196,10 +198,7 @@ pub fn is_valid_bloom_index_columns(
196198
pub fn is_valid_change_tracking(
197199
options: &BTreeMap<String, String>,
198200
) -> databend_common_exception::Result<()> {
199-
if let Some(value) = options.get(OPT_KEY_CHANGE_TRACKING) {
200-
value.to_lowercase().parse::<bool>()?;
201-
}
202-
Ok(())
201+
is_valid_bool_opt(OPT_KEY_CHANGE_TRACKING, options)
203202
}
204203

205204
pub fn is_valid_random_seed(
@@ -210,3 +209,19 @@ pub fn is_valid_random_seed(
210209
}
211210
Ok(())
212211
}
212+
213+
pub fn is_valid_fuse_parquet_encoding_opt(
214+
options: &BTreeMap<String, String>,
215+
) -> databend_common_exception::Result<()> {
216+
is_valid_bool_opt(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, options)
217+
}
218+
219+
fn is_valid_bool_opt(
220+
key: &str,
221+
options: &BTreeMap<String, String>,
222+
) -> databend_common_exception::Result<()> {
223+
if let Some(value) = options.get(key) {
224+
value.parse::<bool>()?;
225+
}
226+
Ok(())
227+
}

src/query/service/src/interpreters/interpreter_table_create.rs

+3
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ use crate::interpreters::common::table_option_validation::is_valid_bloom_index_c
6464
use crate::interpreters::common::table_option_validation::is_valid_change_tracking;
6565
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
6666
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
67+
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt;
6768
use crate::interpreters::common::table_option_validation::is_valid_random_seed;
6869
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
6970
use crate::interpreters::hook::vacuum_hook::hook_clear_m_cte_temp_table;
@@ -425,6 +426,8 @@ impl CreateTableInterpreter {
425426
is_valid_random_seed(&table_meta.options)?;
426427
// check table level data_retention_period_in_hours
427428
is_valid_data_retention_period(&table_meta.options)?;
429+
// check enable_parquet_encoding
430+
is_valid_fuse_parquet_encoding_opt(&table_meta.options)?;
428431

429432
for table_option in table_meta.options.iter() {
430433
let key = table_option.0.to_lowercase();

src/query/service/src/interpreters/interpreter_table_set_options.rs

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::interpreters::common::table_option_validation::is_valid_block_per_seg
3535
use crate::interpreters::common::table_option_validation::is_valid_bloom_index_columns;
3636
use crate::interpreters::common::table_option_validation::is_valid_create_opt;
3737
use crate::interpreters::common::table_option_validation::is_valid_data_retention_period;
38+
use crate::interpreters::common::table_option_validation::is_valid_fuse_parquet_encoding_opt;
3839
use crate::interpreters::common::table_option_validation::is_valid_row_per_block;
3940
use crate::interpreters::Interpreter;
4041
use crate::pipelines::PipelineBuildResult;
@@ -71,6 +72,8 @@ impl Interpreter for SetOptionsInterpreter {
7172
is_valid_row_per_block(&self.plan.set_options)?;
7273
// check data_retention_period
7374
is_valid_data_retention_period(&self.plan.set_options)?;
75+
// check enable_parquet_encoding
76+
is_valid_fuse_parquet_encoding_opt(&self.plan.set_options)?;
7477

7578
// check storage_format
7679
let error_str = "invalid opt for fuse table in alter table statement";

src/query/service/src/test_kits/block_writer.rs

+1
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ impl<'a> BlockWriter<'a> {
143143
vec![index_block],
144144
&mut data,
145145
TableCompression::None,
146+
false,
146147
)?;
147148
let size = data.len() as u64;
148149
data_accessor.write(&location.0, data).await?;

src/query/storages/common/blocks/src/parquet_rs.rs

+30-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use parquet::arrow::ArrowWriter;
2222
use parquet::basic::Encoding;
2323
use parquet::file::properties::EnabledStatistics;
2424
use parquet::file::properties::WriterProperties;
25+
use parquet::file::properties::WriterVersion;
2526
use parquet::format::FileMetaData;
2627

2728
/// Serialize data blocks to parquet format.
@@ -30,17 +31,41 @@ pub fn blocks_to_parquet(
3031
blocks: Vec<DataBlock>,
3132
write_buffer: &mut Vec<u8>,
3233
compression: TableCompression,
34+
enable_encoding: bool,
3335
) -> Result<FileMetaData> {
3436
assert!(!blocks.is_empty());
35-
let props = WriterProperties::builder()
37+
let builder = WriterProperties::builder()
3638
.set_compression(compression.into())
3739
// use `usize::MAX` to effectively limit the number of row groups to 1
3840
.set_max_row_group_size(usize::MAX)
39-
.set_encoding(Encoding::PLAIN)
40-
.set_dictionary_enabled(false)
4141
.set_statistics_enabled(EnabledStatistics::None)
42-
.set_bloom_filter_enabled(false)
43-
.build();
42+
.set_bloom_filter_enabled(false);
43+
44+
let builder = if enable_encoding {
45+
// Enable dictionary encoding and fallback encodings.
46+
//
47+
// Memo for quick lookup:
48+
// The fallback encoding "strategy" used by parquet-54.2.1 is:
49+
//
50+
// ~~~
51+
// (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
52+
// (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
53+
// (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
54+
// (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
55+
// (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
56+
// _ => Encoding::PLAIN,
57+
// ~~~
58+
//
59+
builder
60+
.set_writer_version(WriterVersion::PARQUET_2_0)
61+
.set_dictionary_enabled(true)
62+
} else {
63+
builder
64+
.set_dictionary_enabled(false)
65+
.set_encoding(Encoding::PLAIN)
66+
};
67+
68+
let props = builder.build();
4469
let batches = blocks
4570
.into_iter()
4671
.map(|block| block.to_record_batch(table_schema))

src/query/storages/fuse/src/constants.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@ pub const FUSE_OPT_KEY_ROW_PER_BLOCK: &str = "row_per_block";
1818
pub const FUSE_OPT_KEY_ROW_PER_PAGE: &str = "row_per_page";
1919
pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold";
2020
pub const FUSE_OPT_KEY_FILE_SIZE: &str = "file_size";
21-
2221
pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";
23-
2422
pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids";
23+
pub const FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING: &str = "enable_parquet_encoding";
2524

2625
pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
2726
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";

src/query/storages/fuse/src/fuse_table.rs

+4
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ use crate::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
124124
use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
125125
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
126126
use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
127+
use crate::FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING;
127128
use crate::FUSE_OPT_KEY_FILE_SIZE;
128129
use crate::FUSE_OPT_KEY_ROW_PER_BLOCK;
129130
use crate::FUSE_OPT_KEY_ROW_PER_PAGE;
@@ -272,11 +273,14 @@ impl FuseTable {
272273
let block_per_seg =
273274
self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
274275

276+
let enable_parquet_encoding = self.get_option(FUSE_OPT_KEY_ENABLE_PARQUET_ENCODING, false);
277+
275278
WriteSettings {
276279
storage_format: self.storage_format,
277280
table_compression: self.table_compression,
278281
max_page_size,
279282
block_per_seg,
283+
enable_parquet_encoding,
280284
}
281285
}
282286

src/query/storages/fuse/src/io/write/block_writer.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,13 @@ pub fn serialize_block(
7171
let schema = Arc::new(schema.remove_virtual_computed_fields());
7272
match write_settings.storage_format {
7373
FuseStorageFormat::Parquet => {
74-
let result =
75-
blocks_to_parquet(&schema, vec![block], buf, write_settings.table_compression)?;
74+
let result = blocks_to_parquet(
75+
&schema,
76+
vec![block],
77+
buf,
78+
write_settings.table_compression,
79+
write_settings.enable_parquet_encoding,
80+
)?;
7681
let meta = column_parquet_metas(&result, &schema)?;
7782
Ok(meta)
7883
}
@@ -210,6 +215,7 @@ impl BloomIndexState {
210215
vec![index_block],
211216
&mut data,
212217
TableCompression::None,
218+
false,
213219
)?;
214220
let data_size = data.len() as u64;
215221
Ok(Self {

src/query/storages/fuse/src/io/write/write_settings.rs

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub struct WriteSettings {
2626
pub max_page_size: usize,
2727

2828
pub block_per_seg: usize,
29+
pub enable_parquet_encoding: bool,
2930
}
3031

3132
impl Default for WriteSettings {
@@ -35,6 +36,7 @@ impl Default for WriteSettings {
3536
table_compression: TableCompression::default(),
3637
max_page_size: DEFAULT_ROW_PER_PAGE,
3738
block_per_seg: DEFAULT_BLOCK_PER_SEGMENT,
39+
enable_parquet_encoding: false,
3840
}
3941
}
4042
}

src/query/storages/result_cache/src/write/writer.rs

+2
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ impl ResultCacheWriter {
7272
#[async_backtrace::framed]
7373
pub async fn write_to_storage(&self) -> Result<String> {
7474
let mut buf = Vec::with_capacity(self.current_bytes);
75+
// TODO doc why encoding is not enabled
7576
let _ = blocks_to_parquet(
7677
&self.schema,
7778
self.blocks.clone(),
7879
&mut buf,
7980
TableCompression::None,
81+
false,
8082
)?;
8183

8284
let file_location = format!("{}/{}.parquet", self.location, Uuid::new_v4().as_simple());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
statement ok
2+
create or replace database test_tbl_opt_parquet_encoding;
3+
4+
statement ok
5+
use test_tbl_opt_parquet_encoding;
6+
7+
# Test create table with parquet encoding option
8+
9+
statement ok
10+
create or replace table t_encoded (c int, s string) enable_parquet_encoding = 'true' compression = 'lz4';
11+
12+
statement ok
13+
create or replace table t(c int, s string) compression = 'lz4';
14+
15+
statement ok
16+
insert into t_encoded(c, s) select number as c, to_string(number) as s from numbers(500000);
17+
18+
statement ok
19+
insert into t(c, s) select number as c, to_string(number) as s from numbers(500000);
20+
21+
query T
22+
select humanize_size(file_size) from fuse_block('test_tbl_opt_parquet_encoding', 't_encoded');
23+
----
24+
2.30 MiB
25+
26+
query T
27+
select humanize_size(file_size) from fuse_block('test_tbl_opt_parquet_encoding', 't');
28+
----
29+
3.91 MiB

0 commit comments

Comments
 (0)