12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use std:: collections:: hash_map:: Entry ;
15
+ pub ( crate ) mod local;
16
+
17
+ use std:: ops:: Deref ;
16
18
use std:: pin:: Pin ;
17
19
use std:: sync:: Arc ;
18
20
use std:: task:: Context ;
19
21
use std:: task:: Poll ;
20
22
use std:: time:: Duration ;
21
23
22
- use databend_common_base:: base:: tokio:: sync:: Semaphore as TokioSemaphore ;
23
24
use databend_common_grpc:: RpcClientConf ;
24
25
use databend_common_meta_client:: errors:: CreationError ;
25
26
use databend_common_meta_client:: ClientHandle ;
26
27
use databend_common_meta_client:: MetaGrpcClient ;
27
- use databend_common_meta_embedded:: MemMeta ;
28
28
use databend_common_meta_kvapi:: kvapi;
29
29
use databend_common_meta_kvapi:: kvapi:: KVStream ;
30
30
use databend_common_meta_kvapi:: kvapi:: UpsertKVReply ;
31
31
use databend_common_meta_semaphore:: acquirer:: Permit ;
32
32
use databend_common_meta_semaphore:: errors:: AcquireError ;
33
- use databend_common_meta_semaphore:: errors:: ConnectionClosed ;
34
33
use databend_common_meta_semaphore:: Semaphore ;
35
34
use databend_common_meta_types:: protobuf:: WatchRequest ;
36
35
use databend_common_meta_types:: protobuf:: WatchResponse ;
37
36
use databend_common_meta_types:: MetaError ;
38
37
use databend_common_meta_types:: TxnReply ;
39
38
use databend_common_meta_types:: TxnRequest ;
40
39
use databend_common_meta_types:: UpsertKV ;
40
+ pub use local:: LocalMetaService ;
41
41
use log:: info;
42
42
use tokio_stream:: Stream ;
43
43
@@ -52,7 +52,7 @@ pub struct MetaStoreProvider {
52
52
/// MetaStore is impl with either a local embedded meta store, or a grpc-client of metasrv
53
53
#[ derive( Clone ) ]
54
54
pub enum MetaStore {
55
- L ( Arc < MemMeta > ) ,
55
+ L ( Arc < LocalMetaService > ) ,
56
56
R ( Arc < ClientHandle > ) ,
57
57
}
58
58
@@ -69,23 +69,23 @@ impl MetaStore {
69
69
}
70
70
71
71
pub async fn get_local_addr ( & self ) -> std:: result:: Result < Option < String > , MetaError > {
72
- match self {
73
- MetaStore :: L ( _ ) => Ok ( None ) ,
74
- MetaStore :: R ( grpc_client) => {
75
- let client_info = grpc_client . get_client_info ( ) . await ? ;
76
- Ok ( Some ( client_info . client_addr ) )
77
- }
78
- }
72
+ let client = match self {
73
+ MetaStore :: L ( l ) => l . deref ( ) . deref ( ) ,
74
+ MetaStore :: R ( grpc_client) => grpc_client ,
75
+ } ;
76
+
77
+ let client_info = client . get_client_info ( ) . await ? ;
78
+ Ok ( Some ( client_info . client_addr ) )
79
79
}
80
80
81
81
pub async fn watch ( & self , request : WatchRequest ) -> Result < WatchStream , MetaError > {
82
- match self {
83
- MetaStore :: L ( _ ) => unreachable ! ( ) ,
84
- MetaStore :: R ( grpc_client) => {
85
- let streaming = grpc_client . request ( request ) . await ? ;
86
- Ok ( Box :: pin ( WatchResponseStream :: create ( streaming ) ) )
87
- }
88
- }
82
+ let client = match self {
83
+ MetaStore :: L ( l ) => l . deref ( ) ,
84
+ MetaStore :: R ( grpc_client) => grpc_client ,
85
+ } ;
86
+
87
+ let streaming = client . request ( request ) . await ? ;
88
+ Ok ( Box :: pin ( WatchResponseStream :: create ( streaming ) ) )
89
89
}
90
90
91
91
pub async fn new_acquired (
@@ -95,33 +95,12 @@ impl MetaStore {
95
95
id : impl ToString ,
96
96
lease : Duration ,
97
97
) -> Result < Permit , AcquireError > {
98
- match self {
99
- MetaStore :: L ( v) => {
100
- let mut local_lock_map = v. locks . lock ( ) . await ;
101
-
102
- let acquire_res = match local_lock_map. entry ( prefix. to_string ( ) ) {
103
- Entry :: Occupied ( v) => v. get ( ) . clone ( ) ,
104
- Entry :: Vacant ( v) => v
105
- . insert ( Arc :: new ( TokioSemaphore :: new ( capacity as usize ) ) )
106
- . clone ( ) ,
107
- } ;
108
-
109
- match acquire_res. acquire_owned ( ) . await {
110
- Ok ( guard) => Ok ( Permit {
111
- fu : Box :: pin ( async move {
112
- let _guard = guard;
113
- Ok ( ( ) )
114
- } ) ,
115
- } ) ,
116
- Err ( _e) => Err ( AcquireError :: ConnectionClosed ( ConnectionClosed :: new_str (
117
- "" ,
118
- ) ) ) ,
119
- }
120
- }
121
- MetaStore :: R ( grpc_client) => {
122
- Semaphore :: new_acquired ( grpc_client. clone ( ) , prefix, capacity, id, lease) . await
123
- }
124
- }
98
+ let client = match self {
99
+ MetaStore :: L ( l) => l. deref ( ) ,
100
+ MetaStore :: R ( grpc_client) => grpc_client,
101
+ } ;
102
+
103
+ Semaphore :: new_acquired ( client. clone ( ) , prefix, capacity, id, lease) . await
125
104
}
126
105
}
127
106
@@ -171,8 +150,11 @@ impl MetaStoreProvider {
171
150
) ;
172
151
173
152
// NOTE: This can only be used for test: data will be removed when program quit.
174
- let meta_store = MemMeta :: default ( ) ;
175
- Ok ( MetaStore :: L ( Arc :: new ( meta_store) ) )
153
+ Ok ( MetaStore :: L ( Arc :: new (
154
+ LocalMetaService :: new ( "MetaStoreProvider-created" )
155
+ . await
156
+ . unwrap ( ) ,
157
+ ) ) )
176
158
} else {
177
159
info ! ( conf : ? =( & self . rpc_conf) ; "use remote meta" ) ;
178
160
let client = MetaGrpcClient :: try_new ( & self . rpc_conf ) ?;
0 commit comments