@@ -33,29 +33,6 @@ type consumer struct {
33
33
mutex sync.Mutex
34
34
}
35
35
36
- func notifyRecover (consumer * consumer , message * sarama.ConsumerMessage , session sarama.ConsumerGroupSession , b backoff.BackOff ) error {
37
- for {
38
- if err := retry .NotifyRecover (func () error {
39
- return consumer .doCallback (session , message )
40
- }, b , func (err error , d time.Duration ) {
41
- consumer .k .logger .Warnf ("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying..." , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ), err )
42
- }, func () {
43
- consumer .k .logger .Infof ("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]" , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ))
44
- }); err != nil {
45
- // If the retry policy got interrupted, it could mean that either
46
- // the policy has reached its maximum number of attempts or the context has been cancelled.
47
- // There is a weird edge case where the error returned is a 'context canceled' error but the session.Context is not done.
48
- // This is a workaround to handle that edge case and reprocess the current message.
49
- if err == context .Canceled && session .Context ().Err () == nil {
50
- consumer .k .logger .Warnf ("Error processing Kafka message: %s/%d/%d [key=%s]. The error returned is 'context canceled' but the session context is not done. Retrying..." )
51
- continue
52
- }
53
- return err
54
- }
55
- return nil
56
- }
57
- }
58
-
59
36
func (consumer * consumer ) ConsumeClaim (session sarama.ConsumerGroupSession , claim sarama.ConsumerGroupClaim ) error {
60
37
b := consumer .k .backOffConfig .NewBackOffWithContext (session .Context ())
61
38
isBulkSubscribe := consumer .k .checkBulkSubscribe (claim .Topic ())
@@ -107,8 +84,21 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
107
84
}
108
85
109
86
if consumer .k .consumeRetryEnabled {
110
- if err := notifyRecover (consumer , message , session , b ); err != nil {
87
+ if err := retry .NotifyRecover (func () error {
88
+ return consumer .doCallback (session , message )
89
+ }, b , func (err error , d time.Duration ) {
90
+ consumer .k .logger .Warnf ("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v. Retrying..." , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ), err )
91
+ }, func () {
92
+ consumer .k .logger .Infof ("Successfully processed Kafka message after it previously failed: %s/%d/%d [key=%s]" , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ))
93
+ }); err != nil {
111
94
consumer .k .logger .Errorf ("Too many failed attempts at processing Kafka message: %s/%d/%d [key=%s]. Error: %v." , message .Topic , message .Partition , message .Offset , asBase64String (message .Key ), err )
95
+ if errors .Is (session .Context ().Err (), context .Canceled ) {
96
+ // If the context is canceled, we should not attempt to consume any more messages. Exiting the loop.
97
+ // Otherwise, there is a race condition when this loop keeps processing messages from the claim.Messages() channel
98
+ // before the session.Context().Done() is closed. If there are other messages that can successfully be processed,
99
+ // they will be marked as processed and this failing message will be lost.
100
+ return nil
101
+ }
112
102
}
113
103
} else {
114
104
err := consumer .doCallback (session , message )
0 commit comments