Skip to content

Commit 337391a

Browse files
committed
Added offset tracker
1 parent af0ea44 commit 337391a

File tree

3 files changed

+45
-11
lines changed

3 files changed

+45
-11
lines changed

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
5151
private final List<String> columns;
5252
private final DruidFilter filter;
5353
private final DruidQueryClient druidQueryClient;
54-
55-
private BigInteger nextOffset = BigInteger.ZERO;
54+
private final DruidOffsetTracker offsetTracker;
5655
private int maxRecordsToRead = -1;
5756
private JsonLoaderBuilder jsonBuilder;
5857
private JsonLoaderImpl jsonLoader;
@@ -64,13 +63,14 @@ public DruidBatchRecordReader(DruidSubScan subScan,
6463
DruidSubScanSpec subScanSpec,
6564
List<SchemaPath> projectedColumns,
6665
int maxRecordsToRead,
67-
DruidStoragePlugin plugin) {
66+
DruidStoragePlugin plugin, DruidOffsetTracker offsetTracker) {
6867
this.columns = new ArrayList<>();
6968
this.maxRecordsToRead = maxRecordsToRead;
7069
this.plugin = plugin;
7170
this.scanSpec = subScanSpec;
7271
this.filter = subScanSpec.getFilter();
7372
this.druidQueryClient = plugin.getDruidQueryClient();
73+
this.offsetTracker = offsetTracker;
7474
}
7575

7676
@Override
@@ -118,10 +118,6 @@ public void close() {
118118
jsonLoader.close();
119119
jsonLoader = null;
120120
}
121-
122-
if (! nextOffset.equals(BigInteger.ZERO)) {
123-
nextOffset = BigInteger.ZERO;
124-
}
125121
}
126122

127123
private String getQuery() throws JsonProcessingException {
@@ -135,7 +131,7 @@ private String getQuery() throws JsonProcessingException {
135131
scanSpec.dataSourceName,
136132
columns,
137133
filter,
138-
nextOffset,
134+
offsetTracker.getOffset(),
139135
queryThreshold,
140136
scanSpec.getMinTime(),
141137
scanSpec.getMaxTime()
@@ -144,6 +140,7 @@ private String getQuery() throws JsonProcessingException {
144140
}
145141

146142
private void setNextOffset(DruidScanResponse druidScanResponse) {
147-
nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
143+
//nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
144+
offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size()));
148145
}
149146
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.store.druid;
19+
20+
import java.math.BigInteger;
21+
22+
public class DruidOffsetTracker {
23+
private BigInteger nextOffset;
24+
25+
public DruidOffsetTracker() {
26+
this.nextOffset = BigInteger.ZERO;
27+
}
28+
29+
public BigInteger getOffset() {
30+
return nextOffset;
31+
}
32+
33+
public void setNextOffset(BigInteger offset) {
34+
nextOffset = nextOffset.add(offset);
35+
}
36+
}

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,13 @@ private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan s
7272

7373
private static class DruidReaderFactory implements ReaderFactory {
7474
private final DruidSubScan subScan;
75-
75+
private final DruidOffsetTracker offsetTracker;
7676
private final Iterator<DruidSubScanSpec> scanSpecIterator;
7777

7878
public DruidReaderFactory(DruidSubScan subScan) {
7979
this.subScan = subScan;
8080
this.scanSpecIterator = subScan.getScanSpec().listIterator();
81+
this.offsetTracker = new DruidOffsetTracker();
8182
}
8283

8384
@Override
@@ -89,7 +90,7 @@ public void bind(ManagedScanFramework framework) {
8990
public ManagedReader<? extends SchemaNegotiator> next() {
9091
if (scanSpecIterator.hasNext()) {
9192
DruidSubScanSpec scanSpec = scanSpecIterator.next();
92-
return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine());
93+
return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine(), offsetTracker);
9394
}
9495
return null;
9596
}

0 commit comments

Comments
 (0)