@@ -226,27 +226,27 @@ impl AlignedReduceActor {
226
226
/// Runs the actor, listening for messages and multiplexing them to the reduce tasks.
227
227
async fn run ( mut self ) {
228
228
while let Some ( msg) = self . receiver . recv ( ) . await {
229
- self . handle_window_message ( msg. window , msg . operation ) . await ;
229
+ self . handle_window_message ( msg) . await ;
230
230
}
231
231
self . wait_for_all_tasks ( ) . await ;
232
232
}
233
233
234
234
/// Handle a window message based on its operation type
235
- async fn handle_window_message ( & mut self , window : Window , operation : WindowOperation ) {
236
- let window_id = window. pnf_slot ( ) ;
237
- match operation {
238
- WindowOperation :: Open ( msg) => self . window_open ( window, window_id, msg) . await ,
239
- WindowOperation :: Append ( msg) => self . window_append ( window, window_id, msg) . await ,
240
- WindowOperation :: Close => self . window_close ( window_id) . await ,
235
+ async fn handle_window_message ( & mut self , window_msg : AlignedWindowMessage ) {
236
+ match & window_msg. operation {
237
+ WindowOperation :: Open ( _) => self . window_open ( window_msg) . await ,
238
+ WindowOperation :: Append ( _) => self . window_append ( window_msg) . await ,
239
+ WindowOperation :: Close => self . window_close ( window_msg) . await ,
241
240
}
242
241
}
243
242
244
243
/// Creates a new reduce task for the window and sends the initial Open command with the
245
244
/// first message.
246
- async fn window_open ( & mut self , window : Window , window_id : Bytes , msg : Message ) {
245
+ async fn window_open ( & mut self , window_msg : AlignedWindowMessage ) {
247
246
// Create a new channel for this window's messages
248
247
let ( message_tx, message_rx) = mpsc:: channel ( 100 ) ;
249
248
let message_stream = ReceiverStream :: new ( message_rx) ;
249
+ let window = window_msg. window . clone ( ) ;
250
250
251
251
// Create a ReduceTask
252
252
let reduce_task = ReduceTask :: new (
@@ -258,20 +258,14 @@ impl AlignedReduceActor {
258
258
self . window_manager . clone ( ) ,
259
259
) ;
260
260
261
- // Create the initial window message
262
- let window_msg = AlignedWindowMessage {
263
- operation : WindowOperation :: Open ( msg) ,
264
- window : window. clone ( ) ,
265
- } ;
266
-
267
261
// start the reduce task and store the handle and the sender so that we can send messages
268
262
// and wait for it to complete.
269
263
let task_handle = reduce_task
270
264
. start ( message_stream, self . cln_token . clone ( ) )
271
265
. await ;
272
266
273
267
self . active_streams . insert (
274
- window_id ,
268
+ window . pnf_slot ( ) ,
275
269
ActiveStream {
276
270
message_tx : message_tx. clone ( ) ,
277
271
task_handle,
@@ -283,24 +277,21 @@ impl AlignedReduceActor {
283
277
}
284
278
285
279
/// sends the message to the reduce task for the window.
286
- async fn window_append ( & mut self , window : Window , window_id : Bytes , msg : Message ) {
280
+ async fn window_append ( & mut self , window_msg : AlignedWindowMessage ) {
281
+ let window = window_msg. window . clone ( ) ;
282
+ let window_id = window. pnf_slot ( ) ;
283
+
287
284
// Get the existing stream or log error if not found create a new one.
288
285
let Some ( active_stream) = self . active_streams . get ( & window_id) else {
289
286
// windows may not be found during replay, because the windower doesn't send the open
290
287
// message for the active windows that got replayed, hence we create a new one.
291
288
// this happens because of out-of-order messages and we have to ensure that the (t+1)th
292
289
// message is sent to the window that could be created by (t)th message iff (t+1)th message
293
290
// belongs to that window created by (t)th message.
294
- self . window_open ( window , window_id , msg ) . await ;
291
+ self . window_open ( window_msg ) . await ;
295
292
return ;
296
293
} ;
297
294
298
- // Create the append window message
299
- let window_msg = AlignedWindowMessage {
300
- operation : WindowOperation :: Append ( msg) ,
301
- window,
302
- } ;
303
-
304
295
// Send the append message
305
296
active_stream
306
297
. message_tx
@@ -310,7 +301,9 @@ impl AlignedReduceActor {
310
301
}
311
302
312
303
/// Closes the reduce task for the window.
313
- async fn window_close ( & mut self , window_id : Bytes ) {
304
+ async fn window_close ( & mut self , window_msg : AlignedWindowMessage ) {
305
+ let window_id = window_msg. window . pnf_slot ( ) ;
306
+
314
307
// Get the existing stream or log error if not found
315
308
let Some ( active_stream) = self . active_streams . remove ( & window_id) else {
316
309
error ! ( "No active stream found for window {:?}" , window_id) ;
0 commit comments