From 3a64f2589808adb7eed8c6671563f9a2f70b73ea Mon Sep 17 00:00:00 2001 From: Xingcan Cui Date: Sat, 19 Apr 2025 01:13:34 -0400 Subject: [PATCH] [FLINK-37308] Support pauseOrResumeSplits in HybridSource --- .../source/hybrid/HybridSourceReader.java | 7 +++++ .../source/hybrid/HybridSourceReaderTest.java | 27 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index 2f113078fe330..18c049f7c053c 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -247,4 +248,10 @@ private void setCurrentReader(int index) { addSplits(splits); } } + + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + currentReader.pauseOrResumeSplits(splitsToPause, splitsToResume); + } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index d1acc86e636de..7cf1a63fa9b60 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -310,6 +310,33 @@ public SourceReader createReader( reader.close(); } + @Test + void testPauseResumeCurrentSourceSplits() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + MockBaseSource source = + new MockBaseSource(1, 1, Boundedness.BOUNDED) { + @Override + public SourceReader createReader( + SourceReaderContext readerContext) { + return Mockito.spy(super.createReader(readerContext)); + } + }; + + HybridSourceReader reader = new HybridSourceReader<>(readerContext); + + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); + SourceReader underlyingReader = currentReader(reader); + + reader.pauseOrResumeSplits( + Collections.singletonList("foo"), Collections.singletonList("bar")); + Mockito.verify(underlyingReader) + .pauseOrResumeSplits( + Collections.singletonList("foo"), Collections.singletonList("bar")); + reader.close(); + } + private static SourceReader currentReader( HybridSourceReader reader) { return Whitebox.getInternalState(reader, "currentReader");