Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,21 @@ private boolean processPriorityBuffer(BufferConsumer bufferConsumer, int partial
@GuardedBy("buffers")
private boolean needNotifyPriorityEvent() {
assert Thread.holdsLock(buffers);
// if subpartition is blocked then downstream doesn't expect any notifications
return buffers.getNumPriorityElements() == 1 && !isBlocked;
// Priority events (e.g. unaligned checkpoint barriers) must notify downstream even
// when the subpartition is blocked.
//
// During recovery, once the upstream output channel state is fully restored, a
// RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event
// blocks the subpartition to prevent the upstream from sending new data while the
// downstream is still consuming recovered buffers. The subpartition remains blocked
// until the downstream finishes consuming all recovered buffers from every channel
// and calls resumeConsumption() to unblock.
//
// If a checkpoint is triggered while the downstream is still consuming recovered
// buffers, the upstream receives an unaligned checkpoint barrier and adds it to this
// blocked subpartition. The barrier must still be delivered to the downstream
// immediately, otherwise the checkpoint will hang until it times out.
return buffers.getNumPriorityElements() == 1;
Comment on lines +255 to +269
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explaining why Priority events (e.g. unaligned checkpoint barriers) must notify downstream even when the subpartition is blocked.

}

@GuardedBy("buffers")
Expand Down Expand Up @@ -456,7 +469,10 @@ public void release() {
@Nullable
BufferAndBacklog pollBuffer() {
synchronized (buffers) {
if (isBlocked) {
// When blocked (e.g. by RECOVERY_COMPLETION event), only allow priority buffers
// (e.g. unaligned checkpoint barriers) to be polled. Regular buffers remain blocked
// until resumeConsumption() is called. See needNotifyPriorityEvent() for details.
if (isBlocked && buffers.getNumPriorityElements() == 0) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -80,6 +80,13 @@ public class LocalInputChannel extends InputChannel implements BufferAvailabilit

private final Deque<BufferAndBacklog> toBeConsumedBuffers = new ArrayDeque<>();

/**
* Flag indicating whether there is a pending priority event (e.g., checkpoint barrier) in the
* subpartitionView that should be consumed before toBeConsumedBuffers. This is set by {@link
* #notifyPriorityEvent} and checked in {@link #getNextBuffer()}.
*/
private volatile boolean hasPendingPriorityEvent = false;

public LocalInputChannel(
SingleInputGate inputGate,
int channelIndex,
Expand All @@ -91,7 +98,8 @@ public LocalInputChannel(
int maxBackoff,
Counter numBytesIn,
Counter numBuffersIn,
ChannelStateWriter stateWriter) {
ChannelStateWriter stateWriter,
ArrayDeque<Buffer> initialRecoveredBuffers) {

super(
inputGate,
Expand All @@ -106,14 +114,47 @@ public LocalInputChannel(
this.partitionManager = checkNotNull(partitionManager);
this.taskEventPublisher = checkNotNull(taskEventPublisher);
this.channelStatePersister = new ChannelStatePersister(stateWriter, getChannelInfo());

// Migrate recovered buffers from RecoveredInputChannel if provided.
// These buffers have been filtered but not yet consumed by the Task.
if (!initialRecoveredBuffers.isEmpty()) {
final int expectedCount = initialRecoveredBuffers.size();
// Sequence number starts at Integer.MIN_VALUE, consistent with RecoveredInputChannel.
int seqNum = Integer.MIN_VALUE;
while (!initialRecoveredBuffers.isEmpty()) {
Buffer buffer = initialRecoveredBuffers.poll();
// Determine next data type based on the next buffer in the queue
Buffer.DataType nextDataType =
initialRecoveredBuffers.isEmpty()
? Buffer.DataType.NONE
: initialRecoveredBuffers.peek().getDataType();
// buffersInBacklog is set to 0 as these are recovered buffers
BufferAndBacklog bufferAndBacklog =
new BufferAndBacklog(buffer, 0, nextDataType, seqNum++);
toBeConsumedBuffers.add(bufferAndBacklog);
}
checkState(
toBeConsumedBuffers.size() == expectedCount,
"Buffer migration failed: expected %s buffers but got %s",
expectedCount,
toBeConsumedBuffers.size());
}
}

// ------------------------------------------------------------------------
// Consume
// ------------------------------------------------------------------------

public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList());
// Collect inflight buffers from toBeConsumedBuffers to be persisted.
// These are buffers that have not been consumed yet when the checkpoint barrier arrives.
List<Buffer> inflightBuffers = new ArrayList<>();
for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) {
if (bufferAndBacklog.buffer().isBuffer()) {
inflightBuffers.add(bufferAndBacklog.buffer().retainBuffer());
}
}
channelStatePersister.startPersisting(barrier.getId(), inflightBuffers);
}

public void checkpointStopped(long checkpointId) {
Expand All @@ -122,8 +163,6 @@ public void checkpointStopped(long checkpointId) {

@Override
protected void requestSubpartitions() throws IOException {
checkState(toBeConsumedBuffers.isEmpty());

boolean retriggerRequest = false;
boolean notifyDataAvailable = false;

Expand Down Expand Up @@ -234,7 +273,7 @@ public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
checkError();

if (!toBeConsumedBuffers.isEmpty()) {
return getBufferAndAvailability(toBeConsumedBuffers.removeFirst());
return getNextRecoveredBuffer();
}

ResultSubpartitionView subpartitionView = this.subpartitionView;
Expand Down Expand Up @@ -296,6 +335,68 @@ public Optional<BufferAndAvailability> getNextBuffer() throws IOException {
return getBufferAndAvailability(next);
}

/**
* Consumes the next buffer from toBeConsumedBuffers (recovered buffers), handling pending
* priority events and dynamic availability detection for the last recovered buffer.
*/
private Optional<BufferAndAvailability> getNextRecoveredBuffer() throws IOException {
// If there is a pending priority event (e.g., unaligned checkpoint barrier), fetch it
// from subpartitionView first, skipping toBeConsumedBuffers. This ensures priority
// events are processed immediately even when there are pending recovered buffers.
if (hasPendingPriorityEvent) {
checkState(subpartitionView != null, "No subpartition view available");
BufferAndBacklog next = subpartitionView.getNextBuffer();
checkState(
next != null && next.buffer().getDataType().hasPriority(),
"Expected priority event, but got %s",
next == null ? "null" : next.buffer().getDataType());

// Check for barrier to update channel state persister.
// Note: maybePersist is not needed for barriers as they are not regular data buffers.
channelStatePersister.checkForBarrier(next.buffer());

Buffer.DataType expectedNextDataType = next.getNextDataType();
if (!expectedNextDataType.hasPriority()) {
// Reset hasPendingPriorityEvent to false if no more priority event
hasPendingPriorityEvent = false;
if (!toBeConsumedBuffers.isEmpty()) {
// Correct nextDataType: if toBeConsumedBuffers is not empty, the actual next
// element to consume is from toBeConsumedBuffers, not from subpartitionView
expectedNextDataType = toBeConsumedBuffers.peek().buffer().getDataType();
}
}

return getBufferAndAvailability(
new BufferAndBacklog(
next.buffer(),
next.buffersInBacklog(),
expectedNextDataType,
next.getSequenceNumber()));
}

BufferAndBacklog next = toBeConsumedBuffers.removeFirst();

// If this is the last recovered buffer and nextDataType is NONE,
// dynamically check if subpartitionView has data available.
// The last buffer's nextDataType was preset to NONE during construction,
// but subpartitionView may already have data available.
if (toBeConsumedBuffers.isEmpty()
&& next.getNextDataType() == Buffer.DataType.NONE
&& subpartitionView != null) {
ResultSubpartitionView.AvailabilityWithBacklog availability =
subpartitionView.getAvailabilityAndBacklog(true);
if (availability.isAvailable()) {
next =
new BufferAndBacklog(
next.buffer(),
availability.getBacklog(),
Buffer.DataType.DATA_BUFFER,
next.getSequenceNumber());
}
}
return getBufferAndAvailability(next);
}

private Optional<BufferAndAvailability> getBufferAndAvailability(BufferAndBacklog next)
throws IOException {
Buffer buffer = next.buffer();
Expand Down Expand Up @@ -331,6 +432,14 @@ public void notifyDataAvailable(ResultSubpartitionView view) {
notifyChannelNonEmpty();
}

@Override
public void notifyPriorityEvent(int prioritySequenceNumber) {
// Set flag so that getNextBuffer() knows to fetch priority event from subpartitionView
// before consuming toBeConsumedBuffers.
hasPendingPriorityEvent = true;
super.notifyPriorityEvent(prioritySequenceNumber);
}

private ResultSubpartitionView checkAndWaitForSubpartitionView() {
// synchronizing on the request lock means this blocks until the asynchronous request
// for the partition view has been completed
Expand Down Expand Up @@ -402,6 +511,13 @@ void releaseAllResources() throws IOException {
view.releaseAllResources();
subpartitionView = null;
}

// Release any remaining buffers in toBeConsumedBuffers to avoid memory leak.
// These may be recovered buffers or partial buffers from FullyFilledBuffer.
for (BufferAndBacklog bufferAndBacklog : toBeConsumedBuffers) {
bufferAndBacklog.buffer().recycleBuffer();
}
toBeConsumedBuffers.clear();
}
}

Expand All @@ -418,18 +534,19 @@ void announceBufferSize(int newBufferSize) {
@Override
int getBuffersInUseCount() {
ResultSubpartitionView view = this.subpartitionView;
return view == null ? 0 : view.getNumberOfQueuedBuffers();
return toBeConsumedBuffers.size() + (view == null ? 0 : view.getNumberOfQueuedBuffers());
}

@Override
public int unsynchronizedGetNumberOfQueuedBuffers() {
ResultSubpartitionView view = subpartitionView;

int count = toBeConsumedBuffers.size();
if (view != null) {
return view.unsynchronizedGetNumberOfQueuedBuffers();
count += view.unsynchronizedGetNumberOfQueuedBuffers();
}

return 0;
return count;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;

import java.util.ArrayDeque;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -61,7 +64,7 @@ public class LocalRecoveredInputChannel extends RecoveredInputChannel {
}

@Override
protected InputChannel toInputChannelInternal() {
protected InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers) {
return new LocalInputChannel(
inputGate,
getChannelIndex(),
Expand All @@ -73,6 +76,7 @@ protected InputChannel toInputChannelInternal() {
maxBackoff,
numBytesIn,
numBuffersIn,
channelStateWriter);
channelStateWriter,
remainingBuffers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,16 @@ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
public final InputChannel toInputChannel() throws IOException {
Preconditions.checkState(
stateConsumedFuture.isDone(), "recovered state is not fully consumed");
final InputChannel inputChannel = toInputChannelInternal();

// Extract remaining buffers before conversion.
// These buffers have been filtered but not yet consumed by the Task.
final ArrayDeque<Buffer> remainingBuffers;
synchronized (receivedBuffers) {
remainingBuffers = new ArrayDeque<>(receivedBuffers);
receivedBuffers.clear();
}

final InputChannel inputChannel = toInputChannelInternal(remainingBuffers);
inputChannel.checkpointStopped(lastStoppedCheckpointId);
return inputChannel;
}
Expand All @@ -121,7 +130,15 @@ public void checkpointStopped(long checkpointId) {
this.lastStoppedCheckpointId = checkpointId;
}

protected abstract InputChannel toInputChannelInternal() throws IOException;
/**
* Creates the physical InputChannel from this recovered channel.
*
* @param remainingBuffers buffers that have been filtered but not yet consumed by the Task.
* These buffers will be migrated to the new physical channel.
* @return the physical InputChannel (LocalInputChannel or RemoteInputChannel)
*/
protected abstract InputChannel toInputChannelInternal(ArrayDeque<Buffer> remainingBuffers)
throws IOException;

CompletableFuture<?> getStateConsumedFuture() {
return stateConsumedFuture;
Expand Down
Loading