diff --git a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java index 0d90329f0..11e7f6a21 100644 --- a/src/main/java/io/nats/client/impl/NatsFetchConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsFetchConsumer.java @@ -67,8 +67,7 @@ public void pendingUpdated() {} @Override public void heartbeatError() { - stopped.set(true); - finished.set(true); + finishAndClose(); } @Override @@ -86,8 +85,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked if (m == null) { // if there are no messages in the internal cache AND there are no more pending, // they all have been read and we can go ahead and finish - finished.set(true); - lenientClose(); + finishAndClose(); } return m; } @@ -105,8 +103,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked Message m = sub._nextUnmanagedNoWait(pullSubject); if (m == null) { // no message and no time left, go ahead and finish - finished.set(true); - lenientClose(); + finishAndClose(); } return m; } @@ -114,8 +111,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked Message m = sub._nextUnmanaged(timeLeftNanos, pullSubject); if (m == null && isNoWaitNoExpires) { // no message and no wait, go ahead and finish - finished.set(true); - lenientClose(); + finishAndClose(); } return m; } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java index 7ce13d606..61802e35a 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumer.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumer.java @@ -53,9 +53,14 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager public void heartbeatError() { try { // just close the current sub and make another one. - // this could go on endlessly - lenientClose(); - doSub(); + // this could go on endlessly - unless the user had called stop + if (stopped.get()) { + finishAndClose(); + } + else { + lenientClose(); + doSub(); + } } catch (JetStreamApiException | IOException e) { pmm.resetTracking(); @@ -67,7 +72,7 @@ void doSub() throws JetStreamApiException, IOException { MessageHandler mh = userMessageHandler == null ? null : msg -> { userMessageHandler.onMessage(msg); if (stopped.get() && pmm.noMorePending()) { - finished.set(true); + finishAndClose(); } }; super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null)); @@ -78,7 +83,12 @@ void doSub() throws JetStreamApiException, IOException { @Override public void pendingUpdated() { - if (!stopped.get() && (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))) + if (stopped.get()) { + if (pmm.noMorePending()) { + finishAndClose(); + } + } + else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes)) { repull(); } diff --git a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java index a40e44766..d43eb15c6 100644 --- a/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java +++ b/src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java @@ -104,6 +104,14 @@ public void close() throws Exception { lenientClose(); } + protected void finishAndClose() { + if (pmm != null) { + pmm.shutdownHeartbeatTimer(); + } + finished.set(true); + lenientClose(); + } + protected void lenientClose() { try { if (!stopped.get() || sub.isActive()) { diff --git a/src/main/java/io/nats/client/impl/PullMessageManager.java b/src/main/java/io/nats/client/impl/PullMessageManager.java index 03e608a19..7aa0c290f 100644 --- a/src/main/java/io/nats/client/impl/PullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullMessageManager.java @@ -118,7 +118,7 @@ protected Boolean beforeQueueProcessorImpl(NatsMessage msg) { // heartbeat just needed to be recorded if (status.isHeartbeat()) { - trackIncoming(Integer.MIN_VALUE, Integer.MIN_VALUE); + updateLastMessageReceived(); // no need to call track incoming, this is all it does return false; } diff --git a/src/test/java/io/nats/client/impl/SimplificationTests.java b/src/test/java/io/nats/client/impl/SimplificationTests.java index 87848fec2..b63067126 100644 --- a/src/test/java/io/nats/client/impl/SimplificationTests.java +++ b/src/test/java/io/nats/client/impl/SimplificationTests.java @@ -1303,20 +1303,20 @@ public void testOverflowFetch() throws Exception { FetchConsumeOptions fcoNoMin = FetchConsumeOptions.builder() .maxMessages(5) - .expiresIn(2000) + .expiresIn(3000) .group(group) .build(); FetchConsumeOptions fcoOverA = FetchConsumeOptions.builder() .maxMessages(5) - .expiresIn(2000) + .expiresIn(3000) .group(group) .minAckPending(5) .build(); FetchConsumeOptions fcoOverB = FetchConsumeOptions.builder() .maxMessages(5) - .expiresIn(2000) + .expiresIn(3000) .group(group) .minAckPending(6) .build(); @@ -1495,4 +1495,34 @@ public void testOverflowConsume() throws Exception { assertEquals(0, overCount.get()); }); } + + @Test + public void testFinishEmptyStream() throws Exception { + ListenerForTesting l = new ListenerForTesting(); + Options.Builder b = Options.builder().errorListener(l); + runInJsServer(b, nc -> { + JetStreamManagement jsm = nc.jetStreamManagement(); + TestingStreamContainer tsc = new TestingStreamContainer(jsm); + + String name = variant(); + + ConsumerConfiguration cc = ConsumerConfiguration.builder() + .name(name) + .filterSubjects(tsc.subject()).build(); + jsm.addOrUpdateConsumer(tsc.stream, cc); + + ConsumerContext cctx = nc.getConsumerContext(tsc.stream, name); + + MessageHandler handler = m -> { + System.out.println(m); + m.ack(); + }; + + ConsumeOptions co = ConsumeOptions.builder().expiresIn(1000).build(); + MessageConsumer consumer = cctx.consume(co, handler); + consumer.stop(); + sleep(1200); // more than the expires period for the consume + assertTrue(consumer.isFinished()); + }); + } }