Skip to content

Commit 722a9dc

Browse files
committed
Working?
1 parent 6cc9e3f commit 722a9dc

File tree

6 files changed

+89
-33
lines changed

6 files changed

+89
-33
lines changed

contrib/storage-druid/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Directory to store oauth tokens for testing Googlesheets Storage plugin
2+
/src/test/resources/logback-test.xml

contrib/storage-druid/pom.xml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,6 @@
5353
<version>${project.version}</version>
5454
<scope>test</scope>
5555
</dependency>
56-
<dependency>
57-
<groupId>org.assertj</groupId>
58-
<artifactId>assertj-core</artifactId>
59-
<!-- use 2.9.1 for Java 7 projects -->
60-
<version>3.11.1</version>
61-
<scope>test</scope>
62-
</dependency>
6356
</dependencies>
6457

6558
<build>

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
3636
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
3737
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
38-
import org.apache.drill.exec.vector.BaseValueVector;
3938
import org.slf4j.Logger;
4039
import org.slf4j.LoggerFactory;
4140

@@ -45,6 +44,7 @@
4544

4645
public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
4746
private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class);
47+
private static final int BATCH_SIZE = 4096;
4848
private static final ObjectMapper objectMapper = new ObjectMapper();
4949
private final DruidStoragePlugin plugin;
5050
private final DruidSubScan.DruidSubScanSpec scanSpec;
@@ -55,6 +55,7 @@ public class DruidBatchRecordReader implements ManagedReader<SchemaNegotiator> {
5555
private int maxRecordsToRead = -1;
5656
private JsonLoaderBuilder jsonBuilder;
5757
private JsonLoaderImpl jsonLoader;
58+
private SchemaNegotiator negotiator;
5859
private ResultSetLoader resultSetLoader;
5960
private CustomErrorContext errorContext;
6061

@@ -75,34 +76,50 @@ public DruidBatchRecordReader(DruidSubScan subScan,
7576

7677
@Override
7778
public boolean open(SchemaNegotiator negotiator) {
78-
resultSetLoader = negotiator.build();
79-
errorContext = negotiator.parentErrorContext();
80-
negotiator.setErrorContext(errorContext);
79+
this.negotiator = negotiator;
80+
this.errorContext = this.negotiator.parentErrorContext();
81+
this.negotiator.batchSize(BATCH_SIZE);
82+
this.negotiator.setErrorContext(errorContext);
83+
84+
resultSetLoader = this.negotiator.build();
8185

82-
jsonBuilder = new JsonLoaderBuilder()
83-
.resultSetLoader(resultSetLoader)
84-
.standardOptions(negotiator.queryOptions())
85-
.errorContext(errorContext);
8686

8787
return true;
8888
}
8989

9090
@Override
9191
public boolean next() {
92+
jsonBuilder = new JsonLoaderBuilder()
93+
.resultSetLoader(resultSetLoader)
94+
.standardOptions(negotiator.queryOptions())
95+
.errorContext(errorContext);
96+
int eventCounter = 0;
9297
boolean result = false;
9398
try {
9499
String query = getQuery();
100+
logger.debug("Executing query: {}", query);
95101
DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query);
96102
setNextOffset(druidScanResponse);
97103

104+
StringBuilder events = new StringBuilder();
98105
for (ObjectNode eventNode : druidScanResponse.getEvents()) {
99-
jsonLoader = (JsonLoaderImpl) jsonBuilder
100-
.fromString(eventNode.toString())
106+
events.append(eventNode);
107+
events.append("\n");
108+
eventCounter++;
109+
}
110+
111+
112+
jsonLoader = (JsonLoaderImpl) jsonBuilder
113+
.fromString(events.toString())
101114
.build();
102115

103-
result = jsonLoader.readBatch();
116+
result = jsonLoader.readBatch();
117+
118+
if (eventCounter < BATCH_SIZE) {
119+
return false;
120+
} else {
121+
return result;
104122
}
105-
return result;
106123
} catch (Exception e) {
107124
throw UserException
108125
.dataReadError(e)
@@ -123,8 +140,8 @@ public void close() {
123140
private String getQuery() throws JsonProcessingException {
124141
int queryThreshold =
125142
maxRecordsToRead >= 0
126-
? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, maxRecordsToRead)
127-
: BaseValueVector.INITIAL_VALUE_ALLOCATION;
143+
? Math.min(BATCH_SIZE, maxRecordsToRead)
144+
: BATCH_SIZE;
128145
ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder();
129146
ScanQuery scanQuery =
130147
scanQueryBuilder.build(
@@ -140,7 +157,6 @@ private String getQuery() throws JsonProcessingException {
140157
}
141158

142159
private void setNextOffset(DruidScanResponse druidScanResponse) {
143-
//nextOffset = nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
144160
offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size()));
145161
}
146162
}

contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
*/
1818
package org.apache.drill.exec.store.druid;
1919

20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
2023
import java.math.BigInteger;
2124

2225
public class DruidOffsetTracker {
26+
private static final Logger logger = LoggerFactory.getLogger(DruidOffsetTracker.class);
2327
private BigInteger nextOffset;
2428

2529
public DruidOffsetTracker() {
@@ -32,5 +36,6 @@ public BigInteger getOffset() {
3236

3337
public void setNextOffset(BigInteger offset) {
3438
nextOffset = nextOffset.add(offset);
39+
logger.debug("Incrementing offset by {}", offset);
3540
}
3641
}

contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import java.io.IOException;
2828
import java.net.URISyntaxException;
2929

30-
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
import static org.junit.jupiter.api.Assertions.assertFalse;
32+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3133

3234
public class DruidStoragePluginConfigTest {
3335

@@ -39,11 +41,11 @@ public void testDruidStoragePluginConfigSuccessfullyParsed()
3941
Resources.getResource("bootstrap-storage-plugins.json").toURI()));
4042
DruidStoragePluginConfig druidStoragePluginConfig =
4143
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
42-
assertThat(druidStoragePluginConfig).isNotNull();
43-
assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082");
44-
assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081");
45-
assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200);
46-
assertThat(druidStoragePluginConfig.isEnabled()).isFalse();
44+
assertNotNull(druidStoragePluginConfig);
45+
assertEquals("http://localhost:8082", druidStoragePluginConfig.getBrokerAddress());
46+
assertEquals("http://localhost:8081", druidStoragePluginConfig.getCoordinatorAddress());
47+
assertEquals(200, druidStoragePluginConfig.getAverageRowSizeBytes());
48+
assertFalse(druidStoragePluginConfig.isEnabled());
4749
}
4850

4951
@Test
@@ -58,6 +60,6 @@ public void testDefaultRowSizeUsedWhenNotProvidedInConfig()
5860
JsonNode storagePluginJson = mapper.readTree(druidConfigStr);
5961
DruidStoragePluginConfig druidStoragePluginConfig =
6062
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
61-
assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100);
63+
assertEquals(100, druidStoragePluginConfig.getAverageRowSizeBytes());
6264
}
6365
}

contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@
2020

2121
import org.apache.drill.categories.DruidStorageTest;
2222
import org.apache.drill.categories.SlowTest;
23+
import org.apache.drill.common.types.TypeProtos.DataMode;
24+
import org.apache.drill.common.types.TypeProtos.MinorType;
25+
import org.apache.drill.exec.physical.rowSet.RowSet;
26+
import org.apache.drill.exec.record.metadata.SchemaBuilder;
27+
import org.apache.drill.exec.record.metadata.TupleMetadata;
28+
import org.apache.drill.test.rowSet.RowSetComparison;
2329
import org.junit.Ignore;
2430
import org.junit.Test;
2531
import org.junit.experimental.categories.Category;
2632

33+
import static org.junit.Assert.assertEquals;
34+
35+
2736
@Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests")
2837
@Category({SlowTest.class, DruidStorageTest.class})
2938
public class TestDruidQueries extends DruidTestBase {
@@ -33,7 +42,7 @@ public void testStarQuery() throws Exception {
3342
testBuilder()
3443
.sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA))
3544
.unOrdered()
36-
.expectsNumRecords(2)
45+
.expectsNumRecords(876)
3746
.go();
3847
}
3948

@@ -60,7 +69,7 @@ public void testTwoOrdEqualsFilter() throws Exception {
6069
testBuilder()
6170
.sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA))
6271
.unOrdered()
63-
.expectsNumRecords(3)
72+
.expectsNumRecords(1)
6473
.go();
6574
}
6675

@@ -72,7 +81,7 @@ public void testSingleColumnProject() throws Exception {
7281
.sqlQuery(query)
7382
.unOrdered()
7483
.baselineColumns("comment")
75-
.expectsNumRecords(24433)
84+
.expectsNumRecords(876)
7685
.go();
7786
}
7887

@@ -84,7 +93,36 @@ public void testCountAllRowsQuery() throws Exception {
8493
.sqlQuery(query)
8594
.unOrdered()
8695
.baselineColumns("mycount")
87-
.baselineValues(24433L)
96+
.baselineValues(876L)
8897
.go();
8998
}
99+
100+
@Test
101+
public void testGroupByQuery() throws Exception {
102+
String sql = String.format("SELECT `namespace`, COUNT(*) AS user_count FROM druid.`%s` GROUP BY `namespace` ORDER BY user_count DESC LIMIT 5",TEST_DATASOURCE_WIKIPEDIA);
103+
RowSet results = client.queryBuilder().sql(sql).rowSet();
104+
105+
TupleMetadata expectedSchema = new SchemaBuilder()
106+
.add("namespace", MinorType.VARCHAR, DataMode.OPTIONAL)
107+
.add("user_count", MinorType.BIGINT)
108+
.buildSchema();
109+
110+
RowSet expected = client.rowSetBuilder(expectedSchema)
111+
.addRow("Main", 702)
112+
.addRow("User talk", 29)
113+
.addRow("Wikipedia", 26)
114+
.addRow("Talk", 17)
115+
.addRow("User", 12)
116+
.build();
117+
118+
new RowSetComparison(expected).verifyAndClearAll(results);
119+
}
120+
121+
@Test
122+
public void testSerDe() throws Exception {
123+
String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA);
124+
String plan = queryBuilder().sql(sql).explainJson();
125+
long cnt = queryBuilder().physical(plan).singletonLong();
126+
assertEquals("Counts should match", 876L, cnt);
127+
}
90128
}

0 commit comments

Comments
 (0)