Skip to content

Commit 2da6443

Browse files
committed
fix: ensure hasOngoingSendLoop.exitSafe()
1 parent 9f2edeb commit 2da6443

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) {
609609
LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread");
610610

611611
// Schedule directly
612-
scheduleSendJobInEventLoopIfNeeded(chan);
612+
scheduleSendJobInEventLoopIfNeeded(chan, false);
613613
}
614614

615615
private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
@@ -618,11 +618,6 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
618618
}
619619

620620
final EventLoop eventLoop = chan.eventLoop();
621-
if (eventLoop.inEventLoop()) {
622-
scheduleSendJobInEventLoopIfNeeded(chan);
623-
return;
624-
}
625-
626621
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) {
627622
// Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
628623
// 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
@@ -631,7 +626,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
631626
// 2. uses eventLoop.execute() directly
632627
// Avg latency: 3.2677197021496998s
633628
// Avg QPS: 476925.0751855796/s
634-
eventLoop.execute(() -> scheduleSendJobInEventLoopIfNeeded(chan));
629+
eventLoop.execute(() -> scheduleSendJobInEventLoopIfNeeded(chan, true));
635630
}
636631

637632
// Otherwise:
@@ -642,10 +637,13 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
642637
// second loopSend0(), which will call poll()
643638
}
644639

645-
private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan) {
640+
private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan, boolean enteredSafe) {
646641
// Guarantee only 1 send loop.
647-
if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterUnsafe()) {
642+
BatchFlushEndPointContext.HasOngoingSendLoop hasOngoingSendLoop = chan.context.batchFlushEndPointContext.hasOngoingSendLoop;
643+
if (hasOngoingSendLoop.tryEnterUnsafe()) {
648644
loopSend(chan);
645+
} else if (enteredSafe) {
646+
hasOngoingSendLoop.exitSafe();
649647
}
650648
}
651649

@@ -691,6 +689,8 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext
691689
batchFlushEndPointContext.hasOngoingSendLoop.exitSafe();
692690
// // Guarantee thread-safety: no dangling tasks in the queue.
693691
loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false);
692+
// chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100,
693+
// TimeUnit.NANOSECONDS);
694694
} else {
695695
// The send loop will be triggered later when a new task is added,
696696
batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe();

0 commit comments

Comments
 (0)