Skip to content

Add query statistics support #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 88 additions & 0 deletions ydb/src/client_table_test_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,3 +877,91 @@ FROM
assert!(result_set_count > 1); // ensure get multiply results
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] // need YDB access
async fn test_stats_updates() -> YdbResult<()> {
let client = create_client()
.await?;

client
.table_client()
.create_session()
.await?
.execute_schema_query(
"CREATE TABLE test_values (id Int64, vInt64 Int64, PRIMARY KEY (id))".to_string(),
)
.await?;


let mut tx = client.table_client().create_interactive_transaction();
let res = tx.query(Query::new(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YDB may delay some operations, for example all changes delay up to end of transation if no need result of the update (for example read the table). Try check statistic on commit or try read same table - for force apply upsert

"UPSERT INTO test_values (id, vInt64) VALUES (1, 2),(3,4)",
).with_stats(crate::QueryStatsMode::Basic))
.await?;

tx.commit().await?;

println!("{:?}", res.stats);

assert_eq!(2, res.stats.map(|x|x.rows_affected()).unwrap_or(0));



client
.table_client()
.create_session()
.await?
.execute_schema_query("DROP TABLE test_values".to_string())
.await?;

Ok(())
}

#[tokio::test]
#[traced_test]
#[ignore] // need YDB access
async fn test_stats_reads() -> YdbResult<()> {
let client = create_client()
.await?;

client
.table_client()
.create_session()
.await?
.execute_schema_query(
"CREATE TABLE test_values (id Int64, vInt64 Int64, PRIMARY KEY (id))".to_string(),
)
.await?;

let mut tx = client.table_client().create_interactive_transaction();
let _res = tx.query(Query::new(
"UPSERT INTO test_values (id, vInt64) VALUES (1, 2),(3,4)",
))
.await?
;




let res = tx.query(Query::new("SELECT * FROM test_values")
.with_stats(crate::QueryStatsMode::Basic))
.await?;

tx.commit().await?;

println!("{:?}", res.stats);

assert_eq!(2, res.stats.map(|x|x.rows_affected()).unwrap_or(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason same as above: upsert delay apply until ydb need it. In the case - up to the select. And upsert really apply only on when you send select.

You can select twice: upsert will be applied for first select and you will see only reads for second select.




client
.table_client()
.create_session()
.await?
.execute_schema_query("DROP TABLE test_values".to_string())
.await?;

Ok(())
}
23 changes: 2 additions & 21 deletions ydb/src/grpc_wrapper/raw_table_service/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use crate::grpc_wrapper::raw_table_service::execute_scheme_query::RawExecuteSche
use crate::grpc_wrapper::raw_table_service::keepalive::{RawKeepAliveRequest, RawKeepAliveResult};
use crate::grpc_wrapper::raw_table_service::rollback_transaction::RawRollbackTransactionRequest;
use crate::grpc_wrapper::runtime_interceptors::InterceptedChannel;

use tracing::trace;
use ydb_grpc::ydb_proto::table::v1::table_service_client::TableServiceClient;
use crate::grpc_wrapper::raw_table_service::copy_table::{RawCopyTableRequest, RawCopyTablesRequest};
use crate::grpc_wrapper::raw_table_service::execute_data_query::{RawExecuteDataQueryRequest, RawExecuteDataQueryResult};


pub(crate) struct RawTableClient {
timeouts: TimeoutSettings,
service: TableServiceClient<InterceptedChannel>,
Expand Down Expand Up @@ -167,24 +169,3 @@ impl From<CollectStatsMode> for i32 {
grpc_val as i32
}
}

#[derive(Debug)]
pub(crate) struct RawQueryStats {
process_cpu_time: std::time::Duration,
query_plan: String,
query_ast: String,
total_duration: std::time::Duration,
total_cpu_time: std::time::Duration,
}

impl From<ydb_grpc::ydb_proto::table_stats::QueryStats> for RawQueryStats {
fn from(value: ydb_grpc::ydb_proto::table_stats::QueryStats) -> Self {
Self {
process_cpu_time: std::time::Duration::from_micros(value.process_cpu_time_us),
query_plan: value.query_plan,
query_ast: value.query_ast,
total_duration: std::time::Duration::from_micros(value.total_duration_us),
total_cpu_time: std::time::Duration::from_micros(value.total_cpu_time_us),
}
}
}
4 changes: 3 additions & 1 deletion ydb/src/grpc_wrapper/raw_table_service/commit_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::grpc_wrapper::raw_errors::RawError;
use crate::grpc_wrapper::raw_table_service::client::{CollectStatsMode, RawQueryStats};
use crate::grpc_wrapper::raw_table_service::client::CollectStatsMode;
use crate::grpc_wrapper::raw_ydb_operation::RawOperationParams;

use super::execute_data_query::RawQueryStats;

pub(crate) struct RawCommitTransactionRequest {
pub session_id: String,
pub tx_id: String,
Expand Down
104 changes: 98 additions & 6 deletions ydb/src/grpc_wrapper/raw_table_service/execute_data_query.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;
use crate::grpc_wrapper::raw_errors::RawError;
use crate::grpc_wrapper::raw_table_service::query_stats::RawQueryStatMode;
use crate::grpc_wrapper::raw_table_service::query_stats::RawQueryStatsMode;
use crate::grpc_wrapper::raw_table_service::transaction_control::RawTransactionControl;
use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue};
use crate::grpc_wrapper::raw_table_service::value::r#type::RawType;
use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue};
use crate::grpc_wrapper::raw_ydb_operation::RawOperationParams;
use std::collections::HashMap;

#[derive(serde::Serialize)]
pub(crate) struct RawExecuteDataQueryRequest {
Expand All @@ -14,7 +14,7 @@ pub(crate) struct RawExecuteDataQueryRequest {
pub operation_params: RawOperationParams,
pub params: HashMap<String, RawTypedValue>,
pub keep_in_cache: bool,
pub collect_stats: RawQueryStatMode,
pub collect_stats: RawQueryStatsMode,
}

impl From<RawExecuteDataQueryRequest> for ydb_grpc::ydb_proto::table::ExecuteDataQueryRequest {
Expand Down Expand Up @@ -44,7 +44,7 @@ pub(crate) struct RawExecuteDataQueryResult {
pub result_sets: Vec<RawResultSet>,
pub tx_meta: RawTransactionMeta,
pub query_meta: Option<RawQueryMeta>,
// query_stats: Option<RawQueryStats>, // todo
pub query_stats: Option<RawQueryStats>,
}

impl TryFrom<ydb_grpc::ydb_proto::table::ExecuteQueryResult> for RawExecuteDataQueryResult {
Expand All @@ -59,18 +59,22 @@ impl TryFrom<ydb_grpc::ydb_proto::table::ExecuteQueryResult> for RawExecuteDataQ
.map(|item| item.try_into())
.collect();

let query_meta = if let Some(proto_meta) = value.query_meta{
let query_meta = if let Some(proto_meta) = value.query_meta {
Some(RawQueryMeta::try_from(proto_meta)?)
} else {
None
};

let query_stats = value.query_stats.map(RawQueryStats::from);

Ok(Self {
result_sets: result_sets_res?,
tx_meta: value
.tx_meta
.ok_or_else(|| RawError::custom("no tx_meta at ExecuteQueryResult"))?
.into(),
query_meta,
query_stats,
})
}
}
Expand Down Expand Up @@ -108,3 +112,91 @@ impl TryFrom<ydb_grpc::ydb_proto::table::QueryMeta> for RawQueryMeta {
})
}
}

#[derive(serde::Serialize)]
pub(crate) struct RawQueryStats {
pub query_phases: Vec<RawQueryPhaseStats>,
pub process_cpu_time: std::time::Duration,
pub query_plan: String,
pub query_ast: String,
pub total_duration: std::time::Duration,
pub total_cpu_time: std::time::Duration,
}

impl From<ydb_grpc::ydb_proto::table_stats::QueryStats> for RawQueryStats {
fn from(value: ydb_grpc::ydb_proto::table_stats::QueryStats) -> Self {
Self {
query_phases: value.query_phases.into_iter().map(Into::into).collect(),
process_cpu_time: std::time::Duration::from_micros(value.process_cpu_time_us),
query_plan: value.query_plan,
query_ast: value.query_ast,
total_duration: std::time::Duration::from_micros(value.total_duration_us),
total_cpu_time: std::time::Duration::from_micros(value.total_cpu_time_us),
}
}
}

#[derive(Debug, serde::Serialize)]
pub(crate) struct RawQueryPhaseStats {
pub duration: std::time::Duration,

pub table_access: Vec<RawTableAccessStats>,

pub cpu_time: std::time::Duration,

pub affected_shards: u64,

pub literal_phase: bool,
}

#[derive(Debug, serde::Serialize)]
pub(crate) struct RawTableAccessStats {
pub name: String,
pub reads: Option<RawOperationStats>,
pub updates: Option<RawOperationStats>,
pub deletes: Option<RawOperationStats>,
pub partitions_count: u64,
}

#[derive(Debug, serde::Serialize)]
pub(crate) struct RawOperationStats {
pub rows: u64,
pub bytes: u64,
}

impl From<ydb_grpc::ydb_proto::table_stats::OperationStats> for RawOperationStats {
fn from(value: ydb_grpc::ydb_proto::table_stats::OperationStats) -> Self {
Self {
rows: value.rows,
bytes: value.bytes,
}
}
}

impl From<ydb_grpc::ydb_proto::table_stats::TableAccessStats> for RawTableAccessStats {
fn from(value: ydb_grpc::ydb_proto::table_stats::TableAccessStats) -> Self {
let reads = value.reads.map(Into::into);
let updates = value.updates.map(Into::into);
let deletes = value.deletes.map(Into::into);

Self {
name: value.name,
reads,
updates,
deletes,
partitions_count: value.partitions_count,
}
}
}

impl From<ydb_grpc::ydb_proto::table_stats::QueryPhaseStats> for RawQueryPhaseStats {
fn from(value: ydb_grpc::ydb_proto::table_stats::QueryPhaseStats) -> Self {
Self {
duration: std::time::Duration::from_micros(value.duration_us),
table_access: value.table_access.into_iter().map(Into::into).collect(),
cpu_time: std::time::Duration::from_micros(value.cpu_time_us),
affected_shards: value.affected_shards,
literal_phase: value.literal_phase,
}
}
}
27 changes: 20 additions & 7 deletions ydb/src/grpc_wrapper/raw_table_service/query_stats.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
use crate::QueryStatsMode;

#[derive(serde::Serialize)]
pub(crate) enum RawQueryStatMode{
pub(crate) enum RawQueryStatsMode {
None,
Basic,
Full,
Profile,
}

impl From<RawQueryStatMode> for ydb_grpc::ydb_proto::table::query_stats_collection::Mode {
fn from(v: RawQueryStatMode) -> Self {
impl From<RawQueryStatsMode> for ydb_grpc::ydb_proto::table::query_stats_collection::Mode {
fn from(v: RawQueryStatsMode) -> Self {
use ydb_grpc::ydb_proto::table::query_stats_collection::Mode as grpcMode;
match v {
RawQueryStatMode::None => grpcMode::StatsCollectionNone,
RawQueryStatMode::Basic => grpcMode::StatsCollectionBasic,
RawQueryStatMode::Full => grpcMode::StatsCollectionFull,
RawQueryStatMode::Profile => grpcMode::StatsCollectionProfile,
RawQueryStatsMode::None => grpcMode::StatsCollectionNone,
RawQueryStatsMode::Basic => grpcMode::StatsCollectionBasic,
RawQueryStatsMode::Full => grpcMode::StatsCollectionFull,
RawQueryStatsMode::Profile => grpcMode::StatsCollectionProfile,
}
}
}

impl From<QueryStatsMode> for RawQueryStatsMode {
fn from(value: QueryStatsMode) -> Self {
match value {
QueryStatsMode::Basic => RawQueryStatsMode::Basic,
QueryStatsMode::None => RawQueryStatsMode::None,
QueryStatsMode::Full => RawQueryStatsMode::Full,
QueryStatsMode::Profile => RawQueryStatsMode::Profile,
}
}
}
2 changes: 2 additions & 0 deletions ydb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ pub(crate) mod waiter;

#[cfg(test)]
mod types_test;
mod stats;

pub use stats::*;
pub use client_coordination::client::CoordinationClient;
pub use client_coordination::list_types::{
ConsistencyMode, NodeConfig, NodeConfigBuilder, NodeDescription, RateLimiterCountersMode,
Expand Down
Loading
Loading