Skip to content

Commit 97729f6

Browse files
author
Anton Bashirov
authored
Merge branch 'latest' into feature/stream-closing-fix
2 parents 4c33c33 + 63a53d2 commit 97729f6

File tree

5 files changed

+32
-33
lines changed

5 files changed

+32
-33
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ art {
4646
## Bugs and Feedback
4747
For bugs, questions and discussions please use the [Github Issues](https://github.com/art-community/art/issues).
4848

49-
Join us on Telegram: https://t.me/art_github
49+
Join us on Telegram: https://tx.me/art_github
50+
5051
Join us on Discord: https://discord.gg/jaqxB38
5152

5253
## Documentation

application-json/src/main/java/ru/art/json/descriptor/JsonEntityReader.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,13 +274,12 @@ private Collection<CollectionValue<Entity>> parseArraysArray(JsonParser parser)
274274
List<CollectionValue<Entity>> array = dynamicArrayOf();
275275
JsonToken currentToken = parser.currentToken();
276276
do {
277-
Collection<Entity> entities = dynamicArrayOf();
278277
if (currentToken != START_ARRAY) {
279-
entities = parseEntityArray(parser);
278+
array.add(entityCollection(parseEntityArray(parser)));
280279
}
281280
currentToken = parser.nextToken();
282-
if (currentToken == END_ARRAY) {
283-
array.add(entityCollection(entities));
281+
if (currentToken == END_ARRAY && isEmpty(array)) {
282+
array.add(entityCollection(dynamicArrayOf()));
284283
}
285284
} while (!parser.isClosed() && currentToken != END_ARRAY);
286285
return array;

application-rsocket/src/main/java/ru/art/rsocket/communicator/RsocketCommunicator.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package ru.art.rsocket.communicator;
2020

2121
import io.rsocket.*;
22+
import io.rsocket.core.*;
2223
import io.rsocket.transport.netty.client.*;
2324
import lombok.*;
2425
import org.apache.logging.log4j.*;
@@ -33,7 +34,7 @@
3334
import ru.art.rsocket.exception.*;
3435
import ru.art.rsocket.model.*;
3536
import ru.art.service.model.*;
36-
import static io.rsocket.RSocketFactory.*;
37+
import static io.rsocket.core.RSocketConnector.create;
3738
import static java.text.MessageFormat.*;
3839
import static java.time.Duration.*;
3940
import static java.util.Optional.*;
@@ -78,30 +79,29 @@ public class RsocketCommunicator {
7879

7980
private RsocketCommunicator(RsocketCommunicationTargetConfiguration configuration) {
8081
dataFormat = configuration.dataFormat();
81-
ClientRSocketFactory factory = connect();
82+
RSocketConnector connector = create();
8283
if (configuration.resumable()) {
83-
factory = factory.resume()
84-
.resumeSessionDuration(ofMillis(configuration.resumeSessionDuration()))
85-
.resumeStreamTimeout(ofMillis(configuration.resumeStreamTimeout()));
84+
connector = connector.resume(new Resume()
85+
.sessionDuration(ofMillis(configuration.resumeSessionDuration()))
86+
.streamTimeout(ofMillis(configuration.resumeStreamTimeout()))
87+
);
8688
}
87-
rsocketModule().getClientInterceptors().forEach(factory::addRequesterPlugin);
88-
configuration.interceptors().forEach(factory::addRequesterPlugin);
89+
connector.interceptors(interceptorRegistry -> rsocketModule().getClientInterceptors().forEach(interceptorRegistry::forRequester));
90+
connector.interceptors(interceptorRegistry -> configuration.interceptors().forEach(interceptorRegistry::forRequester));
8991
switch (configuration.transport()) {
9092
case TCP:
91-
rsocketMono = factory.dataMimeType(toMimeType(configuration.dataFormat()))
93+
rsocketMono = connector.dataMimeType(toMimeType(configuration.dataFormat()))
9294
.metadataMimeType(toMimeType(configuration.dataFormat()))
93-
.transport(TcpClientTransport.create(configuration.host(), configuration.tcpPort()))
94-
.start()
95+
.connect(TcpClientTransport.create(configuration.host(), configuration.tcpPort()))
9596
.doOnNext(rsocketModuleState()::registerRsocket)
9697
.doOnSubscribe(subscription -> getLogger().info(format(RSOCKET_TCP_COMMUNICATOR_STARTED_MESSAGE, configuration.host(), configuration.tcpPort())));
9798
getLogger().info(format(RSOCKET_TCP_COMMUNICATOR_CREATED_MESSAGE, configuration.host(), configuration.tcpPort()));
9899
return;
99100
case WEB_SOCKET:
100-
rsocketMono = factory
101+
rsocketMono = connector
101102
.dataMimeType(toMimeType(configuration.dataFormat()))
102103
.metadataMimeType(toMimeType(configuration.dataFormat()))
103-
.transport(WebsocketClientTransport.create(configuration.host(), configuration.tcpPort()))
104-
.start()
104+
.connect(WebsocketClientTransport.create(configuration.host(), configuration.tcpPort()))
105105
.doOnNext(rsocketModuleState()::registerRsocket)
106106
.doOnSubscribe(subscription -> getLogger().info(format(RSOCKET_WS_COMMUNICATOR_STARTED_MESSAGE, configuration.host(), configuration.webSocketPort())));
107107
getLogger().info(format(RSOCKET_WS_COMMUNICATOR_CREATED_MESSAGE, configuration.host(), configuration.tcpPort()));

application-rsocket/src/main/java/ru/art/rsocket/configuration/RsocketModuleConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package ru.art.rsocket.configuration;
2020

2121
import io.rsocket.RSocketFactory.*;
22+
import io.rsocket.core.*;
2223
import io.rsocket.plugins.*;
2324
import io.rsocket.transport.netty.server.*;
2425
import lombok.*;
@@ -101,7 +102,7 @@ default RsocketCommunicationTargetConfiguration getCommunicationTargetConfigurat
101102

102103
Function<? extends TcpServerTransport, ? extends TcpServerTransport> getTcpServerTransportConfigurator();
103104

104-
Function<? extends ServerRSocketFactory, ? extends ServerRSocketFactory> getServerFactoryConfigurator();
105+
Function<? extends RSocketServer, ? extends RSocketServer> getServerConfigurator();
105106

106107
int getFragmentationMtu();
107108

@@ -127,7 +128,7 @@ class RsocketModuleDefaultConfiguration implements RsocketModuleConfiguration {
127128
private final Function<? extends TcpServer, ? extends TcpServer> tcpServerConfigurator = identity();
128129
private final Function<? extends WebsocketServerTransport, ? extends WebsocketServerTransport> webSocketServerTransportConfigurator = identity();
129130
private final Function<? extends TcpServerTransport, ? extends TcpServerTransport> tcpServerTransportConfigurator = identity();
130-
private final Function<? extends ServerRSocketFactory, ? extends ServerRSocketFactory> serverFactoryConfigurator = identity();
131+
private final Function<? extends RSocketServer, ? extends RSocketServer> serverConfigurator = identity();
131132
@Getter(lazy = true, onMethod = @__({@SuppressWarnings("unchecked")}))
132133
private final List<RSocketInterceptor> serverInterceptors = initializeInterceptors();
133134
@Getter(lazy = true, onMethod = @__({@SuppressWarnings("unchecked")}))

application-rsocket/src/main/java/ru/art/rsocket/server/RsocketServer.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package ru.art.rsocket.server;
2020

21+
import io.rsocket.core.*;
2122
import io.rsocket.transport.netty.server.*;
2223
import lombok.*;
2324
import org.apache.logging.log4j.Logger;
@@ -28,7 +29,6 @@
2829
import ru.art.rsocket.exception.*;
2930
import ru.art.rsocket.socket.*;
3031
import ru.art.rsocket.specification.*;
31-
import static io.rsocket.RSocketFactory.*;
3232
import static java.lang.System.*;
3333
import static java.lang.Thread.*;
3434
import static java.text.MessageFormat.*;
@@ -62,39 +62,37 @@ private RsocketServer(RsocketTransport transport) {
6262
}
6363

6464
private Mono<CloseableChannel> createServer() {
65-
ServerRSocketFactory socketFactory = receive().fragment(rsocketModule().getFragmentationMtu());
65+
RSocketServer server = RSocketServer.create().fragment(rsocketModule().getFragmentationMtu());
6666
if (rsocketModule().isResumableServer()) {
67-
socketFactory = socketFactory.resume()
68-
.resumeSessionDuration(ofMillis(rsocketModule().getServerResumeSessionDuration()))
69-
.resumeStreamTimeout(ofMillis(rsocketModule().getServerResumeStreamTimeout()));
67+
server.resume(new Resume()
68+
.sessionDuration(ofMillis(rsocketModule().getServerResumeSessionDuration()))
69+
.streamTimeout(ofMillis(rsocketModule().getServerResumeStreamTimeout())));
7070
}
71-
rsocketModule().getServerInterceptors().forEach(socketFactory::addResponderPlugin);
72-
ServerTransportAcceptor acceptor = rsocketModule()
73-
.getServerFactoryConfigurator()
74-
.apply(cast(socketFactory))
71+
server.interceptors(interceptorRegistry -> rsocketModule().getServerInterceptors().forEach(interceptorRegistry::forResponder));
72+
RSocketServer acceptor = rsocketModule()
73+
.getServerConfigurator()
74+
.apply(cast(server))
7575
.acceptor((setup, sendingSocket) -> just(new RsocketAcceptor(sendingSocket, setup)));
7676
Mono<CloseableChannel> channel;
7777
switch (transport) {
7878
case TCP:
79-
channel = acceptor.transport(rsocketModule().getTcpServerTransportConfigurator()
79+
channel = acceptor.bind(rsocketModule().getTcpServerTransportConfigurator()
8080
.apply(cast(TcpServerTransport.create(rsocketModule()
8181
.getTcpServerConfigurator()
8282
.apply(cast(TcpServer.create()
8383
.host(rsocketModule().getServerHost())
8484
.port(rsocketModule().getServerTcpPort())))))))
85-
.start()
8685
.onTerminateDetach();
8786
rsocketModuleState().setTcpServer(this);
8887
break;
8988
case WEB_SOCKET:
90-
channel = acceptor.transport(rsocketModule()
89+
channel = acceptor.bind(rsocketModule()
9190
.getWebSocketServerTransportConfigurator()
9291
.apply(cast(WebsocketServerTransport.create(rsocketModule()
9392
.getWebSocketServerConfigurator()
9493
.apply(cast(HttpServer.create()
9594
.host(rsocketModule().getServerHost())
9695
.port(rsocketModule().getServerWebSocketPort())))))))
97-
.start()
9896
.onTerminateDetach();
9997
rsocketModuleState().setWebSocketServer(this);
10098
break;

0 commit comments

Comments
 (0)