@@ -107,6 +107,9 @@ pub struct ServerActor {
107
107
disable_persistence : bool ,
108
108
code_chunk_buffers : Vec < DBChunkBuffers > ,
109
109
mask_chunk_buffers : Vec < DBChunkBuffers > ,
110
+ dot_events : Vec < Vec < CUevent > > ,
111
+ exchange_events : Vec < Vec < CUevent > > ,
112
+ phase2_events : Vec < Vec < CUevent > > ,
110
113
}
111
114
112
115
const NON_MATCH_ID : u32 = u32:: MAX ;
@@ -330,6 +333,11 @@ impl ServerActor {
330
333
let code_chunk_buffers = vec ! [ codes_engine. alloc_db_chunk_buffer( DB_CHUNK_SIZE ) ; 2 ] ;
331
334
let mask_chunk_buffers = vec ! [ masks_engine. alloc_db_chunk_buffer( DB_CHUNK_SIZE ) ; 2 ] ;
332
335
336
+ // Create all needed events
337
+ let dot_events = vec ! [ device_manager. create_events( ) ; 2 ] ;
338
+ let exchange_events = vec ! [ device_manager. create_events( ) ; 2 ] ;
339
+ let phase2_events = vec ! [ device_manager. create_events( ) ; 2 ] ;
340
+
333
341
for dev in device_manager. devices ( ) {
334
342
dev. synchronize ( ) . unwrap ( ) ;
335
343
}
@@ -367,6 +375,9 @@ impl ServerActor {
367
375
disable_persistence,
368
376
code_chunk_buffers,
369
377
mask_chunk_buffers,
378
+ dot_events,
379
+ exchange_events,
380
+ phase2_events,
370
381
} )
371
382
}
372
383
@@ -1126,14 +1137,6 @@ impl ServerActor {
1126
1137
tracing:: info!( party_id = self . party_id, "Finished batch deduplication" ) ;
1127
1138
// ---- END BATCH DEDUP ----
1128
1139
1129
- // Create new initial events
1130
- let mut current_dot_event = self . device_manager . create_events ( ) ;
1131
- let mut next_dot_event = self . device_manager . create_events ( ) ;
1132
- let mut current_exchange_event = self . device_manager . create_events ( ) ;
1133
- let mut next_exchange_event = self . device_manager . create_events ( ) ;
1134
- let mut current_phase2_event = self . device_manager . create_events ( ) ;
1135
- let mut next_phase2_event = self . device_manager . create_events ( ) ;
1136
-
1137
1140
let chunk_sizes = |chunk_idx : usize | {
1138
1141
self . current_db_sizes
1139
1142
. iter ( )
@@ -1195,11 +1198,11 @@ impl ServerActor {
1195
1198
// First stream doesn't need to wait
1196
1199
if db_chunk_idx == 0 {
1197
1200
self . device_manager
1198
- . record_event ( request_streams, & current_dot_event ) ;
1201
+ . record_event ( request_streams, & self . dot_events [ db_chunk_idx % 2 ] ) ;
1199
1202
self . device_manager
1200
- . record_event ( request_streams, & current_exchange_event ) ;
1203
+ . record_event ( request_streams, & self . exchange_events [ db_chunk_idx % 2 ] ) ;
1201
1204
self . device_manager
1202
- . record_event ( request_streams, & current_phase2_event ) ;
1205
+ . record_event ( request_streams, & self . phase2_events [ db_chunk_idx % 2 ] ) ;
1203
1206
}
1204
1207
1205
1208
// Prefetch next chunk
@@ -1229,7 +1232,7 @@ impl ServerActor {
1229
1232
) ;
1230
1233
1231
1234
self . device_manager
1232
- . await_event ( request_streams, & current_dot_event ) ;
1235
+ . await_event ( request_streams, & self . dot_events [ db_chunk_idx % 2 ] ) ;
1233
1236
1234
1237
// ---- START PHASE 1 ----
1235
1238
record_stream_time ! ( & self . device_manager, batch_streams, events, "db_dot" , {
@@ -1247,7 +1250,7 @@ impl ServerActor {
1247
1250
1248
1251
// wait for the exchange result buffers to be ready
1249
1252
self . device_manager
1250
- . await_event ( request_streams, & current_exchange_event ) ;
1253
+ . await_event ( request_streams, & self . exchange_events [ db_chunk_idx % 2 ] ) ;
1251
1254
1252
1255
record_stream_time ! (
1253
1256
& self . device_manager,
@@ -1268,7 +1271,7 @@ impl ServerActor {
1268
1271
) ;
1269
1272
1270
1273
self . device_manager
1271
- . record_event ( request_streams, & next_dot_event ) ;
1274
+ . record_event ( request_streams, & self . dot_events [ ( db_chunk_idx + 1 ) % 2 ] ) ;
1272
1275
1273
1276
record_stream_time ! (
1274
1277
& self . device_manager,
@@ -1286,7 +1289,7 @@ impl ServerActor {
1286
1289
// ---- END PHASE 1 ----
1287
1290
1288
1291
self . device_manager
1289
- . await_event ( request_streams, & current_phase2_event ) ;
1292
+ . await_event ( request_streams, & self . phase2_events [ db_chunk_idx % 2 ] ) ;
1290
1293
1291
1294
// ---- START PHASE 2 ----
1292
1295
let max_chunk_size = dot_chunk_size. iter ( ) . max ( ) . copied ( ) . unwrap ( ) ;
@@ -1318,8 +1321,10 @@ impl ServerActor {
1318
1321
// we can now record the exchange event since the phase 2 is no longer using the
1319
1322
// code_dots/mask_dots which are just reinterpretations of the exchange result
1320
1323
// buffers
1321
- self . device_manager
1322
- . record_event ( request_streams, & next_exchange_event) ;
1324
+ self . device_manager . record_event (
1325
+ request_streams,
1326
+ & self . exchange_events [ ( db_chunk_idx + 1 ) % 2 ] ,
1327
+ ) ;
1323
1328
1324
1329
let res = self . phase2 . take_result_buffer ( ) ;
1325
1330
record_stream_time ! ( & self . device_manager, request_streams, events, "db_open" , {
@@ -1340,23 +1345,10 @@ impl ServerActor {
1340
1345
} ) ;
1341
1346
}
1342
1347
self . device_manager
1343
- . record_event ( request_streams, & next_phase2_event ) ;
1348
+ . record_event ( request_streams, & self . phase2_events [ ( db_chunk_idx + 1 ) % 2 ] ) ;
1344
1349
1345
1350
// ---- END PHASE 2 ----
1346
1351
1347
- // Destroy events
1348
- self . device_manager . destroy_events ( current_dot_event) ;
1349
- self . device_manager . destroy_events ( current_exchange_event) ;
1350
- self . device_manager . destroy_events ( current_phase2_event) ;
1351
-
1352
- // Update events for synchronization
1353
- current_dot_event = next_dot_event;
1354
- current_exchange_event = next_exchange_event;
1355
- current_phase2_event = next_phase2_event;
1356
- next_dot_event = self . device_manager . create_events ( ) ;
1357
- next_exchange_event = self . device_manager . create_events ( ) ;
1358
- next_phase2_event = self . device_manager . create_events ( ) ;
1359
-
1360
1352
// Increment chunk index
1361
1353
db_chunk_idx += 1 ;
1362
1354
0 commit comments