@@ -9,7 +9,7 @@ use rand::RngCore;
9
9
use rand:: { thread_rng, Rng } ;
10
10
use rayon:: prelude:: * ;
11
11
use std:: convert:: TryInto ;
12
- use std:: num:: { NonZeroU32 , NonZeroU64 } ;
12
+ use std:: num:: NonZeroU32 ;
13
13
use std:: path:: PathBuf ;
14
14
use tokio:: fs;
15
15
use tokio:: io:: { AsyncWriteExt , BufWriter } ;
@@ -57,8 +57,8 @@ const BLOCK_BYTE_SIZES: [usize; 6] = [
57
57
32_000_000 ,
58
58
] ;
59
59
60
- fn total_newlines ( input : & [ u8 ] ) -> NonZeroU64 {
61
- NonZeroU64 :: new ( bytecount:: count ( input, b'\n' ) as u64 ) . unwrap ( )
60
+ fn total_newlines ( input : & [ u8 ] ) -> u64 {
61
+ bytecount:: count ( input, b'\n' ) as u64
62
62
}
63
63
64
64
fn chunk_bytes < R > ( rng : & mut R , input : usize , bytes_per_second : usize ) -> Vec < usize >
@@ -79,11 +79,18 @@ where
79
79
chunks
80
80
}
81
81
82
+ #[ derive( Debug ) ]
83
+ struct Block {
84
+ total_bytes : NonZeroU32 ,
85
+ lines : u64 ,
86
+ bytes : Vec < u8 > ,
87
+ }
88
+
82
89
fn construct_block (
83
90
block_bytes : usize ,
84
91
variant : Variant ,
85
92
static_path : Option < & PathBuf > ,
86
- ) -> Result < ( NonZeroU32 , NonZeroU64 , Vec < u8 > ) , Error > {
93
+ ) -> Result < Block , Error > {
87
94
let mut rng = thread_rng ( ) ;
88
95
let mut bytes: Vec < u8 > = vec ! [ 0 ; block_bytes] ;
89
96
rng. fill_bytes ( & mut bytes) ;
@@ -100,15 +107,22 @@ fn construct_block(
100
107
Variant :: Json => {
101
108
payload:: Json :: arbitrary_take_rest ( unstructured) ?. to_bytes ( & mut block) ?;
102
109
}
110
+ Variant :: FoundationDb => {
111
+ payload:: FoundationDb :: arbitrary_take_rest ( unstructured) ?. to_bytes ( & mut block) ?;
112
+ }
103
113
}
104
114
block. shrink_to_fit ( ) ;
105
115
if block. is_empty ( ) {
106
116
return Err ( Error :: BlockEmpty ) ;
107
117
}
108
118
119
+ let total_bytes = NonZeroU32 :: new ( block. len ( ) . try_into ( ) . unwrap ( ) ) . unwrap ( ) ;
109
120
let newlines = total_newlines ( & block) ;
110
- let nz_bytes = NonZeroU32 :: new ( block. len ( ) . try_into ( ) . unwrap ( ) ) . unwrap ( ) ;
111
- Ok ( ( nz_bytes, newlines, block) )
121
+ Ok ( Block {
122
+ total_bytes,
123
+ lines : newlines,
124
+ bytes : block,
125
+ } )
112
126
}
113
127
114
128
#[ allow( clippy:: ptr_arg) ]
@@ -117,7 +131,7 @@ fn construct_block_cache<R>(
117
131
mut rng : R ,
118
132
target : & LogTarget ,
119
133
labels : & Vec < ( String , String ) > ,
120
- ) -> Vec < ( NonZeroU32 , u64 , Vec < u8 > ) >
134
+ ) -> Vec < Block >
121
135
where
122
136
R : Rng + Sized ,
123
137
{
@@ -128,11 +142,10 @@ where
128
142
bytes_per_second,
129
143
) ;
130
144
131
- let block_cache: Vec < ( NonZeroU32 , u64 , Vec < u8 > ) > = block_chunks
145
+ let block_cache: Vec < Block > = block_chunks
132
146
. into_par_iter ( )
133
147
. map ( |block_size| construct_block ( block_size, target. variant , target. static_path . as_ref ( ) ) )
134
148
. map ( std:: result:: Result :: unwrap)
135
- . map ( |( nzu32, nzu64, bytes) | ( nzu32, nzu64. get ( ) , bytes) )
136
149
. collect ( ) ;
137
150
gauge ! ( "block_construction_complete" , 1.0 , labels) ;
138
151
block_cache
@@ -147,7 +160,7 @@ pub struct Log {
147
160
maximum_bytes_per_file : NonZeroU32 ,
148
161
bytes_per_second : NonZeroU32 ,
149
162
rate_limiter : RateLimiter < direct:: NotKeyed , state:: InMemoryState , clock:: QuantaClock > ,
150
- block_cache : Vec < ( NonZeroU32 , u64 , Vec < u8 > ) > ,
163
+ block_cache : Vec < Block > ,
151
164
}
152
165
153
166
impl Log {
@@ -217,16 +230,20 @@ impl Log {
217
230
. await ?,
218
231
) ;
219
232
220
- for ( total_bytes, total_newlines, block) in self . block_cache . iter ( ) . cycle ( ) {
221
- self . rate_limiter . until_n_ready ( * total_bytes) . await ?;
233
+ for blk in self . block_cache . iter ( ) . cycle ( ) {
234
+ let total_bytes = blk. total_bytes ;
235
+ let total_newlines = blk. lines ;
236
+ let block = & blk. bytes ;
237
+
238
+ self . rate_limiter . until_n_ready ( total_bytes) . await ?;
222
239
223
240
{
224
241
fp. write_all ( block) . await ?;
225
242
// block.len() and total_bytes are the same numeric value but we
226
243
// avoid needing to get a plain value from a non-zero by calling
227
244
// len here.
228
245
counter ! ( "bytes_written" , block. len( ) as u64 , & labels) ;
229
- counter ! ( "lines_written" , * total_newlines, & labels) ;
246
+ counter ! ( "lines_written" , total_newlines, & labels) ;
230
247
231
248
bytes_written += block. len ( ) as u64 ;
232
249
gauge ! ( "current_target_size_bytes" , bytes_written as f64 , & labels) ;
0 commit comments