|
46 | 46 | import io.netty.buffer.ByteBufAllocator;
|
47 | 47 | import io.netty.buffer.ByteBufAllocatorMetric;
|
48 | 48 | import io.netty.buffer.ByteBufAllocatorMetricProvider;
|
| 49 | +import io.netty.channel.ChannelOption; |
49 | 50 | import io.netty.channel.EventLoopGroup;
|
50 | 51 | import io.netty.channel.epoll.EpollEventLoopGroup;
|
51 | 52 | import io.netty.channel.epoll.EpollSocketChannel;
|
@@ -513,6 +514,27 @@ static class InstanceSyncOptions {
|
513 | 514 | converter = Utils.BackOffDelayPolicyTypeConverter.class)
|
514 | 515 | private BackOffDelayPolicy topologyBackOffDelayPolicy;
|
515 | 516 |
|
| 517 | + @CommandLine.Option( |
| 518 | + names = {"--tcp-send-buffer-size", "-tsbs"}, |
| 519 | + description = "TCP SO_SNDBUF option, default is platform value.", |
| 520 | + defaultValue = "0", |
| 521 | + converter = Utils.ByteCapacityTypeConverter.class) |
| 522 | + private ByteCapacity tcpSndBuff; |
| 523 | + |
| 524 | + @CommandLine.Option( |
| 525 | + names = {"--tcp-receive-buffer-size", "-trbs"}, |
| 526 | + description = "TCP SO_RCVBUF option, default is platform value.", |
| 527 | + defaultValue = "0", |
| 528 | + converter = Utils.ByteCapacityTypeConverter.class) |
| 529 | + private ByteCapacity tcpRcvBuff; |
| 530 | + |
| 531 | + @CommandLine.Option( |
| 532 | + names = {"--tcp-no-delay", "-tnd"}, |
| 533 | + description = "TCP NODELAY", |
| 534 | + arity = "1", |
| 535 | + defaultValue = "true") |
| 536 | + private boolean tcpNoDelay; |
| 537 | + |
516 | 538 | private MetricsCollector metricsCollector;
|
517 | 539 | private PerformanceMetrics performanceMetrics;
|
518 | 540 | private List<Monitoring> monitorings;
|
@@ -764,6 +786,18 @@ public Integer call() throws Exception {
|
764 | 786 | bootstrapCustomizer = b -> {};
|
765 | 787 | }
|
766 | 788 |
|
| 789 | + bootstrapCustomizer = |
| 790 | + bootstrapCustomizer.andThen( |
| 791 | + b -> { |
| 792 | + if (this.tcpSndBuff.toBytes() > 0) { |
| 793 | + b.option(ChannelOption.SO_SNDBUF, (int) this.tcpSndBuff.toBytes()); |
| 794 | + } |
| 795 | + if (this.tcpRcvBuff.toBytes() > 0) { |
| 796 | + b.option(ChannelOption.SO_RCVBUF, (int) this.tcpRcvBuff.toBytes()); |
| 797 | + } |
| 798 | + b.option(ChannelOption.TCP_NODELAY, this.tcpNoDelay); |
| 799 | + }); |
| 800 | + |
767 | 801 | EnvironmentBuilder environmentBuilder =
|
768 | 802 | Environment.builder()
|
769 | 803 | .id("stream-perf-test")
|
|
0 commit comments