Skip to content

Resubscribe to the blocks streaming #401

@bbr88

Description

@bbr88

Rationale

The current implementation does not consider reopening a subscription to the blocks streaming. In the case of a temporal network issue or if the node currently streaming blocks goes down, it's impossible to subscribe again. Attempts to resubscribe fail with the following exception:

jp.co.soramitsu.iroha2.IrohaSdkException: Flow#73dd19b2-d6c6-4c1a-b355-2fa191e0d6ac not found
	at jp.co.soramitsu.iroha2.client.blockstream.BlockStreamSubscription.receive(BlockStreamSubscription.kt:87)
	at jp.co.soramitsu.iroha2_adapter.service.IrohaService.initBlockStreamSubscription(IrohaService.java:205) ~[main/:na]
	at jdk.internal.reflect.GeneratedMethodAccessor39.invoke(Unknown Source) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-6.0.11.jar:6.0.11]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.0.11.jar:6.0.11]

Code snippet to subscribe to blocks streaming

val idToSubscription = iroha2AsyncClient.subscribeToBlockStream(latestBlockHeight.get(), Integer.MAX_VALUE);
val id = idToSubscription.getFirst().iterator().next().getId();
val subscription = idToSubscription.getSecond();
val publisher = ReactiveFlowKt.asPublisher(subscription.receive(id));

Flux.from(publisher)
        .subscribeOn(Schedulers.boundedElastic())
        .map(block -> (VersionedBlockMessage) block)
        .doOnNext(block -> {...})
        .doOnComplete(() -> {
            //never executed
        })
        .doOnCancel(() -> {
            //never executed
        })
        .doOnError(__ -> {
            //never executed
        })

Optional enhancement

No complete or cancel signal propagates to either of the //never executed blocks. Likely it happens since the coroutines-to-reactor adapter is used in the Java code. It would be great if the BlockStreamSubscription has isRunning() or a similar method.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions