Skip to content

Commit 081b331

Browse files
committed
fix: queue size not correct
1 parent 704b384 commit 081b331

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,8 @@ public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
245245
command = processActivationCommand(command);
246246
}
247247

248-
QUEUE_SIZE.incrementAndGet(this);
249248
this.taskQueue.offer(command);
249+
QUEUE_SIZE.incrementAndGet(this);
250250

251251
if (autoFlushCommands) {
252252
flushCommands();
@@ -788,6 +788,7 @@ private void offerFirstAll(Deque<RedisCommand<?, ?, ?>> commands) {
788788
}
789789
});
790790
this.taskQueue.offerFirstAll(commands);
791+
QUEUE_SIZE.addAndGet(this, commands.size());
791792
}
792793

793794
private void cancelCommands(String message) {
@@ -818,6 +819,7 @@ private final void fulfillCommands(String message, Consumer<RedisCommand<?, ?, ?
818819
}
819820
}
820821

822+
int cancelledTaskNumInTaskQueue = 0;
821823
while (true) {
822824
RedisCommand<?, ?, ?> cmd = this.taskQueue.poll();
823825
if (cmd == null) {
@@ -827,9 +829,12 @@ private final void fulfillCommands(String message, Consumer<RedisCommand<?, ?, ?
827829
cmd.getOutput().setError(message);
828830
}
829831
commandConsumer.accept(cmd);
832+
833+
cancelledTaskNumInTaskQueue++;
830834
totalCancelledTaskNum++;
831835
}
832836

837+
QUEUE_SIZE.addAndGet(this, -cancelledTaskNumInTaskQueue);
833838
if (totalCancelledTaskNum > 0) {
834839
logger.error("{} cancel {} pending tasks, reason: '{}'", logPrefix(), totalCancelledTaskNum, message);
835840
}

0 commit comments

Comments
 (0)