@@ -5,6 +5,7 @@ use governor::{clock, state};
5
5
use governor:: { Quota , RateLimiter } ;
6
6
use metrics:: { counter, gauge} ;
7
7
use rand:: Rng ;
8
+ use std:: convert:: TryInto ;
8
9
use std:: mem;
9
10
use std:: num:: NonZeroU32 ;
10
11
use std:: path:: PathBuf ;
@@ -33,24 +34,17 @@ impl From<::std::io::Error> for Error {
33
34
/// The [`Log`] defines a task that emits variant lines to a file, managing
34
35
/// rotation and controlling rate limits.
35
36
#[ derive( Debug ) ]
36
- pub struct Log < R >
37
- where
38
- R : Rng + Sized ,
39
- {
37
+ pub struct Log {
40
38
path : PathBuf ,
41
39
name : String , // this is the stringy version of `path`
42
40
fp : BufWriter < fs:: File > ,
43
- variant : Variant ,
44
41
maximum_bytes_per_file : NonZeroU32 ,
45
42
maximum_line_size_bytes : NonZeroU32 ,
46
43
rate_limiter : RateLimiter < direct:: NotKeyed , state:: InMemoryState , clock:: QuantaClock > ,
47
- rng : R ,
44
+ line_cache : Vec < ( NonZeroU32 , Vec < u8 > ) > ,
48
45
}
49
46
50
- impl < R > Log < R >
51
- where
52
- R : Rng + Sized ,
53
- {
47
+ impl Log {
54
48
/// Create a new [`Log`]
55
49
///
56
50
/// A new instance of this type requires a random generator, its name and
60
54
/// # Errors
61
55
///
62
56
/// Creation will fail if the target file cannot be opened for writing.
63
- pub async fn new ( rng : R , name : String , target : LogTarget ) -> Result < Self , Error > {
57
+ ///
58
+ /// # Panics
59
+ ///
60
+ /// This function will panic if the value of `maximum_line_size_bytes`
61
+ /// cannot be coerced into a machine word size.
62
+ pub async fn new < R > ( mut rng : R , name : String , target : LogTarget ) -> Result < Self , Error >
63
+ where
64
+ R : Rng + Sized ,
65
+ {
64
66
let rate_limiter: RateLimiter < direct:: NotKeyed , state:: InMemoryState , clock:: QuantaClock > =
65
67
RateLimiter :: direct (
66
68
Quota :: per_second ( target. bytes_per_second )
@@ -79,15 +81,30 @@ where
79
81
. await ?,
80
82
) ;
81
83
84
+ let mut line_cache = Vec :: with_capacity ( 1024 ) ;
85
+ let mut bytes_remaining = target. maximum_prebuild_cache_size_bytes . get ( ) as usize ;
86
+ while bytes_remaining > 0 {
87
+ let bytes = rng. gen_range ( 1 ..maximum_line_size_bytes. get ( ) ) as usize ;
88
+ let mut buffer: Vec < u8 > = vec ! [ 0 ; bytes] ;
89
+ let res = match target. variant {
90
+ Variant :: Ascii => buffer:: fill_ascii ( & mut rng, & mut buffer) ,
91
+ Variant :: Constant => buffer:: fill_constant ( & mut rng, & mut buffer) ,
92
+ Variant :: Json => buffer:: fill_json ( & mut rng, & mut buffer) ,
93
+ } ;
94
+ res. expect ( "could not pre-fill line" ) ;
95
+ let nz_bytes = NonZeroU32 :: new ( bytes. try_into ( ) . unwrap ( ) ) . unwrap ( ) ;
96
+ line_cache. push ( ( nz_bytes, buffer) ) ;
97
+ bytes_remaining = bytes_remaining. saturating_sub ( bytes) ;
98
+ }
99
+
82
100
Ok ( Self {
83
101
fp,
84
102
maximum_bytes_per_file,
85
103
name,
86
104
path : target. path ,
87
- variant : target. variant ,
88
105
maximum_line_size_bytes,
89
106
rate_limiter,
90
- rng ,
107
+ line_cache ,
91
108
} )
92
109
}
93
110
@@ -120,32 +137,16 @@ where
120
137
& labels
121
138
) ;
122
139
123
- let mut buffer: Vec < u8 > = vec ! [ 0 ; self . maximum_line_size_bytes. get( ) as usize ] ;
140
+ for ( nz_bytes, line) in self . line_cache . iter ( ) . cycle ( ) {
141
+ self . rate_limiter . until_n_ready ( * nz_bytes) . await ?;
124
142
125
- loop {
126
143
{
127
- let bytes = self . rng . gen_range ( 1 ..maximum_line_size_bytes) ;
128
- let nz_bytes =
129
- NonZeroU32 :: new ( bytes) . expect ( "invalid condition, should never trigger" ) ;
130
- self . rate_limiter . until_n_ready ( nz_bytes) . await ?;
131
-
132
- let slice = & mut buffer[ 0 ..bytes as usize ] ;
133
- let res = match self . variant {
134
- Variant :: Ascii => buffer:: fill_ascii ( & mut self . rng , slice) ,
135
- Variant :: Constant => buffer:: fill_constant ( & mut self . rng , slice) ,
136
- Variant :: Json => buffer:: fill_json ( & mut self . rng , slice) ,
137
- } ;
138
- if let Ok ( filled_bytes) = res {
139
- self . fp . write ( & slice[ 0 ..filled_bytes] ) . await ?;
140
- counter ! ( "bytes_written" , filled_bytes as u64 , & labels) ;
141
- counter ! ( "lines_written" , 1 , & labels) ;
144
+ self . fp . write ( & line) . await ?;
145
+ counter ! ( "bytes_written" , line. len( ) as u64 , & labels) ;
146
+ counter ! ( "lines_written" , 1 , & labels) ;
142
147
143
- bytes_written += filled_bytes as u64 ;
144
- gauge ! ( "current_target_size_bytes" , bytes_written as f64 , & labels) ;
145
- } else {
146
- counter ! ( "unable_to_write_to_target" , 1 , & labels) ;
147
- continue ;
148
- }
148
+ bytes_written += line. len ( ) as u64 ;
149
+ gauge ! ( "current_target_size_bytes" , bytes_written as f64 , & labels) ;
149
150
}
150
151
151
152
if bytes_written > maximum_bytes_per_file {
@@ -173,5 +174,6 @@ where
173
174
counter ! ( "file_rotated" , 1 , & labels) ;
174
175
}
175
176
}
177
+ Ok ( ( ) )
176
178
}
177
179
}
0 commit comments