diff --git a/topic/src/main/java/tech/ydb/topic/read/SyncReader.java b/topic/src/main/java/tech/ydb/topic/read/SyncReader.java index fe02b91e7..28882a06d 100644 --- a/topic/src/main/java/tech/ydb/topic/read/SyncReader.java +++ b/topic/src/main/java/tech/ydb/topic/read/SyncReader.java @@ -22,10 +22,18 @@ public interface SyncReader { void initAndWait(); /** - * Receive a {@link Message}. Blocks until a Message is received. + * Return identifier of read session. + * @return current read session identifier or null if session has not started yet + */ + @Nullable + String getSessionId(); + + /** + * Receive a {@link Message}.Blocks until a Message is received. * * @param settings settings for receiving a Message * @return returns a {@link Message}, or null if the specified timeout time elapses before a message is available + * @throws java.lang.InterruptedException if current thread was interrupted */ Message receive(ReceiveSettings settings) throws InterruptedException; @@ -35,6 +43,7 @@ public interface SyncReader { * @param timeout timeout to wait a Message with * @param unit TimeUnit for timeout * @return returns a {@link Message}, or null if the specified waiting time elapses before a message is available + * @throws java.lang.InterruptedException if current thread was interrupted */ @Nullable default Message receive(long timeout, TimeUnit unit) throws InterruptedException { @@ -47,6 +56,7 @@ default Message receive(long timeout, TimeUnit unit) throws InterruptedException * Receive a {@link Message}. Blocks until a Message is received. * * @return {@link Message} + * @throws java.lang.InterruptedException if current thread was interrupted */ default Message receive() throws InterruptedException { return receive(ReceiveSettings.newBuilder().build()); diff --git a/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java b/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java index f97c80608..0397e3f7b 100644 --- a/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java +++ b/topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java @@ -1,5 +1,7 @@ package tech.ydb.topic.read.events; +import tech.ydb.topic.read.impl.events.SessionStartedEvent; + /** * @author Nikolay Perfilov */ @@ -22,4 +24,6 @@ default void onStopPartitionSession(StopPartitionSessionEvent event) { default void onPartitionSessionClosed(PartitionSessionClosedEvent event) { } default void onReaderClosed(ReaderClosedEvent event) { } + + default void onSessionStarted(SessionStartedEvent event) { } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 2cf21ba35..40762f381 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -27,6 +27,7 @@ import tech.ydb.topic.read.events.StopPartitionSessionEvent; import tech.ydb.topic.read.impl.events.CommitOffsetAcknowledgementEventImpl; import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl; +import tech.ydb.topic.read.impl.events.SessionStartedEvent; import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl; import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl; import tech.ydb.topic.settings.ReadEventHandlersSettings; @@ -76,6 +77,18 @@ public CompletableFuture updateOffsetsInTransaction(YdbTransaction trans return sendUpdateOffsetsInTransaction(transaction, offsets, settings); } + @Override + protected void handleSessionStarted(String sessionId) { + handlerExecutor.execute(() -> { + try { + eventHandler.onSessionStarted(new SessionStartedEvent(sessionId)); + } catch (Throwable th) { + logUserThrowableAndStopWorking(th, "onSessionStarted"); + throw th; + } + }); + } + @Override protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent event) { return CompletableFuture.runAsync(() -> { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 2cbe5d51d..33997a938 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -109,6 +109,7 @@ protected CompletableFuture initImpl() { } protected abstract CompletableFuture handleDataReceivedEvent(DataReceivedEvent event); + protected abstract void handleSessionStarted(String sessionId); protected abstract void handleCommitResponse(long committedOffset, PartitionSession partitionSession); protected abstract void handleStartPartitionSessionRequest( YdbTopic.StreamReadMessage.StartPartitionSessionRequest request, @@ -385,6 +386,7 @@ private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) { sizeBytesToRequest.set(settings.getMaxMemoryUsageBytes()); logger.info("[{}] Session {} initialized. Requesting {} bytes...", streamId, sessionId, settings.getMaxMemoryUsageBytes()); + handleSessionStarted(sessionId); sendReadRequest(); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index 6c3bd37cb..7c9a43ccf 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -39,6 +39,7 @@ public class SyncReaderImpl extends ReaderImpl implements SyncReader { private final ReentrantLock queueLock = new ReentrantLock(); private final Condition queueIsNotEmptyCondition = queueLock.newCondition(); private int currentMessageIndex = 0; + private volatile String sessionId = null; public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { super(topicRpc, settings); @@ -54,6 +55,11 @@ private MessageBatchWrapper(List messages, CompletableFuture futu } } + @Override + public String getSessionId() { + return sessionId; + } + @Override public void init() { initImpl(); @@ -160,6 +166,11 @@ protected CompletableFuture handleDataReceivedEvent(DataReceivedEvent even return resultFuture; } + @Override + protected void handleSessionStarted(String sessionId) { + this.sessionId = sessionId; + } + @Override protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) { if (logger.isDebugEnabled()) { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/events/SessionStartedEvent.java b/topic/src/main/java/tech/ydb/topic/read/impl/events/SessionStartedEvent.java new file mode 100644 index 000000000..b4aec30d3 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/read/impl/events/SessionStartedEvent.java @@ -0,0 +1,17 @@ +package tech.ydb.topic.read.impl.events; + +/** + * + * @author Aleksandr Gorshenin + */ +public class SessionStartedEvent { + private final String sessionId; + + public SessionStartedEvent(String sessionId) { + this.sessionId = sessionId; + } + + public String getSessionId() { + return this.sessionId; + } +}