Skip to content

Commit 5a5df2c

Browse files
committed
finishAndClose();
1 parent 48b41f3 commit 5a5df2c

File tree

3 files changed

+10
-13
lines changed

3 files changed

+10
-13
lines changed

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void pendingUpdated() {}
6767

6868
@Override
6969
public void heartbeatError() {
70-
finish(true);
70+
finishAndClose();
7171
}
7272

7373
@Override
@@ -85,7 +85,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
8585
if (m == null) {
8686
// if there are no messages in the internal cache AND there are no more pending,
8787
// they all have been read and we can go ahead and finish
88-
finish(true);
88+
finishAndClose();
8989
}
9090
return m;
9191
}
@@ -103,16 +103,15 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
103103
Message m = sub._nextUnmanagedNoWait(pullSubject);
104104
if (m == null) {
105105
// no message and no time left, go ahead and finish
106-
finish(true);
106+
finishAndClose();
107107
}
108108
return m;
109109
}
110110

111111
Message m = sub._nextUnmanaged(timeLeftNanos, pullSubject);
112112
if (m == null && isNoWaitNoExpires) {
113113
// no message and no wait, go ahead and finish
114-
finished.set(true);
115-
lenientClose();
114+
finishAndClose();
116115
}
117116
return m;
118117
}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void heartbeatError() {
5555
// just close the current sub and make another one.
5656
// this could go on endlessly - unless the user had called stop
5757
if (stopped.get()) {
58-
finish(true);
58+
finishAndClose();
5959
}
6060
else {
6161
lenientClose();
@@ -72,7 +72,7 @@ void doSub() throws JetStreamApiException, IOException {
7272
MessageHandler mh = userMessageHandler == null ? null : msg -> {
7373
userMessageHandler.onMessage(msg);
7474
if (stopped.get() && pmm.noMorePending()) {
75-
finish(false);
75+
finishAndClose();
7676
}
7777
};
7878
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
@@ -85,7 +85,7 @@ void doSub() throws JetStreamApiException, IOException {
8585
public void pendingUpdated() {
8686
if (stopped.get()) {
8787
if (pmm.noMorePending()) {
88-
finish(false);
88+
finishAndClose();
8989
}
9090
}
9191
else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))

src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,12 @@ public void close() throws Exception {
104104
lenientClose();
105105
}
106106

107-
protected void finish(boolean close) {
108-
finished.set(true);
107+
protected void finishAndClose() {
109108
if (pmm != null) {
110109
pmm.shutdownHeartbeatTimer();
111110
}
112-
if (close) {
113-
lenientClose();
114-
}
111+
finished.set(true);
112+
lenientClose();
115113
}
116114

117115
protected void lenientClose() {

0 commit comments

Comments
 (0)