@@ -7,6 +7,7 @@ use std::{
7
7
} ;
8
8
9
9
use async_stream:: stream;
10
+ use fusio_dispatch:: FsOptions ;
10
11
use futures_core:: Stream ;
11
12
use futures_util:: StreamExt ;
12
13
use parquet:: data_type:: AsBytes ;
@@ -184,6 +185,97 @@ pub trait BenchReader {
184
185
) -> impl Stream < Item = ProjectionResult > + ' a ;
185
186
}
186
187
188
+ pub struct TonboS3BenchDataBase {
189
+ db : tonbo:: DB < Customer , TokioExecutor > ,
190
+ }
191
+
192
+ impl TonboS3BenchDataBase {
193
+ #[ allow( dead_code) ]
194
+ pub fn new ( db : tonbo:: DB < Customer , TokioExecutor > ) -> Self {
195
+ TonboS3BenchDataBase { db }
196
+ }
197
+ }
198
+
199
+ impl BenchDatabase for TonboS3BenchDataBase {
200
+ type W < ' db >
201
+ = TonboBenchWriteTransaction < ' db >
202
+ where
203
+ Self : ' db ;
204
+ type R < ' db >
205
+ = TonboBenchReadTransaction < ' db >
206
+ where
207
+ Self : ' db ;
208
+
209
+ fn db_type_name ( ) -> & ' static str {
210
+ "tonbo on s3"
211
+ }
212
+
213
+ async fn write_transaction ( & self ) -> Self :: W < ' _ > {
214
+ TonboBenchWriteTransaction {
215
+ txn : self . db . transaction ( ) . await ,
216
+ }
217
+ }
218
+
219
+ async fn read_transaction ( & self ) -> Self :: R < ' _ > {
220
+ TonboBenchReadTransaction {
221
+ txn : self . db . transaction ( ) . await ,
222
+ }
223
+ }
224
+
225
+ async fn build ( path : impl AsRef < Path > ) -> Self {
226
+ create_dir_all ( path. as_ref ( ) ) . await . unwrap ( ) ;
227
+
228
+ let fs_options = FsOptions :: S3 {
229
+ bucket : "data" . to_string ( ) ,
230
+ credential : Some ( fusio:: remotes:: aws:: credential:: AwsCredential {
231
+ key_id : "user" . to_string ( ) ,
232
+ secret_key : "password" . to_string ( ) ,
233
+ token : None ,
234
+ } ) ,
235
+ endpoint : Some ( "http://localhost:9000" . to_string ( ) ) ,
236
+ sign_payload : None ,
237
+ checksum : None ,
238
+ region : None ,
239
+ } ;
240
+
241
+ let path = fusio:: path:: Path :: from_filesystem_path ( path. as_ref ( ) ) . unwrap ( ) ;
242
+ let option = DbOption :: from ( path. clone ( ) )
243
+ . level_path (
244
+ 0 ,
245
+ fusio:: path:: Path :: from_url_path ( "/l0" ) . unwrap ( ) ,
246
+ fs_options. clone ( ) ,
247
+ )
248
+ . unwrap ( )
249
+ . level_path (
250
+ 1 ,
251
+ fusio:: path:: Path :: from_url_path ( "/l1" ) . unwrap ( ) ,
252
+ fs_options. clone ( ) ,
253
+ )
254
+ . unwrap ( )
255
+ . level_path (
256
+ 2 ,
257
+ fusio:: path:: Path :: from_url_path ( "/l2" ) . unwrap ( ) ,
258
+ fs_options. clone ( ) ,
259
+ )
260
+ . unwrap ( )
261
+ . level_path (
262
+ 3 ,
263
+ fusio:: path:: Path :: from_url_path ( "/l3" ) . unwrap ( ) ,
264
+ fs_options. clone ( ) ,
265
+ )
266
+ . unwrap ( )
267
+ . level_path (
268
+ 4 ,
269
+ fusio:: path:: Path :: from_url_path ( "/l4" ) . unwrap ( ) ,
270
+ fs_options. clone ( ) ,
271
+ )
272
+ . unwrap ( )
273
+ . disable_wal ( ) ;
274
+
275
+ TonboS3BenchDataBase :: new ( tonbo:: DB :: new ( option, TokioExecutor :: new ( ) ) . await . unwrap ( ) )
276
+ }
277
+ }
278
+
187
279
pub struct TonboBenchDataBase {
188
280
db : tonbo:: DB < Customer , TokioExecutor > ,
189
281
}
0 commit comments