1
1
use std:: {
2
+ io,
2
3
path:: {
3
4
Path ,
4
5
PathBuf ,
@@ -51,18 +52,24 @@ use super::{
51
52
} ;
52
53
use crate :: SearchFileType ;
53
54
54
- struct IndexMeta {
55
- size : u64 ,
56
- path : PathBuf ,
55
+ struct IndexTempDir {
56
+ dir : PathBuf ,
57
57
cleaner : CacheCleaner ,
58
58
}
59
59
60
- impl Drop for IndexMeta {
60
+ impl Drop for IndexTempDir {
61
61
fn drop ( & mut self ) {
62
- let _ = self . cleaner . attempt_cleanup ( self . path . clone ( ) ) ;
62
+ let _ = self . cleaner . attempt_cleanup ( self . dir . clone ( ) ) ;
63
63
}
64
64
}
65
65
66
+ struct IndexMeta {
67
+ size : u64 ,
68
+ /// A path under `tempdir.dir`; may not be the directory itself
69
+ path : PathBuf ,
70
+ _tempdir : IndexTempDir ,
71
+ }
72
+
66
73
impl SizedValue for IndexMeta {
67
74
fn size ( & self ) -> u64 {
68
75
self . size
@@ -135,7 +142,7 @@ impl<RT: Runtime> ArchiveFetcher<RT> {
135
142
search_storage : Arc < dyn Storage > ,
136
143
key : ObjectKey ,
137
144
search_file_type : SearchFileType ,
138
- destination : PathBuf ,
145
+ destination : IndexTempDir ,
139
146
) -> anyhow:: Result < IndexMeta > {
140
147
let timer = metrics:: archive_fetch_timer ( ) ;
141
148
let archive = search_storage
@@ -145,27 +152,20 @@ impl<RT: Runtime> ArchiveFetcher<RT> {
145
152
. into_tokio_reader ( ) ;
146
153
let extract_archive_timer = metrics:: extract_archive_timer ( ) ;
147
154
let extract_archive_result = self
148
- . extract_archive ( search_file_type, destination. clone ( ) , archive)
155
+ . extract_archive ( search_file_type, destination. dir . clone ( ) , archive)
149
156
. await ;
150
157
extract_archive_timer. finish ( ) ;
151
158
152
- match extract_archive_result {
153
- Ok ( ( bytes_used, path) ) => {
154
- if is_immutable ( search_file_type) {
155
- set_readonly ( & path, true ) . await ?;
156
- }
157
- metrics:: finish_archive_fetch ( timer, bytes_used, search_file_type) ;
158
- Ok ( IndexMeta {
159
- path,
160
- size : bytes_used,
161
- cleaner : self . cleaner . clone ( ) ,
162
- } )
163
- } ,
164
- Err ( e) => {
165
- self . cleaner . attempt_cleanup ( destination) ?;
166
- Err ( e)
167
- } ,
159
+ let ( bytes_used, path) = extract_archive_result?;
160
+ if is_immutable ( search_file_type) {
161
+ set_readonly ( & path, true ) . await ?;
168
162
}
163
+ metrics:: finish_archive_fetch ( timer, bytes_used, search_file_type) ;
164
+ Ok ( IndexMeta {
165
+ _tempdir : destination,
166
+ size : bytes_used,
167
+ path,
168
+ } )
169
169
}
170
170
171
171
async fn extract_archive (
@@ -226,33 +226,32 @@ impl<RT: Runtime> ArchiveFetcher<RT> {
226
226
let mut timeout_fut = self . rt . wait ( * ARCHIVE_FETCH_TIMEOUT_SECONDS ) . fuse ( ) ;
227
227
let destination = self . cache_path . join ( Uuid :: new_v4 ( ) . simple ( ) . to_string ( ) ) ;
228
228
229
- let new_destination = destination. clone ( ) ;
229
+ // Create this right away so its Drop impl (which deletes the path) runs
230
+ // even on failure
231
+ let tempdir = IndexTempDir {
232
+ cleaner : self . cleaner . clone ( ) ,
233
+ dir : destination. clone ( ) ,
234
+ } ;
230
235
let new_self = self . clone ( ) ;
231
236
let new_key = key. clone ( ) ;
232
237
// Many parts of the fetch perform blocking operations. To avoid blocking the
233
238
// calling thread's scheduling, punt all fetches to a separate OS thread.
234
239
let fetch_fut = self
235
240
. blocking_thread_pool
236
241
. execute_async ( move || {
237
- new_self. fetch ( search_storage, new_key, search_file_type, new_destination )
242
+ new_self. fetch ( search_storage, new_key, search_file_type, tempdir )
238
243
} )
239
244
. fuse ( ) ;
240
245
pin_mut ! ( fetch_fut) ;
241
- let res = select_biased ! {
246
+ select_biased ! {
242
247
meta = fetch_fut => {
243
- meta
248
+ meta?
244
249
} ,
245
250
_ = timeout_fut => {
246
251
metrics:: log_cache_fetch_timeout( ) ;
247
252
tracing:: error!( "Timed out fetching archive for key {key:?}" ) ;
248
- Err ( anyhow:: anyhow!( "Timed out" ) ) }
249
- } ;
250
-
251
- if let Ok ( Ok ( index_meta) ) = res {
252
- Ok ( index_meta)
253
- } else {
254
- self . cleaner . attempt_cleanup ( destination) ?;
255
- res?
253
+ Err ( anyhow:: anyhow!( "Timed out" ) )
254
+ }
256
255
}
257
256
}
258
257
}
@@ -402,7 +401,7 @@ fn is_immutable(search_file_type: SearchFileType) -> bool {
402
401
}
403
402
}
404
403
405
- async fn set_readonly ( path : & PathBuf , readonly : bool ) -> anyhow :: Result < ( ) > {
404
+ async fn set_readonly ( path : & PathBuf , readonly : bool ) -> io :: Result < ( ) > {
406
405
let metadata = fs:: metadata ( path) . await ?;
407
406
let mut permissions = metadata. permissions ( ) ;
408
407
permissions. set_readonly ( readonly) ;
@@ -444,11 +443,16 @@ async fn cleanup_thread(mut rx: mpsc::UnboundedReceiver<PathBuf>) {
444
443
// production here, we should investigate further but for now, it's simpler
445
444
// to disallow inconsistent filesystem state.
446
445
tracing:: debug!( "Removing path {} from disk" , path. display( ) ) ;
447
- let result: anyhow :: Result < ( ) > = try {
446
+ let result: io :: Result < ( ) > = try {
448
447
set_readonly ( & path, false ) . await ?;
449
448
fs:: remove_dir_all ( path) . await ?;
450
449
} ;
451
- result. expect ( "ArchiveCacheManager failed to clean up archive directory" ) ;
450
+ match result {
451
+ Ok ( ( ) ) => ( ) ,
452
+ // Can happen if the path to clean up was never created
453
+ Err ( e) if e. kind ( ) == io:: ErrorKind :: NotFound => ( ) ,
454
+ Err ( e) => panic ! ( "ArchiveCacheManager failed to clean up archive directory: {e:?}" ) ,
455
+ }
452
456
}
453
457
}
454
458
0 commit comments