Skip to content

Commit c91b2ef

Browse files
authored
Merge pull request #1339 from nats-io/fix-1337
[BUG] MessageConsumer.isFinished() not set properly in certain conditions
2 parents 86652c4 + 5a5df2c commit c91b2ef

File tree

5 files changed

+61
-17
lines changed

5 files changed

+61
-17
lines changed

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ public void pendingUpdated() {}
6767

6868
@Override
6969
public void heartbeatError() {
70-
stopped.set(true);
71-
finished.set(true);
70+
finishAndClose();
7271
}
7372

7473
@Override
@@ -86,8 +85,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
8685
if (m == null) {
8786
// if there are no messages in the internal cache AND there are no more pending,
8887
// they all have been read and we can go ahead and finish
89-
finished.set(true);
90-
lenientClose();
88+
finishAndClose();
9189
}
9290
return m;
9391
}
@@ -105,17 +103,15 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
105103
Message m = sub._nextUnmanagedNoWait(pullSubject);
106104
if (m == null) {
107105
// no message and no time left, go ahead and finish
108-
finished.set(true);
109-
lenientClose();
106+
finishAndClose();
110107
}
111108
return m;
112109
}
113110

114111
Message m = sub._nextUnmanaged(timeLeftNanos, pullSubject);
115112
if (m == null && isNoWaitNoExpires) {
116113
// no message and no wait, go ahead and finish
117-
finished.set(true);
118-
lenientClose();
114+
finishAndClose();
119115
}
120116
return m;
121117
}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,14 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager
5353
public void heartbeatError() {
5454
try {
5555
// just close the current sub and make another one.
56-
// this could go on endlessly
57-
lenientClose();
58-
doSub();
56+
// this could go on endlessly - unless the user had called stop
57+
if (stopped.get()) {
58+
finishAndClose();
59+
}
60+
else {
61+
lenientClose();
62+
doSub();
63+
}
5964
}
6065
catch (JetStreamApiException | IOException e) {
6166
pmm.resetTracking();
@@ -67,7 +72,7 @@ void doSub() throws JetStreamApiException, IOException {
6772
MessageHandler mh = userMessageHandler == null ? null : msg -> {
6873
userMessageHandler.onMessage(msg);
6974
if (stopped.get() && pmm.noMorePending()) {
70-
finished.set(true);
75+
finishAndClose();
7176
}
7277
};
7378
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
@@ -78,7 +83,12 @@ void doSub() throws JetStreamApiException, IOException {
7883

7984
@Override
8085
public void pendingUpdated() {
81-
if (!stopped.get() && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes)))
86+
if (stopped.get()) {
87+
if (pmm.noMorePending()) {
88+
finishAndClose();
89+
}
90+
}
91+
else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))
8292
{
8393
repull();
8494
}

src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ public void close() throws Exception {
104104
lenientClose();
105105
}
106106

107+
protected void finishAndClose() {
108+
if (pmm != null) {
109+
pmm.shutdownHeartbeatTimer();
110+
}
111+
finished.set(true);
112+
lenientClose();
113+
}
114+
107115
protected void lenientClose() {
108116
try {
109117
if (!stopped.get() || sub.isActive()) {

src/main/java/io/nats/client/impl/PullMessageManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) {
118118

119119
// heartbeat just needed to be recorded
120120
if (status.isHeartbeat()) {
121-
trackIncoming(Integer.MIN_VALUE, Integer.MIN_VALUE);
121+
updateLastMessageReceived(); // no need to call track incoming, this is all it does
122122
return false;
123123
}
124124

src/test/java/io/nats/client/impl/SimplificationTests.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,20 +1303,20 @@ public void testOverflowFetch() throws Exception {
13031303

13041304
FetchConsumeOptions fcoNoMin = FetchConsumeOptions.builder()
13051305
.maxMessages(5)
1306-
.expiresIn(2000)
1306+
.expiresIn(3000)
13071307
.group(group)
13081308
.build();
13091309

13101310
FetchConsumeOptions fcoOverA = FetchConsumeOptions.builder()
13111311
.maxMessages(5)
1312-
.expiresIn(2000)
1312+
.expiresIn(3000)
13131313
.group(group)
13141314
.minAckPending(5)
13151315
.build();
13161316

13171317
FetchConsumeOptions fcoOverB = FetchConsumeOptions.builder()
13181318
.maxMessages(5)
1319-
.expiresIn(2000)
1319+
.expiresIn(3000)
13201320
.group(group)
13211321
.minAckPending(6)
13221322
.build();
@@ -1495,4 +1495,34 @@ public void testOverflowConsume() throws Exception {
14951495
assertEquals(0, overCount.get());
14961496
});
14971497
}
1498+
1499+
@Test
1500+
public void testFinishEmptyStream() throws Exception {
1501+
ListenerForTesting l = new ListenerForTesting();
1502+
Options.Builder b = Options.builder().errorListener(l);
1503+
runInJsServer(b, nc -> {
1504+
JetStreamManagement jsm = nc.jetStreamManagement();
1505+
TestingStreamContainer tsc = new TestingStreamContainer(jsm);
1506+
1507+
String name = variant();
1508+
1509+
ConsumerConfiguration cc = ConsumerConfiguration.builder()
1510+
.name(name)
1511+
.filterSubjects(tsc.subject()).build();
1512+
jsm.addOrUpdateConsumer(tsc.stream, cc);
1513+
1514+
ConsumerContext cctx = nc.getConsumerContext(tsc.stream, name);
1515+
1516+
MessageHandler handler = m -> {
1517+
System.out.println(m);
1518+
m.ack();
1519+
};
1520+
1521+
ConsumeOptions co = ConsumeOptions.builder().expiresIn(1000).build();
1522+
MessageConsumer consumer = cctx.consume(co, handler);
1523+
consumer.stop();
1524+
sleep(1200); // more than the expires period for the consume
1525+
assertTrue(consumer.isFinished());
1526+
});
1527+
}
14981528
}

0 commit comments

Comments
 (0)