-
Notifications
You must be signed in to change notification settings - Fork 40
Open
Labels
Description
Rationale
The current implementation does not consider reopening a subscription to the blocks streaming. In the case of a temporal network issue or if the node currently streaming blocks goes down, it's impossible to subscribe again. Attempts to resubscribe fail with the following exception:
jp.co.soramitsu.iroha2.IrohaSdkException: Flow#73dd19b2-d6c6-4c1a-b355-2fa191e0d6ac not found
at jp.co.soramitsu.iroha2.client.blockstream.BlockStreamSubscription.receive(BlockStreamSubscription.kt:87)
at jp.co.soramitsu.iroha2_adapter.service.IrohaService.initBlockStreamSubscription(IrohaService.java:205) ~[main/:na]
at jdk.internal.reflect.GeneratedMethodAccessor39.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-6.0.11.jar:6.0.11]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.0.11.jar:6.0.11]
Code snippet to subscribe to blocks streaming
val idToSubscription = iroha2AsyncClient.subscribeToBlockStream(latestBlockHeight.get(), Integer.MAX_VALUE);
val id = idToSubscription.getFirst().iterator().next().getId();
val subscription = idToSubscription.getSecond();
val publisher = ReactiveFlowKt.asPublisher(subscription.receive(id));
Flux.from(publisher)
.subscribeOn(Schedulers.boundedElastic())
.map(block -> (VersionedBlockMessage) block)
.doOnNext(block -> {...})
.doOnComplete(() -> {
//never executed
})
.doOnCancel(() -> {
//never executed
})
.doOnError(__ -> {
//never executed
})
Optional enhancement
No complete or cancel signal propagates to either of the //never executed
blocks. Likely it happens since the coroutines-to-reactor adapter is used in the Java code. It would be great if the BlockStreamSubscription
has isRunning() or a similar method.