Skip to content

Commit 9f2edeb

Browse files
committed
perf: add busyLoop mode
1 parent 081b331 commit 9f2edeb

File tree

4 files changed

+70
-6
lines changed

4 files changed

+70
-6
lines changed

src/main/java/io/lettuce/core/AutoBatchFlushOptions.java

+40
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.io.Serializable;
44

5+
import io.lettuce.core.internal.LettuceAssert;
6+
57
/**
68
* Options for command timeouts. These options configure how and whether commands time out once they were dispatched. Command
79
* timeout begins:
@@ -24,16 +26,26 @@ public class AutoBatchFlushOptions implements Serializable {
2426

2527
public static final int DEFAULT_BATCH_SIZE = 8;
2628

29+
public static final boolean DEFAULT_USE_BUSY_LOOP = false;
30+
31+
public static final long DEFAULT_BUSY_LOOP_DELAY_IN_NANOS = 400;
32+
2733
private final boolean enableAutoBatchFlush;
2834

2935
private final int writeSpinCount;
3036

3137
private final int batchSize;
3238

39+
private final boolean busyLoop;
40+
41+
private final long busyLoopDelayInNanos;
42+
3343
public AutoBatchFlushOptions(AutoBatchFlushOptions.Builder builder) {
3444
this.enableAutoBatchFlush = builder.enableAutoBatchFlush;
3545
this.writeSpinCount = builder.writeSpinCount;
3646
this.batchSize = builder.batchSize;
47+
this.busyLoop = builder.busyLoop;
48+
this.busyLoopDelayInNanos = builder.busyLoopDelayInNanos;
3749
}
3850

3951
/**
@@ -61,6 +73,10 @@ public static class Builder {
6173

6274
private int batchSize = DEFAULT_BATCH_SIZE;
6375

76+
private boolean busyLoop = DEFAULT_USE_BUSY_LOOP;
77+
78+
private long busyLoopDelayInNanos = DEFAULT_BUSY_LOOP_DELAY_IN_NANOS;
79+
6480
/**
6581
* Enable auto batch flush.
6682
*
@@ -79,6 +95,8 @@ public Builder enableAutoBatchFlush(boolean enableAutoBatchFlush) {
7995
* @return {@code this}
8096
*/
8197
public Builder writeSpinCount(int writeSpinCount) {
98+
LettuceAssert.isPositive(writeSpinCount, "Write spin count must be greater 0");
99+
82100
this.writeSpinCount = writeSpinCount;
83101
return this;
84102
}
@@ -90,10 +108,24 @@ public Builder writeSpinCount(int writeSpinCount) {
90108
* @return {@code this}
91109
*/
92110
public Builder batchSize(int batchSize) {
111+
LettuceAssert.isPositive(batchSize, "Batch size must be greater 0");
112+
93113
this.batchSize = batchSize;
94114
return this;
95115
}
96116

117+
public Builder busyLoop(boolean busyLoop) {
118+
this.busyLoop = busyLoop;
119+
return this;
120+
}
121+
122+
public Builder busyLoopDelayInNanos(long busyLoopDelayInNanos) {
123+
LettuceAssert.isNonNegative(busyLoopDelayInNanos, "Busy loop delay must be greater 0");
124+
125+
this.busyLoopDelayInNanos = busyLoopDelayInNanos;
126+
return this;
127+
}
128+
97129
/**
98130
* Create a new instance of {@link AutoBatchFlushOptions}.
99131
*
@@ -126,4 +158,12 @@ public int getBatchSize() {
126158
return batchSize;
127159
}
128160

161+
public boolean isBusyLoop() {
162+
return busyLoop;
163+
}
164+
165+
public long getBusyLoopDelayInNanos() {
166+
return busyLoopDelayInNanos;
167+
}
168+
129169
}

src/main/java/io/lettuce/core/ContextualChannel.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import java.net.SocketAddress;
44

5+
import javax.annotation.Nonnull;
6+
57
import io.lettuce.core.context.ConnectionContext;
68
import io.netty.buffer.ByteBufAllocator;
79
import io.netty.channel.Channel;
@@ -15,7 +17,6 @@
1517
import io.netty.channel.EventLoop;
1618
import io.netty.util.Attribute;
1719
import io.netty.util.AttributeKey;
18-
import org.jetbrains.annotations.NotNull;
1920

2021
/**
2122
* @author chenxiaofan
@@ -250,7 +251,7 @@ public <T> boolean hasAttr(AttributeKey<T> attributeKey) {
250251
}
251252

252253
@Override
253-
public int compareTo(@NotNull Channel o) {
254+
public int compareTo(@Nonnull Channel o) {
254255
return this == o ? 0 : this.id().compareTo(o.id());
255256
}
256257

src/main/java/io/lettuce/core/internal/LettuceAssert.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,15 @@ public static void assertState(boolean condition, Supplier<String> messageSuppli
237237
}
238238
}
239239

240-
public static void isPositive(int writeSpinCount, String writeSpinCountIsNotPositive) {
241-
if (writeSpinCount <= 0) {
242-
throw new IllegalArgumentException(writeSpinCountIsNotPositive);
240+
public static void isPositive(int digit, String message) {
241+
if (digit <= 0) {
242+
throw new IllegalArgumentException(message);
243+
}
244+
}
245+
246+
public static void isNonNegative(long digit, String message) {
247+
if (digit < 0) {
248+
throw new IllegalArgumentException(message);
243249
}
244250
}
245251

src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
169169

170170
private final int batchSize;
171171

172+
private final boolean busyLoop;
173+
174+
private final long busyLoopDelayInNanos;
175+
172176
/**
173177
* Create a new {@link BatchFlushEndpoint}.
174178
*
@@ -197,6 +201,8 @@ protected DefaultBatchFlushEndpoint(ClientOptions clientOptions, ClientResources
197201
this.callbackOnClose = callbackOnClose;
198202
this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount();
199203
this.batchSize = clientOptions.getAutoBatchFlushOptions().getBatchSize();
204+
this.busyLoop = clientOptions.getAutoBatchFlushOptions().isBusyLoop();
205+
this.busyLoopDelayInNanos = clientOptions.getAutoBatchFlushOptions().getBusyLoopDelayInNanos();
200206
}
201207

202208
@Override
@@ -607,6 +613,10 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) {
607613
}
608614

609615
private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
616+
if (busyLoop) {
617+
return;
618+
}
619+
610620
final EventLoop eventLoop = chan.eventLoop();
611621
if (eventLoop.inEventLoop()) {
612622
scheduleSendJobInEventLoopIfNeeded(chan);
@@ -669,6 +679,13 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext
669679
return;
670680
}
671681

682+
if (busyLoop) {
683+
// Don't use chan.eventLoop().execute(), otherwise performance will drop, since the event loop
684+
// thread will trap within a certain time period.
685+
chan.eventLoop().schedule(() -> loopSend(chan), busyLoopDelayInNanos, TimeUnit.NANOSECONDS);
686+
return;
687+
}
688+
672689
if (firstCall) {
673690
// // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
674691
batchFlushEndPointContext.hasOngoingSendLoop.exitSafe();
@@ -683,7 +700,7 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext
683700
private int pollBatch(final BatchFlushEndPointContext batchFlushEndPointContext, ContextualChannel chan) {
684701
int count = 0;
685702
for (; count < batchSize; count++) {
686-
final RedisCommand<?, ?, ?> cmd = this.taskQueue.poll(); // relaxed poll is faster and we wil retry later anyway.
703+
final RedisCommand<?, ?, ?> cmd = this.taskQueue.poll();
687704
if (cmd == null) {
688705
break;
689706
}

0 commit comments

Comments
 (0)