@@ -188,46 +188,52 @@ class DataIntegrityTree {
188
188
if ( ! isHexString ( key ) ) {
189
189
throw new Error ( `key must be a valid hex string: ${ key } ` ) ;
190
190
}
191
+
191
192
const uncompressedHash = crypto . createHash ( "sha256" ) ;
192
193
const gzip = zlib . createGzip ( ) ;
193
-
194
+
194
195
let sha256 : string ;
195
196
const tempDir = path . join ( this . storeDir , "tmp" ) ;
196
197
if ( ! fs . existsSync ( tempDir ) ) {
197
198
fs . mkdirSync ( tempDir , { recursive : true } ) ;
198
199
}
199
200
const tempFilePath = path . join ( tempDir , `${ crypto . randomUUID ( ) } .gz` ) ;
200
-
201
+
201
202
return new Promise ( ( resolve , reject ) => {
202
203
const tempWriteStream = fs . createWriteStream ( tempFilePath ) ;
203
-
204
+
205
+ // Update the hash with the original data
204
206
readStream . on ( "data" , ( chunk ) => {
205
207
uncompressedHash . update ( chunk ) ;
206
208
} ) ;
207
-
209
+
210
+ // Pipe the read stream through gzip into the temporary write stream
208
211
readStream . pipe ( gzip ) . pipe ( tempWriteStream ) ;
209
-
212
+
210
213
tempWriteStream . on ( "finish" , async ( ) => {
211
- sha256 = uncompressedHash . digest ( "hex" ) ;
212
-
213
- const finalWriteStream = this . _createWriteStream ( sha256 ) ;
214
- const finalPath = finalWriteStream . path as string ;
215
-
216
- // Ensure the directory exists before copying the file
217
- const finalDir = path . dirname ( finalPath ) ;
218
- if ( ! fs . existsSync ( finalDir ) ) {
219
- fs . mkdirSync ( finalDir , { recursive : true } ) ;
220
- }
221
-
214
+ let finalWriteStream : fs . WriteStream | undefined ;
222
215
try {
216
+ sha256 = uncompressedHash . digest ( "hex" ) ;
217
+
218
+ finalWriteStream = this . _createWriteStream ( sha256 ) ;
219
+ const finalPath = finalWriteStream . path as string ;
220
+
221
+ // Ensure the directory exists
222
+ const finalDir = path . dirname ( finalPath ) ;
223
+ if ( ! fs . existsSync ( finalDir ) ) {
224
+ fs . mkdirSync ( finalDir , { recursive : true } ) ;
225
+ }
226
+
227
+ // Copy the temporary gzipped file to the final destination
223
228
await this . _streamFile ( tempFilePath , finalPath ) ;
224
- await unlink ( tempFilePath ) ;
225
-
229
+ await unlink ( tempFilePath ) ; // Clean up the temporary file
230
+
226
231
const combinedHash = crypto
227
232
. createHash ( "sha256" )
228
233
. update ( `${ key } /${ sha256 } ` )
229
234
. digest ( "hex" ) ;
230
-
235
+
236
+ // Check if the key already exists with the same hash
231
237
if (
232
238
Array . from ( this . files . values ( ) ) . some (
233
239
( file ) => file . hash === combinedHash
@@ -236,28 +242,58 @@ class DataIntegrityTree {
236
242
console . log ( `No changes detected for key: ${ key } ` ) ;
237
243
return resolve ( ) ;
238
244
}
239
-
245
+
246
+ // Delete existing key if present
240
247
if ( this . files . has ( key ) ) {
241
248
this . deleteKey ( key ) ;
242
249
}
243
-
250
+
251
+ // Insert the new key with the hash
244
252
console . log ( `Inserted key: ${ key } ` ) ;
245
253
this . files . set ( key , {
246
254
hash : combinedHash ,
247
255
sha256 : sha256 ,
248
256
} ) ;
257
+
249
258
this . _rebuildTree ( ) ;
250
259
resolve ( ) ;
251
260
} catch ( err ) {
261
+ // On error, cleanup the temporary file and reject
262
+ await unlink ( tempFilePath ) . catch ( ( ) => { } ) ;
252
263
reject ( err ) ;
264
+ } finally {
265
+ // Always close the final write stream if it exists
266
+ if ( finalWriteStream ) {
267
+ finalWriteStream . end ( ) ;
268
+ }
253
269
}
254
270
} ) ;
255
-
256
- tempWriteStream . on ( "error" , ( err ) => {
271
+
272
+ tempWriteStream . on ( "error" , async ( err ) => {
273
+ // Close streams and clean up in case of error
274
+ tempWriteStream . destroy ( ) ;
275
+ gzip . destroy ( ) ;
276
+ readStream . destroy ( ) ;
277
+
278
+ await unlink ( tempFilePath ) . catch ( ( ) => { } ) ; // Clean up the temp file
257
279
reject ( err ) ;
258
280
} ) ;
259
-
260
- readStream . on ( "error" , ( err ) => {
281
+
282
+ readStream . on ( "error" , async ( err ) => {
283
+ // Close streams and clean up in case of error
284
+ tempWriteStream . destroy ( ) ;
285
+ gzip . destroy ( ) ;
286
+ readStream . destroy ( ) ;
287
+
288
+ await unlink ( tempFilePath ) . catch ( ( ) => { } ) ; // Clean up the temp file
289
+ reject ( err ) ;
290
+ } ) ;
291
+
292
+ gzip . on ( "error" , ( err ) => {
293
+ // Handle errors in the gzip stream
294
+ tempWriteStream . destroy ( ) ;
295
+ gzip . destroy ( ) ;
296
+ readStream . destroy ( ) ;
261
297
reject ( err ) ;
262
298
} ) ;
263
299
} ) ;
0 commit comments