-
Notifications
You must be signed in to change notification settings - Fork 175
[BUG] MessageConsumer.isFinished() not set properly in certain conditions #1339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -67,8 +67,7 @@ public void pendingUpdated() {} | |||
|
|||
@Override | |||
public void heartbeatError() { | |||
stopped.set(true); | |||
finished.set(true); | |||
finishAndClose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a heartbeat error on a fetch is terminal
@@ -86,8 +85,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked | |||
if (m == null) { | |||
// if there are no messages in the internal cache AND there are no more pending, | |||
// they all have been read and we can go ahead and finish | |||
finished.set(true); | |||
lenientClose(); | |||
finishAndClose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no more messages on a fetch is terminal
@@ -105,17 +103,15 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked | |||
Message m = sub._nextUnmanagedNoWait(pullSubject); | |||
if (m == null) { | |||
// no message and no time left, go ahead and finish | |||
finished.set(true); | |||
lenientClose(); | |||
finishAndClose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no more messages on a fetch is terminal
} | ||
return m; | ||
} | ||
|
||
Message m = sub._nextUnmanaged(timeLeftNanos, pullSubject); | ||
if (m == null && isNoWaitNoExpires) { | ||
// no message and no wait, go ahead and finish | ||
finished.set(true); | ||
lenientClose(); | ||
finishAndClose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no more messages on a fetch is terminal
@@ -67,7 +72,7 @@ void doSub() throws JetStreamApiException, IOException { | |||
MessageHandler mh = userMessageHandler == null ? null : msg -> { | |||
userMessageHandler.onMessage(msg); | |||
if (stopped.get() && pmm.noMorePending()) { | |||
finished.set(true); | |||
finishAndClose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have no more pending and the user has called stop, so we are finished with this consume
@@ -78,7 +83,12 @@ void doSub() throws JetStreamApiException, IOException { | |||
|
|||
@Override | |||
public void pendingUpdated() { | |||
if (!stopped.get() && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) | |||
if (stopped.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have no more pending and the user has called stop, so we are finished with this consume
.group(group) | ||
.minAckPending(5) | ||
.build(); | ||
|
||
FetchConsumeOptions fcoOverB = FetchConsumeOptions.builder() | ||
.maxMessages(5) | ||
.expiresIn(2000) | ||
.expiresIn(3000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test was flapping, so added a little extra time
if (stopped.get()) { | ||
finishAndClose(); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a heartbeat error occurs, we would usually just try again. But if the user had called stop, they've told us that we are finished.
Fixes #1337
The new
finishAndClose()
method set's the finish flag, stops the heartbeat timer and does a lenient close.