Skip to content

Commit f9b5036

Browse files
committed
DRILL-8239: Convert JSON UDF to EVF
1 parent b2e2653 commit f9b5036

File tree

2 files changed

+77
-36
lines changed

2 files changed

+77
-36
lines changed

exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java

Lines changed: 69 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
package org.apache.drill.exec.expr.fn.impl.conv;
1919

2020

21-
import io.netty.buffer.DrillBuf;
22-
2321
import javax.inject.Inject;
2422

2523
import org.apache.drill.exec.expr.DrillSimpleFunc;
@@ -32,37 +30,45 @@
3230
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
3331
import org.apache.drill.exec.expr.holders.VarBinaryHolder;
3432
import org.apache.drill.exec.expr.holders.VarCharHolder;
33+
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
34+
import org.apache.drill.exec.server.options.OptionManager;
3535
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
3636

3737
public class JsonConvertFrom {
3838

39-
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonConvertFrom.class);
40-
4139
private JsonConvertFrom() {
4240
}
4341

4442
@FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true)
4543
public static class ConvertFromJson implements DrillSimpleFunc {
4644

4745
@Param VarBinaryHolder in;
48-
@Inject DrillBuf buffer;
49-
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
46+
@Inject
47+
ResultSetLoader loader;
48+
@Workspace
49+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
50+
51+
@Inject
52+
OptionManager options;
5053

5154
@Output ComplexWriter writer;
5255

5356
@Override
5457
public void setup() {
55-
jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
56-
.defaultSchemaPathColumns()
57-
.build();
58+
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
59+
.resultSetLoader(loader)
60+
.standardOptions(options);
5861
}
5962

6063
@Override
6164
public void eval() {
6265
try {
63-
jsonReader.setSource(in.start, in.end, in.buffer);
64-
jsonReader.write(writer);
65-
buffer = jsonReader.getWorkBuf();
66+
jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer);
67+
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
68+
loader.startBatch();
69+
jsonLoader.readBatch();
70+
loader.close();
71+
6672
} catch (Exception e) {
6773
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
6874
}
@@ -73,24 +79,33 @@ public void eval() {
7379
public static class ConvertFromJsonVarchar implements DrillSimpleFunc {
7480

7581
@Param VarCharHolder in;
76-
@Inject DrillBuf buffer;
77-
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
82+
@Workspace
83+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
84+
85+
@Inject
86+
OptionManager options;
87+
88+
@Inject
89+
ResultSetLoader loader;
7890

7991
@Output ComplexWriter writer;
8092

8193
@Override
8294
public void setup() {
83-
jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
84-
.defaultSchemaPathColumns()
85-
.build();
95+
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
96+
.resultSetLoader(loader)
97+
.standardOptions(options);
8698
}
8799

88100
@Override
89101
public void eval() {
90102
try {
91-
jsonReader.setSource(in.start, in.end, in.buffer);
92-
jsonReader.write(writer);
93-
buffer = jsonReader.getWorkBuf();
103+
jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer);
104+
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
105+
loader.startBatch();
106+
jsonLoader.readBatch();
107+
loader.close();
108+
94109
} catch (Exception e) {
95110
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
96111
}
@@ -101,16 +116,23 @@ public void eval() {
101116
public static class ConvertFromJsonNullableInput implements DrillSimpleFunc {
102117

103118
@Param NullableVarBinaryHolder in;
104-
@Inject DrillBuf buffer;
105-
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
119+
120+
@Workspace
121+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
122+
123+
@Inject
124+
OptionManager options;
125+
126+
@Inject
127+
ResultSetLoader loader;
106128

107129
@Output ComplexWriter writer;
108130

109131
@Override
110132
public void setup() {
111-
jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
112-
.defaultSchemaPathColumns()
113-
.build();
133+
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
134+
.resultSetLoader(loader)
135+
.standardOptions(options);
114136
}
115137

116138
@Override
@@ -124,9 +146,11 @@ public void eval() {
124146
}
125147

126148
try {
127-
jsonReader.setSource(in.start, in.end, in.buffer);
128-
jsonReader.write(writer);
129-
buffer = jsonReader.getWorkBuf();
149+
jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer);
150+
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
151+
loader.startBatch();
152+
jsonLoader.readBatch();
153+
loader.close();
130154
} catch (Exception e) {
131155
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
132156
}
@@ -137,16 +161,23 @@ public void eval() {
137161
public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFunc {
138162

139163
@Param NullableVarCharHolder in;
140-
@Inject DrillBuf buffer;
141-
@Workspace org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
164+
165+
@Workspace
166+
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
167+
168+
@Inject
169+
OptionManager options;
170+
171+
@Inject
172+
ResultSetLoader loader;
142173

143174
@Output ComplexWriter writer;
144175

145176
@Override
146177
public void setup() {
147-
jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
148-
.defaultSchemaPathColumns()
149-
.build();
178+
jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
179+
.resultSetLoader(loader)
180+
.standardOptions(options);
150181
}
151182

152183
@Override
@@ -160,9 +191,11 @@ public void eval() {
160191
}
161192

162193
try {
163-
jsonReader.setSource(in.start, in.end, in.buffer);
164-
jsonReader.write(writer);
165-
buffer = jsonReader.getWorkBuf();
194+
jsonLoaderBuilder.fromStream(in.start, in.end, in.buffer);
195+
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
196+
loader.startBatch();
197+
jsonLoader.readBatch();
198+
loader.close();
166199
} catch (Exception e) {
167200
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
168201
}

exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.nio.charset.Charset;
2424
import java.util.ArrayList;
2525
import java.util.Arrays;
26+
import java.util.Collections;
2627
import java.util.List;
2728

29+
import io.netty.buffer.DrillBuf;
2830
import org.apache.commons.io.IOUtils;
2931
import org.apache.drill.common.exceptions.CustomErrorContext;
3032
import org.apache.drill.common.exceptions.EmptyErrorContext;
@@ -45,6 +47,7 @@
4547
import org.apache.drill.exec.store.easy.json.parser.ValueDef;
4648
import org.apache.drill.exec.store.easy.json.parser.ValueDef.JsonType;
4749
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
50+
import org.apache.drill.exec.vector.complex.fn.DrillBufInputStream;
4851
import org.slf4j.Logger;
4952
import org.slf4j.LoggerFactory;
5053

@@ -186,6 +189,11 @@ public JsonLoaderBuilder fromStream(InputStream... stream) {
186189
return this;
187190
}
188191

192+
public JsonLoaderBuilder fromStream(int start, int end, DrillBuf buf) {
193+
this.streams = Collections.singletonList(DrillBufInputStream.getStream(start, end, buf));
194+
return this;
195+
}
196+
189197
public JsonLoaderBuilder fromStream(Iterable<InputStream> streams) {
190198
this.streams = streams;
191199
return this;

0 commit comments

Comments
 (0)