[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel#27861
[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel#278611996fanrui wants to merge 5 commits intoapache:masterfrom
Conversation
cf606db to
4fa25ef
Compare
…de toBeConsumedBuffers
…ot for recovered buffers
4fa25ef to
b1a7ca7
Compare
.../src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
Show resolved
Hide resolved
| // Priority events (e.g. unaligned checkpoint barriers) must notify downstream even | ||
| // when the subpartition is blocked. | ||
| // | ||
| // During recovery, once the upstream output channel state is fully restored, a | ||
| // RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event | ||
| // blocks the subpartition to prevent the upstream from sending new data while the | ||
| // downstream is still consuming recovered buffers. The subpartition remains blocked | ||
| // until the downstream finishes consuming all recovered buffers from every channel | ||
| // and calls resumeConsumption() to unblock. | ||
| // | ||
| // If a checkpoint is triggered while the downstream is still consuming recovered | ||
| // buffers, the upstream receives an unaligned checkpoint barrier and adds it to this | ||
| // blocked subpartition. The barrier must still be delivered to the downstream | ||
| // immediately, otherwise the checkpoint will hang until it times out. | ||
| return buffers.getNumPriorityElements() == 1; |
There was a problem hiding this comment.
Explaining why Priority events (e.g. unaligned checkpoint barriers) must notify downstream even when the subpartition is blocked.
...test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
Show resolved
Hide resolved
| * <p><b>Lock ordering note:</b> This method acquires {@code inputChannelsWithData} and then may | ||
| * indirectly acquire {@code receivedBuffers} (via {@code toInputChannel()} and {@code | ||
| * releaseAllResources()}). This is the reverse order of {@link | ||
| * RecoveredInputChannel#onRecoveredStateBuffer}, which acquires {@code receivedBuffers} first | ||
| * and then {@code inputChannelsWithData} (via {@code notifyChannelNonEmpty()}). This is safe | ||
| * because {@code convertRecoveredInputChannels()} is only called from {@link | ||
| * #requestPartitions()}, which happens after all state recovery is complete (buffer filtering | ||
| * future is done), so {@code onRecoveredStateBuffer()} is no longer being called concurrently. |
There was a problem hiding this comment.
This sounds fishy and fragile 🤔
There was a problem hiding this comment.
Good point. I've narrowed the inputChannelsWithData lock scope in convertRecoveredInputChannels() — moved toInputChannel(), releaseAllResources(), and getBuffersInUseCount() outside the synchronized block to eliminate the reverse lock ordering. The lock now only covers the data structure updates.
.../src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
Show resolved
Hide resolved
…r availability for recovered buffers
…n if it is blocked to ensure the checkpoint barrier can be handled by downstream task Priority events (e.g. unaligned checkpoint barriers) must notify downstream even when the subpartition is blocked. During recovery, once the upstream output channel state is fully restored, a RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event blocks the subpartition to prevent the upstream from sending new data while the downstream is still consuming recovered buffers. The subpartition remains blocked until the downstream finishes consuming all recovered buffers from every channel and calls resumeConsumption() to unblock. If a checkpoint is triggered while the downstream is still consuming recovered buffers, the upstream receives an unaligned checkpoint barrier and adds it to this blocked subpartition. The barrier must still be delivered to the downstream immediately, otherwise the checkpoint will hang until it times out.
fef5732 to
f08a818
Compare
There was a problem hiding this comment.
Thanks @pnowojski for the review, all comments are addressed, and commits are organized.
… physical channels
f08a818 to
bb071b2
Compare
.../src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
Show resolved
Hide resolved
| int buffersInUseCount = realInputChannel.getBuffersInUseCount(); | ||
|
|
||
| // Phase 2: Atomically update data structures under the lock. | ||
| synchronized (inputChannelsWithData) { |
There was a problem hiding this comment.
Why the previous code didn't need the lock in the first place? What has changed or what did I miss?
There was a problem hiding this comment.
The previous(master branch) code didn't touch inputChannelsWithData at all — it was a simple channel swap without any lock. FLINK-39018 changes introduced buffer migration from RecoveredInputChannel to physical channels, where onRecoveredStateBuffer() now enqueues the channel into inputChannelsWithData via notifyChannelNonEmpty(). So during conversion, we now need to dequeue the old channel and conditionally enqueue the new one, which requires the inputChannelsWithData lock. I've structured it so the lock only covers the queue/map updates (phase 2), while toInputChannel() and releaseAllResources() run outside the lock (phase 1) to avoid reverse lock ordering with onRecoveredStateBuffer().
This PR depends on #27782 and #27783
What is the purpose of the change
[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation