6
6
// accordance with one or both of these licenses.
7
7
8
8
use crate :: io:: utils:: check_namespace_key_validity;
9
+ use crate :: runtime:: Runtime ;
10
+
9
11
use bitcoin:: hashes:: { sha256, Hash , HashEngine , Hmac , HmacEngine } ;
10
12
use lightning:: io:: { self , Error , ErrorKind } ;
11
13
use lightning:: util:: persist:: KVStore ;
@@ -15,7 +17,6 @@ use rand::RngCore;
15
17
use std:: panic:: RefUnwindSafe ;
16
18
use std:: sync:: Arc ;
17
19
use std:: time:: Duration ;
18
- use tokio:: runtime:: Runtime ;
19
20
use vss_client:: client:: VssClient ;
20
21
use vss_client:: error:: VssError ;
21
22
use vss_client:: headers:: VssHeaderProvider ;
@@ -41,17 +42,16 @@ type CustomRetryPolicy = FilteredRetryPolicy<
41
42
pub struct VssStore {
42
43
client : VssClient < CustomRetryPolicy > ,
43
44
store_id : String ,
44
- runtime : Runtime ,
45
+ runtime : Arc < Runtime > ,
45
46
storable_builder : StorableBuilder < RandEntropySource > ,
46
47
key_obfuscator : KeyObfuscator ,
47
48
}
48
49
49
50
impl VssStore {
50
51
pub ( crate ) fn new (
51
52
base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
52
- header_provider : Arc < dyn VssHeaderProvider > ,
53
- ) -> io:: Result < Self > {
54
- let runtime = tokio:: runtime:: Builder :: new_multi_thread ( ) . enable_all ( ) . build ( ) ?;
53
+ header_provider : Arc < dyn VssHeaderProvider > , runtime : Arc < Runtime > ,
54
+ ) -> Self {
55
55
let ( data_encryption_key, obfuscation_master_key) =
56
56
derive_data_encryption_and_obfuscation_keys ( & vss_seed) ;
57
57
let key_obfuscator = KeyObfuscator :: new ( obfuscation_master_key) ;
@@ -70,7 +70,7 @@ impl VssStore {
70
70
} ) as _ ) ;
71
71
72
72
let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
73
- Ok ( Self { client, store_id, runtime, storable_builder, key_obfuscator } )
73
+ Self { client, store_id, runtime, storable_builder, key_obfuscator }
74
74
}
75
75
76
76
fn build_key (
@@ -136,19 +136,16 @@ impl KVStore for VssStore {
136
136
store_id : self . store_id . clone ( ) ,
137
137
key : self . build_key ( primary_namespace, secondary_namespace, key) ?,
138
138
} ;
139
-
140
- let resp =
141
- tokio:: task:: block_in_place ( || self . runtime . block_on ( self . client . get_object ( & request) ) )
142
- . map_err ( |e| {
143
- let msg = format ! (
144
- "Failed to read from key {}/{}/{}: {}" ,
145
- primary_namespace, secondary_namespace, key, e
146
- ) ;
147
- match e {
148
- VssError :: NoSuchKeyError ( ..) => Error :: new ( ErrorKind :: NotFound , msg) ,
149
- _ => Error :: new ( ErrorKind :: Other , msg) ,
150
- }
151
- } ) ?;
139
+ let resp = self . runtime . block_on ( self . client . get_object ( & request) ) . map_err ( |e| {
140
+ let msg = format ! (
141
+ "Failed to read from key {}/{}/{}: {}" ,
142
+ primary_namespace, secondary_namespace, key, e
143
+ ) ;
144
+ match e {
145
+ VssError :: NoSuchKeyError ( ..) => Error :: new ( ErrorKind :: NotFound , msg) ,
146
+ _ => Error :: new ( ErrorKind :: Other , msg) ,
147
+ }
148
+ } ) ?;
152
149
// unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise
153
150
// it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`]
154
151
let storable = Storable :: decode ( & resp. value . unwrap ( ) . value [ ..] ) . map_err ( |e| {
@@ -179,14 +176,13 @@ impl KVStore for VssStore {
179
176
delete_items : vec ! [ ] ,
180
177
} ;
181
178
182
- tokio:: task:: block_in_place ( || self . runtime . block_on ( self . client . put_object ( & request) ) )
183
- . map_err ( |e| {
184
- let msg = format ! (
185
- "Failed to write to key {}/{}/{}: {}" ,
186
- primary_namespace, secondary_namespace, key, e
187
- ) ;
188
- Error :: new ( ErrorKind :: Other , msg)
189
- } ) ?;
179
+ self . runtime . block_on ( self . client . put_object ( & request) ) . map_err ( |e| {
180
+ let msg = format ! (
181
+ "Failed to write to key {}/{}/{}: {}" ,
182
+ primary_namespace, secondary_namespace, key, e
183
+ ) ;
184
+ Error :: new ( ErrorKind :: Other , msg)
185
+ } ) ?;
190
186
191
187
Ok ( ( ) )
192
188
}
@@ -204,30 +200,29 @@ impl KVStore for VssStore {
204
200
} ) ,
205
201
} ;
206
202
207
- tokio:: task:: block_in_place ( || self . runtime . block_on ( self . client . delete_object ( & request) ) )
208
- . map_err ( |e| {
209
- let msg = format ! (
210
- "Failed to delete key {}/{}/{}: {}" ,
211
- primary_namespace, secondary_namespace, key, e
212
- ) ;
213
- Error :: new ( ErrorKind :: Other , msg)
214
- } ) ?;
203
+ self . runtime . block_on ( self . client . delete_object ( & request) ) . map_err ( |e| {
204
+ let msg = format ! (
205
+ "Failed to delete key {}/{}/{}: {}" ,
206
+ primary_namespace, secondary_namespace, key, e
207
+ ) ;
208
+ Error :: new ( ErrorKind :: Other , msg)
209
+ } ) ?;
215
210
Ok ( ( ) )
216
211
}
217
212
218
213
fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
219
214
check_namespace_key_validity ( primary_namespace, secondary_namespace, None , "list" ) ?;
220
215
221
- let keys = tokio :: task :: block_in_place ( || {
222
- self . runtime . block_on ( self . list_all_keys ( primary_namespace , secondary_namespace ) )
223
- } )
224
- . map_err ( |e| {
225
- let msg = format ! (
226
- "Failed to retrieve keys in namespace: {}/{} : {}" ,
227
- primary_namespace, secondary_namespace, e
228
- ) ;
229
- Error :: new ( ErrorKind :: Other , msg)
230
- } ) ?;
216
+ let keys = self
217
+ . runtime
218
+ . block_on ( self . list_all_keys ( primary_namespace , secondary_namespace ) )
219
+ . map_err ( |e| {
220
+ let msg = format ! (
221
+ "Failed to retrieve keys in namespace: {}/{} : {}" ,
222
+ primary_namespace, secondary_namespace, e
223
+ ) ;
224
+ Error :: new ( ErrorKind :: Other , msg)
225
+ } ) ?;
231
226
232
227
Ok ( keys)
233
228
}
@@ -266,19 +261,37 @@ mod tests {
266
261
use rand:: distributions:: Alphanumeric ;
267
262
use rand:: { thread_rng, Rng , RngCore } ;
268
263
use std:: collections:: HashMap ;
264
+ use tokio:: runtime;
269
265
use vss_client:: headers:: FixedHeaders ;
270
266
271
267
#[ test]
272
- fn read_write_remove_list_persist ( ) {
268
+ fn vss_read_write_remove_list_persist ( ) {
269
+ let runtime = Arc :: new ( Runtime :: new ( ) . unwrap ( ) ) ;
270
+ let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
271
+ let mut rng = thread_rng ( ) ;
272
+ let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
273
+ let mut vss_seed = [ 0u8 ; 32 ] ;
274
+ rng. fill_bytes ( & mut vss_seed) ;
275
+ let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
276
+ let vss_store =
277
+ VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) . unwrap ( ) ;
278
+
279
+ do_read_write_remove_list_persist ( & vss_store) ;
280
+ }
281
+
282
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
283
+ async fn vss_read_write_remove_list_persist_in_runtime_context ( ) {
284
+ let runtime = Arc :: new ( Runtime :: new ( ) . unwrap ( ) ) ;
273
285
let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
274
286
let mut rng = thread_rng ( ) ;
275
287
let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
276
288
let mut vss_seed = [ 0u8 ; 32 ] ;
277
289
rng. fill_bytes ( & mut vss_seed) ;
278
290
let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
279
291
let vss_store =
280
- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) . unwrap ( ) ;
292
+ VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime ) . unwrap ( ) ;
281
293
282
294
do_read_write_remove_list_persist ( & vss_store) ;
295
+ drop ( vss_store)
283
296
}
284
297
}
0 commit comments