Skip to content

Commit 902c1a5

Browse files
yupeng9Harsh Kothari
authored and
Harsh Kothari
committed
pass the maxPoll, timeout from config to poller (opensearch-project#17995)
Signed-off-by: Harsh Kothari <techarsh@amazon.com>
1 parent 89cb714 commit 902c1a5

File tree

2 files changed

+40
-16
lines changed

2 files changed

+40
-16
lines changed

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
public class DefaultStreamPoller implements StreamPoller {
3232
private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class);
3333

34-
public static final long MAX_POLL_SIZE = 1000;
35-
public static final int POLL_TIMEOUT = 1000;
36-
3734
private volatile State state = State.NONE;
3835

3936
// goal state
@@ -53,6 +50,9 @@ public class DefaultStreamPoller implements StreamPoller {
5350
private ResetState resetState;
5451
private final String resetValue;
5552

53+
private long maxPollSize;
54+
private int pollTimeout;
55+
5656
private Set<IngestionShardPointer> persistedPointers;
5757

5858
private final CounterMetric totalPolledCount = new CounterMetric();
@@ -84,7 +84,9 @@ public DefaultStreamPoller(
8484
resetState,
8585
resetValue,
8686
errorStrategy,
87-
initialState
87+
initialState,
88+
maxPollSize,
89+
pollTimeout
8890
);
8991
}
9092

@@ -96,14 +98,18 @@ public DefaultStreamPoller(
9698
ResetState resetState,
9799
String resetValue,
98100
IngestionErrorStrategy errorStrategy,
99-
State initialState
101+
State initialState,
102+
long maxPollSize,
103+
int pollTimeout
100104
) {
101105
this.consumer = Objects.requireNonNull(consumer);
102106
this.resetState = resetState;
103107
this.resetValue = resetValue;
104108
this.initialBatchStartPointer = startPointer;
105109
this.state = initialState;
106110
this.persistedPointers = persistedPointers;
111+
this.maxPollSize = maxPollSize;
112+
this.pollTimeout = pollTimeout;
107113
if (!this.persistedPointers.isEmpty()) {
108114
maxPersistedPointer = this.persistedPointers.stream().max(IngestionShardPointer::compareTo).get();
109115
}
@@ -197,13 +203,13 @@ protected void startPoll() {
197203
List<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> results;
198204

199205
if (includeBatchStartPointer) {
200-
results = consumer.readNext(initialBatchStartPointer, true, MAX_POLL_SIZE, POLL_TIMEOUT);
206+
results = consumer.readNext(initialBatchStartPointer, true, maxPollSize, pollTimeout);
201207
includeBatchStartPointer = false;
202208
} else if (failedShardPointer != null) {
203209
results = consumer.readNext(failedShardPointer, true, MAX_POLL_SIZE, POLL_TIMEOUT);
204210
failedShardPointer = null;
205211
} else {
206-
results = consumer.readNext(MAX_POLL_SIZE, POLL_TIMEOUT);
212+
results = consumer.readNext(maxPollSize, pollTimeout);
207213
}
208214

209215
if (results.isEmpty()) {

server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,9 @@ public void setUp() throws Exception {
7272
StreamPoller.ResetState.NONE,
7373
"",
7474
errorStrategy,
75-
StreamPoller.State.NONE
75+
StreamPoller.State.NONE,
76+
1000,
77+
1000
7678
);
7779
partitionedBlockingQueueContainer.startProcessorThreads();
7880
}
@@ -131,7 +133,9 @@ public void testSkipProcessed() throws InterruptedException {
131133
StreamPoller.ResetState.NONE,
132134
"",
133135
errorStrategy,
134-
StreamPoller.State.NONE
136+
StreamPoller.State.NONE,
137+
1000,
138+
1000
135139
);
136140

137141
CountDownLatch latch = new CountDownLatch(2);
@@ -169,7 +173,9 @@ public void testResetStateEarliest() throws InterruptedException {
169173
StreamPoller.ResetState.EARLIEST,
170174
"",
171175
errorStrategy,
172-
StreamPoller.State.NONE
176+
StreamPoller.State.NONE,
177+
1000,
178+
1000
173179
);
174180
CountDownLatch latch = new CountDownLatch(2);
175181
doAnswer(invocation -> {
@@ -193,7 +199,9 @@ public void testResetStateLatest() throws InterruptedException {
193199
StreamPoller.ResetState.LATEST,
194200
"",
195201
errorStrategy,
196-
StreamPoller.State.NONE
202+
StreamPoller.State.NONE,
203+
1000,
204+
1000
197205
);
198206

199207
poller.start();
@@ -213,7 +221,9 @@ public void testResetStateRewindByOffset() throws InterruptedException {
213221
StreamPoller.ResetState.REWIND_BY_OFFSET,
214222
"1",
215223
errorStrategy,
216-
StreamPoller.State.NONE
224+
StreamPoller.State.NONE,
225+
1000,
226+
1000
217227
);
218228
CountDownLatch latch = new CountDownLatch(1);
219229
doAnswer(invocation -> {
@@ -289,7 +299,9 @@ public void testDropErrorIngestionStrategy() throws TimeoutException, Interrupte
289299
StreamPoller.ResetState.NONE,
290300
"",
291301
errorStrategy,
292-
StreamPoller.State.NONE
302+
StreamPoller.State.NONE,
303+
1000,
304+
1000
293305
);
294306
poller.start();
295307
Thread.sleep(sleepTime);
@@ -340,7 +352,9 @@ public void testBlockErrorIngestionStrategy() throws TimeoutException, Interrupt
340352
StreamPoller.ResetState.NONE,
341353
"",
342354
errorStrategy,
343-
StreamPoller.State.NONE
355+
StreamPoller.State.NONE,
356+
1000,
357+
1000
344358
);
345359
poller.start();
346360
Thread.sleep(sleepTime);
@@ -369,7 +383,9 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE
369383
StreamPoller.ResetState.NONE,
370384
"",
371385
mockErrorStrategy,
372-
StreamPoller.State.NONE
386+
StreamPoller.State.NONE,
387+
1000,
388+
1000
373389
);
374390
poller.start();
375391
Thread.sleep(sleepTime);
@@ -433,7 +449,9 @@ public void testPersistedBatchStartPointer() throws TimeoutException, Interrupte
433449
StreamPoller.ResetState.NONE,
434450
"",
435451
errorStrategy,
436-
StreamPoller.State.NONE
452+
StreamPoller.State.NONE,
453+
1000,
454+
1000
437455
);
438456
poller.start();
439457
Thread.sleep(sleepTime);

0 commit comments

Comments
 (0)