Skip to content

Commit 516fda6

Browse files
[FIXED] Don't report redeliveries for consumer with MaxDeliver 1 (#6877)
A consumer with `MaxDeliver: 1` reports non-acked messages as `Redelivered Messages`, although no redeliveries have happened. The state is still preserved, otherwise unacked messages in an Interest stream can be lost if used for DLQ (#6538). But we don't report redeliveries anymore for this case. Resolves #6874 Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
2 parents 1da9471 + be8d6ee commit 516fda6

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

server/consumer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2976,6 +2976,11 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
29762976
TimeStamp: time.Now().UTC(),
29772977
PriorityGroups: priorityGroups,
29782978
}
2979+
// Reset redelivered for MaxDeliver 1. Redeliveries are disabled so must not report it (is confusing otherwise).
2980+
// The state does still keep track of these messages.
2981+
if o.cfg.MaxDeliver == 1 {
2982+
info.NumRedelivered = 0
2983+
}
29792984
if o.cfg.PauseUntil != nil {
29802985
p := *o.cfg.PauseUntil
29812986
if info.Paused = time.Now().Before(p); info.Paused {

server/jetstream_cluster_1_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6599,7 +6599,7 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
65996599
require_Equal(t, ci.AckFloor.Consumer, 0)
66006600
require_Equal(t, ci.AckFloor.Stream, 0)
66016601
require_Equal(t, ci.NumAckPending, 0)
6602-
require_Equal(t, ci.NumRedelivered, 1)
6602+
require_Equal(t, ci.NumRedelivered, 0)
66036603
require_Equal(t, ci.NumPending, 0)
66046604
}
66056605
}

server/jetstream_cluster_3_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5478,7 +5478,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
54785478
require_Equal(t, a.AckFloor.Consumer, 0)
54795479
require_Equal(t, a.AckFloor.Stream, 0)
54805480
require_Equal(t, a.NumPending, 40)
5481-
require_Equal(t, a.NumRedelivered, 10)
5481+
require_Equal(t, a.NumRedelivered, 0)
54825482
a.Cluster, b.Cluster = nil, nil
54835483
a.Delivered.Last, b.Delivered.Last = nil, nil
54845484
if !reflect.DeepEqual(a, b) {

server/jetstream_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19118,6 +19118,85 @@ func TestJetStreamInterestMaxDeliveryReached(t *testing.T) {
1911819118
}
1911919119
}
1912019120

19121+
// https://github.com/nats-io/nats-server/issues/6874
19122+
func TestJetStreamMaxDeliveryRedeliveredReporting(t *testing.T) {
19123+
s := RunBasicJetStreamServer(t)
19124+
defer s.Shutdown()
19125+
19126+
nc, js := jsClientConnect(t, s)
19127+
defer nc.Close()
19128+
19129+
_, err := js.AddStream(&nats.StreamConfig{
19130+
Name: "TEST",
19131+
Storage: nats.FileStorage,
19132+
Subjects: []string{"foo"},
19133+
Replicas: 1,
19134+
Retention: nats.LimitsPolicy,
19135+
})
19136+
require_NoError(t, err)
19137+
19138+
_, err = js.Publish("foo", nil)
19139+
require_NoError(t, err)
19140+
19141+
maxWait := 250 * time.Millisecond
19142+
cfg := &nats.ConsumerConfig{
19143+
Durable: "CONSUMER",
19144+
AckPolicy: nats.AckExplicitPolicy,
19145+
AckWait: maxWait,
19146+
MaxDeliver: 1,
19147+
}
19148+
_, err = js.AddConsumer("TEST", cfg)
19149+
require_NoError(t, err)
19150+
19151+
sub, err := js.PullSubscribe(_EMPTY_, "CONSUMER", nats.BindStream("TEST"))
19152+
require_NoError(t, err)
19153+
19154+
nfo, err := js.StreamInfo("TEST")
19155+
require_NoError(t, err)
19156+
require_Equal(t, nfo.State.Msgs, uint64(1))
19157+
19158+
msgs, err := sub.Fetch(1, nats.MaxWait(maxWait))
19159+
require_NoError(t, err)
19160+
require_Len(t, 1, len(msgs))
19161+
19162+
cnfo, err := js.ConsumerInfo("TEST", "CONSUMER")
19163+
require_NoError(t, err)
19164+
require_Equal(t, cnfo.NumAckPending, 1)
19165+
require_Equal(t, cnfo.NumRedelivered, 0)
19166+
19167+
time.Sleep(2 * maxWait)
19168+
19169+
// Max deliver 1 so this will fail.
19170+
_, err = sub.Fetch(1, nats.MaxWait(maxWait))
19171+
require_Error(t, err)
19172+
19173+
// Redelivered should remain 0, as it doesn't get redelivered with MaxDeliver 1.
19174+
cnfo, err = js.ConsumerInfo("TEST", "CONSUMER")
19175+
require_NoError(t, err)
19176+
require_Equal(t, cnfo.NumAckPending, 0)
19177+
require_Equal(t, cnfo.NumRedelivered, 0)
19178+
19179+
// With a higher MaxDeliver we should report it.
19180+
cfg.MaxDeliver = 2
19181+
_, err = js.UpdateConsumer("TEST", cfg)
19182+
require_NoError(t, err)
19183+
19184+
cnfo, err = js.ConsumerInfo("TEST", "CONSUMER")
19185+
require_NoError(t, err)
19186+
require_Equal(t, cnfo.NumAckPending, 0)
19187+
require_Equal(t, cnfo.NumRedelivered, 1)
19188+
19189+
// Unset should also report.
19190+
cfg.MaxDeliver = -1
19191+
_, err = js.UpdateConsumer("TEST", cfg)
19192+
require_NoError(t, err)
19193+
19194+
cnfo, err = js.ConsumerInfo("TEST", "CONSUMER")
19195+
require_NoError(t, err)
19196+
require_Equal(t, cnfo.NumAckPending, 0)
19197+
require_Equal(t, cnfo.NumRedelivered, 1)
19198+
}
19199+
1912119200
func TestJetStreamRecoversStreamFirstSeqWhenNotEmpty(t *testing.T) {
1912219201
s := RunBasicJetStreamServer(t)
1912319202
defer s.Shutdown()

0 commit comments

Comments
 (0)