|
19 | 19 | package ru.art.rsocket.communicator; |
20 | 20 |
|
21 | 21 | import io.rsocket.*; |
| 22 | +import io.rsocket.core.*; |
22 | 23 | import io.rsocket.transport.netty.client.*; |
23 | 24 | import lombok.*; |
24 | 25 | import org.apache.logging.log4j.*; |
|
33 | 34 | import ru.art.rsocket.exception.*; |
34 | 35 | import ru.art.rsocket.model.*; |
35 | 36 | import ru.art.service.model.*; |
36 | | -import static io.rsocket.RSocketFactory.*; |
| 37 | +import static io.rsocket.core.RSocketConnector.create; |
37 | 38 | import static java.text.MessageFormat.*; |
38 | 39 | import static java.time.Duration.*; |
39 | 40 | import static java.util.Optional.*; |
@@ -78,30 +79,29 @@ public class RsocketCommunicator { |
78 | 79 |
|
79 | 80 | private RsocketCommunicator(RsocketCommunicationTargetConfiguration configuration) { |
80 | 81 | dataFormat = configuration.dataFormat(); |
81 | | - ClientRSocketFactory factory = connect(); |
| 82 | + RSocketConnector connector = create(); |
82 | 83 | if (configuration.resumable()) { |
83 | | - factory = factory.resume() |
84 | | - .resumeSessionDuration(ofMillis(configuration.resumeSessionDuration())) |
85 | | - .resumeStreamTimeout(ofMillis(configuration.resumeStreamTimeout())); |
| 84 | + connector = connector.resume(new Resume() |
| 85 | + .sessionDuration(ofMillis(configuration.resumeSessionDuration())) |
| 86 | + .streamTimeout(ofMillis(configuration.resumeStreamTimeout())) |
| 87 | + ); |
86 | 88 | } |
87 | | - rsocketModule().getClientInterceptors().forEach(factory::addRequesterPlugin); |
88 | | - configuration.interceptors().forEach(factory::addRequesterPlugin); |
| 89 | + connector.interceptors(interceptorRegistry -> rsocketModule().getClientInterceptors().forEach(interceptorRegistry::forRequester)); |
| 90 | + connector.interceptors(interceptorRegistry -> configuration.interceptors().forEach(interceptorRegistry::forRequester)); |
89 | 91 | switch (configuration.transport()) { |
90 | 92 | case TCP: |
91 | | - rsocketMono = factory.dataMimeType(toMimeType(configuration.dataFormat())) |
| 93 | + rsocketMono = connector.dataMimeType(toMimeType(configuration.dataFormat())) |
92 | 94 | .metadataMimeType(toMimeType(configuration.dataFormat())) |
93 | | - .transport(TcpClientTransport.create(configuration.host(), configuration.tcpPort())) |
94 | | - .start() |
| 95 | + .connect(TcpClientTransport.create(configuration.host(), configuration.tcpPort())) |
95 | 96 | .doOnNext(rsocketModuleState()::registerRsocket) |
96 | 97 | .doOnSubscribe(subscription -> getLogger().info(format(RSOCKET_TCP_COMMUNICATOR_STARTED_MESSAGE, configuration.host(), configuration.tcpPort()))); |
97 | 98 | getLogger().info(format(RSOCKET_TCP_COMMUNICATOR_CREATED_MESSAGE, configuration.host(), configuration.tcpPort())); |
98 | 99 | return; |
99 | 100 | case WEB_SOCKET: |
100 | | - rsocketMono = factory |
| 101 | + rsocketMono = connector |
101 | 102 | .dataMimeType(toMimeType(configuration.dataFormat())) |
102 | 103 | .metadataMimeType(toMimeType(configuration.dataFormat())) |
103 | | - .transport(WebsocketClientTransport.create(configuration.host(), configuration.tcpPort())) |
104 | | - .start() |
| 104 | + .connect(WebsocketClientTransport.create(configuration.host(), configuration.tcpPort())) |
105 | 105 | .doOnNext(rsocketModuleState()::registerRsocket) |
106 | 106 | .doOnSubscribe(subscription -> getLogger().info(format(RSOCKET_WS_COMMUNICATOR_STARTED_MESSAGE, configuration.host(), configuration.webSocketPort()))); |
107 | 107 | getLogger().info(format(RSOCKET_WS_COMMUNICATOR_CREATED_MESSAGE, configuration.host(), configuration.tcpPort())); |
|
0 commit comments