Skip to content

Commit 983ce6f

Browse files
authored
fix: remove blocking catchup flush (#821)
The original idea for this code was to detect if any flushing futures were in progress when the next continuous flush occurs and then to drive those synchronously. The problem is that this can cause unexpected timeouts if latency is high and the function is quite busy/creating lots of data. In those cases we may have a longer than usual flush for one of the telemetry types (as there are multiple simultaneous requests, divided into the maximum batch size). This then causes us to block and wait to ensure the last flush succeeds and then we immediately create another flush task. This should be fast but in some cases, creates so many futures that we don't adequately poll the `/next` event and can time out. For now, this change will allow those flush tasks to run in the background. On the shutdown event we drive all to completion anyway, so this change should result in eventually consistent data.
1 parent 7bfe4df commit 983ce6f

File tree

1 file changed

+1
-14
lines changed

1 file changed

+1
-14
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,6 @@ async fn extension_loop_active(
598598
let next_lambda_response = next_event(client, &r.extension_id).await;
599599
// first invoke we must call next
600600
let mut pending_flush_handles = PendingFlushHandles::new();
601-
let mut last_continuous_flush_error = false;
602601
handle_next_invocation(next_lambda_response, invocation_processor.clone()).await;
603602
loop {
604603
let maybe_shutdown_event;
@@ -651,18 +650,7 @@ async fn extension_loop_active(
651650
handle_next_invocation(next_response, invocation_processor.clone()).await;
652651
} else {
653652
//Periodic flush scenario, flush at top of invocation
654-
if current_flush_decision == FlushDecision::Continuous && !last_continuous_flush_error {
655-
let tf = trace_flusher.clone();
656-
// Await any previous flush handles. This
657-
last_continuous_flush_error = pending_flush_handles
658-
.await_flush_handles(
659-
&logs_flusher.clone(),
660-
&tf,
661-
&metrics_flushers,
662-
&proxy_flusher,
663-
)
664-
.await;
665-
653+
if current_flush_decision == FlushDecision::Continuous {
666654
let lf = logs_flusher.clone();
667655
pending_flush_handles
668656
.log_flush_handles
@@ -723,7 +711,6 @@ async fn extension_loop_active(
723711
&metrics_aggr_handle,
724712
)
725713
.await;
726-
last_continuous_flush_error = false;
727714
}
728715
// NO FLUSH SCENARIO
729716
// JUST LOOP OVER PIPELINE AND WAIT FOR NEXT EVENT

0 commit comments

Comments
 (0)