Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 99f7621

Browse files
authored
Reduced object allocations in websocket gateway client (#193)
* Reduced object allocations in websocket gateway client * Removed Optional
1 parent 54bd46e commit 99f7621

File tree

5 files changed

+88
-84
lines changed

5 files changed

+88
-84
lines changed

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -108,36 +108,32 @@ private WebsocketGatewayClient(
108108

109109
@Override
110110
public Mono<ServiceMessage> requestResponse(ServiceMessage request) {
111-
return Mono.defer(
112-
() -> {
113-
long sid = sidCounter.incrementAndGet();
114-
return getOrConnect()
115-
.flatMap(
116-
session ->
117-
session
118-
.send(encodeRequest(request, sid))
119-
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
120-
.then(session.<ServiceMessage>newMonoProcessor(sid).asMono())
121-
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
122-
.doFinally(s -> session.removeProcessor(sid)));
123-
});
111+
return getOrConnect()
112+
.flatMap(
113+
session -> {
114+
long sid = sidCounter.incrementAndGet();
115+
return session
116+
.send(encodeRequest(request, sid))
117+
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
118+
.then(session.<ServiceMessage>newMonoProcessor(sid).asMono())
119+
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
120+
.doFinally(s -> session.removeProcessor(sid));
121+
});
124122
}
125123

126124
@Override
127125
public Flux<ServiceMessage> requestStream(ServiceMessage request) {
128-
return Flux.defer(
129-
() -> {
130-
long sid = sidCounter.incrementAndGet();
131-
return getOrConnect()
132-
.flatMapMany(
133-
session ->
134-
session
135-
.send(encodeRequest(request, sid))
136-
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
137-
.thenMany(session.<ServiceMessage>newUnicastProcessor(sid).asFlux())
138-
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
139-
.doFinally(s -> session.removeProcessor(sid)));
140-
});
126+
return getOrConnect()
127+
.flatMapMany(
128+
session -> {
129+
long sid = sidCounter.incrementAndGet();
130+
return session
131+
.send(encodeRequest(request, sid))
132+
.doOnSubscribe(s -> LOGGER.debug("Sending request {}", request))
133+
.thenMany(session.<ServiceMessage>newUnicastProcessor(sid).asFlux())
134+
.doOnCancel(() -> session.cancel(sid, request.qualifier()))
135+
.doFinally(s -> session.removeProcessor(sid));
136+
});
141137
}
142138

143139
@Override
@@ -161,7 +157,7 @@ private Mono<Void> doClose() {
161157

162158
private Mono<WebsocketGatewayClientSession> getOrConnect() {
163159
// noinspection unchecked
164-
return Mono.defer(() -> websocketMonoUpdater.updateAndGet(this, this::getOrConnect0));
160+
return websocketMonoUpdater.updateAndGet(this, this::getOrConnect0);
165161
}
166162

167163
private Mono<WebsocketGatewayClientSession> getOrConnect0(

services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@
1010
import io.scalecube.services.transport.api.ReferenceCountUtil;
1111
import java.nio.channels.ClosedChannelException;
1212
import java.util.Map;
13-
import java.util.Optional;
1413
import java.util.StringJoiner;
1514
import org.jctools.maps.NonBlockingHashMapLong;
1615
import org.slf4j.Logger;
1716
import org.slf4j.LoggerFactory;
1817
import reactor.core.publisher.Mono;
1918
import reactor.core.publisher.Sinks;
19+
import reactor.core.publisher.Sinks.Many;
20+
import reactor.core.publisher.Sinks.One;
2021
import reactor.netty.Connection;
2122
import reactor.netty.http.websocket.WebsocketInbound;
2223
import reactor.netty.http.websocket.WebsocketOutbound;
@@ -59,22 +60,26 @@ public final class WebsocketGatewayClientSession {
5960
try {
6061
message = codec.decode(byteBuf);
6162
} catch (Exception ex) {
62-
LOGGER.error("Response decoder failed: " + ex);
63+
LOGGER.error("Response decoder failed:", ex);
6364
return;
6465
}
6566

6667
// ignore messages w/o sid
6768
if (!message.headers().containsKey(STREAM_ID)) {
6869
LOGGER.error("Ignore response: {} with null sid, session={}", message, id);
69-
Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
70+
if (message.data() != null) {
71+
ReferenceCountUtil.safestRelease(message.data());
72+
}
7073
return;
7174
}
7275

7376
// processor?
7477
long sid = Long.parseLong(message.header(STREAM_ID));
7578
Object processor = inboundProcessors.get(sid);
7679
if (processor == null) {
77-
Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
80+
if (message.data() != null) {
81+
ReferenceCountUtil.safestRelease(message.data());
82+
}
7883
return;
7984
}
8085

@@ -88,24 +93,22 @@ public final class WebsocketGatewayClientSession {
8893

8994
@SuppressWarnings({"rawtypes", "unchecked"})
9095
<T> Sinks.One<T> newMonoProcessor(long sid) {
91-
return (Sinks.One)
92-
inboundProcessors.computeIfAbsent(
93-
sid,
94-
key -> {
95-
LOGGER.debug("Put sid={}, session={}", sid, id);
96-
return Sinks.one();
97-
});
96+
return (Sinks.One) inboundProcessors.computeIfAbsent(sid, this::newMonoProcessor0);
9897
}
9998

10099
@SuppressWarnings({"rawtypes", "unchecked"})
101100
<T> Sinks.Many<T> newUnicastProcessor(long sid) {
102-
return (Sinks.Many)
103-
inboundProcessors.computeIfAbsent(
104-
sid,
105-
key -> {
106-
LOGGER.debug("Put sid={}, session={}", sid, id);
107-
return Sinks.many().unicast().onBackpressureBuffer();
108-
});
101+
return (Sinks.Many) inboundProcessors.computeIfAbsent(sid, this::newUnicastProcessor0);
102+
}
103+
104+
private One<Object> newMonoProcessor0(long sid) {
105+
LOGGER.debug("Put sid={}, session={}", sid, id);
106+
return Sinks.one();
107+
}
108+
109+
private Many<Object> newUnicastProcessor0(long sid) {
110+
LOGGER.debug("Put sid={}, session={}", sid, id);
111+
return Sinks.many().unicast().onBackpressureBuffer();
109112
}
110113

111114
void removeProcessor(long sid) {
@@ -115,14 +118,7 @@ void removeProcessor(long sid) {
115118
}
116119

117120
Mono<Void> send(ByteBuf byteBuf) {
118-
return Mono.defer(
119-
() -> {
120-
// send with publisher (defer buffer cleanup to netty)
121-
return connection
122-
.outbound()
123-
.sendObject(Mono.just(byteBuf).map(TextWebSocketFrame::new), f -> true)
124-
.then();
125-
});
121+
return connection.outbound().sendObject(new TextWebSocketFrame(byteBuf)).then();
126122
}
127123

128124
void cancel(long sid, String qualifier) {
@@ -158,25 +154,29 @@ public Mono<Void> onClose() {
158154
}
159155

160156
private void handleResponse(ServiceMessage response, Object processor) {
161-
LOGGER.debug("Handle response: {}, session={}", response, id);
157+
if (LOGGER.isDebugEnabled()) {
158+
LOGGER.debug("Handle response: {}, session={}", response, id);
159+
}
162160

163161
try {
164-
Optional<Signal> signalOptional =
165-
Optional.ofNullable(response.header(SIGNAL)).map(Signal::from);
162+
Signal signal = null;
163+
final String header = response.header(SIGNAL);
164+
165+
if (header != null) {
166+
signal = Signal.from(header);
167+
}
166168

167-
if (!signalOptional.isPresent()) {
169+
if (signal == null) {
168170
// handle normal response
169171
emitNext(processor, response);
170172
} else {
171173
// handle completion signal
172-
Signal signal = signalOptional.get();
173174
if (signal == Signal.COMPLETE) {
174175
emitComplete(processor);
175176
}
176177
if (signal == Signal.ERROR) {
177178
// decode error data to retrieve real error cause
178-
ServiceMessage errorMessage = codec.decodeData(response, ErrorData.class);
179-
emitNext(processor, errorMessage);
179+
emitNext(processor, codec.decodeData(response, ErrorData.class));
180180
}
181181
}
182182
} catch (Exception e) {

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Map;
2727
import java.util.Map.Entry;
2828
import java.util.Objects;
29-
import java.util.Optional;
3029
import java.util.concurrent.atomic.AtomicBoolean;
3130
import java.util.concurrent.atomic.AtomicLong;
3231
import java.util.function.BiFunction;
@@ -185,23 +184,22 @@ private void onRequest(WebsocketGatewaySession session, ServiceMessage request,
185184
final long sid = getSid(request);
186185
final AtomicBoolean receivedError = new AtomicBoolean(false);
187186

188-
final Flux<ServiceMessage> serviceStream = serviceCall.requestMany(request);
187+
Flux<ServiceMessage> serviceStream = serviceCall.requestMany(request);
188+
final String limitRate = request.header(RATE_LIMIT_FIELD);
189+
serviceStream =
190+
limitRate != null ? serviceStream.limitRate(Integer.parseInt(limitRate)) : serviceStream;
189191

190192
Disposable disposable =
191193
session
192194
.send(
193-
Optional.ofNullable(request.header(RATE_LIMIT_FIELD))
194-
.map(Integer::valueOf)
195-
.map(serviceStream::limitRate)
196-
.orElse(serviceStream)
197-
.map(
198-
response -> {
199-
boolean isErrorResponse = response.isError();
200-
if (isErrorResponse) {
201-
receivedError.set(true);
202-
}
203-
return newResponseMessage(sid, response, isErrorResponse);
204-
}))
195+
serviceStream.map(
196+
response -> {
197+
boolean isErrorResponse = response.isError();
198+
if (isErrorResponse) {
199+
receivedError.set(true);
200+
}
201+
return newResponseMessage(sid, response, isErrorResponse);
202+
}))
205203
.doOnError(
206204
th -> {
207205
ReferenceCountUtil.safestRelease(request.data());

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Collections;
99
import java.util.HashMap;
1010
import java.util.Map;
11+
import java.util.function.Predicate;
1112
import org.jctools.maps.NonBlockingHashMapLong;
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
@@ -22,6 +23,8 @@ public final class WebsocketGatewaySession implements GatewaySession {
2223

2324
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewaySession.class);
2425

26+
private static final Predicate<Object> SEND_PREDICATE = f -> true;
27+
2528
private final Map<Long, Disposable> subscriptions = new NonBlockingHashMapLong<>(1024);
2629

2730
private final GatewaySessionHandler gatewayHandler;
@@ -118,7 +121,7 @@ public Mono<Void> send(Flux<ServiceMessage> messages) {
118121
this, frame.content(), response, (Context) context);
119122
return frame;
120123
}),
121-
f -> true)
124+
SEND_PREDICATE)
122125
.then()
123126
.doOnError(th -> gatewayHandler.onError(this, th, (Context) context));
124127
});
@@ -171,7 +174,9 @@ public boolean dispose(Long streamId) {
171174
Disposable disposable = subscriptions.remove(streamId);
172175
result = disposable != null;
173176
if (result) {
174-
LOGGER.debug("Dispose subscription by sid={}, session={}", streamId, sessionId);
177+
if (LOGGER.isDebugEnabled()) {
178+
LOGGER.debug("Dispose subscription by sid={}, session={}", streamId, sessionId);
179+
}
175180
disposable.dispose();
176181
}
177182
}
@@ -188,24 +193,28 @@ public boolean containsSid(Long streamId) {
188193
*
189194
* @param streamId stream id
190195
* @param disposable service subscription
191-
* @return true if disposable subscription was stored
192196
*/
193-
public boolean register(Long streamId, Disposable disposable) {
197+
public void register(Long streamId, Disposable disposable) {
194198
boolean result = false;
195199
if (!disposable.isDisposed()) {
196200
result = subscriptions.putIfAbsent(streamId, disposable) == null;
197201
}
198202
if (result) {
199-
LOGGER.debug("Registered subscription with sid={}, session={}", streamId, sessionId);
203+
if (LOGGER.isDebugEnabled()) {
204+
LOGGER.debug("Registered subscription with sid={}, session={}", streamId, sessionId);
205+
}
200206
}
201-
return result;
202207
}
203208

204209
private void clearSubscriptions() {
205210
if (subscriptions.size() > 1) {
206-
LOGGER.debug("Clear all {} subscriptions on session={}", subscriptions.size(), sessionId);
211+
if (LOGGER.isDebugEnabled()) {
212+
LOGGER.debug("Clear all {} subscriptions on session={}", subscriptions.size(), sessionId);
213+
}
207214
} else if (subscriptions.size() == 1) {
208-
LOGGER.debug("Clear 1 subscription on session={}", sessionId);
215+
if (LOGGER.isDebugEnabled()) {
216+
LOGGER.debug("Clear 1 subscription on session={}", sessionId);
217+
}
209218
}
210219
subscriptions.forEach((sid, disposable) -> disposable.dispose());
211220
subscriptions.clear();

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketServiceMessageCodec.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.InputStream;
2525
import java.io.OutputStream;
2626
import java.util.Map.Entry;
27-
import java.util.Optional;
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029

@@ -100,7 +99,9 @@ public ByteBuf encode(ServiceMessage message) throws MessageCodecException {
10099
generator.writeEndObject();
101100
} catch (Throwable ex) {
102101
ReferenceCountUtil.safestRelease(byteBuf);
103-
Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease);
102+
if (message.data() != null) {
103+
ReferenceCountUtil.safestRelease(message.data());
104+
}
104105
LOGGER.error("Failed to encode gateway service message: {}", message, ex);
105106
throw new MessageCodecException("Failed to encode gateway service message", ex);
106107
}

0 commit comments

Comments
 (0)