Skip to content

Commit d419358

Browse files
adarsh0728vigith
andauthored
fix: retry attempts calculation and logs in retry strategy (#2653)
Signed-off-by: adarsh0728 <gooneriitk@gmail.com> Signed-off-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com>
1 parent 0730011 commit d419358

File tree

14 files changed

+97
-71
lines changed

14 files changed

+97
-71
lines changed

rust/extns/numaflow-kafka/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ impl KafkaActor {
546546
.fetch_metadata(Some(&topic), timeout)
547547
.map_err(|e| Error::Kafka(format!("Failed to fetch metadata: {}", e)))?;
548548
let Some(topic_metadata) = metadata.topics().first() else {
549-
warn!(topic = topic, "No topic metadata found");
549+
warn!(%topic, "No topic metadata found");
550550
return Ok(0);
551551
};
552552
let mut topic_pending = 0;
@@ -605,7 +605,7 @@ impl KafkaActor {
605605
.fetch_metadata(Some(&topic), timeout)
606606
.map_err(|e| Error::Kafka(format!("Failed to fetch metadata: {}", e)))?;
607607
let Some(topic_metadata) = metadata.topics().first() else {
608-
warn!(topic = topic, "No topic metadata found");
608+
warn!(%topic, "No topic metadata found");
609609
return Ok(Vec::new());
610610
};
611611
let partitions: Vec<i32> =

rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ impl JetStreamReader {
396396
let result = match msg.ack_with(ack_kind).await {
397397
Ok(_) => Ok(()),
398398
Err(e) => {
399-
warn!(?e, "Failed to send {:?} to Jetstream for message", ack_kind);
399+
warn!(error = ?e, ?ack_kind, "Failed to send ack to Jetstream for message");
400400
Err(Error::Connection(format!(
401401
"Failed to send {:?}: {:?}",
402402
ack_kind, e

rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ impl JetstreamWriter {
342342
Some(true) => {
343343
// FIXME: add metrics
344344
if log_counter >= 500 {
345-
warn!(stream=?stream, "stream is full (throttled logging)");
345+
warn!(?stream, "stream is full (throttled logging)");
346346
log_counter = 0;
347347
}
348348
log_counter += 1;
@@ -439,8 +439,8 @@ impl JetstreamWriter {
439439
if ack.duplicate {
440440
warn!(
441441
message_id = ?message.id,
442-
stream = ?stream,
443-
ack = ?ack,
442+
?stream,
443+
?ack,
444444
"Duplicate message detected"
445445
);
446446
}

rust/numaflow-core/src/shared/grpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub(crate) async fn create_rpc_channel(socket_path: PathBuf) -> error::Result<Ch
119119
async || match connect_with_uds(socket_path.clone()).await {
120120
Ok(channel) => Ok(channel),
121121
Err(e) => {
122-
warn!(?e, ?socket_path, "Failed to connect to UDS socket");
122+
warn!(error = ?e, ?socket_path, "Failed to connect to UDS socket");
123123
Err(Error::Connection(format!(
124124
"Failed to connect {socket_path:?}: {:?}",
125125
e

rust/numaflow-core/src/shared/server_info.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,8 @@ fn check_sdk_compatibility(
261261
} else {
262262
// Language not found in the supported SDK versions
263263
warn!(
264-
"SDK version constraint not found for language: {}, container type: {}",
265-
sdk_language, container_type
264+
%sdk_language,
265+
%container_type, "SDK version constraint not found for language and container type"
266266
);
267267

268268
// Return error indicating the language
@@ -407,12 +407,11 @@ async fn read_server_info(
407407
// If the file ends with the END marker, trim it and break out of the loop
408408
contents = data.trim_end_matches(END).to_string();
409409
break;
410-
} else {
411-
warn!("Server info file is incomplete, EOF is missing...");
412410
}
411+
warn!("Server info file is incomplete, EOF is missing...");
413412
}
414413
Err(e) => {
415-
warn!("Failed to read file: {}", e);
414+
warn!(error = ?e, ?file_path, "Failed to read server info file");
416415
}
417416
}
418417

rust/numaflow-core/src/sink.rs

Lines changed: 57 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -499,8 +499,8 @@ impl SinkWriter {
499499

500500
let write_start_time = tokio::time::Instant::now();
501501
let total_msgs = messages.len();
502+
let mut retry_attempts = 0;
502503
let total_msgs_bytes: usize = messages.iter().map(|msg| msg.value.len()).sum();
503-
let mut attempts = 1;
504504
let mut error_map = HashMap::new();
505505
let mut fallback_msgs = Vec::new();
506506
let mut serving_msgs = Vec::new();
@@ -516,7 +516,7 @@ impl SinkWriter {
516516
let mut rng = StdRng::from_entropy();
517517

518518
loop {
519-
while attempts <= retry_config.sink_max_retry_attempts {
519+
while retry_attempts <= retry_config.sink_max_retry_attempts {
520520
let status = self
521521
.write_to_sink_once(
522522
&mut error_map,
@@ -528,11 +528,25 @@ impl SinkWriter {
528528
match status {
529529
Ok(true) => break,
530530
Ok(false) => {
531+
// Increment retry attempt only if we will retry again
532+
// otherwise we will break out of the loop
533+
// sleep for the calculated delay only if we are retrying
534+
if retry_attempts >= retry_config.sink_max_retry_attempts {
535+
break;
536+
}
537+
retry_attempts += 1;
538+
write_errors_total += error_map.len();
531539
warn!(
532-
"Retry attempt {} due to retryable error. Errors: {:?}",
533-
attempts, error_map
540+
retry_attempt = retry_attempts,
541+
?error_map,
542+
"Retrying due to retryable error."
534543
);
535-
write_errors_total += error_map.len();
544+
let delay = Self::calculate_exponential_delay(
545+
retry_config,
546+
retry_attempts,
547+
&mut rng,
548+
);
549+
sleep(Duration::from_millis(delay as u64)).await;
536550
}
537551
Err(e) => Err(e)?,
538552
}
@@ -546,17 +560,11 @@ impl SinkWriter {
546560
.to_string(),
547561
));
548562
}
549-
550-
// Calculate exponential backoff delay for the next retry attempt
551-
let delay = Self::calculate_exponential_delay(retry_config, attempts, &mut rng);
552-
// Sleep for the calculated delay
553-
sleep(Duration::from_millis(delay as u64)).await;
554-
attempts += 1;
555563
}
556564

557565
// If after the retries we still have messages to process, handle the post retry failures
558566
let need_retry = Self::handle_sink_post_retry(
559-
&mut attempts,
567+
&mut retry_attempts,
560568
&mut error_map,
561569
&mut fallback_msgs,
562570
&mut messages_to_send,
@@ -566,9 +574,9 @@ impl SinkWriter {
566574
match need_retry {
567575
// if we are done with the messages, break the loop
568576
Ok(false) => break,
569-
// if we need to retry, reset the attempts and error_map
577+
// if we need to retry, reset the retry_attempts and error_map
570578
Ok(true) => {
571-
attempts = 0;
579+
retry_attempts = 0;
572580
error_map.clear();
573581
}
574582
Err(e) => Err(e)?,
@@ -642,7 +650,7 @@ impl SinkWriter {
642650
/// Handles the post retry failures based on the configured strategy,
643651
/// returns true if we need to retry, else false.
644652
fn handle_sink_post_retry(
645-
attempts: &mut u16,
653+
retry_attempts: &mut u16,
646654
error_map: &mut HashMap<String, i32>,
647655
fallback_msgs: &mut Vec<Message>,
648656
messages_to_send: &mut Vec<Message>,
@@ -658,17 +666,19 @@ impl SinkWriter {
658666
// if we need to retry, return true
659667
OnFailureStrategy::Retry => {
660668
warn!(
661-
"Using onFailure Retry, Retry attempts {} completed",
662-
attempts
669+
retry_attempts = *retry_attempts,
670+
errors = ?error_map,
671+
"Using onFailure Retry, retry attempts completed"
663672
);
664673
return Ok(true);
665674
}
666675
// if we need to drop the messages, log and return false
667676
OnFailureStrategy::Drop => {
668677
// log that we are dropping the messages as requested
669678
warn!(
670-
"Dropping messages after {} attempts. Errors: {:?}",
671-
attempts, error_map
679+
retry_attempts = *retry_attempts,
680+
errors = ?error_map,
681+
"Dropping messages."
672682
);
673683
if is_mono_vertex() {
674684
// update the drop metric count with the messages left for sink
@@ -693,8 +703,9 @@ impl SinkWriter {
693703
OnFailureStrategy::Fallback => {
694704
// log that we are moving the messages to the fallback as requested
695705
warn!(
696-
"Moving messages to fallback after {} attempts. Errors: {:?}",
697-
attempts, error_map
706+
retry_attempts = *retry_attempts,
707+
errors = ?error_map,
708+
"Moving messages to fallback after retry attempts.",
698709
);
699710
// move the messages to the fallback messages
700711
fallback_msgs.append(messages_to_send);
@@ -772,7 +783,7 @@ impl SinkWriter {
772783
));
773784
}
774785

775-
let mut attempts = 0;
786+
let mut retry_attempts = 0;
776787
let mut fallback_error_map = HashMap::new();
777788
// start with the original set of message to be sent.
778789
// we will overwrite this vec with failed messages and will keep retrying.
@@ -783,10 +794,11 @@ impl SinkWriter {
783794
.clone()
784795
.backoff
785796
.unwrap();
786-
let max_attempts = default_retry.steps.unwrap();
797+
// steps is the maximum number of retry attempts, excluding the initial attempt
798+
let max_retry_attempts = default_retry.steps.unwrap();
787799
let sleep_interval = default_retry.interval.unwrap();
788800

789-
while attempts < max_attempts {
801+
while retry_attempts <= max_retry_attempts {
790802
match self.fb_sink(messages_to_send.clone()).await {
791803
Ok(fb_response) => {
792804
// create a map of id to result, since there is no strict requirement
@@ -856,26 +868,33 @@ impl SinkWriter {
856868
));
857869
}
858870

859-
attempts += 1;
860-
861871
if messages_to_send.is_empty() {
862872
break;
863873
}
864874

865-
warn!(
866-
"Retry attempt {} due to retryable error. Errors: {:?}",
867-
attempts, fallback_error_map
868-
);
869-
sleep(tokio::time::Duration::from(sleep_interval)).await;
875+
// increment retry attempt only if we will retry
876+
// otherwise break out of the loop
877+
// sleep for the calculated delay only if we are retrying
878+
if retry_attempts < max_retry_attempts {
879+
retry_attempts += 1;
880+
warn!(
881+
retry_attempts,
882+
?fallback_error_map,
883+
"Retrying due to retryable error in Fallback Sink"
884+
);
885+
sleep(tokio::time::Duration::from(sleep_interval)).await;
886+
} else {
887+
break;
888+
}
870889
}
871890
Err(e) => return Err(e),
872891
}
873892
}
874893
if !messages_to_send.is_empty() {
875894
return Err(Error::FbSink(format!(
876-
"Failed to write messages to fallback sink after {} attempts. \
895+
"Failed to write messages to fallback sink after {} retry attempts. \
877896
Max Attempts configured: {} Errors: {:?}",
878-
attempts, max_attempts, fallback_error_map
897+
retry_attempts, max_retry_attempts, fallback_error_map
879898
)));
880899
}
881900
Ok(())
@@ -972,13 +991,15 @@ impl SinkWriter {
972991

973992
fn calculate_exponential_delay(
974993
retry_config: &RetryConfig,
975-
attempts: u16,
994+
retry_attempts: u16,
976995
rng: &mut StdRng,
977996
) -> f64 {
978997
// Calculate the base delay using the initial retry interval and the retry factor
979-
// The base delay is calculated as: initial_retry_interval * retry_factor^(attempts-1)
998+
// The base delay is calculated as: initial_retry_interval * retry_factor^(retry_attempts-1)
980999
let base_delay = (retry_config.sink_initial_retry_interval_in_ms as f64)
981-
* retry_config.sink_retry_factor.powi((attempts - 1) as i32);
1000+
* retry_config
1001+
.sink_retry_factor
1002+
.powi((retry_attempts - 1) as i32);
9821003

9831004
let jitter = retry_config.sink_retry_jitter;
9841005
// If jitter is 0, return the base delay

rust/numaflow-core/src/source/generator.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,19 @@ mod stream_generator {
7878
let key_count = std::cmp::min(cfg.key_count as usize, cfg.rpu) as u8;
7979
if key_count != cfg.key_count {
8080
warn!(
81-
"Specified KeyCount({}) is higher than RPU ({}). KeyCount is changed to {}",
82-
cfg.key_count, cfg.rpu, key_count
81+
key_count = cfg.key_count,
82+
rpu = cfg.rpu,
83+
new_key_count = key_count,
84+
"Specified KeyCount is higher than RPU. KeyCount has changed."
8385
);
8486
}
8587
if key_count > 0 && rpu % key_count as usize != 0 {
8688
let new_rpu = rpu - (rpu % key_count as usize);
8789
warn!(
8890
rpu,
8991
key_count,
90-
"Specified RPU is not a multiple of the KeyCount. This may lead to uneven distribution of messages across keys. RPUs will be adjusted to {}",
91-
new_rpu
92+
new_rpu,
93+
"Specified RPU is not a multiple of the KeyCount. This may lead to uneven distribution of messages across keys. RPUs will be adjusted."
9294
);
9395
rpu = new_rpu;
9496
}

rust/numaflow-core/src/watermark/processor/manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ impl ProcessorManager {
226226
.await
227227
.unwrap_or_else(|_| Vec::new()),
228228
Err(e) => {
229-
warn!(?e, "Failed to get keys from ot bucket");
229+
warn!(error = ?e, "Failed to get keys from ot bucket");
230230
Vec::new()
231231
}
232232
};
@@ -303,7 +303,7 @@ impl ProcessorManager {
303303
let kv = match val {
304304
Ok(kv) => kv,
305305
Err(e) => {
306-
warn!(?e, "Failed to get next kv entry, recreating watcher");
306+
warn!(error = ?e, "Failed to get next kv entry, recreating watcher");
307307
ot_watcher = Self::create_watcher(ot_bucket.clone()).await;
308308
continue;
309309
}
@@ -353,7 +353,7 @@ impl ProcessorManager {
353353
let kv = match val {
354354
Ok(kv) => kv,
355355
Err(e) => {
356-
warn!(?e, "Failed to get next kv entry, recreating watcher");
356+
warn!(error = ?e, "Failed to get next kv entry, recreating watcher");
357357
hb_watcher = Self::create_watcher(hb_bucket.clone()).await;
358358
continue;
359359
}

rust/serving/src/app/orchestrator.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ where
7070
previous_pod_hash,
7171
} => {
7272
warn!(
73-
?err,
73+
error = %err,
7474
"Request was cancelled, fetching callbacks and responses again"
7575
);
7676
Some(previous_pod_hash)
7777
}
7878
status::Error::Duplicate(msg) => {
79-
warn!(error = ?msg, "Request already exists in the store");
79+
warn!(error = %msg, "Request already exists in the store");
8080
serving_metrics().request_register_duplicate_count.inc();
8181
return Err(Error::Duplicate(msg));
8282
}
@@ -149,7 +149,8 @@ where
149149
}
150150
ProcessingStatus::Failed { error, pod_hash } => {
151151
warn!(
152-
"Request was failed because of {error}, processing and fetching the responses again"
152+
%error,
153+
"Request was failed, processing and fetching the responses again"
153154
);
154155
let notify = self.process_request(id, request_type).await?;
155156
notify.await.expect("sender was dropped")?;
@@ -184,13 +185,13 @@ where
184185
previous_pod_hash,
185186
} => {
186187
warn!(
187-
?err,
188+
error = %err,
188189
"Request was cancelled, fetching callbacks and responses again"
189190
);
190191
Some(previous_pod_hash)
191192
}
192193
status::Error::Duplicate(msg) => {
193-
warn!(error = ?msg, "Request already exists in the tracker");
194+
warn!(error = %msg, "Request already exists in the tracker");
194195
serving_metrics().request_register_duplicate_count.inc();
195196
return Err(Error::Duplicate(msg));
196197
}

rust/serving/src/app/store/cbstore/jetstream_store.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,10 @@ impl JetStreamCallbackStore {
208208
Ok(callback_obj) => {
209209
let callback = Arc::new(callback_obj);
210210
if tx.send(callback).await.is_err() {
211-
warn!(request_id = ?id, "Receiver dropped during historical send. Stopping scan stream.");
211+
warn!(
212+
request_id = %id,
213+
"Receiver dropped during historical send. Stopping scan stream."
214+
);
212215
break;
213216
}
214217
}

0 commit comments

Comments
 (0)