From 7ced5fba366cd73d0ae4278df047cc069b0009e6 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Wed, 18 Feb 2026 21:26:24 +0100 Subject: [PATCH 1/5] [hotfix][network] Fix LocalInputChannel.getBuffersInUseCount to include toBeConsumedBuffers --- .../partition/consumer/LocalInputChannel.java | 14 ++++-- .../consumer/LocalInputChannelTest.java | 49 +++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 5e90ed4625af9..a7628d8c52f62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -402,6 +402,13 @@ void releaseAllResources() throws IOException { view.releaseAllResources(); subpartitionView = null; } + + // Release any remaining buffers in toBeConsumedBuffers to avoid memory leak. + // These may be recovered buffers or partial buffers from FullyFilledBuffer. + for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) { + bufferAndBacklog.buffer().recycleBuffer(); + } + toBeConsumedBuffers.clear(); } } @@ -418,18 +425,19 @@ void announceBufferSize(int newBufferSize) { @Override int getBuffersInUseCount() { ResultSubpartitionView view = this.subpartitionView; - return view == null ? 0 : view.getNumberOfQueuedBuffers(); + return toBeConsumedBuffers.size() + (view == null ? 0 : view.getNumberOfQueuedBuffers()); } @Override public int unsynchronizedGetNumberOfQueuedBuffers() { ResultSubpartitionView view = subpartitionView; + int count = toBeConsumedBuffers.size(); if (view != null) { - return view.unsynchronizedGetNumberOfQueuedBuffers(); + count += view.unsynchronizedGetNumberOfQueuedBuffers(); } - return 0; + return count; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 298987c83f1ba..2e0be6ae7b829 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.disk.NoOpFileChannelManager; @@ -59,6 +61,7 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.ArrayDeque; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -664,6 +667,52 @@ void testReceivingBuffersInUseBeforeSubpartitionViewInitialization() throws Exce assertThat(localChannel.getBuffersInUseCount()).isEqualTo(3); } + @Test + void testGetBuffersInUseCountIncludesToBeConsumedBuffers() throws Exception { + // given: Local input channel with recovered buffers in toBeConsumedBuffers + ResultSubpartitionView subpartitionView = + InputChannelTestUtils.createResultSubpartitionView( + createFilledFinishedBufferConsumer(4096), + createFilledFinishedBufferConsumer(4096)); + TestingResultPartitionManager partitionManager = + new TestingResultPartitionManager(subpartitionView); + final SingleInputGate inputGate = createSingleInputGate(1); + + // Create 3 recovered buffers + ArrayDeque recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(32)); + recoveredBuffers.add(TestBufferFactory.createBuffer(32)); + recoveredBuffers.add(TestBufferFactory.createBuffer(32)); + + final LocalInputChannel localChannel = + new LocalInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + partitionManager, + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + ChannelStateWriter.NO_OP, + recoveredBuffers); + + inputGate.setInputChannels(localChannel); + + // then: Before requesting subpartitions, buffers in use should include recovered buffers + assertThat(localChannel.getBuffersInUseCount()).isEqualTo(3); + assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(3); + + // when: The subpartition view is initialized (adds 2 more buffers from the view) + localChannel.requestSubpartitions(); + + // then: Buffers in use should include both recovered and subpartition view buffers + assertThat(localChannel.getBuffersInUseCount()).isEqualTo(5); + assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(5); + } + // --------------------------------------------------------------------------------------------- /** Returns the configured number of buffers for each channel in a random order. */ From 7c9a38d6bfccc6d11dfb4da54d151436b5445d82 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Wed, 18 Feb 2026 21:26:32 +0100 Subject: [PATCH 2/5] [FLINK-39018][checkpoint] Support LocalInputChannel checkpoint snapshot for recovered buffers --- .../partition/consumer/LocalInputChannel.java | 12 +++++- .../consumer/LocalInputChannelTest.java | 43 +++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index a7628d8c52f62..616071c450b85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -46,7 +46,7 @@ import java.io.IOException; import java.util.ArrayDeque; -import java.util.Collections; +import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Optional; @@ -113,7 +113,15 @@ public LocalInputChannel( // ------------------------------------------------------------------------ public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException { - channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList()); + // Collect inflight buffers from toBeConsumedBuffers to be persisted. + // These are buffers that have not been consumed yet when the checkpoint barrier arrives. + List inflightBuffers = new ArrayList<>(); + for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) { + if (bufferAndBacklog.buffer().isBuffer()) { + inflightBuffers.add(bufferAndBacklog.buffer().retainBuffer()); + } + } + channelStatePersister.startPersisting(barrier.getId(), inflightBuffers); } public void checkpointStopped(long checkpointId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 2e0be6ae7b829..83a94e79cd143 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -713,6 +713,49 @@ void testGetBuffersInUseCountIncludesToBeConsumedBuffers() throws Exception { assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(5); } + @Test + void testCheckpointStartedPersistsRecoveredBuffers() throws Exception { + // given: Local input channel with recovered buffers + SingleInputGate inputGate = new SingleInputGateBuilder().build(); + + ArrayDeque recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(10)); + recoveredBuffers.add(TestBufferFactory.createBuffer(20)); + recoveredBuffers.add(TestBufferFactory.createBuffer(30)); + + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + + LocalInputChannel channel = + new LocalInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + new ResultPartitionManager(), + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + stateWriter, + recoveredBuffers); + + inputGate.setInputChannels(channel); + + // when: Checkpoint is started + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + stateWriter.start(1L, options); + CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, options); + channel.checkpointStarted(barrier); + + // then: All 3 recovered buffers should be persisted as inflight data + List persistedBuffers = stateWriter.getAddedInput().get(channel.getChannelInfo()); + assertThat(persistedBuffers).isNotNull().hasSize(3); + assertThat(persistedBuffers.stream().mapToInt(Buffer::getSize).toArray()) + .containsExactly(10, 20, 30); + } + // --------------------------------------------------------------------------------------------- /** Returns the configured number of buffers for each channel in a random order. */ From 16fbdbfed023d5128803508e27b6690fa9477d27 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Wed, 18 Feb 2026 21:26:32 +0100 Subject: [PATCH 3/5] [FLINK-39018][network] Fix LocalInputChannel priority event and buffer availability for recovered buffers --- .../partition/consumer/LocalInputChannel.java | 81 ++++++- .../consumer/LocalInputChannelTest.java | 205 ++++++++++++++++++ 2 files changed, 283 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 616071c450b85..2833adecb5813 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -80,6 +80,13 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit private final Deque toBeConsumedBuffers = new ArrayDeque<>(); + /** + * Flag indicating whether there is a pending priority event (e.g., checkpoint barrier) in the + * subpartitionView that should be consumed before toBeConsumedBuffers. This is set by {@link + * #notifyPriorityEvent} and checked in {@link #getNextBuffer()}. + */ + private volatile boolean hasPendingPriorityEvent = false; + public LocalInputChannel( SingleInputGate inputGate, int channelIndex, @@ -130,8 +137,6 @@ public void checkpointStopped(long checkpointId) { @Override protected void requestSubpartitions() throws IOException { - checkState(toBeConsumedBuffers.isEmpty()); - boolean retriggerRequest = false; boolean notifyDataAvailable = false; @@ -242,7 +247,7 @@ public Optional getNextBuffer() throws IOException { checkError(); if (!toBeConsumedBuffers.isEmpty()) { - return getBufferAndAvailability(toBeConsumedBuffers.removeFirst()); + return getNextRecoveredBuffer(); } ResultSubpartitionView subpartitionView = this.subpartitionView; @@ -304,6 +309,68 @@ public Optional getNextBuffer() throws IOException { return getBufferAndAvailability(next); } + /** + * Consumes the next buffer from toBeConsumedBuffers (recovered buffers), handling pending + * priority events and dynamic availability detection for the last recovered buffer. + */ + private Optional getNextRecoveredBuffer() throws IOException { + // If there is a pending priority event (e.g., unaligned checkpoint barrier), fetch it + // from subpartitionView first, skipping toBeConsumedBuffers. This ensures priority + // events are processed immediately even when there are pending recovered buffers. + if (hasPendingPriorityEvent) { + checkState(subpartitionView != null, "No subpartition view available"); + BufferAndBacklog next = subpartitionView.getNextBuffer(); + checkState( + next != null && next.buffer().getDataType().hasPriority(), + "Expected priority event, but got %s", + next == null ? "null" : next.buffer().getDataType()); + + // Check for barrier to update channel state persister. + // Note: maybePersist is not needed for barriers as they are not regular data buffers. + channelStatePersister.checkForBarrier(next.buffer()); + + Buffer.DataType expectedNextDataType = next.getNextDataType(); + if (!expectedNextDataType.hasPriority()) { + // Reset hasPendingPriorityEvent to false if no more priority event + hasPendingPriorityEvent = false; + if (!toBeConsumedBuffers.isEmpty()) { + // Correct nextDataType: if toBeConsumedBuffers is not empty, the actual next + // element to consume is from toBeConsumedBuffers, not from subpartitionView + expectedNextDataType = toBeConsumedBuffers.peek().buffer().getDataType(); + } + } + + return getBufferAndAvailability( + new BufferAndBacklog( + next.buffer(), + next.buffersInBacklog(), + expectedNextDataType, + next.getSequenceNumber())); + } + + BufferAndBacklog next = toBeConsumedBuffers.removeFirst(); + + // If this is the last recovered buffer and nextDataType is NONE, + // dynamically check if subpartitionView has data available. + // The last buffer's nextDataType was preset to NONE during construction, + // but subpartitionView may already have data available. + if (toBeConsumedBuffers.isEmpty() + && next.getNextDataType() == Buffer.DataType.NONE + && subpartitionView != null) { + ResultSubpartitionView.AvailabilityWithBacklog availability = + subpartitionView.getAvailabilityAndBacklog(true); + if (availability.isAvailable()) { + next = + new BufferAndBacklog( + next.buffer(), + availability.getBacklog(), + Buffer.DataType.DATA_BUFFER, + next.getSequenceNumber()); + } + } + return getBufferAndAvailability(next); + } + private Optional getBufferAndAvailability(BufferAndBacklog next) throws IOException { Buffer buffer = next.buffer(); @@ -339,6 +406,14 @@ public void notifyDataAvailable(ResultSubpartitionView view) { notifyChannelNonEmpty(); } + @Override + public void notifyPriorityEvent(int prioritySequenceNumber) { + // Set flag so that getNextBuffer() knows to fetch priority event from subpartitionView + // before consuming toBeConsumedBuffers. + hasPendingPriorityEvent = true; + super.notifyPriorityEvent(prioritySequenceNumber); + } + private ResultSubpartitionView checkAndWaitForSubpartitionView() { // synchronizing on the request lock means this blocks until the asynchronous request // for the partition view has been completed diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 83a94e79cd143..aeb765f79b96b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -756,6 +756,211 @@ void testCheckpointStartedPersistsRecoveredBuffers() throws Exception { .containsExactly(10, 20, 30); } + @Test + void testPriorityEventConsumedBeforeRecoveredBuffers() throws Exception { + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + ChannelAndSubpartition ctx = createChannelWithRecoveredBuffers(stateWriter, 10, 20); + + // when: A priority event (barrier) arrives while recovered buffers are still pending + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + CheckpointBarrier barrier = new CheckpointBarrier(1L, 0L, options); + ctx.subpartition.add(EventSerializer.toBufferConsumer(barrier, true)); + + ctx.channel.notifyPriorityEvent(0); + + // then: The first buffer returned should be the priority event (barrier), not recovered + // data + Optional firstResult = ctx.channel.getNextBuffer(); + assertThat(firstResult).isPresent(); + assertThat(firstResult.get().buffer().getDataType().hasPriority()).isTrue(); + + // And the next buffers should be the recovered data + Optional secondResult = ctx.channel.getNextBuffer(); + assertThat(secondResult).isPresent(); + assertThat(secondResult.get().buffer().isBuffer()).isTrue(); + assertThat(secondResult.get().buffer().getSize()).isEqualTo(10); + } + + @Test + void testPriorityEventFailsFastWhenSubpartitionViewIsNull() throws Exception { + // given: Local input channel with recovered buffers but NO subpartition view initialized + SingleInputGate inputGate = new SingleInputGateBuilder().build(); + + ArrayDeque recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(10)); + + LocalInputChannel channel = + new LocalInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + new ResultPartitionManager(), + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + ChannelStateWriter.NO_OP, + recoveredBuffers); + + inputGate.setInputChannels(channel); + // Do NOT call channel.requestSubpartitions() — subpartitionView stays null + + channel.notifyPriorityEvent(0); + + assertThatThrownBy(channel::getNextBuffer) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No subpartition view available"); + } + + @Test + void testPriorityEventFailsFastWhenNonPriorityBufferReturned() throws Exception { + ChannelAndSubpartition ctx = + createChannelWithRecoveredBuffers(ChannelStateWriter.NO_OP, 10); + + // Add a non-priority data buffer to the subpartition + ctx.subpartition.add(createFilledFinishedBufferConsumer(32)); + ctx.channel.notifyPriorityEvent(0); + + assertThatThrownBy(ctx.channel::getNextBuffer) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Expected priority event"); + } + + @Test + void testPriorityEventFailsFastWhenSubpartitionViewReturnsNull() throws Exception { + ChannelAndSubpartition ctx = + createChannelWithRecoveredBuffers(ChannelStateWriter.NO_OP, 10); + + // Do NOT add any buffer to the subpartition — getNextBuffer() returns null + ctx.channel.notifyPriorityEvent(0); + + assertThatThrownBy(ctx.channel::getNextBuffer) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Expected priority event, but got null"); + } + + @Test + void testMultipleConsecutivePriorityEvents() throws Exception { + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + ChannelAndSubpartition ctx = createChannelWithRecoveredBuffers(stateWriter, 10); + + // Add two priority events (barriers) to the subpartition + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + ctx.subpartition.add( + EventSerializer.toBufferConsumer(new CheckpointBarrier(1L, 0L, options), true)); + ctx.subpartition.add( + EventSerializer.toBufferConsumer(new CheckpointBarrier(2L, 0L, options), true)); + + ctx.channel.notifyPriorityEvent(0); + + // First getNextBuffer() should return the first barrier + Optional first = ctx.channel.getNextBuffer(); + assertThat(first).isPresent(); + assertThat(first.get().buffer().getDataType().hasPriority()).isTrue(); + assertThat(first.get().morePriorityEvents()).isTrue(); + + // Second getNextBuffer() should return the second barrier + Optional second = ctx.channel.getNextBuffer(); + assertThat(second).isPresent(); + assertThat(second.get().buffer().getDataType().hasPriority()).isTrue(); + + // Third getNextBuffer() should return the recovered data buffer + Optional third = ctx.channel.getNextBuffer(); + assertThat(third).isPresent(); + assertThat(third.get().buffer().isBuffer()).isTrue(); + assertThat(third.get().buffer().getSize()).isEqualTo(10); + } + + @Test + void testNextDataTypeCorrectedToRecoveredBufferType() throws Exception { + RecordingChannelStateWriter stateWriter = new RecordingChannelStateWriter(); + ChannelAndSubpartition ctx = createChannelWithRecoveredBuffers(stateWriter, 10); + + // Add a priority event followed by a data buffer in the subpartition + CheckpointOptions options = + CheckpointOptions.unaligned(CheckpointType.CHECKPOINT, getDefault()); + ctx.subpartition.add( + EventSerializer.toBufferConsumer(new CheckpointBarrier(1L, 0L, options), true)); + ctx.subpartition.add(createFilledFinishedBufferConsumer(32)); + + ctx.channel.notifyPriorityEvent(0); + + // getNextBuffer() returns the barrier + Optional result = ctx.channel.getNextBuffer(); + assertThat(result).isPresent(); + assertThat(result.get().buffer().getDataType().hasPriority()).isTrue(); + + // The nextDataType should be corrected to DATA_BUFFER (from toBeConsumedBuffers), + // not whatever the subpartitionView reports. + assertThat(result.get().morePriorityEvents()).isFalse(); + assertThat(result.get().moreAvailable()).isTrue(); + + // The next buffer should be the recovered data (not the subpartition data) + Optional next = ctx.channel.getNextBuffer(); + assertThat(next).isPresent(); + assertThat(next.get().buffer().isBuffer()).isTrue(); + assertThat(next.get().buffer().getSize()).isEqualTo(10); + } + + /** + * Creates a LocalInputChannel with recovered buffers and a live subpartition, ready for + * priority event tests. The channel has already called requestSubpartitions(). + */ + private static ChannelAndSubpartition createChannelWithRecoveredBuffers( + ChannelStateWriter stateWriter, int... recoveredBufferSizes) throws Exception { + SingleInputGate inputGate = new SingleInputGateBuilder().build(); + + PipelinedResultPartition parent = + (PipelinedResultPartition) + PartitionTestUtils.createPartition( + ResultPartitionType.PIPELINED, NoOpFileChannelManager.INSTANCE); + ResultSubpartition subpartition = parent.getAllPartitions()[0]; + ResultSubpartitionView subpartitionView = + subpartition.createReadView((ResultSubpartitionView view) -> {}); + + TestingResultPartitionManager partitionManager = + new TestingResultPartitionManager(subpartitionView); + + ArrayDeque recoveredBuffers = new ArrayDeque<>(); + for (int size : recoveredBufferSizes) { + recoveredBuffers.add(TestBufferFactory.createBuffer(size)); + } + + LocalInputChannel channel = + new LocalInputChannel( + inputGate, + 0, + parent.getPartitionId(), + new ResultSubpartitionIndexSet(0), + partitionManager, + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + stateWriter, + recoveredBuffers); + + inputGate.setInputChannels(channel); + channel.requestSubpartitions(); + + return new ChannelAndSubpartition(channel, subpartition); + } + + private static class ChannelAndSubpartition { + final LocalInputChannel channel; + final ResultSubpartition subpartition; + + ChannelAndSubpartition(LocalInputChannel channel, ResultSubpartition subpartition) { + this.channel = channel; + this.subpartition = subpartition; + } + } + // --------------------------------------------------------------------------------------------- /** Returns the configured number of buffers for each channel in a random order. */ From 8e8dc32a2a14ea15d66aeb038a58381d27ddcf61 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Thu, 12 Mar 2026 23:08:13 +0100 Subject: [PATCH 4/5] [FLINK-39018][checkpoint] Notify PriorityEvent to downstream task even if it is blocked to ensure the checkpoint barrier can be handled by downstream task Priority events (e.g. unaligned checkpoint barriers) must notify downstream even when the subpartition is blocked. During recovery, once the upstream output channel state is fully restored, a RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event blocks the subpartition to prevent the upstream from sending new data while the downstream is still consuming recovered buffers. The subpartition remains blocked until the downstream finishes consuming all recovered buffers from every channel and calls resumeConsumption() to unblock. If a checkpoint is triggered while the downstream is still consuming recovered buffers, the upstream receives an unaligned checkpoint barrier and adds it to this blocked subpartition. The barrier must still be delivered to the downstream immediately, otherwise the checkpoint will hang until it times out. --- .../partition/PipelinedSubpartition.java | 22 +++++- ...PipelinedSubpartitionWithReadViewTest.java | 74 +++++++++++++++++++ 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 2381720106aaa..2e4b674a3a3ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -252,8 +252,21 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial @GuardedBy("buffers") private boolean needNotifyPriorityEvent() { assert Thread.holdsLock(buffers); - // if subpartition is blocked then downstream doesn't expect any notifications - return buffers.getNumPriorityElements() == 1 && !isBlocked; + // Priority events (e.g. unaligned checkpoint barriers) must notify downstream even + // when the subpartition is blocked. + // + // During recovery, once the upstream output channel state is fully restored, a + // RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event + // blocks the subpartition to prevent the upstream from sending new data while the + // downstream is still consuming recovered buffers. The subpartition remains blocked + // until the downstream finishes consuming all recovered buffers from every channel + // and calls resumeConsumption() to unblock. + // + // If a checkpoint is triggered while the downstream is still consuming recovered + // buffers, the upstream receives an unaligned checkpoint barrier and adds it to this + // blocked subpartition. The barrier must still be delivered to the downstream + // immediately, otherwise the checkpoint will hang until it times out. + return buffers.getNumPriorityElements() == 1; } @GuardedBy("buffers") @@ -456,7 +469,10 @@ public void release() { @Nullable BufferAndBacklog pollBuffer() { synchronized (buffers) { - if (isBlocked) { + // When blocked (e.g. by RECOVERY_COMPLETION event), only allow priority buffers + // (e.g. unaligned checkpoint barriers) to be polled. Regular buffers remain blocked + // until resumeConsumption() is called. See needNotifyPriorityEvent() for details. + if (isBlocked && buffers.getNumPriorityElements() == 0) { return null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java index 1eebe25273a18..1705e7f520db6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java @@ -545,6 +545,80 @@ void testResumeBlockedEmptySubpartition() throws IOException, InterruptedExcepti assertNoNextBuffer(readView); } + @TestTemplate + void testPriorityEventBypassesBlockedSubpartition() throws Exception { + subpartition.setChannelStateWriter(ChannelStateWriter.NO_OP); + + // Block the subpartition by consuming an aligned checkpoint barrier + blockSubpartitionByCheckpoint(1); + assertThat(availablityListener.getNumPriorityEvents()).isZero(); + + // While blocked, add an unaligned checkpoint barrier (priority event). + // Even though isBlocked=true, the priority event notification should NOT + // be suppressed — priority events must bypass blocking. + CheckpointOptions options = + CheckpointOptions.unaligned( + CheckpointType.CHECKPOINT, + new CheckpointStorageLocationReference(new byte[] {0, 1, 2})); + BufferConsumer barrierBuffer = + EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true); + subpartition.add(barrierBuffer); + // Priority notification should fire immediately despite isBlocked=true + assertThat(availablityListener.getNumPriorityEvents()).isOne(); + + assertNextEvent( + readView, + barrierBuffer.getWrittenBytes(), + CheckpointBarrier.class, + false, + 0, + false, + true); + assertNoNextBuffer(readView); + } + + @TestTemplate + void testDataStillBlockedAfterPriorityEventBypasses() throws Exception { + final RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter(); + subpartition.setChannelStateWriter(channelStateWriter); + + // Block the subpartition + blockSubpartitionByCheckpoint(1); + subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); + assertNoNextBuffer(readView); + + // Add priority event while blocked — should notify and be pollable + CheckpointOptions options = + CheckpointOptions.unaligned( + CheckpointType.CHECKPOINT, + new CheckpointStorageLocationReference(new byte[] {0, 1, 2})); + channelStateWriter.start(0, options); + BufferConsumer barrierBuffer = + EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options), true); + subpartition.add(barrierBuffer); + assertThat(availablityListener.getNumPriorityEvents()).isOne(); + + // Recycle inflight buffer copies held by channel state writer + final List inflight = + channelStateWriter.getAddedOutput().get(subpartition.getSubpartitionInfo()); + assertThat(inflight).hasSize(1); + inflight.forEach(Buffer::recycleBuffer); + + assertNextEvent( + readView, + barrierBuffer.getWrittenBytes(), + CheckpointBarrier.class, + false, + 0, + false, + true); + assertNoNextBuffer(readView); + + // After resumeConsumption, data becomes available + readView.resumeConsumption(); + assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true); + } + // ------------------------------------------------------------------------ private void blockSubpartitionByCheckpoint(int numNotifications) From bb071b272d0ed7c7c7ff3537080a683d5a127d91 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Wed, 18 Feb 2026 21:26:32 +0100 Subject: [PATCH 5/5] [FLINK-39018][network] Buffer migration from RecoveredInputChannel to physical channels --- .../partition/consumer/LocalInputChannel.java | 28 ++++++++++- .../consumer/LocalRecoveredInputChannel.java | 8 +++- .../consumer/RecoveredInputChannel.java | 21 +++++++- .../consumer/RemoteInputChannel.java | 48 +++++++++++++++++-- .../consumer/RemoteRecoveredInputChannel.java | 8 +++- .../partition/consumer/SingleInputGate.java | 41 ++++++++++++---- .../consumer/UnknownInputChannel.java | 7 ++- ...asedPartitionRequestClientHandlerTest.java | 4 +- .../PartitionRequestRegistrationTest.java | 4 +- .../consumer/InputChannelBuilder.java | 7 ++- .../consumer/LocalInputChannelTest.java | 37 ++++++++++++++ .../consumer/RecoveredInputChannelTest.java | 5 +- .../consumer/RemoteInputChannelTest.java | 46 ++++++++++++++++++ .../SingleInputGateBenchmarkFactory.java | 7 ++- 14 files changed, 242 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 2833adecb5813..661e4b063c75f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -98,7 +98,8 @@ public LocalInputChannel( int maxBackoff, Counter numBytesIn, Counter numBuffersIn, - ChannelStateWriter stateWriter) { + ChannelStateWriter stateWriter, + ArrayDeque initialRecoveredBuffers) { super( inputGate, @@ -113,6 +114,31 @@ public LocalInputChannel( this.partitionManager = checkNotNull(partitionManager); this.taskEventPublisher = checkNotNull(taskEventPublisher); this.channelStatePersister = new ChannelStatePersister(stateWriter, getChannelInfo()); + + // Migrate recovered buffers from RecoveredInputChannel if provided. + // These buffers have been filtered but not yet consumed by the Task. + if (!initialRecoveredBuffers.isEmpty()) { + final int expectedCount = initialRecoveredBuffers.size(); + // Sequence number starts at Integer.MIN_VALUE, consistent with RecoveredInputChannel. + int seqNum = Integer.MIN_VALUE; + while (!initialRecoveredBuffers.isEmpty()) { + Buffer buffer = initialRecoveredBuffers.poll(); + // Determine next data type based on the next buffer in the queue + Buffer.DataType nextDataType = + initialRecoveredBuffers.isEmpty() + ? Buffer.DataType.NONE + : initialRecoveredBuffers.peek().getDataType(); + // buffersInBacklog is set to 0 as these are recovered buffers + BufferAndBacklog bufferAndBacklog = + new BufferAndBacklog(buffer, 0, nextDataType, seqNum++); + toBeConsumedBuffers.add(bufferAndBacklog); + } + checkState( + toBeConsumedBuffers.size() == expectedCount, + "Buffer migration failed: expected %s buffers but got %s", + expectedCount, + toBeConsumedBuffers.size()); + } } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java index 784444d63e8bd..bdde2244f38ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java @@ -19,11 +19,14 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; +import java.util.ArrayDeque; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -61,7 +64,7 @@ public class LocalRecoveredInputChannel extends RecoveredInputChannel { } @Override - protected InputChannel toInputChannelInternal() { + protected InputChannel toInputChannelInternal(ArrayDeque remainingBuffers) { return new LocalInputChannel( inputGate, getChannelIndex(), @@ -73,6 +76,7 @@ protected InputChannel toInputChannelInternal() { maxBackoff, numBytesIn, numBuffersIn, - channelStateWriter); + channelStateWriter, + remainingBuffers); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java index e809e952a2825..d2a7a07137df5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java @@ -111,7 +111,16 @@ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) { public final InputChannel toInputChannel() throws IOException { Preconditions.checkState( stateConsumedFuture.isDone(), "recovered state is not fully consumed"); - final InputChannel inputChannel = toInputChannelInternal(); + + // Extract remaining buffers before conversion. + // These buffers have been filtered but not yet consumed by the Task. + final ArrayDeque remainingBuffers; + synchronized (receivedBuffers) { + remainingBuffers = new ArrayDeque<>(receivedBuffers); + receivedBuffers.clear(); + } + + final InputChannel inputChannel = toInputChannelInternal(remainingBuffers); inputChannel.checkpointStopped(lastStoppedCheckpointId); return inputChannel; } @@ -121,7 +130,15 @@ public void checkpointStopped(long checkpointId) { this.lastStoppedCheckpointId = checkpointId; } - protected abstract InputChannel toInputChannelInternal() throws IOException; + /** + * Creates the physical InputChannel from this recovered channel. + * + * @param remainingBuffers buffers that have been filtered but not yet consumed by the Task. + * These buffers will be migrated to the new physical channel. + * @return the physical InputChannel (LocalInputChannel or RemoteInputChannel) + */ + protected abstract InputChannel toInputChannelInternal(ArrayDeque remainingBuffers) + throws IOException; CompletableFuture getStateConsumedFuture() { return stateConsumedFuture; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 3430196775c03..66a7d50014067 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -138,7 +138,8 @@ public RemoteInputChannel( int networkBuffersPerChannel, Counter numBytesIn, Counter numBuffersIn, - ChannelStateWriter stateWriter) { + ChannelStateWriter stateWriter, + ArrayDeque initialRecoveredBuffers) { super( inputGate, @@ -157,6 +158,29 @@ public RemoteInputChannel( this.connectionManager = checkNotNull(connectionManager); this.bufferManager = new BufferManager(inputGate.getMemorySegmentProvider(), this, 0); this.channelStatePersister = new ChannelStatePersister(stateWriter, getChannelInfo()); + + // Migrate recovered buffers from RecoveredInputChannel if provided. + // These buffers have been filtered but not yet consumed by the Task. + if (!initialRecoveredBuffers.isEmpty()) { + final int expectedCount = initialRecoveredBuffers.size(); + // Sequence number starts at Integer.MIN_VALUE, consistent with RecoveredInputChannel. + int seqNum = Integer.MIN_VALUE; + for (Buffer buffer : initialRecoveredBuffers) { + // subpartitionId is set to 0 for recovered buffers. This is correct because: + // 1) For single-subpartition channels, the only valid subpartition is 0. + // 2) For multi-subpartition channels (consumedSubpartitionIndexSet.size() > 1), + // RecoveryMetadata events embedded in the recovered buffer sequence track + // the actual subpartition context for proper routing. + SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, seqNum++, 0); + receivedBuffers.add(sequenceBuffer); + totalQueueSizeInBytes += buffer.getSize(); + } + checkState( + receivedBuffers.size() == expectedCount, + "Buffer migration failed: expected %s buffers but got %s", + expectedCount, + receivedBuffers.size()); + } } @VisibleForTesting @@ -239,9 +263,9 @@ protected boolean increaseBackoff() { @Override protected int peekNextBufferSubpartitionIdInternal() throws IOException { - checkPartitionRequestQueueInitialized(); - synchronized (receivedBuffers) { + checkReadability(); + final SequenceBuffer next = receivedBuffers.peek(); if (next != null) { @@ -254,12 +278,12 @@ protected int peekNextBufferSubpartitionIdInternal() throws IOException { @Override public Optional getNextBuffer() throws IOException { - checkPartitionRequestQueueInitialized(); - final SequenceBuffer next; final DataType nextDataType; synchronized (receivedBuffers) { + checkReadability(); + next = receivedBuffers.poll(); if (next != null) { @@ -879,6 +903,20 @@ public void onError(Throwable cause) { setError(cause); } + /** + * When receivedBuffers contains migrated buffers from RecoveredInputChannel, they can be read + * before requestSubpartitions(). In that case only check for errors. Once migrated buffers are + * drained, require full client initialization check. + */ + private void checkReadability() throws IOException { + assert Thread.holdsLock(receivedBuffers); + if (receivedBuffers.isEmpty()) { + checkPartitionRequestQueueInitialized(); + } else { + checkError(); + } + } + private void checkPartitionRequestQueueInitialized() throws IOException { checkError(); checkState( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java index cbaddbcbfa0b9..2cfff6f5e7972 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteRecoveredInputChannel.java @@ -20,11 +20,13 @@ import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import java.io.IOException; +import java.util.ArrayDeque; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -66,7 +68,8 @@ public class RemoteRecoveredInputChannel extends RecoveredInputChannel { } @Override - protected InputChannel toInputChannelInternal() throws IOException { + protected InputChannel toInputChannelInternal(ArrayDeque remainingBuffers) + throws IOException { RemoteInputChannel remoteInputChannel = new RemoteInputChannel( inputGate, @@ -81,7 +84,8 @@ protected InputChannel toInputChannelInternal() throws IOException { networkBuffersPerChannel, numBytesIn, numBuffersIn, - channelStateWriter); + channelStateWriter, + remainingBuffers); remoteInputChannel.setup(); return remoteInputChannel; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index d845eef629404..2847e36fcc2b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -375,6 +375,10 @@ public void requestPartitions() { } } + /** + * Converts all {@link RecoveredInputChannel}s to their real channel types ({@link + * LocalInputChannel} or {@link RemoteInputChannel}). + */ @VisibleForTesting public void convertRecoveredInputChannels() { LOG.debug("Converting recovered input channels ({} channels)", getNumberOfInputChannels()); @@ -384,19 +388,40 @@ public void convertRecoveredInputChannels() { new HashSet<>(inputChannelsForCurrentPartition.keySet()); for (InputChannelInfo inputChannelInfo : oldInputChannelInfos) { InputChannel inputChannel = inputChannelsForCurrentPartition.get(inputChannelInfo); - if (inputChannel instanceof RecoveredInputChannel) { - try { - InputChannel realInputChannel = - ((RecoveredInputChannel) inputChannel).toInputChannel(); - inputChannel.releaseAllResources(); + if (!(inputChannel instanceof RecoveredInputChannel)) { + continue; + } + try { + // Phase 1: Convert channel and release resources outside the lock. + // These calls may acquire the receivedBuffers lock internally, so they + // run outside inputChannelsWithData lock to maintain a consistent lock + // order with onRecoveredStateBuffer() which acquires receivedBuffers + // first and then inputChannelsWithData. + InputChannel realInputChannel = + ((RecoveredInputChannel) inputChannel).toInputChannel(); + inputChannel.releaseAllResources(); + int buffersInUseCount = realInputChannel.getBuffersInUseCount(); + + // Phase 2: Atomically update data structures under the lock. + synchronized (inputChannelsWithData) { + if (inputChannelsWithData.contains(inputChannel)) { + inputChannelsWithData.getAndRemove(ch -> ch == inputChannel); + } + enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex()); + inputChannelsForCurrentPartition.remove(inputChannelInfo); inputChannelsForCurrentPartition.put( realInputChannel.getChannelInfo(), realInputChannel); channels[inputChannel.getChannelIndex()] = realInputChannel; - } catch (Throwable t) { - inputChannel.setError(t); - return; + + if (buffersInUseCount > 0) { + inputChannelsWithData.add(realInputChannel); + enqueuedInputChannelsWithData.set(realInputChannel.getChannelIndex()); + } } + } catch (Throwable t) { + inputChannel.setError(t); + return; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java index 2ff8aa73bcdb7..15182cedadb9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java @@ -35,6 +35,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayDeque; import java.util.Optional; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY; @@ -183,7 +184,8 @@ public RemoteInputChannel toRemoteInputChannel( networkBuffersPerChannel, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter(), - channelStateWriter == null ? ChannelStateWriter.NO_OP : channelStateWriter); + channelStateWriter == null ? ChannelStateWriter.NO_OP : channelStateWriter, + new ArrayDeque<>()); } public LocalInputChannel toLocalInputChannel(ResultPartitionID resultPartitionID) { @@ -198,7 +200,8 @@ public LocalInputChannel toLocalInputChannel(ResultPartitionID resultPartitionID maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter(), - channelStateWriter == null ? ChannelStateWriter.NO_OP : channelStateWriter); + channelStateWriter == null ? ChannelStateWriter.NO_OP : channelStateWriter, + new ArrayDeque<>()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java index 48b29aeaeaa86..d96ed78b6a031 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java @@ -68,6 +68,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayDeque; import java.util.stream.Stream; import static org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel; @@ -951,7 +952,8 @@ private static class TestRemoteInputChannelForError extends RemoteInputChannel { 2, new SimpleCounter(), new SimpleCounter(), - ChannelStateWriter.NO_OP); + ChannelStateWriter.NO_OP, + new ArrayDeque<>()); this.expectedMessage = expectedMessage; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestRegistrationTest.java index 3b590d3a25659..e3cfb55e3400f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestRegistrationTest.java @@ -44,6 +44,7 @@ import org.junit.jupiter.api.Test; +import java.util.ArrayDeque; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -248,7 +249,8 @@ private static class TestRemoteInputChannelForPartitionNotFound extends RemoteIn 2, new SimpleCounter(), new SimpleCounter(), - ChannelStateWriter.NO_OP); + ChannelStateWriter.NO_OP, + new ArrayDeque<>()); this.latch = latch; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java index f5e810bb60591..08f65d9fe7265 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import java.net.InetSocketAddress; +import java.util.ArrayDeque; import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.TestingResultPartitionManager; @@ -164,7 +165,8 @@ public LocalInputChannel buildLocalChannel(SingleInputGate inputGate) { maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter(), - stateWriter); + stateWriter, + new ArrayDeque<>()); } public RemoteInputChannel buildRemoteChannel(SingleInputGate inputGate) { @@ -181,7 +183,8 @@ public RemoteInputChannel buildRemoteChannel(SingleInputGate inputGate) { networkBuffersPerChannel, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter(), - stateWriter); + stateWriter, + new ArrayDeque<>()); } public LocalRecoveredInputChannel buildLocalRecoveredChannel(SingleInputGate inputGate) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index aeb765f79b96b..86bda9866d204 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -713,6 +713,43 @@ void testGetBuffersInUseCountIncludesToBeConsumedBuffers() throws Exception { assertThat(localChannel.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(5); } + @Test + void testGetNextBufferWithMigratedRecoveredBuffers() throws Exception { + // given: LocalInputChannel with recovered buffers migrated from RecoveredInputChannel + SingleInputGate inputGate = createSingleInputGate(1); + + ArrayDeque recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(10)); + recoveredBuffers.add(TestBufferFactory.createBuffer(20)); + + LocalInputChannel channel = + new LocalInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + new ResultPartitionManager(), + new TaskEventDispatcher(), + 0, + 0, + new SimpleCounter(), + new SimpleCounter(), + ChannelStateWriter.NO_OP, + recoveredBuffers); + + inputGate.setInputChannels(channel); + + // then: Can read recovered buffers even before requestSubpartitions() + Optional first = channel.getNextBuffer(); + assertThat(first).isPresent(); + assertThat(first.get().buffer().getSize()).isEqualTo(10); + assertThat(first.get().moreAvailable()).isTrue(); + + Optional second = channel.getNextBuffer(); + assertThat(second).isPresent(); + assertThat(second.get().buffer().getSize()).isEqualTo(20); + } + @Test void testCheckpointStartedPersistsRecoveredBuffers() throws Exception { // given: Local input channel with recovered buffers diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java index ab7dd142c1eb2..5985a81e8ca86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannelTest.java @@ -22,11 +22,14 @@ import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import org.junit.jupiter.api.Test; +import java.util.ArrayDeque; + import static org.apache.flink.runtime.checkpoint.CheckpointOptions.unaligned; import static org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -74,7 +77,7 @@ private RecoveredInputChannel buildChannel() { new SimpleCounter(), 10) { @Override - protected InputChannel toInputChannelInternal() { + protected InputChannel toInputChannelInternal(ArrayDeque remainingBuffers) { throw new AssertionError("channel conversion succeeded"); } }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 4f5abdd4271e1..e47de93c9e8bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -21,6 +21,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; @@ -2073,6 +2074,51 @@ void verifyResult( } } + @Test + void testGetNextBufferWithMigratedRecoveredBuffers() throws Exception { + // given: RemoteInputChannel with recovered buffers migrated from RecoveredInputChannel + SingleInputGate inputGate = createSingleInputGate(1); + + ArrayDeque recoveredBuffers = new ArrayDeque<>(); + recoveredBuffers.add(TestBufferFactory.createBuffer(10)); + recoveredBuffers.add(TestBufferFactory.createBuffer(20)); + + ConnectionID connectionId = + new ConnectionID( + org.apache.flink.runtime.clusterframework.types.ResourceID.generate(), + new java.net.InetSocketAddress("localhost", 0), + 0); + RemoteInputChannel channel = + new RemoteInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultSubpartitionIndexSet(0), + connectionId, + InputChannelTestUtils.mockConnectionManagerWithPartitionRequestClient( + mock(PartitionRequestClient.class)), + 0, + 0, + 0, + 2, + new SimpleCounter(), + new SimpleCounter(), + ChannelStateWriter.NO_OP, + recoveredBuffers); + + inputGate.setInputChannels(channel); + + // then: Can read recovered buffers even before requestSubpartitions() + Optional first = channel.getNextBuffer(); + assertThat(first).isPresent(); + assertThat(first.get().buffer().getSize()).isEqualTo(10); + assertThat(first.get().moreAvailable()).isTrue(); + + Optional second = channel.getNextBuffer(); + assertThat(second).isPresent(); + assertThat(second.get().buffer().getSize()).isEqualTo(20); + } + private static final class TestBufferPool extends NoOpBufferPool { @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java index 49ecb6c6645dc..b850a7cc55370 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/SingleInputGateBenchmarkFactory.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; import java.io.IOException; +import java.util.ArrayDeque; /** * A benchmark-specific input gate factory which overrides the respective methods of creating {@link @@ -128,7 +129,8 @@ public TestLocalInputChannel( maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter(), - ChannelStateWriter.NO_OP); + ChannelStateWriter.NO_OP, + new ArrayDeque<>()); } @Override @@ -183,7 +185,8 @@ public TestRemoteInputChannel( networkBuffersPerChannel, metrics.getNumBytesInRemoteCounter(), metrics.getNumBuffersInRemoteCounter(), - ChannelStateWriter.NO_OP); + ChannelStateWriter.NO_OP, + new ArrayDeque<>()); } @Override