From d4b37f9ceb641215816e8b5c9ba9c2438f2074b8 Mon Sep 17 00:00:00 2001 From: Mikhail Nazarov Date: Mon, 11 Nov 2024 18:42:21 +0300 Subject: [PATCH 1/2] wip --- Cargo.lock | 4 +- .../grpc_wrapper/raw_table_service/client.rs | 23 +--- .../raw_table_service/commit_transaction.rs | 4 +- .../raw_table_service/execute_data_query.rs | 104 ++++++++++++++++- .../raw_table_service/query_stats.rs | 27 +++-- ydb/src/query.rs | 25 +++- ydb/src/result.rs | 108 +++++++++++++++++- ydb/src/session.rs | 2 +- ydb/src/transaction.rs | 5 +- 9 files changed, 259 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 992b87bb..e38897ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2550,7 +2550,7 @@ dependencies = [ [[package]] name = "ydb" -version = "0.9.3" +version = "0.9.4" dependencies = [ "async-trait", "async_once", @@ -2606,7 +2606,7 @@ dependencies = [ [[package]] name = "ydb-grpc" -version = "0.0.14" +version = "0.1.0" dependencies = [ "pbjson", "pbjson-build", diff --git a/ydb/src/grpc_wrapper/raw_table_service/client.rs b/ydb/src/grpc_wrapper/raw_table_service/client.rs index 8be1c808..a716d80f 100644 --- a/ydb/src/grpc_wrapper/raw_table_service/client.rs +++ b/ydb/src/grpc_wrapper/raw_table_service/client.rs @@ -11,11 +11,13 @@ use crate::grpc_wrapper::raw_table_service::execute_scheme_query::RawExecuteSche use crate::grpc_wrapper::raw_table_service::keepalive::{RawKeepAliveRequest, RawKeepAliveResult}; use crate::grpc_wrapper::raw_table_service::rollback_transaction::RawRollbackTransactionRequest; use crate::grpc_wrapper::runtime_interceptors::InterceptedChannel; + use tracing::trace; use ydb_grpc::ydb_proto::table::v1::table_service_client::TableServiceClient; use crate::grpc_wrapper::raw_table_service::copy_table::{RawCopyTableRequest, RawCopyTablesRequest}; use crate::grpc_wrapper::raw_table_service::execute_data_query::{RawExecuteDataQueryRequest, RawExecuteDataQueryResult}; + pub(crate) struct RawTableClient { timeouts: TimeoutSettings, service: TableServiceClient, @@ -167,24 +169,3 @@ impl From for i32 { grpc_val as i32 } } - -#[derive(Debug)] -pub(crate) struct RawQueryStats { - process_cpu_time: std::time::Duration, - query_plan: String, - query_ast: String, - total_duration: std::time::Duration, - total_cpu_time: std::time::Duration, -} - -impl From for RawQueryStats { - fn from(value: ydb_grpc::ydb_proto::table_stats::QueryStats) -> Self { - Self { - process_cpu_time: std::time::Duration::from_micros(value.process_cpu_time_us), - query_plan: value.query_plan, - query_ast: value.query_ast, - total_duration: std::time::Duration::from_micros(value.total_duration_us), - total_cpu_time: std::time::Duration::from_micros(value.total_cpu_time_us), - } - } -} diff --git a/ydb/src/grpc_wrapper/raw_table_service/commit_transaction.rs b/ydb/src/grpc_wrapper/raw_table_service/commit_transaction.rs index 78da3db2..1c5a963e 100644 --- a/ydb/src/grpc_wrapper/raw_table_service/commit_transaction.rs +++ b/ydb/src/grpc_wrapper/raw_table_service/commit_transaction.rs @@ -1,7 +1,9 @@ use crate::grpc_wrapper::raw_errors::RawError; -use crate::grpc_wrapper::raw_table_service::client::{CollectStatsMode, RawQueryStats}; +use crate::grpc_wrapper::raw_table_service::client::CollectStatsMode; use crate::grpc_wrapper::raw_ydb_operation::RawOperationParams; +use super::execute_data_query::RawQueryStats; + pub(crate) struct RawCommitTransactionRequest { pub session_id: String, pub tx_id: String, diff --git a/ydb/src/grpc_wrapper/raw_table_service/execute_data_query.rs b/ydb/src/grpc_wrapper/raw_table_service/execute_data_query.rs index 401f46fa..636e5db9 100644 --- a/ydb/src/grpc_wrapper/raw_table_service/execute_data_query.rs +++ b/ydb/src/grpc_wrapper/raw_table_service/execute_data_query.rs @@ -1,10 +1,10 @@ -use std::collections::HashMap; use crate::grpc_wrapper::raw_errors::RawError; -use crate::grpc_wrapper::raw_table_service::query_stats::RawQueryStatMode; +use crate::grpc_wrapper::raw_table_service::query_stats::RawQueryStatsMode; use crate::grpc_wrapper::raw_table_service::transaction_control::RawTransactionControl; -use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue}; use crate::grpc_wrapper::raw_table_service::value::r#type::RawType; +use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue}; use crate::grpc_wrapper::raw_ydb_operation::RawOperationParams; +use std::collections::HashMap; #[derive(serde::Serialize)] pub(crate) struct RawExecuteDataQueryRequest { @@ -14,7 +14,7 @@ pub(crate) struct RawExecuteDataQueryRequest { pub operation_params: RawOperationParams, pub params: HashMap, pub keep_in_cache: bool, - pub collect_stats: RawQueryStatMode, + pub collect_stats: RawQueryStatsMode, } impl From for ydb_grpc::ydb_proto::table::ExecuteDataQueryRequest { @@ -44,7 +44,7 @@ pub(crate) struct RawExecuteDataQueryResult { pub result_sets: Vec, pub tx_meta: RawTransactionMeta, pub query_meta: Option, - // query_stats: Option, // todo + pub query_stats: Option, } impl TryFrom for RawExecuteDataQueryResult { @@ -59,11 +59,14 @@ impl TryFrom for RawExecuteDataQ .map(|item| item.try_into()) .collect(); - let query_meta = if let Some(proto_meta) = value.query_meta{ + let query_meta = if let Some(proto_meta) = value.query_meta { Some(RawQueryMeta::try_from(proto_meta)?) } else { None }; + + let query_stats = value.query_stats.map(RawQueryStats::from); + Ok(Self { result_sets: result_sets_res?, tx_meta: value @@ -71,6 +74,7 @@ impl TryFrom for RawExecuteDataQ .ok_or_else(|| RawError::custom("no tx_meta at ExecuteQueryResult"))? .into(), query_meta, + query_stats, }) } } @@ -108,3 +112,91 @@ impl TryFrom for RawQueryMeta { }) } } + +#[derive(serde::Serialize)] +pub(crate) struct RawQueryStats { + pub query_phases: Vec, + pub process_cpu_time: std::time::Duration, + pub query_plan: String, + pub query_ast: String, + pub total_duration: std::time::Duration, + pub total_cpu_time: std::time::Duration, +} + +impl From for RawQueryStats { + fn from(value: ydb_grpc::ydb_proto::table_stats::QueryStats) -> Self { + Self { + query_phases: value.query_phases.into_iter().map(Into::into).collect(), + process_cpu_time: std::time::Duration::from_micros(value.process_cpu_time_us), + query_plan: value.query_plan, + query_ast: value.query_ast, + total_duration: std::time::Duration::from_micros(value.total_duration_us), + total_cpu_time: std::time::Duration::from_micros(value.total_cpu_time_us), + } + } +} + +#[derive(Debug, serde::Serialize)] +pub(crate) struct RawQueryPhaseStats { + pub duration: std::time::Duration, + + pub table_access: Vec, + + pub cpu_time: std::time::Duration, + + pub affected_shards: u64, + + pub literal_phase: bool, +} + +#[derive(Debug, serde::Serialize)] +pub(crate) struct RawTableAccessStats { + pub name: String, + pub reads: Option, + pub updates: Option, + pub deletes: Option, + pub partitions_count: u64, +} + +#[derive(Debug, serde::Serialize)] +pub(crate) struct RawOperationStats { + pub rows: u64, + pub bytes: u64, +} + +impl From for RawOperationStats { + fn from(value: ydb_grpc::ydb_proto::table_stats::OperationStats) -> Self { + Self { + rows: value.rows, + bytes: value.bytes, + } + } +} + +impl From for RawTableAccessStats { + fn from(value: ydb_grpc::ydb_proto::table_stats::TableAccessStats) -> Self { + let reads = value.reads.map(Into::into); + let updates = value.updates.map(Into::into); + let deletes = value.deletes.map(Into::into); + + Self { + name: value.name, + reads, + updates, + deletes, + partitions_count: value.partitions_count, + } + } +} + +impl From for RawQueryPhaseStats { + fn from(value: ydb_grpc::ydb_proto::table_stats::QueryPhaseStats) -> Self { + Self { + duration: std::time::Duration::from_micros(value.duration_us), + table_access: value.table_access.into_iter().map(Into::into).collect(), + cpu_time: std::time::Duration::from_micros(value.cpu_time_us), + affected_shards: value.affected_shards, + literal_phase: value.literal_phase, + } + } +} diff --git a/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs b/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs index 461ed4d4..287831b9 100644 --- a/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs +++ b/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs @@ -1,19 +1,32 @@ +use crate::query::QueryStatsMode; + #[derive(serde::Serialize)] -pub(crate) enum RawQueryStatMode{ +pub(crate) enum RawQueryStatsMode { None, Basic, Full, Profile, } -impl From for ydb_grpc::ydb_proto::table::query_stats_collection::Mode { - fn from(v: RawQueryStatMode) -> Self { +impl From for ydb_grpc::ydb_proto::table::query_stats_collection::Mode { + fn from(v: RawQueryStatsMode) -> Self { use ydb_grpc::ydb_proto::table::query_stats_collection::Mode as grpcMode; match v { - RawQueryStatMode::None => grpcMode::StatsCollectionNone, - RawQueryStatMode::Basic => grpcMode::StatsCollectionBasic, - RawQueryStatMode::Full => grpcMode::StatsCollectionFull, - RawQueryStatMode::Profile => grpcMode::StatsCollectionProfile, + RawQueryStatsMode::None => grpcMode::StatsCollectionNone, + RawQueryStatsMode::Basic => grpcMode::StatsCollectionBasic, + RawQueryStatsMode::Full => grpcMode::StatsCollectionFull, + RawQueryStatsMode::Profile => grpcMode::StatsCollectionProfile, + } + } +} + +impl From for RawQueryStatsMode { + fn from(value: QueryStatsMode) -> Self { + match value { + QueryStatsMode::Basic => RawQueryStatsMode::Basic, + QueryStatsMode::None => RawQueryStatsMode::None, + QueryStatsMode::Full => RawQueryStatsMode::Full, + QueryStatsMode::Profile => RawQueryStatsMode::Profile, } } } diff --git a/ydb/src/query.rs b/ydb/src/query.rs index 744beb29..93ae16a0 100644 --- a/ydb/src/query.rs +++ b/ydb/src/query.rs @@ -13,6 +13,23 @@ pub struct Query { pub(crate) parameters: HashMap, pub(crate) keep_in_cache: bool, force_keep_in_cache: bool, + pub(crate) collect_stats: QueryStatsMode, +} + +/// Specifies which statistics should be collected during request processing +#[derive(Clone)] +pub enum QueryStatsMode { + /// Stats collection is disabled + None, + + /// Aggregated stats of reads, updates and deletes per table + Basic, + + /// Add execution stats and plan on top of STATS_COLLECTION_BASIC + Full, + + /// Detailed execution stats including stats for individual tasks and channels + Profile, } impl Query { @@ -23,6 +40,7 @@ impl Query { parameters: HashMap::new(), keep_in_cache: false, force_keep_in_cache: false, + collect_stats: QueryStatsMode::None, } } @@ -77,12 +95,17 @@ impl Query { /// .with_params(ydb_params!("$val" => 123 as i64)) /// .with_keep_in_cache(false); /// ``` - pub fn with_keep_in_cache(mut self, val: bool)->Self { + pub fn with_keep_in_cache(mut self, val: bool) -> Self { self.force_keep_in_cache = true; self.keep_in_cache = val; self } + pub fn with_stats(mut self, mode: QueryStatsMode) -> Self { + self.collect_stats = mode; + self + } + pub(crate) fn query_to_proto(&self) -> ydb_grpc::ydb_proto::table::Query { ydb_grpc::ydb_proto::table::Query { query: Some(ydb_grpc::ydb_proto::table::query::Query::YqlText( diff --git a/ydb/src/result.rs b/ydb/src/result.rs index 57d41265..35d5b4bf 100644 --- a/ydb/src/result.rs +++ b/ydb/src/result.rs @@ -1,7 +1,7 @@ use crate::errors; use crate::errors::{YdbError, YdbResult, YdbStatusError}; use crate::grpc::proto_issues_to_ydb_issues; -use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryResult; +use crate::grpc_wrapper::raw_table_service::execute_data_query::{RawQueryStats,RawExecuteDataQueryResult, RawOperationStats, RawQueryPhaseStats, RawTableAccessStats}; use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue, RawValue}; use crate::trace_helpers::ensure_len_string; use crate::types::Value; @@ -17,6 +17,19 @@ use ydb_grpc::ydb_proto::table::ExecuteScanQueryPartialResponse; pub struct QueryResult { pub(crate) results: Vec, pub(crate) tx_id: String, + + pub stats: Option, +} + +#[derive(Debug)] +pub struct QueryStats { + pub process_cpu_time: std::time::Duration, + pub total_duration: std::time::Duration, + pub total_cpu_time: std::time::Duration, + pub query_plan: String, + pub query_ast: String, + + pub query_phases: Vec, } impl QueryResult { @@ -45,6 +58,7 @@ impl QueryResult { Ok(QueryResult { results, tx_id: raw_res.tx_meta.id, + stats: raw_res.query_stats.map(QueryStats::from), }) } @@ -215,3 +229,95 @@ impl StreamResult { Ok(Some(result_set)) } } + +impl From for QueryStats{ + + fn from(value: RawQueryStats) -> QueryStats { + + Self { + process_cpu_time: value.process_cpu_time, + total_duration: value.total_duration, + total_cpu_time: value.total_cpu_time, + query_ast: value.query_ast, + query_plan: value.query_plan, + + query_phases: value.query_phases.into_iter().map(Into::into).collect(), + } + } +} + +#[derive(Debug)] +pub struct QueryPhaseStats { + pub duration: std::time::Duration, + pub table_access: Vec, + pub cpu_time: std::time::Duration, + pub affected_shards: u64, + pub literal_phase: bool, +} + +impl From for QueryPhaseStats { + fn from(value: RawQueryPhaseStats) -> Self { + Self { + + duration: value.duration, + table_access: value.table_access.into_iter().map(Into::into).collect(), + cpu_time: value.cpu_time, + affected_shards: value.affected_shards, + literal_phase: value.literal_phase, + } + } +} + +#[derive(Debug)] +pub struct TableAccessStats { + pub name: String, + pub reads: Option, + pub updates: Option, + pub deletes: Option, + pub partitions_count: u64, + pub affected_rows: u64 +} + + + + +impl From for TableAccessStats { + fn from(value: RawTableAccessStats) -> Self { + fn affected_rows(stats: &Option) -> u64 { + stats.as_ref().map(|x|x.rows).unwrap_or(0) + } + + let reads= value.reads.map(Into::into); + let updates= value.updates.map(Into::into); + let deletes= value.deletes.map(Into::into); + + let affected_rows = + affected_rows(&reads) + affected_rows(&updates) + affected_rows(&deletes); + + Self { + name: value.name, + reads, + updates, + deletes, + partitions_count: value.partitions_count, + affected_rows + } + } +} + + + +impl From for OperationStats { + fn from(value: RawOperationStats) -> Self { + Self { + rows: value.rows, + bytes: value.bytes, + } + } +} + +#[derive(Debug)] +pub struct OperationStats { + pub rows: u64, + pub bytes: u64, +} \ No newline at end of file diff --git a/ydb/src/session.rs b/ydb/src/session.rs index 4edd00dc..e4923342 100644 --- a/ydb/src/session.rs +++ b/ydb/src/session.rs @@ -25,7 +25,7 @@ use ydb_grpc::ydb_proto::table::{ execute_scan_query_request, ExecuteScanQueryRequest, }; -use crate::grpc_wrapper::raw_table_service::execute_data_query::{RawExecuteDataQueryRequest}; +use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryRequest; use crate::grpc_wrapper::raw_table_service::copy_table::{ RawCopyTableRequest, RawCopyTablesRequest diff --git a/ydb/src/transaction.rs b/ydb/src/transaction.rs index 15a3da1c..c146f248 100644 --- a/ydb/src/transaction.rs +++ b/ydb/src/transaction.rs @@ -1,7 +1,6 @@ use crate::client::TimeoutSettings; use crate::errors::{YdbError, YdbResult}; use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryRequest; -use crate::grpc_wrapper::raw_table_service::query_stats::RawQueryStatMode; use crate::grpc_wrapper::raw_table_service::transaction_control::{ RawOnlineReadonlySettings, RawTransactionControl, RawTxMode, RawTxSelector, RawTxSettings, }; @@ -101,7 +100,7 @@ impl Transaction for AutoCommit { }) .try_collect()?, keep_in_cache: query.keep_in_cache, - collect_stats: RawQueryStatMode::None, + collect_stats: query.collect_stats.into(), }; let mut session = self.session_pool.session().await?; @@ -207,7 +206,7 @@ impl Transaction for SerializableReadWriteTx { }) .try_collect()?, keep_in_cache: false, - collect_stats: RawQueryStatMode::None, + collect_stats: query.collect_stats.into(), }; let query_result = session .execute_data_query(req, self.error_on_truncate_response) From 0e42b85d6206bd5db86a0ee6b6978c6692d78b5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B8=D1=85=D0=B0=D0=B8=D0=BB=20=D0=9D=D0=B0=D0=B7?= =?UTF-8?q?=D0=B0=D1=80=D0=BE=D0=B2?= Date: Mon, 11 Nov 2024 23:38:13 +0300 Subject: [PATCH 2/2] update --- ydb/src/client_table_test_integration.rs | 88 ++++++++ .../raw_table_service/query_stats.rs | 2 +- ydb/src/lib.rs | 2 + ydb/src/query.rs | 17 +- ydb/src/result.rs | 109 +--------- ydb/src/stats.rs | 195 ++++++++++++++++++ 6 files changed, 291 insertions(+), 122 deletions(-) create mode 100644 ydb/src/stats.rs diff --git a/ydb/src/client_table_test_integration.rs b/ydb/src/client_table_test_integration.rs index 1395768c..3d9d3872 100644 --- a/ydb/src/client_table_test_integration.rs +++ b/ydb/src/client_table_test_integration.rs @@ -877,3 +877,91 @@ FROM assert!(result_set_count > 1); // ensure get multiply results Ok(()) } +#[tokio::test] +#[traced_test] +#[ignore] // need YDB access +async fn test_stats_updates() -> YdbResult<()> { + let client = create_client() + .await?; + + client + .table_client() + .create_session() + .await? + .execute_schema_query( + "CREATE TABLE test_values (id Int64, vInt64 Int64, PRIMARY KEY (id))".to_string(), + ) + .await?; + + + let mut tx = client.table_client().create_interactive_transaction(); + let res = tx.query(Query::new( + "UPSERT INTO test_values (id, vInt64) VALUES (1, 2),(3,4)", + ).with_stats(crate::QueryStatsMode::Basic)) + .await?; + + tx.commit().await?; + + println!("{:?}", res.stats); + + assert_eq!(2, res.stats.map(|x|x.rows_affected()).unwrap_or(0)); + + + + client + .table_client() + .create_session() + .await? + .execute_schema_query("DROP TABLE test_values".to_string()) + .await?; + + Ok(()) +} + +#[tokio::test] +#[traced_test] +#[ignore] // need YDB access +async fn test_stats_reads() -> YdbResult<()> { + let client = create_client() + .await?; + + client + .table_client() + .create_session() + .await? + .execute_schema_query( + "CREATE TABLE test_values (id Int64, vInt64 Int64, PRIMARY KEY (id))".to_string(), + ) + .await?; + + let mut tx = client.table_client().create_interactive_transaction(); + let _res = tx.query(Query::new( + "UPSERT INTO test_values (id, vInt64) VALUES (1, 2),(3,4)", + )) + .await? + ; + + + + + let res = tx.query(Query::new("SELECT * FROM test_values") + .with_stats(crate::QueryStatsMode::Basic)) + .await?; + + tx.commit().await?; + + println!("{:?}", res.stats); + + assert_eq!(2, res.stats.map(|x|x.rows_affected()).unwrap_or(0)); + + + + client + .table_client() + .create_session() + .await? + .execute_schema_query("DROP TABLE test_values".to_string()) + .await?; + + Ok(()) +} diff --git a/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs b/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs index 287831b9..4d84137d 100644 --- a/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs +++ b/ydb/src/grpc_wrapper/raw_table_service/query_stats.rs @@ -1,4 +1,4 @@ -use crate::query::QueryStatsMode; +use crate::QueryStatsMode; #[derive(serde::Serialize)] pub(crate) enum RawQueryStatsMode { diff --git a/ydb/src/lib.rs b/ydb/src/lib.rs index bc5d5c61..f41213d4 100644 --- a/ydb/src/lib.rs +++ b/ydb/src/lib.rs @@ -98,7 +98,9 @@ pub(crate) mod waiter; #[cfg(test)] mod types_test; +mod stats; +pub use stats::*; pub use client_coordination::client::CoordinationClient; pub use client_coordination::list_types::{ ConsistencyMode, NodeConfig, NodeConfigBuilder, NodeDescription, RateLimiterCountersMode, diff --git a/ydb/src/query.rs b/ydb/src/query.rs index 93ae16a0..3ca8aacb 100644 --- a/ydb/src/query.rs +++ b/ydb/src/query.rs @@ -3,7 +3,7 @@ use crate::types::Value; use std::collections::HashMap; use std::str::FromStr; -use crate::YdbError; +use crate::{QueryStatsMode, YdbError}; use ydb_grpc::ydb_proto::TypedValue; /// Query object @@ -16,21 +16,6 @@ pub struct Query { pub(crate) collect_stats: QueryStatsMode, } -/// Specifies which statistics should be collected during request processing -#[derive(Clone)] -pub enum QueryStatsMode { - /// Stats collection is disabled - None, - - /// Aggregated stats of reads, updates and deletes per table - Basic, - - /// Add execution stats and plan on top of STATS_COLLECTION_BASIC - Full, - - /// Detailed execution stats including stats for individual tasks and channels - Profile, -} impl Query { /// Create query with query text diff --git a/ydb/src/result.rs b/ydb/src/result.rs index 35d5b4bf..fb731c63 100644 --- a/ydb/src/result.rs +++ b/ydb/src/result.rs @@ -1,7 +1,7 @@ -use crate::errors; +use crate::{errors, QueryStats}; use crate::errors::{YdbError, YdbResult, YdbStatusError}; use crate::grpc::proto_issues_to_ydb_issues; -use crate::grpc_wrapper::raw_table_service::execute_data_query::{RawQueryStats,RawExecuteDataQueryResult, RawOperationStats, RawQueryPhaseStats, RawTableAccessStats}; +use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryResult; use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue, RawValue}; use crate::trace_helpers::ensure_len_string; use crate::types::Value; @@ -21,17 +21,6 @@ pub struct QueryResult { pub stats: Option, } -#[derive(Debug)] -pub struct QueryStats { - pub process_cpu_time: std::time::Duration, - pub total_duration: std::time::Duration, - pub total_cpu_time: std::time::Duration, - pub query_plan: String, - pub query_ast: String, - - pub query_phases: Vec, -} - impl QueryResult { pub(crate) fn from_raw_result( error_on_truncate: bool, @@ -90,6 +79,8 @@ impl QueryResult { None => Err(YdbError::NoRows), } } + + } #[derive(Debug)] @@ -229,95 +220,3 @@ impl StreamResult { Ok(Some(result_set)) } } - -impl From for QueryStats{ - - fn from(value: RawQueryStats) -> QueryStats { - - Self { - process_cpu_time: value.process_cpu_time, - total_duration: value.total_duration, - total_cpu_time: value.total_cpu_time, - query_ast: value.query_ast, - query_plan: value.query_plan, - - query_phases: value.query_phases.into_iter().map(Into::into).collect(), - } - } -} - -#[derive(Debug)] -pub struct QueryPhaseStats { - pub duration: std::time::Duration, - pub table_access: Vec, - pub cpu_time: std::time::Duration, - pub affected_shards: u64, - pub literal_phase: bool, -} - -impl From for QueryPhaseStats { - fn from(value: RawQueryPhaseStats) -> Self { - Self { - - duration: value.duration, - table_access: value.table_access.into_iter().map(Into::into).collect(), - cpu_time: value.cpu_time, - affected_shards: value.affected_shards, - literal_phase: value.literal_phase, - } - } -} - -#[derive(Debug)] -pub struct TableAccessStats { - pub name: String, - pub reads: Option, - pub updates: Option, - pub deletes: Option, - pub partitions_count: u64, - pub affected_rows: u64 -} - - - - -impl From for TableAccessStats { - fn from(value: RawTableAccessStats) -> Self { - fn affected_rows(stats: &Option) -> u64 { - stats.as_ref().map(|x|x.rows).unwrap_or(0) - } - - let reads= value.reads.map(Into::into); - let updates= value.updates.map(Into::into); - let deletes= value.deletes.map(Into::into); - - let affected_rows = - affected_rows(&reads) + affected_rows(&updates) + affected_rows(&deletes); - - Self { - name: value.name, - reads, - updates, - deletes, - partitions_count: value.partitions_count, - affected_rows - } - } -} - - - -impl From for OperationStats { - fn from(value: RawOperationStats) -> Self { - Self { - rows: value.rows, - bytes: value.bytes, - } - } -} - -#[derive(Debug)] -pub struct OperationStats { - pub rows: u64, - pub bytes: u64, -} \ No newline at end of file diff --git a/ydb/src/stats.rs b/ydb/src/stats.rs new file mode 100644 index 00000000..33880e81 --- /dev/null +++ b/ydb/src/stats.rs @@ -0,0 +1,195 @@ +use crate::grpc_wrapper::raw_table_service::execute_data_query::{RawOperationStats, RawQueryPhaseStats, RawQueryStats, RawTableAccessStats}; +use std::fmt::Debug; + +pub struct QueryStats { + pub process_cpu_time: std::time::Duration, + pub total_duration: std::time::Duration, + pub total_cpu_time: std::time::Duration, + pub query_plan: String, + pub query_ast: String, + + pub query_phases: Vec, +} + + +/// Specifies which statistics should be collected during request processing +#[derive(Clone)] +pub enum QueryStatsMode { + /// Stats collection is disabled + None, + + /// Aggregated stats of reads, updates and deletes per table + Basic, + + /// Add execution stats and plan on top of STATS_COLLECTION_BASIC + Full, + + /// Detailed execution stats including stats for individual tasks and channels + Profile, +} + + +impl From for QueryStats{ + + fn from(value: RawQueryStats) -> QueryStats { + + let query_phases = value.query_phases.into_iter().map(Into::into).collect(); + + Self { + process_cpu_time: value.process_cpu_time, + total_duration: value.total_duration, + total_cpu_time: value.total_cpu_time, + query_ast: value.query_ast, + query_plan: value.query_plan, + + query_phases, + } + } +} + + +pub struct QueryPhaseStats { + pub duration: std::time::Duration, + pub table_access: Vec, + pub cpu_time: std::time::Duration, + pub affected_shards: u64, + pub literal_phase: bool, +} + +impl From for QueryPhaseStats { + + fn from(value: RawQueryPhaseStats) -> Self { + let table_access: Vec<_> = value.table_access.into_iter().map(Into::into).collect(); + + Self { + + duration: value.duration, + table_access, + cpu_time: value.cpu_time, + affected_shards: value.affected_shards, + literal_phase: value.literal_phase, + } + } +} + + +pub struct TableAccessStats { + pub name: String, + pub reads: Option, + pub updates: Option, + pub deletes: Option, + pub partitions_count: u64 +} + + + + +impl From for TableAccessStats { + fn from(value: RawTableAccessStats) -> Self { + + + let reads= value.reads.map(Into::into); + let updates= value.updates.map(Into::into); + let deletes= value.deletes.map(Into::into); + + + + Self { + name: value.name, + reads, + updates, + deletes, + partitions_count: value.partitions_count, + } + } +} + + + +impl From for OperationStats { + fn from(value: RawOperationStats) -> Self { + Self { + rows: value.rows, + bytes: value.bytes, + } + } +} + + +pub struct OperationStats { + pub rows: u64, + pub bytes: u64, +} + + +impl QueryStats{ + pub fn rows_affected(&self) -> u64 { + self.query_phases.iter().fold(0, |acc, r| acc + r.rows_affected()) + } +} + +impl QueryPhaseStats{ + pub fn rows_affected(&self) -> u64 { + self.table_access.iter().fold(0, |acc, r| acc + r.rows_affected()) + } +} + +impl TableAccessStats{ + pub fn rows_affected(&self) -> u64 { + Self::_rows_affected(&self.reads) + Self::_rows_affected(&self.updates) + Self::_rows_affected(&self.deletes) + } + fn _rows_affected(stats: &Option) -> u64 { + stats.as_ref().map(|x|x.rows).unwrap_or(0) + } +} + +impl Debug for QueryStats{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "\n=== Query Stats: ===")?; + writeln!(f,"process_cpu_time: {:?}",self.process_cpu_time)?; + writeln!(f,"total_duration: {:?}",self.total_duration)?; + writeln!(f,"total_cpu_time: {:?}",self.total_cpu_time)?; + writeln!(f,"AST: {}",self.query_ast)?; + writeln!(f,"Plan: {}",self.query_plan)?; + writeln!(f,"Phases ({}):",self.query_phases.len())?; + + for phase in &self.query_phases { + writeln!(f, "--------------------")?; + writeln!(f,"{:?}",phase)?; + } + + Ok(()) + } +} + +impl Debug for QueryPhaseStats{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f,"duration: {:?}",self.duration)?; + writeln!(f,"cpu_time: {:?}",self.cpu_time)?; + writeln!(f,"affected_shards: {}",self.affected_shards)?; + writeln!(f,"literal_phase: {}",self.literal_phase)?; + + writeln!(f,"Tables ({}):",self.table_access.len())?; + for table in &self.table_access { + writeln!(f,"{:?}",table)?; + } + Ok(()) + } +} + +impl Debug for TableAccessStats{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f,"name: {}",self.name)?; + writeln!(f,"\t[ROWS] reads: {}, updates: {}, deletes: {}", + self.reads.as_ref().map(|x|x.rows).unwrap_or(0), + self.updates.as_ref().map(|x|x.rows).unwrap_or(0), + self.deletes.as_ref().map(|x|x.rows).unwrap_or(0))?; + + writeln!(f,"\t[BYTES] reads: {}, updates: {}, deletes: {}", + self.reads.as_ref().map(|x|x.bytes).unwrap_or(0), + self.updates.as_ref().map(|x|x.bytes).unwrap_or(0), + self.deletes.as_ref().map(|x|x.bytes).unwrap_or(0))?; + + Ok(()) + } +} \ No newline at end of file