Skip to content

feat: handle Ack and Nack messages in OTAP Exporter.#1994

Merged
jmacd merged 5 commits into
open-telemetry:mainfrom
alochaus:remove-sleeps-from-otap-exporter
Mar 4, 2026
Merged

feat: handle Ack and Nack messages in OTAP Exporter.#1994
jmacd merged 5 commits into
open-telemetry:mainfrom
alochaus:remove-sleeps-from-otap-exporter

Conversation

@alochaus
Copy link
Copy Markdown
Contributor

@alochaus alochaus commented Feb 7, 2026

Change Summary

Part of #1325 (see also #1324).

Problem: flaky test due to sleep-based synchronization.

The OTAP Exporter had two related issues:

1. Dropped Context

When the exporter received pipeline data (OtapPdata), it split the message into context and payload, then threw away the context:

let (_context, payload) = pdata.into_parts();
// TODO(#1098): Note context is dropped.

The context carries routing information that tells the pipeline who to notify when a message succeeds or fails. Without it, the exporter was a black hole — data went in, but no confirmation ever came back.

2. Flaky Test

The test test_receiver_not_ready_on_start had no way to know when the exporter finished processing, so it used arbitrary sleeps:

// TODO instead of sleeping here, once we handle ACK/NACK we should wait to get a NACK
// from the control channel
tokio::time::sleep(Duration::from_millis(5)).await;

// wait a bit before starting the server. This will ensure the exporter no-long exits
// when start is called if the endpoint can't be reached
tokio::time::sleep(Duration::from_millis(100)).await;

// TODO instead of sleeping here, once we handle ACK/NACK we should wait to get a ACK
// from the control channel
tokio::time::sleep(Duration::from_millis(50)).await;

On a slow machine or under load, these might not be long enough. On a fast machine, they waste time.

Solution: thread context through for ACK/NACK.

The core idea is to preserve the OtapPdata context by using take_payload() instead of into_parts().

This extracts the payload for gRPC transmission while keeping the context alive in the original OtapPdata. Then pass that context through the entire streaming pipeline so it can be returned with ACK (success) or NACK (failure) notifications.

Key changes

  1. Preserve context at the entry point
  // Before: context discarded
  let (_context, payload) = pdata.into_parts();

  // After: payload extracted, context preserved
  let payload = pdata.take_payload();
  1. Pair each batch with its context through the pipeline

The internal channels changed from carrying just data to carrying (context, data) tuples:

  // Before
  channel::<OtapArrowRecords>(64)

  // After
  channel::<(OtapPdata, OtapArrowRecords)>(64)
  1. Correlate gRPC responses with their original requests

The exporter uses bidirectional gRPC streaming — requests go out on one stream, responses come back on another. A FIFO correlation channel pairs them:

  create_req_stream ──sends pdata──→ [correlation channel] ──recv pdata──→ handle_res_stream
    (yielded batch)                                                         (got response)

Since both streams are ordered, the first response always corresponds to the first request.

  1. Send ACK/NACK in the main loop
  Some(PDataMetricsUpdate::Exported(signal_type, pdata)) => {
      self.pdata_metrics.inc_exported(signal_type);
      effect_handler.notify_ack(AckMsg::new(pdata)).await?;
  },
  Some(PDataMetricsUpdate::Failed(signal_type, pdata)) => {
      self.pdata_metrics.inc_failed(signal_type);
      effect_handler.notify_nack(NackMsg::new("export failed", pdata)).await?;
  },

The effect_handler uses the context inside pdata to route ACK/NACK back through the pipeline.

  1. Replace sleeps with deterministic waits in the test
  // Before: sleep and hope
  tokio::time::sleep(Duration::from_millis(5)).await;

  // After: wait for the actual event
  timeout(Duration::from_secs(5), async {
      loop {
          match pipeline_ctrl_msg_rx.recv().await {
              Ok(PipelineControlMsg::DeliverNack { .. }) => break,
              Ok(_) => continue,
              Err(_) => panic!("pipeline ctrl channel closed"),
          }
      }
  }).await.expect("Timed out waiting for NACK");

The test is now event-driven: it proceeds as soon as the NACK/ACK arrives, with a 5-second timeout as a safety net.

What issue does this PR close?

How are these changes tested?

Are there any user-facing changes?

@github-actions github-actions Bot added the rust Pull requests that update Rust code label Feb 7, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 7, 2026

Codecov Report

❌ Patch coverage is 96.13260% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.28%. Comparing base (63a23cf) to head (36bc3e0).
⚠️ Report is 8 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1994      +/-   ##
==========================================
+ Coverage   87.27%   87.28%   +0.01%     
==========================================
  Files         553      553              
  Lines      181338   181503     +165     
==========================================
+ Hits       158259   158424     +165     
  Misses      22545    22545              
  Partials      534      534              
Components Coverage Δ
otap-dataflow 89.41% <96.13%> (+0.01%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.30% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
quiver 91.83% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@lalitb
Copy link
Copy Markdown
Member

lalitb commented Feb 9, 2026

Just to confirm, I believe this is not an issue with OTLP exporter - the exporter captures the context from into_parts(), threads it through the export pipeline, and sends proper ACK/NACK messages via route_export_result().

@alochaus
Copy link
Copy Markdown
Contributor Author

alochaus commented Feb 9, 2026

You're right. This problem doesn't happen in OTLP Exporter, only in OTAP exporter.

@jmacd
Copy link
Copy Markdown
Contributor

jmacd commented Feb 9, 2026

@alochaus FYI see #1324 and #1325

@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to lack of recent activity. It will be closed in 30 days if no further activity occurs. If this PR is still relevant, please comment or push new commits to keep it active.

@github-actions github-actions Bot added stale Not actively pursued and removed stale Not actively pursued labels Feb 24, 2026
Copy link
Copy Markdown
Contributor

@jmacd jmacd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, by the way!

@alochaus alochaus marked this pull request as ready for review February 27, 2026 12:50
@alochaus alochaus requested a review from a team as a code owner February 27, 2026 12:50
@jmacd
Copy link
Copy Markdown
Contributor

jmacd commented Mar 4, 2026

Thank you @alochaus!

FYI

  // After: payload extracted, context preserved
  let payload = pdata.take_payload();

In the future we may want to extend this with an option to save the payload in case the retry_processor is subscribed with a RETURN_DATA interest. I can see how we might like a helper on OtapPdata and Context, a sort of clone_or_take() operation that clones otherwise takes that handles the RETURN_DATA case.

@jmacd jmacd added this pull request to the merge queue Mar 4, 2026
Merged via the queue into open-telemetry:main with commit 281e61f Mar 4, 2026
67 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants