@@ -12,13 +12,11 @@ use std::path::Path;
12
12
use std:: path:: PathBuf ;
13
13
use std:: pin:: Pin ;
14
14
use std:: sync:: Arc ;
15
- use std:: sync:: RwLock ;
16
15
use std:: task:: Context as TaskContext ;
17
16
use std:: task:: Poll ;
18
17
use std:: time:: Duration ;
19
18
use std:: time:: SystemTime ;
20
19
21
- use anyhow:: Error ;
22
20
use anyhow:: Result ;
23
21
use async_trait:: async_trait;
24
22
use chrono:: DateTime ;
@@ -50,10 +48,7 @@ use hyperactor_telemetry::log_file_path;
50
48
use serde:: Deserialize ;
51
49
use serde:: Serialize ;
52
50
use tokio:: io;
53
- use tokio:: sync:: mpsc;
54
- use tokio:: sync:: watch;
55
51
use tokio:: sync:: watch:: Receiver ;
56
- use tokio:: task:: JoinHandle ;
57
52
58
53
use crate :: bootstrap:: BOOTSTRAP_LOG_CHANNEL ;
59
54
@@ -264,6 +259,9 @@ pub enum LogMessage {
264
259
/// The log payload as bytes
265
260
payload : Serialized ,
266
261
} ,
262
+
263
+ /// Flush the log
264
+ Flush { } ,
267
265
}
268
266
269
267
/// Messages that can be sent to the LogClient locally.
@@ -663,20 +661,15 @@ fn deserialize_message_lines(
663
661
handlers = [ LogMessage , LogClientMessage ] ,
664
662
) ]
665
663
pub struct LogClientActor {
666
- log_tx : mpsc:: Sender < ( OutputTarget , String ) > ,
667
- #[ allow( unused) ]
668
- aggregator_handle : JoinHandle < Result < ( ) , Error > > ,
669
- /// The watch sender for the aggregation window in seconds
670
- aggregate_window_tx : watch:: Sender < u64 > ,
671
- should_aggregate : bool ,
672
- // Store aggregators directly in the actor for access in Drop
673
- aggregators : Arc < RwLock < HashMap < OutputTarget , Aggregator > > > ,
664
+ aggregate_window_sec : Option < u64 > ,
665
+ aggregators : HashMap < OutputTarget , Aggregator > ,
666
+ last_flush_time : SystemTime ,
667
+ next_flush_deadline : Option < SystemTime > ,
674
668
}
675
669
676
670
impl LogClientActor {
677
- fn print_aggregators ( aggregators : & RwLock < HashMap < OutputTarget , Aggregator > > ) {
678
- let mut aggregators_guard = aggregators. write ( ) . unwrap ( ) ;
679
- for ( output_target, aggregator) in aggregators_guard. iter_mut ( ) {
671
+ fn print_aggregators ( & mut self ) {
672
+ for ( output_target, aggregator) in self . aggregators . iter_mut ( ) {
680
673
if aggregator. is_empty ( ) {
681
674
continue ;
682
675
}
@@ -693,6 +686,14 @@ impl LogClientActor {
693
686
aggregator. reset ( ) ;
694
687
}
695
688
}
689
+
690
+ fn print_log_line ( hostname : & str , pid : u32 , output_target : OutputTarget , line : String ) {
691
+ let message = format ! ( "[{} {}] {}" , hostname, pid, line) ;
692
+ match output_target {
693
+ OutputTarget :: Stdout => println ! ( "{}" , message) ,
694
+ OutputTarget :: Stderr => eprintln ! ( "{}" , message) ,
695
+ }
696
+ }
696
697
}
697
698
698
699
#[ async_trait]
@@ -701,114 +702,96 @@ impl Actor for LogClientActor {
701
702
type Params = ( ) ;
702
703
703
704
async fn new ( _: ( ) ) -> Result < Self , anyhow:: Error > {
704
- // Create mpsc channel for log messages
705
- let ( log_tx, log_rx) = mpsc:: channel :: < ( OutputTarget , String ) > ( 1000 ) ;
706
-
707
- // Create a watch channel for the aggregation window
708
- let ( aggregate_window_tx, aggregate_window_rx) =
709
- watch:: channel ( DEFAULT_AGGREGATE_WINDOW_SEC ) ;
710
-
711
705
// Initialize aggregators
712
706
let mut aggregators = HashMap :: new ( ) ;
713
707
aggregators. insert ( OutputTarget :: Stderr , Aggregator :: new ( ) ) ;
714
708
aggregators. insert ( OutputTarget :: Stdout , Aggregator :: new ( ) ) ;
715
- let aggregators = Arc :: new ( RwLock :: new ( aggregators) ) ;
716
-
717
- // Clone aggregators for the aggregator task
718
- let aggregators_for_task = Arc :: clone ( & aggregators) ;
719
-
720
- // Start the loggregator
721
- let aggregator_handle = tokio:: spawn ( async move {
722
- start_aggregator ( log_rx, aggregate_window_rx, aggregators_for_task) . await
723
- } ) ;
724
709
725
710
Ok ( Self {
726
- log_tx,
727
- aggregator_handle,
728
- aggregate_window_tx,
729
- should_aggregate : true ,
711
+ aggregate_window_sec : Some ( DEFAULT_AGGREGATE_WINDOW_SEC ) ,
730
712
aggregators,
713
+ last_flush_time : RealClock . system_time_now ( ) ,
714
+ next_flush_deadline : None ,
731
715
} )
732
716
}
733
717
}
734
718
735
719
impl Drop for LogClientActor {
736
720
fn drop ( & mut self ) {
737
721
// Flush the remaining logs before shutting down
738
- Self :: print_aggregators ( & self . aggregators ) ;
722
+ self . print_aggregators ( ) ;
739
723
}
740
724
}
741
725
742
- async fn start_aggregator (
743
- mut log_rx : mpsc:: Receiver < ( OutputTarget , String ) > ,
744
- mut interval_sec_rx : watch:: Receiver < u64 > ,
745
- aggregators : Arc < RwLock < HashMap < OutputTarget , Aggregator > > > ,
746
- ) -> anyhow:: Result < ( ) > {
747
- let mut interval =
748
- tokio:: time:: interval ( tokio:: time:: Duration :: from_secs ( * interval_sec_rx. borrow ( ) ) ) ;
749
-
750
- // Start the event loop
751
- loop {
752
- tokio:: select! {
753
- // Process incoming log messages
754
- Some ( ( output_target, log_line) ) = log_rx. recv( ) => {
755
- let mut aggregators_guard = aggregators. write( ) . unwrap( ) ;
756
- if let Some ( aggregator) = aggregators_guard. get_mut( & output_target) {
757
- if let Err ( e) = aggregator. add_line( & log_line) {
758
- tracing:: error!( "error adding log line: {}" , e) ;
759
- }
760
- } else {
761
- tracing:: error!( "unknown output target: {:?}" , output_target) ;
762
- }
763
- }
764
- // Watch for changes in the interval
765
- Ok ( _) = interval_sec_rx. changed( ) => {
766
- interval = tokio:: time:: interval( tokio:: time:: Duration :: from_secs( * interval_sec_rx. borrow( ) ) ) ;
767
- }
768
-
769
- // Every interval tick, print and reset the aggregator
770
- _ = interval. tick( ) => {
771
- LogClientActor :: print_aggregators( & aggregators) ;
772
- }
773
-
774
- // Exit if the channel is closed
775
- else => {
776
- tracing:: error!( "log channel closed, exiting aggregator" ) ;
777
- // Print final aggregated logs before shutting down
778
- LogClientActor :: print_aggregators( & aggregators) ;
779
- break ;
780
- }
781
- }
782
- }
783
-
784
- Ok ( ( ) )
785
- }
786
-
787
726
#[ async_trait]
788
727
#[ hyperactor:: forward( LogMessage ) ]
789
728
impl LogMessageHandler for LogClientActor {
790
729
async fn log (
791
730
& mut self ,
792
- _cx : & Context < Self > ,
731
+ cx : & Context < Self > ,
793
732
hostname : String ,
794
733
pid : u32 ,
795
734
output_target : OutputTarget ,
796
735
payload : Serialized ,
797
736
) -> Result < ( ) , anyhow:: Error > {
798
737
// Deserialize the message and process line by line with UTF-8
799
738
let message_lines = deserialize_message_lines ( & payload) ?;
739
+ let hostname = hostname. as_str ( ) ;
800
740
801
- for line in message_lines {
802
- if self . should_aggregate {
803
- self . log_tx . send ( ( output_target, line) ) . await ?;
804
- } else {
805
- let message = format ! ( "[{} {}] {}" , hostname, pid, line) ;
806
- match output_target {
807
- OutputTarget :: Stdout => println ! ( "{}" , message) ,
808
- OutputTarget :: Stderr => eprintln ! ( "{}" , message) ,
741
+ match self . aggregate_window_sec {
742
+ None => {
743
+ for line in message_lines {
744
+ Self :: print_log_line ( hostname, pid, output_target, line) ;
745
+ }
746
+ self . last_flush_time = RealClock . system_time_now ( ) ;
747
+ }
748
+ Some ( window) => {
749
+ for line in message_lines {
750
+ if let Some ( aggregator) = self . aggregators . get_mut ( & output_target) {
751
+ if let Err ( e) = aggregator. add_line ( & line) {
752
+ tracing:: error!( "error adding log line: {}" , e) ;
753
+ // For the sake of completeness, flush the log lines.
754
+ Self :: print_log_line ( hostname, pid, output_target, line) ;
755
+ }
756
+ } else {
757
+ tracing:: error!( "unknown output target: {:?}" , output_target) ;
758
+ // For the sake of completeness, flush the log lines.
759
+ Self :: print_log_line ( hostname, pid, output_target, line) ;
760
+ }
761
+ }
762
+
763
+ let new_deadline = self . last_flush_time + Duration :: from_secs ( window) ;
764
+ let now = RealClock . system_time_now ( ) ;
765
+ if new_deadline <= now {
766
+ self . flush ( cx) . await ?;
767
+ } else {
768
+ let delay = new_deadline. duration_since ( now) ?;
769
+ match self . next_flush_deadline {
770
+ None => {
771
+ self . next_flush_deadline = Some ( new_deadline) ;
772
+ cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
773
+ }
774
+ Some ( deadline) => {
775
+ // Some early log lines have alrady triggered the flush.
776
+ if new_deadline < deadline {
777
+ // This can happen if the user has adjusted the aggregation window.
778
+ self . next_flush_deadline = Some ( new_deadline) ;
779
+ cx. self_message_with_delay ( LogMessage :: Flush { } , delay) ?;
780
+ }
781
+ }
782
+ }
809
783
}
810
784
}
811
785
}
786
+
787
+ Ok ( ( ) )
788
+ }
789
+
790
+ async fn flush ( & mut self , _cx : & Context < Self > ) -> Result < ( ) , anyhow:: Error > {
791
+ self . print_aggregators ( ) ;
792
+ self . last_flush_time = RealClock . system_time_now ( ) ;
793
+ self . next_flush_deadline = None ;
794
+
812
795
Ok ( ( ) )
813
796
}
814
797
}
@@ -821,11 +804,11 @@ impl LogClientMessageHandler for LogClientActor {
821
804
_cx : & Context < Self > ,
822
805
aggregate_window_sec : Option < u64 > ,
823
806
) -> Result < ( ) , anyhow:: Error > {
824
- if let Some ( window ) = aggregate_window_sec {
825
- // Send the new value through the watch channel
826
- self . aggregate_window_tx . send ( window ) ? ;
807
+ if self . aggregate_window_sec . is_some ( ) && aggregate_window_sec. is_none ( ) {
808
+ // Make sure we flush whatever in the aggregators before disabling aggregation.
809
+ self . print_aggregators ( ) ;
827
810
}
828
- self . should_aggregate = aggregate_window_sec. is_some ( ) ;
811
+ self . aggregate_window_sec = aggregate_window_sec;
829
812
Ok ( ( ) )
830
813
}
831
814
}
0 commit comments