Skip to content

Commit d70f234

Browse files
author
mizantrop2397
authored
Merge pull request #283 from art-community/feature/update-rsocket-version
update rsocket version
2 parents 8174d1a + 44ebbbe commit d70f234

File tree

6 files changed

+32
-31
lines changed

6 files changed

+32
-31
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ art {
4747
For bugs, questions and discussions please use the [Github Issues](https://github.com/art-community/art/issues).
4848

4949
Join us on Telegram: https://t.me/art_github
50+
5051
Join us on Discord: https://discord.gg/jaqxB38
5152

5253
## Documentation

application-rsocket/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ art {
3232

3333
dependencies {
3434
with(art.externalDependencyVersionsConfiguration) {
35-
embedded("io.rsocket", "rsocket-core", rsocketVersion)
35+
embedded("io.rsocket", "rsocket-core", "1.+")
3636
.exclude("io.netty")
3737
.exclude("io.projectreactor", "reactor-core")
3838
.exclude("org.slf4j")
39-
embedded("io.rsocket", "rsocket-transport-netty", rsocketVersion)
39+
embedded("io.rsocket", "rsocket-transport-netty", "1.+")
4040
.exclude("io.netty")
4141
.exclude("io.projectreactor", "reactor-core")
4242
.exclude("org.slf4j")

application-rsocket/src/main/java/ru/art/rsocket/communicator/RsocketCommunicator.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package ru.art.rsocket.communicator;
2020

2121
import io.rsocket.*;
22+
import io.rsocket.core.*;
2223
import io.rsocket.transport.netty.client.*;
2324
import lombok.*;
2425
import org.apache.logging.log4j.*;
@@ -33,7 +34,7 @@
3334
import ru.art.rsocket.exception.*;
3435
import ru.art.rsocket.model.*;
3536
import ru.art.service.model.*;
36-
import static io.rsocket.RSocketFactory.*;
37+
import static io.rsocket.core.RSocketConnector.create;
3738
import static java.text.MessageFormat.*;
3839
import static java.time.Duration.*;
3940
import static java.util.Optional.*;
@@ -78,30 +79,29 @@ public class RsocketCommunicator {
7879

7980
private RsocketCommunicator(RsocketCommunicationTargetConfiguration configuration) {
8081
dataFormat = configuration.dataFormat();
81-
ClientRSocketFactory factory = connect();
82+
RSocketConnector connector = create();
8283
if (configuration.resumable()) {
83-
factory = factory.resume()
84-
.resumeSessionDuration(ofMillis(configuration.resumeSessionDuration()))
85-
.resumeStreamTimeout(ofMillis(configuration.resumeStreamTimeout()));
84+
connector = connector.resume(new Resume()
85+
.sessionDuration(ofMillis(configuration.resumeSessionDuration()))
86+
.streamTimeout(ofMillis(configuration.resumeStreamTimeout()))
87+
);
8688
}
87-
rsocketModule().getClientInterceptors().forEach(factory::addRequesterPlugin);
88-
configuration.interceptors().forEach(factory::addRequesterPlugin);
89+
connector.interceptors(interceptorRegistry -> rsocketModule().getClientInterceptors().forEach(interceptorRegistry::forRequester));
90+
connector.interceptors(interceptorRegistry -> configuration.interceptors().forEach(interceptorRegistry::forRequester));
8991
switch (configuration.transport()) {
9092
case TCP:
91-
rsocketMono = factory.dataMimeType(toMimeType(configuration.dataFormat()))
93+
rsocketMono = connector.dataMimeType(toMimeType(configuration.dataFormat()))
9294
.metadataMimeType(toMimeType(configuration.dataFormat()))
93-
.transport(TcpClientTransport.create(configuration.host(), configuration.tcpPort()))
94-
.start()
95+
.connect(TcpClientTransport.create(configuration.host(), configuration.tcpPort()))
9596
.doOnNext(rsocketModuleState()::registerRsocket)
9697
.doOnSubscribe(subscription -> getLogger().info(format(RSOCKET_TCP_COMMUNICATOR_STARTED_MESSAGE, configuration.host(), configuration.tcpPort())));
9798
getLogger().info(format(RSOCKET_TCP_COMMUNICATOR_CREATED_MESSAGE, configuration.host(), configuration.tcpPort()));
9899
return;
99100
case WEB_SOCKET:
100-
rsocketMono = factory
101+
rsocketMono = connector
101102
.dataMimeType(toMimeType(configuration.dataFormat()))
102103
.metadataMimeType(toMimeType(configuration.dataFormat()))
103-
.transport(WebsocketClientTransport.create(configuration.host(), configuration.tcpPort()))
104-
.start()
104+
.connect(WebsocketClientTransport.create(configuration.host(), configuration.tcpPort()))
105105
.doOnNext(rsocketModuleState()::registerRsocket)
106106
.doOnSubscribe(subscription -> getLogger().info(format(RSOCKET_WS_COMMUNICATOR_STARTED_MESSAGE, configuration.host(), configuration.webSocketPort())));
107107
getLogger().info(format(RSOCKET_WS_COMMUNICATOR_CREATED_MESSAGE, configuration.host(), configuration.tcpPort()));

application-rsocket/src/main/java/ru/art/rsocket/configuration/RsocketModuleConfiguration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package ru.art.rsocket.configuration;
2020

2121
import io.rsocket.RSocketFactory.*;
22+
import io.rsocket.core.*;
2223
import io.rsocket.plugins.*;
2324
import io.rsocket.transport.netty.server.*;
2425
import lombok.*;
@@ -101,7 +102,7 @@ default RsocketCommunicationTargetConfiguration getCommunicationTargetConfigurat
101102

102103
Function<? extends TcpServerTransport, ? extends TcpServerTransport> getTcpServerTransportConfigurator();
103104

104-
Function<? extends ServerRSocketFactory, ? extends ServerRSocketFactory> getServerFactoryConfigurator();
105+
Function<? extends RSocketServer, ? extends RSocketServer> getServerConfigurator();
105106

106107
int getFragmentationMtu();
107108

@@ -127,7 +128,7 @@ class RsocketModuleDefaultConfiguration implements RsocketModuleConfiguration {
127128
private final Function<? extends TcpServer, ? extends TcpServer> tcpServerConfigurator = identity();
128129
private final Function<? extends WebsocketServerTransport, ? extends WebsocketServerTransport> webSocketServerTransportConfigurator = identity();
129130
private final Function<? extends TcpServerTransport, ? extends TcpServerTransport> tcpServerTransportConfigurator = identity();
130-
private final Function<? extends ServerRSocketFactory, ? extends ServerRSocketFactory> serverFactoryConfigurator = identity();
131+
private final Function<? extends RSocketServer, ? extends RSocketServer> serverConfigurator = identity();
131132
@Getter(lazy = true, onMethod = @__({@SuppressWarnings("unchecked")}))
132133
private final List<RSocketInterceptor> serverInterceptors = initializeInterceptors();
133134
@Getter(lazy = true, onMethod = @__({@SuppressWarnings("unchecked")}))

application-rsocket/src/main/java/ru/art/rsocket/server/RsocketServer.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package ru.art.rsocket.server;
2020

21+
import io.rsocket.core.*;
2122
import io.rsocket.transport.netty.server.*;
2223
import lombok.*;
2324
import org.apache.logging.log4j.Logger;
@@ -28,7 +29,6 @@
2829
import ru.art.rsocket.exception.*;
2930
import ru.art.rsocket.socket.*;
3031
import ru.art.rsocket.specification.*;
31-
import static io.rsocket.RSocketFactory.*;
3232
import static java.lang.System.*;
3333
import static java.lang.Thread.*;
3434
import static java.text.MessageFormat.*;
@@ -62,39 +62,37 @@ private RsocketServer(RsocketTransport transport) {
6262
}
6363

6464
private Mono<CloseableChannel> createServer() {
65-
ServerRSocketFactory socketFactory = receive().fragment(rsocketModule().getFragmentationMtu());
65+
RSocketServer server = RSocketServer.create().fragment(rsocketModule().getFragmentationMtu());
6666
if (rsocketModule().isResumableServer()) {
67-
socketFactory = socketFactory.resume()
68-
.resumeSessionDuration(ofMillis(rsocketModule().getServerResumeSessionDuration()))
69-
.resumeStreamTimeout(ofMillis(rsocketModule().getServerResumeStreamTimeout()));
67+
server.resume(new Resume()
68+
.sessionDuration(ofMillis(rsocketModule().getServerResumeSessionDuration()))
69+
.streamTimeout(ofMillis(rsocketModule().getServerResumeStreamTimeout())));
7070
}
71-
rsocketModule().getServerInterceptors().forEach(socketFactory::addResponderPlugin);
72-
ServerTransportAcceptor acceptor = rsocketModule()
73-
.getServerFactoryConfigurator()
74-
.apply(cast(socketFactory))
71+
server.interceptors(interceptorRegistry -> rsocketModule().getServerInterceptors().forEach(interceptorRegistry::forResponder));
72+
RSocketServer acceptor = rsocketModule()
73+
.getServerConfigurator()
74+
.apply(cast(server))
7575
.acceptor((setup, sendingSocket) -> just(new RsocketAcceptor(sendingSocket, setup)));
7676
Mono<CloseableChannel> channel;
7777
switch (transport) {
7878
case TCP:
79-
channel = acceptor.transport(rsocketModule().getTcpServerTransportConfigurator()
79+
channel = acceptor.bind(rsocketModule().getTcpServerTransportConfigurator()
8080
.apply(cast(TcpServerTransport.create(rsocketModule()
8181
.getTcpServerConfigurator()
8282
.apply(cast(TcpServer.create()
8383
.host(rsocketModule().getServerHost())
8484
.port(rsocketModule().getServerTcpPort())))))))
85-
.start()
8685
.onTerminateDetach();
8786
rsocketModuleState().setTcpServer(this);
8887
break;
8988
case WEB_SOCKET:
90-
channel = acceptor.transport(rsocketModule()
89+
channel = acceptor.bind(rsocketModule()
9190
.getWebSocketServerTransportConfigurator()
9291
.apply(cast(WebsocketServerTransport.create(rsocketModule()
9392
.getWebSocketServerConfigurator()
9493
.apply(cast(HttpServer.create()
9594
.host(rsocketModule().getServerHost())
9695
.port(rsocketModule().getServerWebSocketPort())))))))
97-
.start()
9896
.onTerminateDetach();
9997
rsocketModuleState().setWebSocketServer(this);
10098
break;

application-rsocket/src/main/java/ru/art/rsocket/socket/RsocketAcceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.rsocket.*;
2222
import org.reactivestreams.*;
2323
import reactor.core.publisher.*;
24+
import reactor.util.annotation.*;
2425
import ru.art.rsocket.flux.*;
2526
import ru.art.rsocket.model.*;
2627
import ru.art.rsocket.service.*;
@@ -39,7 +40,7 @@
3940
import static ru.art.rsocket.writer.ServiceResponsePayloadWriter.*;
4041
import static ru.art.service.ServiceController.*;
4142

42-
public class RsocketAcceptor extends AbstractRSocket {
43+
public class RsocketAcceptor implements RSocket {
4344
private final CurrentRsocketState state;
4445

4546
public RsocketAcceptor(RSocket socket, ConnectionSetupPayload setupPayload) {

0 commit comments

Comments
 (0)