Skip to content

[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel#27861

Open
1996fanrui wants to merge 5 commits intoapache:masterfrom
1996fanrui:39018/support-checkpoint-for-localinputchannel
Open

[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel#27861
1996fanrui wants to merge 5 commits intoapache:masterfrom
1996fanrui:39018/support-checkpoint-for-localinputchannel

Conversation

@1996fanrui
Copy link
Copy Markdown
Member

@1996fanrui 1996fanrui commented Mar 31, 2026

This PR depends on #27782 and #27783

What is the purpose of the change

[FLINK-39018][checkpoint] Support checkpoint for LocalInputChannel

Brief change log

  • [hotfix][network] Fix LocalInputChannel.getBuffersInUseCount to include toBeConsumedBuffers
  • [FLINK-39018][checkpoint] Support LocalInputChannel checkpoint snapshot for recovered buffers
  • [FLINK-39018][network] Fix LocalInputChannel priority event and buffer availability for recovered buffers
  • [FLINK-39018][checkpoint] Notify PriorityEvent to downstream task even if it is blocked to ensure the checkpoint barrier can be handled by downstream task
  • [FLINK-39018][network] Buffer migration from RecoveredInputChannel to physical channels

Verifying this change

  • Tons of unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive):no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector:no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 31, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui force-pushed the 39018/support-checkpoint-for-localinputchannel branch from cf606db to 4fa25ef Compare March 31, 2026 16:36
@1996fanrui 1996fanrui force-pushed the 39018/support-checkpoint-for-localinputchannel branch from 4fa25ef to b1a7ca7 Compare March 31, 2026 18:37
Comment on lines +255 to +269
// Priority events (e.g. unaligned checkpoint barriers) must notify downstream even
// when the subpartition is blocked.
//
// During recovery, once the upstream output channel state is fully restored, a
// RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event
// blocks the subpartition to prevent the upstream from sending new data while the
// downstream is still consuming recovered buffers. The subpartition remains blocked
// until the downstream finishes consuming all recovered buffers from every channel
// and calls resumeConsumption() to unblock.
//
// If a checkpoint is triggered while the downstream is still consuming recovered
// buffers, the upstream receives an unaligned checkpoint barrier and adds it to this
// blocked subpartition. The barrier must still be delivered to the downstream
// immediately, otherwise the checkpoint will hang until it times out.
return buffers.getNumPriorityElements() == 1;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explaining why Priority events (e.g. unaligned checkpoint barriers) must notify downstream even when the subpartition is blocked.

Comment on lines +382 to +389
* <p><b>Lock ordering note:</b> This method acquires {@code inputChannelsWithData} and then may
* indirectly acquire {@code receivedBuffers} (via {@code toInputChannel()} and {@code
* releaseAllResources()}). This is the reverse order of {@link
* RecoveredInputChannel#onRecoveredStateBuffer}, which acquires {@code receivedBuffers} first
* and then {@code inputChannelsWithData} (via {@code notifyChannelNonEmpty()}). This is safe
* because {@code convertRecoveredInputChannels()} is only called from {@link
* #requestPartitions()}, which happens after all state recovery is complete (buffer filtering
* future is done), so {@code onRecoveredStateBuffer()} is no longer being called concurrently.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds fishy and fragile 🤔

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I've narrowed the inputChannelsWithData lock scope in convertRecoveredInputChannels() — moved toInputChannel(), releaseAllResources(), and getBuffersInUseCount() outside the synchronized block to eliminate the reverse lock ordering. The lock now only covers the data structure updates.

…n if it is blocked to ensure the checkpoint barrier can be handled by downstream task

Priority events (e.g. unaligned checkpoint barriers) must notify downstream even
when the subpartition is blocked.

During recovery, once the upstream output channel state is fully restored, a
RECOVERY_COMPLETION event (EndOfOutputChannelStateEvent) is emitted. This event
blocks the subpartition to prevent the upstream from sending new data while the
downstream is still consuming recovered buffers. The subpartition remains blocked
until the downstream finishes consuming all recovered buffers from every channel
and calls resumeConsumption() to unblock.

If a checkpoint is triggered while the downstream is still consuming recovered
buffers, the upstream receives an unaligned checkpoint barrier and adds it to this
blocked subpartition. The barrier must still be delivered to the downstream
immediately, otherwise the checkpoint will hang until it times out.
@1996fanrui 1996fanrui force-pushed the 39018/support-checkpoint-for-localinputchannel branch 4 times, most recently from fef5732 to f08a818 Compare April 2, 2026 20:33
Copy link
Copy Markdown
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pnowojski for the review, all comments are addressed, and commits are organized.

@1996fanrui 1996fanrui force-pushed the 39018/support-checkpoint-for-localinputchannel branch from f08a818 to bb071b2 Compare April 3, 2026 12:45
int buffersInUseCount = realInputChannel.getBuffersInUseCount();

// Phase 2: Atomically update data structures under the lock.
synchronized (inputChannelsWithData) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the previous code didn't need the lock in the first place? What has changed or what did I miss?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous(master branch) code didn't touch inputChannelsWithData at all — it was a simple channel swap without any lock. FLINK-39018 changes introduced buffer migration from RecoveredInputChannel to physical channels, where onRecoveredStateBuffer() now enqueues the channel into inputChannelsWithData via notifyChannelNonEmpty(). So during conversion, we now need to dequeue the old channel and conditionally enqueue the new one, which requires the inputChannelsWithData lock. I've structured it so the lock only covers the queue/map updates (phase 2), while toInputChannel() and releaseAllResources() run outside the lock (phase 1) to avoid reverse lock ordering with onRecoveredStateBuffer().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants