Skip to content

Commit 265f02c

Browse files
authored
Merge pull request #225 from abbit/more-topic-methods
Thanks for the pr
2 parents 368a586 + 7c8fd43 commit 265f02c

22 files changed

+1163
-101
lines changed

ydb/src/client_topic/client.rs

Lines changed: 89 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,97 @@
1+
use super::list_types::{Codec, TopicDescription};
12
use crate::client::TimeoutSettings;
2-
use crate::client_topic::list_types::{Consumer, MeteringMode, SupportedCodecs};
3+
use crate::client_topic::list_types::{AlterConsumer, Consumer, MeteringMode};
34
use crate::client_topic::topicwriter::writer::TopicWriter;
45
use crate::client_topic::topicwriter::writer_options::{
56
TopicWriterOptions, TopicWriterOptionsBuilder,
67
};
78
use crate::errors;
89
use crate::grpc_connection_manager::GrpcConnectionManager;
10+
use crate::grpc_wrapper::raw_topic_service::alter_topic::RawAlterTopicRequest;
911
use crate::grpc_wrapper::raw_topic_service::create_topic::RawCreateTopicRequest;
10-
use crate::grpc_wrapper::raw_topic_service::delete_topic::RawDropTopicRequest;
12+
use crate::grpc_wrapper::raw_topic_service::describe_topic::RawDescribeTopicRequest;
13+
use crate::grpc_wrapper::raw_topic_service::drop_topic::RawDropTopicRequest;
1114
use crate::YdbError::InternalError;
1215
use crate::{grpc_wrapper, YdbResult};
1316
use derive_builder::{Builder, UninitializedFieldError};
1417
use std::collections::HashMap;
18+
use std::time::Duration;
1519

1620
#[derive(Builder)]
1721
#[builder(build_fn(error = "errors::YdbError"))]
18-
pub struct TopicOptions {
19-
// Use TopicOptionsBuilder
20-
#[builder(setter(strip_option), default)]
21-
pub metering_mode: Option<MeteringMode>,
22+
pub struct CreateTopicOptions {
23+
// Use CreateTopicOptionsBuilder
2224
#[builder(default)]
2325
pub min_active_partitions: i64,
2426
#[builder(default)]
2527
pub partition_count_limit: i64,
2628
#[builder(setter(strip_option), default)]
27-
pub retention_period: Option<core::time::Duration>,
29+
pub retention_period: Option<Duration>,
2830
#[builder(default)]
2931
pub retention_storage_mb: i64,
3032
#[builder(default)]
31-
pub supported_codecs: SupportedCodecs,
33+
pub supported_codecs: Vec<Codec>,
3234
#[builder(default)]
3335
pub partition_write_speed_bytes_per_second: i64,
3436
#[builder(default)]
3537
pub partition_write_burst_bytes: i64,
3638
#[builder(default)]
39+
pub consumers: Vec<Consumer>,
40+
#[builder(default)]
3741
pub attributes: HashMap<String, String>,
42+
#[builder(setter(strip_option), default)]
43+
pub metering_mode: Option<MeteringMode>,
44+
}
45+
46+
#[derive(Builder)]
47+
#[builder(build_fn(error = "errors::YdbError"))]
48+
pub struct AlterTopicOptions {
49+
// Use AlterTopicOptionsBuilder
50+
#[builder(setter(strip_option), default)]
51+
pub set_min_active_partitions: Option<i64>,
52+
53+
#[builder(setter(strip_option), default)]
54+
pub set_partition_count_limit: Option<i64>,
55+
56+
#[builder(setter(strip_option), default)]
57+
pub set_retention_period: Option<Duration>,
58+
59+
#[builder(setter(strip_option), default)]
60+
pub set_retention_storage_mb: Option<i64>,
61+
62+
#[builder(setter(strip_option), default)]
63+
pub set_supported_codecs: Option<Vec<Codec>>,
64+
65+
#[builder(setter(strip_option), default)]
66+
pub set_partition_write_speed_bytes_per_second: Option<i64>,
67+
68+
#[builder(setter(strip_option), default)]
69+
pub set_partition_write_burst_bytes: Option<i64>,
70+
3871
#[builder(default)]
39-
pub consumers: Vec<Consumer>,
72+
pub alter_attributes: HashMap<String, String>,
73+
74+
#[builder(default)]
75+
pub add_consumers: Vec<Consumer>,
76+
77+
#[builder(default)]
78+
pub drop_consumers: Vec<String>,
79+
80+
#[builder(default)]
81+
pub alter_consumers: Vec<AlterConsumer>,
82+
83+
#[builder(setter(strip_option), default)]
84+
pub set_metering_mode: Option<MeteringMode>,
85+
}
86+
87+
#[derive(Builder)]
88+
#[builder(build_fn(error = "errors::YdbError"))]
89+
pub struct DescribeTopicOptions {
90+
// Use DescribeTopicOptionsBuilder
91+
#[builder(default)]
92+
pub include_stats: bool,
93+
#[builder(default)]
94+
pub include_location: bool,
4095
}
4196

4297
impl From<UninitializedFieldError> for errors::YdbError {
@@ -64,16 +119,39 @@ impl TopicClient {
64119
pub async fn create_topic(
65120
&mut self,
66121
path: String,
67-
topic_options: TopicOptions,
122+
options: CreateTopicOptions,
68123
) -> YdbResult<()> {
69-
let req = RawCreateTopicRequest::new(path, self.timeouts.operation_params(), topic_options);
124+
let req = RawCreateTopicRequest::new(path, self.timeouts.operation_params(), options);
70125

71126
let mut service = self.raw_client_connection().await?;
72127
service.create_topic(req).await?;
73128

74129
Ok(())
75130
}
76131

132+
pub async fn alter_topic(&mut self, path: String, options: AlterTopicOptions) -> YdbResult<()> {
133+
let req = RawAlterTopicRequest::new(path, self.timeouts.operation_params(), options);
134+
135+
let mut service = self.raw_client_connection().await?;
136+
service.alter_topic(req).await?;
137+
138+
Ok(())
139+
}
140+
141+
pub async fn describe_topic(
142+
&mut self,
143+
path: String,
144+
options: DescribeTopicOptions,
145+
) -> YdbResult<TopicDescription> {
146+
let req = RawDescribeTopicRequest::new(path, self.timeouts.operation_params(), options);
147+
148+
let mut service = self.raw_client_connection().await?;
149+
let result = service.describe_topic(req).await?;
150+
let description = TopicDescription::from(result);
151+
152+
Ok(description)
153+
}
154+
77155
pub async fn drop_topic(&mut self, path: String) -> YdbResult<()> {
78156
let req = RawDropTopicRequest {
79157
operation_params: self.timeouts.operation_params(),

0 commit comments

Comments
 (0)