Skip to content

Commit 0db0f5b

Browse files
committed
Almost working
1 parent a5bc6f6 commit 0db0f5b

File tree

10 files changed

+50
-266
lines changed

10 files changed

+50
-266
lines changed

contrib/storage-druid/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<artifactId>drill-druid-storage</artifactId>
3030
<name>Drill : Contrib : Storage : Druid</name>
3131
<properties>
32-
<druid.TestSuite>**/DruidTestSuit.class</druid.TestSuite>
32+
<druid.TestSuite>**/DruidTestSuite.class</druid.TestSuite>
3333
</properties>
3434
<dependencies>
3535
<dependency>

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.drill.common.exceptions.CustomErrorContext;
2525
import org.apache.drill.common.exceptions.UserException;
2626
import org.apache.drill.common.expression.SchemaPath;
27-
import org.apache.drill.exec.ops.FragmentContext;
2827
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
2928
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
3029
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
@@ -47,43 +46,30 @@
4746

4847
public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
4948
private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class);
50-
5149
private static final ObjectMapper objectMapper = new ObjectMapper();
52-
5350
private final DruidStoragePlugin plugin;
5451
private final DruidSubScan.DruidSubScanSpec scanSpec;
5552
private final List<String> columns;
5653
private final DruidFilter filter;
5754
private final DruidQueryClient druidQueryClient;
58-
private final FragmentContext fragmentContext;
59-
60-
private final DruidSubScan subScan;
6155

62-
private final TupleMetadata schema;
6356
private BigInteger nextOffset = BigInteger.ZERO;
6457
private int maxRecordsToRead = -1;
65-
6658
private JsonLoaderBuilder jsonBuilder;
67-
6859
private JsonLoader jsonLoader;
6960
private ResultSetLoader resultSetLoader;
70-
7161
private CustomErrorContext errorContext;
7262

7363

7464
public DruidBatchRecordReader(DruidSubScan subScan,
7565
DruidSubScanSpec subScanSpec,
7666
List<SchemaPath> projectedColumns,
7767
int maxRecordsToRead,
78-
FragmentContext context,
7968
DruidStoragePlugin plugin) {
80-
this.subScan = subScan;
81-
columns = new ArrayList<>();
69+
this.columns = new ArrayList<>();
8270
this.maxRecordsToRead = maxRecordsToRead;
8371
this.plugin = plugin;
84-
scanSpec = subScanSpec;
85-
this.schema = subScan.getSchema();
86-
fragmentContext = context;
72+
this.scanSpec = subScanSpec;
8773
this.filter = subScanSpec.getFilter();
8874
this.druidQueryClient = plugin.getDruidQueryClient();
8975
}
@@ -96,10 +82,9 @@ public boolean open(SchemaNegotiator negotiator) {
9682

9783
jsonBuilder = new JsonLoaderBuilder()
9884
.resultSetLoader(resultSetLoader)
85+
.standardOptions(negotiator.queryOptions())
9986
.errorContext(errorContext);
10087

101-
102-
10388
return true;
10489
}
10590

@@ -112,9 +97,10 @@ public boolean next() {
11297
setNextOffset(druidScanResponse);
11398

11499
for (ObjectNode eventNode : druidScanResponse.getEvents()) {
115-
jsonLoader = jsonBuilder
116-
.fromString(eventNode.asText())
100+
JsonLoader jsonLoader = jsonBuilder
101+
.fromString(eventNode.toString())
117102
.build();
103+
118104
result = jsonLoader.readBatch();
119105
}
120106
return result;

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.fasterxml.jackson.annotation.JsonTypeName;
2424
import com.fasterxml.jackson.annotation.JsonIgnore;
2525

26-
import org.apache.calcite.avatica.Meta;
2726
import org.apache.drill.common.PlanStringBuilder;
2827
import org.apache.drill.common.expression.SchemaPath;
2928
import org.apache.drill.exec.metastore.MetadataProviderManager;
@@ -58,9 +57,7 @@ public class DruidGroupScan extends AbstractGroupScan {
5857
private static final long DEFAULT_TABLET_SIZE = 1000;
5958
private final DruidScanSpec scanSpec;
6059
private final DruidStoragePlugin storagePlugin;
61-
62-
private MetadataProviderManager metadataProviderManager;
63-
60+
private final MetadataProviderManager metadataProviderManager;
6461
private List<SchemaPath> columns;
6562
private boolean filterPushedDown = false;
6663
private int maxRecordsToRead;
@@ -293,6 +290,11 @@ public int getMaxRecordsToRead() {
293290
return maxRecordsToRead;
294291
}
295292

293+
@JsonIgnore
294+
public MetadataProviderManager getMetadataProviderManager() {
295+
return metadataProviderManager;
296+
}
297+
296298
public TupleMetadata getSchema() {
297299
if (metadataProviderManager == null) {
298300
return null;

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public void onMatch(RelOptRuleCall relOptRuleCall) {
7070
groupScan.getStoragePlugin(),
7171
newScanSpec,
7272
groupScan.getColumns(),
73-
groupScan.getMaxRecordsToRead());
73+
groupScan.getMaxRecordsToRead(),
74+
groupScan.getMetadataProviderManager());
7475
newGroupsScan.setFilterPushedDown(true);
7576

7677
ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType());

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java

Lines changed: 0 additions & 182 deletions
This file was deleted.

0 commit comments

Comments
 (0)