Skip to content

Commit ded8e9c

Browse files
committed
Implement initial crud component
1 parent 76f8311 commit ded8e9c

File tree

7 files changed

+344
-327
lines changed

7 files changed

+344
-327
lines changed
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
#![feature(async_closure)]
2+
3+
use std::future::Future;
4+
use std::sync::Arc;
5+
6+
use futures::{FutureExt, TryFutureExt};
7+
8+
use crate::crudoptions::{GetOptions, UpsertOptions};
9+
use crate::crudresults::{GetResult, UpsertResult};
10+
use crate::error::Result;
11+
use crate::kvclient_ops::KvClientOps;
12+
use crate::kvclientmanager::{KvClientManager, KvClientManagerClientType, orchestrate_memd_client};
13+
use crate::memdx::request::{GetRequest, SetRequest};
14+
use crate::memdx::response::TryFromClientResponse;
15+
use crate::mutationtoken::MutationToken;
16+
use crate::vbucketrouter::{NotMyVbucketConfigHandler, orchestrate_memd_routing, VbucketRouter};
17+
18+
pub(crate) struct CrudComponent<
19+
M: KvClientManager,
20+
V: VbucketRouter,
21+
Nmvb: NotMyVbucketConfigHandler,
22+
> {
23+
conn_manager: Arc<M>,
24+
router: Arc<V>,
25+
nmvb_handler: Arc<Nmvb>,
26+
}
27+
28+
pub(crate) async fn orchestrate_simple_crud<M, V, Fut, Resp>(
29+
nmvb_handler: Arc<impl NotMyVbucketConfigHandler>,
30+
vb: Arc<V>,
31+
mgr: Arc<M>,
32+
key: &[u8],
33+
operation: impl Fn(String, u16, Arc<KvClientManagerClientType<M>>) -> Fut + Send + Sync,
34+
) -> Result<Resp>
35+
where
36+
M: KvClientManager,
37+
V: VbucketRouter,
38+
Fut: Future<Output = Result<Resp>> + Send,
39+
Resp: Send,
40+
{
41+
orchestrate_memd_routing(
42+
vb.clone(),
43+
nmvb_handler,
44+
key,
45+
0,
46+
async |endpoint: String, vb_id: u16| {
47+
orchestrate_memd_client(
48+
mgr.clone(),
49+
endpoint.clone(),
50+
async |client: Arc<KvClientManagerClientType<M>>| {
51+
operation(endpoint.clone(), vb_id, client).await
52+
},
53+
)
54+
.await
55+
},
56+
)
57+
.await
58+
}
59+
60+
// TODO: So much clone.
61+
impl<M: KvClientManager, V: VbucketRouter, Nmvb: NotMyVbucketConfigHandler>
62+
CrudComponent<M, V, Nmvb>
63+
{
64+
pub(crate) fn new(nmvb_handler: Arc<Nmvb>, router: Arc<V>, conn_manager: Arc<M>) -> Self {
65+
CrudComponent {
66+
conn_manager,
67+
router,
68+
nmvb_handler,
69+
}
70+
}
71+
72+
pub(crate) async fn upsert(&self, opts: UpsertOptions) -> Result<UpsertResult> {
73+
orchestrate_simple_crud(
74+
self.nmvb_handler.clone(),
75+
self.router.clone(),
76+
self.conn_manager.clone(),
77+
opts.key.clone().as_slice(),
78+
async |endpoint, vb_id, client| {
79+
client
80+
.set(SetRequest {
81+
collection_id: 0,
82+
key: opts.key.clone(),
83+
vbucket_id: vb_id,
84+
flags: opts.flags,
85+
value: opts.value.clone(),
86+
datatype: 0,
87+
expiry: opts.expiry,
88+
preserve_expiry: opts.preserve_expiry,
89+
cas: opts.cas,
90+
on_behalf_of: None,
91+
durability_level: opts.durability_level,
92+
durability_level_timeout: None,
93+
})
94+
.map_ok(|resp| {
95+
let mutation_token = resp.mutation_token.map(|t| MutationToken {
96+
vbid: vb_id,
97+
vbuuid: t.vbuuid,
98+
seqno: t.seqno,
99+
});
100+
101+
UpsertResult {
102+
cas: resp.cas,
103+
mutation_token,
104+
}
105+
})
106+
.await
107+
},
108+
)
109+
.await
110+
}
111+
112+
pub(crate) async fn get(&self, opts: GetOptions) -> Result<GetResult> {
113+
orchestrate_simple_crud(
114+
self.nmvb_handler.clone(),
115+
self.router.clone(),
116+
self.conn_manager.clone(),
117+
opts.key.clone().as_slice(),
118+
async |endpoint, vb_id, client| {
119+
client
120+
.get(GetRequest {
121+
collection_id: 0,
122+
key: opts.key.clone(),
123+
vbucket_id: vb_id,
124+
on_behalf_of: None,
125+
})
126+
.map_ok(|resp| resp.into())
127+
.await
128+
},
129+
)
130+
.await
131+
}
132+
}
133+
134+
#[cfg(test)]
135+
mod tests {
136+
use std::collections::HashMap;
137+
use std::ops::Add;
138+
use std::sync::Arc;
139+
use std::time::Duration;
140+
141+
use tokio::sync::mpsc::unbounded_channel;
142+
use tokio::time::Instant;
143+
144+
use crate::authenticator::PasswordAuthenticator;
145+
use crate::cbconfig::TerseConfig;
146+
use crate::crudcomponent::CrudComponent;
147+
use crate::crudoptions::{GetOptions, UpsertOptions};
148+
use crate::kvclient::{KvClientConfig, StdKvClient};
149+
use crate::kvclientmanager::{
150+
KvClientManager, KvClientManagerConfig, KvClientManagerOptions, StdKvClientManager,
151+
};
152+
use crate::kvclientpool::NaiveKvClientPool;
153+
use crate::memdx::client::Client;
154+
use crate::memdx::packet::ResponsePacket;
155+
use crate::vbucketmap::VbucketMap;
156+
use crate::vbucketrouter::{
157+
NotMyVbucketConfigHandler, StdVbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
158+
};
159+
160+
struct NVMBHandler {}
161+
162+
impl NotMyVbucketConfigHandler for NVMBHandler {
163+
fn not_my_vbucket_config(&self, config: TerseConfig, source_hostname: &str) {}
164+
}
165+
166+
struct Resp {}
167+
168+
#[tokio::test]
169+
async fn can_orchestrate_memd_routing() {
170+
let _ = env_logger::try_init();
171+
172+
let instant = Instant::now().add(Duration::new(7, 0));
173+
174+
let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();
175+
176+
tokio::spawn(async move {
177+
loop {
178+
match orphan_rx.recv().await {
179+
Some(resp) => {
180+
dbg!("unexpected orphan", resp);
181+
}
182+
None => {
183+
return;
184+
}
185+
}
186+
}
187+
});
188+
189+
let client_config = KvClientConfig {
190+
address: "192.168.107.128:11210"
191+
.parse()
192+
.expect("Failed to parse address"),
193+
root_certs: None,
194+
accept_all_certs: None,
195+
client_name: "myclient".to_string(),
196+
authenticator: Some(Arc::new(PasswordAuthenticator {
197+
username: "Administrator".to_string(),
198+
password: "password".to_string(),
199+
})),
200+
selected_bucket: Some("default".to_string()),
201+
disable_default_features: false,
202+
disable_error_map: false,
203+
disable_bootstrap: false,
204+
};
205+
206+
let mut client_configs = HashMap::new();
207+
client_configs.insert("192.168.107.128:11210".to_string(), client_config);
208+
209+
let manger_config = KvClientManagerConfig {
210+
num_pool_connections: 1,
211+
clients: client_configs,
212+
};
213+
214+
let manager: StdKvClientManager<NaiveKvClientPool<StdKvClient<Client>>> =
215+
StdKvClientManager::new(
216+
manger_config,
217+
KvClientManagerOptions {
218+
connect_timeout: Default::default(),
219+
connect_throttle_period: Default::default(),
220+
orphan_handler: Arc::new(orphan_tx),
221+
},
222+
)
223+
.await
224+
.unwrap();
225+
226+
let routing_info = VbucketRoutingInfo {
227+
vbucket_info: VbucketMap::new(
228+
vec![vec![0, 1], vec![1, 0], vec![0, 1], vec![0, 1], vec![1, 0]],
229+
1,
230+
)
231+
.unwrap(),
232+
server_list: vec!["192.168.107.128:11210".to_string()],
233+
};
234+
235+
let dispatcher = StdVbucketRouter::new(routing_info, VbucketRouterOptions {});
236+
237+
let dispatcher = Arc::new(dispatcher);
238+
let manager = Arc::new(manager);
239+
let nmvb_handler = Arc::new(NVMBHandler {});
240+
241+
let crud_comp = CrudComponent::new(nmvb_handler, dispatcher, manager);
242+
let set_result = crud_comp
243+
.upsert(UpsertOptions {
244+
key: "test".into(),
245+
scope_name: None,
246+
collection_name: None,
247+
value: "value".into(),
248+
flags: 0,
249+
expiry: None,
250+
preserve_expiry: None,
251+
cas: None,
252+
durability_level: None,
253+
})
254+
.await
255+
.unwrap();
256+
257+
dbg!(set_result);
258+
259+
let get_result = crud_comp
260+
.get(GetOptions {
261+
key: "test".into(),
262+
scope_name: None,
263+
collection_name: None,
264+
})
265+
.await
266+
.unwrap();
267+
268+
dbg!(get_result);
269+
}
270+
}

sdk/couchbase-core/src/crudoptions.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::memdx::durability_level::DurabilityLevel;
2+
3+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
4+
pub struct GetOptions {
5+
pub key: Vec<u8>,
6+
pub scope_name: Option<String>,
7+
pub collection_name: Option<String>,
8+
}
9+
10+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
11+
pub struct UpsertOptions {
12+
pub key: Vec<u8>,
13+
pub scope_name: Option<String>,
14+
pub collection_name: Option<String>,
15+
pub value: Vec<u8>,
16+
pub flags: u32,
17+
// pub datatype:
18+
pub expiry: Option<u32>,
19+
pub preserve_expiry: Option<bool>,
20+
pub cas: Option<u64>,
21+
pub durability_level: Option<DurabilityLevel>,
22+
}

sdk/couchbase-core/src/crudresults.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use crate::memdx::response::GetResponse;
2+
use crate::mutationtoken::MutationToken;
3+
4+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
5+
pub struct GetResult {
6+
pub value: Vec<u8>,
7+
pub flags: u32,
8+
// datatype: Dataty
9+
pub cas: u64,
10+
}
11+
12+
impl From<GetResponse> for GetResult {
13+
fn from(resp: GetResponse) -> Self {
14+
Self {
15+
value: resp.value,
16+
flags: resp.flags,
17+
cas: resp.cas,
18+
}
19+
}
20+
}
21+
22+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
23+
pub struct UpsertResult {
24+
pub cas: u64,
25+
pub mutation_token: Option<MutationToken>,
26+
}

0 commit comments

Comments
 (0)