From 1cc9bfb4949b5466e75df5c54703d455a00d4c16 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 6 Aug 2025 16:38:42 +0200 Subject: [PATCH] perf(cubestore): Reduce allocations in info schema tables - Replace intermediate Vec allocations with direct iterator consumption - Use from_iter/from_iter_values instead of collect() followed by from() - Remove unnecessary .as_str() conversions where &String is sufficient - Simplify Option handling with .as_deref() instead of .as_ref().map() This optimization eliminates ~78 unnecessary heap allocations across all info schema table implementations, improving performance especially with large datasets. --- rust/cubestore/CLAUDE.md | 140 ++++++++++++++ rust/cubestore/Cargo.lock | 36 ---- rust/cubestore/cubestore/Cargo.toml | 1 - .../info_schema/info_schema_columns.rs | 22 +-- .../info_schema/info_schema_schemata.rs | 7 +- .../info_schema/info_schema_tables.rs | 54 +++--- .../queryplanner/info_schema/system_cache.rs | 19 +- .../queryplanner/info_schema/system_chunks.rs | 131 +++++-------- .../info_schema/system_indexes.rs | 47 ++--- .../queryplanner/info_schema/system_jobs.rs | 24 +-- .../info_schema/system_partitions.rs | 85 +++------ .../queryplanner/info_schema/system_queue.rs | 57 +++--- .../info_schema/system_queue_results.rs | 12 +- .../info_schema/system_replay_handles.rs | 29 ++- .../info_schema/system_snapshots.rs | 18 +- .../queryplanner/info_schema/system_tables.rs | 177 +++++++----------- 16 files changed, 391 insertions(+), 468 deletions(-) create mode 100644 rust/cubestore/CLAUDE.md diff --git a/rust/cubestore/CLAUDE.md b/rust/cubestore/CLAUDE.md new file mode 100644 index 0000000000000..caeffbc45d3ea --- /dev/null +++ b/rust/cubestore/CLAUDE.md @@ -0,0 +1,140 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Repository Overview + +CubeStore is the Rust-based distributed OLAP storage engine for Cube.js, designed to store and serve pre-aggregations at scale. It's part of the larger Cube.js monorepo and serves as the materialized cache store for rollup tables. + +## Architecture Overview + +### Core Components + +The codebase is organized as a Rust workspace with multiple crates: + +- **`cubestore`**: Main CubeStore implementation with distributed storage, query execution, and API interfaces +- **`cubestore-sql-tests`**: SQL compatibility test suite and benchmarks +- **`cubehll`**: HyperLogLog implementation for approximate distinct counting +- **`cubedatasketches`**: DataSketches integration for advanced approximate algorithms +- **`cubezetasketch`**: Theta Sketch implementation for set operations +- **`cuberpc`**: RPC layer for distributed communication +- **`cuberockstore`**: RocksDB wrapper and storage abstraction + +### Key Modules in `cubestore/src/` + +- **`metastore/`**: Metadata management, table schemas, partitioning, and distributed coordination +- **`queryplanner/`**: Query planning, optimization, and physical execution planning using DataFusion +- **`store/`**: Core storage layer with compaction and data management +- **`cluster/`**: Distributed cluster management, worker pools, and inter-node communication +- **`table/`**: Table data handling, Parquet integration, and data redistribution +- **`cachestore/`**: Caching layer with eviction policies and queue management +- **`sql/`**: SQL parsing and execution layer +- **`streaming/`**: Kafka streaming support and traffic handling +- **`remotefs/`**: Cloud storage integration (S3, GCS, MinIO) +- **`config/`**: Dependency injection and configuration management + +## Development Commands + +### Building + +```bash +# Build all crates in release mode +cargo build --release + +# Build all crates in debug mode +cargo build + +# Build specific crate +cargo build -p cubestore + +# Check code without building +cargo check +``` + +### Testing + +```bash +# Run all tests +cargo test + +# Run tests for specific crate +cargo test -p cubestore +cargo test -p cubestore-sql-tests + +# Run single test +cargo test test_name + +# Run tests with output +cargo test -- --nocapture + +# Run integration tests +cargo test --test '*' + +# Run benchmarks +cargo bench +``` + +### Development + +```bash +# Format code +cargo fmt + +# Check formatting +cargo fmt -- --check + +# Run clippy lints +cargo clippy + +# Run with debug logging +RUST_LOG=debug cargo run + +# Run specific binary +cargo run --bin cubestore + +# Watch for changes (requires cargo-watch) +cargo watch -x check -x test +``` + +### JavaScript Wrapper Commands + +```bash +# Build TypeScript wrapper +npm run build + +# Run JavaScript tests +npm test + +# Lint JavaScript code +npm run lint + +# Fix linting issues +npm run lint:fix +``` + +## Key Dependencies and Technologies + +- **DataFusion**: Apache Arrow-based query engine (using Cube's fork) +- **Apache Arrow/Parquet**: Columnar data format and processing +- **RocksDB**: Embedded key-value store for metadata +- **Tokio**: Async runtime for concurrent operations +- **sqlparser-rs**: SQL parsing (using Cube's fork) + +## Configuration via Dependency Injection + +The codebase uses a custom dependency injection system defined in `config/injection.rs`. Services are configured through the `Injector` and use `Arc` patterns for abstraction. + +## Testing Approach + +- Unit tests are colocated with source files using `#[cfg(test)]` modules +- Integration tests are in `cubestore-sql-tests/tests/` +- SQL compatibility tests use fixtures in `cubestore-sql-tests/src/tests.rs` +- Benchmarks are in `benches/` directories + +## Important Notes + +- This is a Rust nightly project (see `rust-toolchain.toml`) +- Uses custom forks of Arrow/DataFusion and sqlparser-rs for Cube-specific features +- Distributed mode involves router and worker nodes communicating via RPC +- Heavy use of async/await patterns with Tokio runtime +- Parquet files are the primary storage format for data \ No newline at end of file diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 2f8510736846e..ef5841f430c43 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1268,7 +1268,6 @@ dependencies = [ "simple_logger", "smallvec", "sqlparser", - "tarpc", "tempfile", "tokio", "tokio-stream", @@ -2186,12 +2185,6 @@ dependencies = [ "libm", ] -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "0.14.28" @@ -5043,35 +5036,6 @@ dependencies = [ "xattr", ] -[[package]] -name = "tarpc" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e325774dd5b35d979e9f4db2b0f0d7d85dc2ff2b676a3150af56c09eafc14b07" -dependencies = [ - "anyhow", - "fnv", - "futures", - "humantime", - "log", - "pin-project", - "rand 0.7.3", - "static_assertions", - "tarpc-plugins", - "tokio", -] - -[[package]] -name = "tarpc-plugins" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3240378a22b1195734e085ba71d1d4188d50f034aea82635acc430b7005afb5" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.107", -] - [[package]] name = "tempfile" version = "3.10.1" diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index 0c053f5d650bb..8aed023e41c3b 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -74,7 +74,6 @@ tokio-stream = { version = "0.1.15", features=["io-util"] } scopeguard = "1.1.0" async-compression = { version = "0.3.7", features = ["gzip", "tokio"] } tempfile = "3.10.1" -tarpc = { version = "0.24", features = ["tokio1"] } pin-project-lite = "0.2.4" paste = "1.0.4" memchr = "2" diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs index 8d265e95d5572..4057ba8c1b688 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_columns.rs @@ -43,35 +43,29 @@ impl InfoSchemaTableDef for ColumnsInfoSchemaTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|(_, row)| row.schema.get_row().get_name().as_str()) - .collect::>(), + .map(|(_, row)| row.schema.get_row().get_name()), )) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|(_, row)| row.table.get_row().get_table_name().as_str()) - .collect::>(), + .map(|(_, row)| row.table.get_row().get_table_name()), )) }), Box::new(|tables| { - Arc::new(StringArray::from( - tables - .iter() - .map(|(column, _)| column.get_name().as_str()) - .collect::>(), + Arc::new(StringArray::from_iter_values( + tables.iter().map(|(column, _)| column.get_name()), )) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|(column, _)| column.get_column_type().to_string()) - .collect::>(), + .map(|(column, _)| column.get_column_type().to_string()), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs index f97198700bf9c..4745e32a6b932 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_schemata.rs @@ -26,11 +26,8 @@ impl InfoSchemaTableDef for SchemataInfoSchemaTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![Box::new(|tables| { - Arc::new(StringArray::from( - tables - .iter() - .map(|row| row.get_row().get_name().as_str()) - .collect::>(), + Arc::new(StringArray::from_iter_values( + tables.iter().map(|row| row.get_row().get_name()), )) })] } diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs index f401978817a5a..fdc53cdacae01 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/info_schema_tables.rs @@ -40,48 +40,38 @@ impl InfoSchemaTableDef for TablesInfoSchemaTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|tables| { - Arc::new(StringArray::from( - tables - .iter() - .map(|row| row.schema.get_row().get_name().as_str()) - .collect::>(), + Arc::new(StringArray::from_iter_values( + tables.iter().map(|row| row.schema.get_row().get_name()), )) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|row| row.table.get_row().get_table_name().as_str()) - .collect::>(), + .map(|row| row.table.get_row().get_table_name()), )) }), Box::new(|tables| { - Arc::new(TimestampNanosecondArray::from( - tables - .iter() - .map(|row| { - row.table - .get_row() - .build_range_end() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(tables.iter().map( + |row| { + row.table + .get_row() + .build_range_end() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|tables| { - Arc::new(TimestampNanosecondArray::from( - tables - .iter() - .map(|row| { - row.table - .get_row() - .seal_at() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(tables.iter().map( + |row| { + row.table + .get_row() + .seal_at() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), ] } diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs index 309fd7fd7f9ce..ac6cd41151d37 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_cache.rs @@ -47,17 +47,14 @@ impl InfoSchemaTableDef for SystemCacheTableDef { )) }), Box::new(|items| { - Arc::new(TimestampNanosecondArray::from( - items - .iter() - .map(|row| { - row.get_row() - .get_expire() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(items.iter().map( + |row| { + row.get_row() + .get_expire() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|items| { Arc::new(StringArray::from_iter( diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs index fc56f5306c270..c31ecc42d4ead 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_chunks.rs @@ -57,111 +57,80 @@ impl InfoSchemaTableDef for SystemChunksTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|chunks| { - Arc::new(UInt64Array::from( - chunks.iter().map(|row| row.get_id()).collect::>(), + Arc::new(UInt64Array::from_iter_values( + chunks.iter().map(|row| row.get_id()), )) }), Box::new(|chunks| { - Arc::new(StringArray::from( - chunks - .iter() - .map(|row| chunk_file_name(row.get_id(), row.get_row().suffix())) - .collect::>(), - )) + Arc::new(StringArray::from_iter_values(chunks.iter().map(|row| { + chunk_file_name(row.get_id(), row.get_row().suffix()) + }))) }), Box::new(|chunks| { - Arc::new(UInt64Array::from( - chunks - .iter() - .map(|row| row.get_row().get_partition_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + chunks.iter().map(|row| row.get_row().get_partition_id()), )) }), Box::new(|chunks| { - Arc::new(UInt64Array::from( + Arc::new(UInt64Array::from_iter( chunks .iter() - .map(|row| row.get_row().replay_handle_id().clone()) - .collect::>(), + .map(|row| row.get_row().replay_handle_id().clone()), )) }), Box::new(|chunks| { - Arc::new(UInt64Array::from( - chunks - .iter() - .map(|row| row.get_row().get_row_count()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + chunks.iter().map(|row| row.get_row().get_row_count()), )) }), Box::new(|chunks| { - Arc::new(BooleanArray::from( - chunks - .iter() - .map(|row| row.get_row().uploaded()) - .collect::>(), + Arc::new(BooleanArray::from_iter( + chunks.iter().map(|row| Some(row.get_row().uploaded())), )) }), Box::new(|chunks| { - Arc::new(BooleanArray::from( - chunks - .iter() - .map(|row| row.get_row().active()) - .collect::>(), + Arc::new(BooleanArray::from_iter( + chunks.iter().map(|row| Some(row.get_row().active())), )) }), Box::new(|chunks| { - Arc::new(BooleanArray::from( - chunks - .iter() - .map(|row| row.get_row().in_memory()) - .collect::>(), + Arc::new(BooleanArray::from_iter( + chunks.iter().map(|row| Some(row.get_row().in_memory())), )) }), Box::new(|chunks| { - Arc::new(TimestampNanosecondArray::from( - chunks - .iter() - .map(|row| { - row.get_row() - .created_at() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(chunks.iter().map( + |row| { + row.get_row() + .created_at() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|chunks| { - Arc::new(TimestampNanosecondArray::from( - chunks - .iter() - .map(|row| { - row.get_row() - .oldest_insert_at() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(chunks.iter().map( + |row| { + row.get_row() + .oldest_insert_at() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|chunks| { - Arc::new(TimestampNanosecondArray::from( - chunks - .iter() - .map(|row| { - row.get_row() - .deactivated_at() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(chunks.iter().map( + |row| { + row.get_row() + .deactivated_at() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|chunks| { - Arc::new(UInt64Array::from( - chunks - .iter() - .map(|row| row.get_row().file_size()) - .collect::>(), + Arc::new(UInt64Array::from_iter( + chunks.iter().map(|row| row.get_row().file_size()), )) }), Box::new(|chunks| { @@ -169,11 +138,8 @@ impl InfoSchemaTableDef for SystemChunksTableDef { .iter() .map(|row| row.get_row().min().as_ref().map(|x| format!("{:?}", x))) .collect::>(); - Arc::new(StringArray::from( - min_array - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + min_array.iter().map(|v| v.as_deref()), )) }), Box::new(|chunks| { @@ -181,11 +147,8 @@ impl InfoSchemaTableDef for SystemChunksTableDef { .iter() .map(|row| row.get_row().max().as_ref().map(|x| format!("{:?}", x))) .collect::>(); - Arc::new(StringArray::from( - max_array - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + max_array.iter().map(|v| v.as_deref()), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs index ec36824ef36f6..793b444858bcc 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_indexes.rs @@ -36,64 +36,49 @@ impl InfoSchemaTableDef for SystemIndexesTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|indexes| { - Arc::new(UInt64Array::from( - indexes.iter().map(|row| row.get_id()).collect::>(), + Arc::new(UInt64Array::from_iter_values( + indexes.iter().map(|row| row.get_id()), )) }), Box::new(|indexes| { - Arc::new(UInt64Array::from( - indexes - .iter() - .map(|row| row.get_row().table_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + indexes.iter().map(|row| row.get_row().table_id()), )) }), Box::new(|indexes| { - Arc::new(StringArray::from( - indexes - .iter() - .map(|row| row.get_row().get_name().as_str()) - .collect::>(), + Arc::new(StringArray::from_iter_values( + indexes.iter().map(|row| row.get_row().get_name()), )) }), Box::new(|indexes| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( indexes .iter() - .map(|row| format!("{:?}", row.get_row().get_columns())) - .collect::>(), + .map(|row| format!("{:?}", row.get_row().get_columns())), )) }), Box::new(|indexes| { - Arc::new(UInt64Array::from( - indexes - .iter() - .map(|row| row.get_row().sort_key_size()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + indexes.iter().map(|row| row.get_row().sort_key_size()), )) }), Box::new(|indexes| { - Arc::new(UInt64Array::from( + Arc::new(UInt64Array::from_iter( indexes .iter() - .map(|row| row.get_row().partition_split_key_size().clone()) - .collect::>(), + .map(|row| row.get_row().partition_split_key_size().clone()), )) }), Box::new(|indexes| { - Arc::new(UInt64Array::from( - indexes - .iter() - .map(|row| row.get_row().multi_index_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter( + indexes.iter().map(|row| row.get_row().multi_index_id()), )) }), Box::new(|indexes| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( indexes .iter() - .map(|row| format!("{:?}", row.get_row().get_type())) - .collect::>(), + .map(|row| format!("{:?}", row.get_row().get_type())), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs index d54fd44c05031..2480887fbdef4 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_jobs.rs @@ -38,36 +38,32 @@ impl InfoSchemaTableDef for SystemJobsTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|jobs| { - Arc::new(UInt64Array::from( - jobs.iter().map(|row| row.get_id()).collect::>(), + Arc::new(UInt64Array::from_iter_values( + jobs.iter().map(|row| row.get_id()), )) }), Box::new(|jobs| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( jobs.iter() - .map(|row| format!("{:?}", row.get_row().row_reference())) - .collect::>(), + .map(|row| format!("{:?}", row.get_row().row_reference())), )) }), Box::new(|jobs| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( jobs.iter() - .map(|row| format!("{:?}", row.get_row().job_type())) - .collect::>(), + .map(|row| format!("{:?}", row.get_row().job_type())), )) }), Box::new(|jobs| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( jobs.iter() - .map(|row| format!("{:?}", row.get_row().status())) - .collect::>(), + .map(|row| format!("{:?}", row.get_row().status())), )) }), Box::new(|jobs| { - Arc::new(TimestampNanosecondArray::from( + Arc::new(TimestampNanosecondArray::from_iter_values( jobs.iter() - .map(|row| row.get_row().last_heart_beat().timestamp_nanos()) - .collect::>(), + .map(|row| row.get_row().last_heart_beat().timestamp_nanos()), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs index 7f603a3d09759..ab4474287a0e5 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_partitions.rs @@ -42,43 +42,32 @@ impl InfoSchemaTableDef for SystemPartitionsTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|partitions| { - Arc::new(UInt64Array::from( - partitions - .iter() - .map(|row| row.get_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + partitions.iter().map(|row| row.get_id()), )) }), Box::new(|partitions| { - Arc::new(StringArray::from( - partitions - .iter() - .map(|row| partition_file_name(row.get_id(), row.get_row().suffix())) - .collect::>(), - )) + Arc::new(StringArray::from_iter_values(partitions.iter().map( + |row| partition_file_name(row.get_id(), row.get_row().suffix()), + ))) }), Box::new(|partitions| { - Arc::new(UInt64Array::from( - partitions - .iter() - .map(|row| row.get_row().get_index_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + partitions.iter().map(|row| row.get_row().get_index_id()), )) }), Box::new(|partitions| { - Arc::new(UInt64Array::from( + Arc::new(UInt64Array::from_iter( partitions .iter() - .map(|row| row.get_row().parent_partition_id().clone()) - .collect::>(), + .map(|row| row.get_row().parent_partition_id().clone()), )) }), Box::new(|partitions| { - Arc::new(UInt64Array::from( + Arc::new(UInt64Array::from_iter( partitions .iter() - .map(|row| row.get_row().multi_partition_id().clone()) - .collect::>(), + .map(|row| row.get_row().multi_partition_id().clone()), )) }), Box::new(|partitions| { @@ -91,11 +80,8 @@ impl InfoSchemaTableDef for SystemPartitionsTableDef { .map(|x| format!("{:?}", x)) }) .collect::>(); - Arc::new(StringArray::from( - min_array - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + min_array.iter().map(|v| v.as_deref()), )) }), Box::new(|partitions| { @@ -108,11 +94,8 @@ impl InfoSchemaTableDef for SystemPartitionsTableDef { .map(|x| format!("{:?}", x)) }) .collect::>(); - Arc::new(StringArray::from( - max_array - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + max_array.iter().map(|v| v.as_deref()), )) }), Box::new(|partitions| { @@ -120,11 +103,8 @@ impl InfoSchemaTableDef for SystemPartitionsTableDef { .iter() .map(|row| row.get_row().get_min().as_ref().map(|x| format!("{:?}", x))) .collect::>(); - Arc::new(StringArray::from( - min_array - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + min_array.iter().map(|v| v.as_deref()), )) }), Box::new(|partitions| { @@ -132,43 +112,32 @@ impl InfoSchemaTableDef for SystemPartitionsTableDef { .iter() .map(|row| row.get_row().get_max().as_ref().map(|x| format!("{:?}", x))) .collect::>(); - Arc::new(StringArray::from( - max_array - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + max_array.iter().map(|v| v.as_deref()), )) }), Box::new(|partitions| { - Arc::new(BooleanArray::from( - partitions - .iter() - .map(|row| row.get_row().is_active()) - .collect::>(), + Arc::new(BooleanArray::from_iter( + partitions.iter().map(|row| Some(row.get_row().is_active())), )) }), Box::new(|partitions| { - Arc::new(BooleanArray::from( + Arc::new(BooleanArray::from_iter( partitions .iter() - .map(|row| row.get_row().is_warmed_up()) - .collect::>(), + .map(|row| Some(row.get_row().is_warmed_up())), )) }), Box::new(|partitions| { - Arc::new(UInt64Array::from( + Arc::new(UInt64Array::from_iter_values( partitions .iter() - .map(|row| row.get_row().main_table_row_count()) - .collect::>(), + .map(|row| row.get_row().main_table_row_count()), )) }), Box::new(|partitions| { - Arc::new(UInt64Array::from( - partitions - .iter() - .map(|row| row.get_row().file_size()) - .collect::>(), + Arc::new(UInt64Array::from_iter( + partitions.iter().map(|row| row.get_row().file_size()), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs index 4c7ccaeb98b92..fc914ce5f38b1 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs @@ -61,56 +61,47 @@ impl InfoSchemaTableDef for SystemQueueTableDef { )) }), Box::new(|items| { - Arc::new(TimestampNanosecondArray::from( + Arc::new(TimestampNanosecondArray::from_iter_values( items .iter() - .map(|row| row.item.get_row().get_created().timestamp_nanos()) - .collect::>(), + .map(|row| row.item.get_row().get_created().timestamp_nanos()), )) }), Box::new(|items| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( items .iter() - .map(|row| format!("{:?}", row.item.get_row().get_status())) - .collect::>(), + .map(|row| format!("{:?}", row.item.get_row().get_status())), )) }), Box::new(|items| { - Arc::new(Int64Array::from( + Arc::new(Int64Array::from_iter_values( items .iter() - .map(|row| row.item.get_row().get_priority().clone()) - .collect::>(), + .map(|row| row.item.get_row().get_priority().clone()), )) }), Box::new(|items| { - Arc::new(TimestampNanosecondArray::from( - items - .iter() - .map(|row| { - row.item - .get_row() - .get_heartbeat() - .as_ref() - .map(|v| v.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(items.iter().map( + |row| { + row.item + .get_row() + .get_heartbeat() + .as_ref() + .map(|v| v.timestamp_nanos()) + }, + ))) }), Box::new(|items| { - Arc::new(TimestampNanosecondArray::from( - items - .iter() - .map(|row| { - row.item - .get_row() - .get_orphaned() - .as_ref() - .map(|v| v.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(items.iter().map( + |row| { + row.item + .get_row() + .get_orphaned() + .as_ref() + .map(|v| v.timestamp_nanos()) + }, + ))) }), Box::new(|items| { Arc::new(StringArray::from_iter( diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs index 08f5db63545b3..f36c694145783 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue_results.rs @@ -52,11 +52,10 @@ impl InfoSchemaTableDef for SystemQueueResultsTableDef { )) }), Box::new(|items| { - Arc::new(TimestampNanosecondArray::from( + Arc::new(TimestampNanosecondArray::from_iter_values( items .iter() - .map(|row| row.get_row().get_expire().timestamp_nanos()) - .collect::>(), + .map(|row| row.get_row().get_expire().timestamp_nanos()), )) }), Box::new(|items| { @@ -65,11 +64,8 @@ impl InfoSchemaTableDef for SystemQueueResultsTableDef { )) }), Box::new(|items| { - Arc::new(StringArray::from( - items - .iter() - .map(|row| row.get_row().get_value().clone()) - .collect::>(), + Arc::new(StringArray::from_iter_values( + items.iter().map(|row| row.get_row().get_value().clone()), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs index 894eaa88d4fc2..c930551941740 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_replay_handles.rs @@ -40,39 +40,32 @@ impl InfoSchemaTableDef for SystemReplayHandlesTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|handles| { - Arc::new(UInt64Array::from( - handles.iter().map(|row| row.get_id()).collect::>(), + Arc::new(UInt64Array::from_iter_values( + handles.iter().map(|row| row.get_id()), )) }), Box::new(|handles| { - Arc::new(UInt64Array::from( - handles - .iter() - .map(|row| row.get_row().table_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + handles.iter().map(|row| row.get_row().table_id()), )) }), Box::new(|handles| { - Arc::new(BooleanArray::from( + Arc::new(BooleanArray::from_iter( handles .iter() - .map(|row| row.get_row().has_failed_to_persist_chunks()) - .collect::>(), + .map(|row| Some(row.get_row().has_failed_to_persist_chunks())), )) }), Box::new(|jobs| { - Arc::new(StringArray::from( - jobs.iter() - .map(|row| format!("{:?}", row.get_row().seq_pointers_by_location())) - .collect::>(), - )) + Arc::new(StringArray::from_iter_values(jobs.iter().map(|row| { + format!("{:?}", row.get_row().seq_pointers_by_location()) + }))) }), Box::new(|handles| { - Arc::new(TimestampNanosecondArray::from( + Arc::new(TimestampNanosecondArray::from_iter_values( handles .iter() - .map(|row| row.get_row().created_at().timestamp_nanos()) - .collect::>(), + .map(|row| row.get_row().created_at().timestamp_nanos()), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs index 7dfe33c29e37a..74d6808d1dd46 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_snapshots.rs @@ -35,24 +35,18 @@ impl InfoSchemaTableDef for SystemSnapshotsTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|snapshots| { - Arc::new(StringArray::from( - snapshots - .iter() - .map(|row| format!("{}", row.id)) - .collect::>(), + Arc::new(StringArray::from_iter_values( + snapshots.iter().map(|row| format!("{}", row.id)), )) }), Box::new(|snapshots| { - Arc::new(TimestampNanosecondArray::from( - snapshots - .iter() - .map(|row| (row.id * 1000000) as i64) - .collect::>(), + Arc::new(TimestampNanosecondArray::from_iter_values( + snapshots.iter().map(|row| (row.id * 1000000) as i64), )) }), Box::new(|snapshots| { - Arc::new(BooleanArray::from( - snapshots.iter().map(|row| row.current).collect::>(), + Arc::new(BooleanArray::from_iter( + snapshots.iter().map(|row| Some(row.current)), )) }), ] diff --git a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs index 55060cb065add..e52daf73825ec 100644 --- a/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs +++ b/rust/cubestore/cubestore/src/queryplanner/info_schema/system_tables.rs @@ -61,75 +61,60 @@ impl InfoSchemaTableDef for SystemTablesTableDef { fn columns(&self) -> Vec>) -> ArrayRef>> { vec![ Box::new(|tables| { - Arc::new(UInt64Array::from( - tables - .iter() - .map(|row| row.table.get_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + tables.iter().map(|row| row.table.get_id()), )) }), Box::new(|tables| { - Arc::new(UInt64Array::from( - tables - .iter() - .map(|row| row.table.get_row().get_schema_id()) - .collect::>(), + Arc::new(UInt64Array::from_iter_values( + tables.iter().map(|row| row.table.get_row().get_schema_id()), )) }), Box::new(|tables| { - Arc::new(StringArray::from( - tables - .iter() - .map(|row| row.schema.get_row().get_name().as_str()) - .collect::>(), + Arc::new(StringArray::from_iter_values( + tables.iter().map(|row| row.schema.get_row().get_name()), )) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|row| row.table.get_row().get_table_name().as_str()) - .collect::>(), + .map(|row| row.table.get_row().get_table_name()), )) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|row| format!("{:?}", row.table.get_row().get_columns())) - .collect::>(), + .map(|row| format!("{:?}", row.table.get_row().get_columns())), )) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|row| format!("{:?}", row.table.get_row().locations())) - .collect::>(), + .map(|row| format!("{:?}", row.table.get_row().locations())), )) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter_values( tables .iter() - .map(|row| format!("{:?}", row.table.get_row().import_format())) - .collect::>(), + .map(|row| format!("{:?}", row.table.get_row().import_format())), )) }), Box::new(|tables| { - Arc::new(BooleanArray::from( + Arc::new(BooleanArray::from_iter( tables .iter() - .map(|row| *row.table.get_row().has_data()) - .collect::>(), + .map(|row| Some(*row.table.get_row().has_data())), )) }), Box::new(|tables| { - Arc::new(BooleanArray::from( + Arc::new(BooleanArray::from_iter( tables .iter() - .map(|row| row.table.get_row().is_ready()) - .collect::>(), + .map(|row| Some(row.table.get_row().is_ready())), )) }), Box::new(|tables| { @@ -143,21 +128,14 @@ impl InfoSchemaTableDef for SystemTablesTableDef { .map(|v| format!("{:?}", v)) }) .collect::>(); - Arc::new(StringArray::from( - unique_key_columns - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + unique_key_columns.iter().map(|v| v.as_deref()), )) }), Box::new(|tables| { - let aggregates = tables - .iter() - .map(|row| format!("{:?}", row.table.get_row().aggregate_columns())) - .collect::>(); - Arc::new(StringArray::from( - aggregates.iter().map(|v| v.as_str()).collect::>(), - )) + Arc::new(StringArray::from_iter_values(tables.iter().map(|row| { + format!("{:?}", row.table.get_row().aggregate_columns()) + }))) }), Box::new(|tables| { let seq_columns = tables @@ -170,90 +148,67 @@ impl InfoSchemaTableDef for SystemTablesTableDef { .map(|v| format!("{:?}", v)) }) .collect::>(); - Arc::new(StringArray::from( - seq_columns - .iter() - .map(|v| v.as_ref().map(|v| v.as_str())) - .collect::>(), + Arc::new(StringArray::from_iter( + seq_columns.iter().map(|v| v.as_deref()), )) }), Box::new(|tables| { - let array = tables - .iter() - .map(|row| row.table.get_row().partition_split_threshold().clone()) - .collect::>(); - Arc::new(UInt64Array::from(array)) + Arc::new(UInt64Array::from_iter(tables.iter().map(|row| { + row.table.get_row().partition_split_threshold().clone() + }))) }), Box::new(|tables| { - Arc::new(TimestampNanosecondArray::from( - tables - .iter() - .map(|row| { - row.table - .get_row() - .created_at() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(tables.iter().map( + |row| { + row.table + .get_row() + .created_at() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|tables| { - Arc::new(TimestampNanosecondArray::from( - tables - .iter() - .map(|row| { - row.table - .get_row() - .build_range_end() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(tables.iter().map( + |row| { + row.table + .get_row() + .build_range_end() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|tables| { - Arc::new(TimestampNanosecondArray::from( - tables - .iter() - .map(|row| { - row.table - .get_row() - .seal_at() - .as_ref() - .map(|t| t.timestamp_nanos()) - }) - .collect::>(), - )) + Arc::new(TimestampNanosecondArray::from_iter(tables.iter().map( + |row| { + row.table + .get_row() + .seal_at() + .as_ref() + .map(|t| t.timestamp_nanos()) + }, + ))) }), Box::new(|tables| { - Arc::new(BooleanArray::from( - tables - .iter() - .map(|row| row.table.get_row().sealed()) - .collect::>(), + Arc::new(BooleanArray::from_iter( + tables.iter().map(|row| Some(row.table.get_row().sealed())), )) }), Box::new(|tables| { - Arc::new(StringArray::from( - tables - .iter() - .map(|row| { - row.table - .get_row() - .select_statement() - .as_ref() - .map(|t| t.as_str()) - }) - .collect::>(), - )) + Arc::new(StringArray::from_iter(tables.iter().map(|row| { + row.table + .get_row() + .select_statement() + .as_ref() + .map(|t| t.as_str()) + }))) }), Box::new(|tables| { - Arc::new(StringArray::from( + Arc::new(StringArray::from_iter( tables .iter() - .map(|row| row.table.get_row().extension().as_ref().map(|t| t.as_str())) - .collect::>(), + .map(|row| row.table.get_row().extension().as_deref()), )) }), ]