28
28
import org .apache .drill .exec .expr .annotations .Param ;
29
29
import org .apache .drill .exec .expr .annotations .Workspace ;
30
30
import org .apache .drill .exec .expr .holders .NullableVarBinaryHolder ;
31
- import org .apache .drill .exec .expr .holders .VarCharHolder ;
31
+ import org .apache .drill .exec .expr .holders .NullableVarCharHolder ;
32
32
import org .apache .drill .exec .physical .resultSet .ResultSetLoader ;
33
33
import org .apache .drill .exec .server .options .OptionManager ;
34
34
import org .apache .drill .exec .vector .complex .writer .BaseWriter ;
@@ -45,23 +45,24 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc {
45
45
@ Param
46
46
NullableVarBinaryHolder in ;
47
47
48
- @ Workspace
49
- org . apache . drill . exec . store . easy . json . loader . JsonLoaderImpl . JsonLoaderBuilder jsonLoaderBuilder ;
48
+ @ Output // TODO Remove in future work
49
+ BaseWriter . ComplexWriter writer ;
50
50
51
51
@ Inject
52
52
OptionManager options ;
53
53
54
54
@ Inject
55
- ResultSetLoader loader ;
55
+ ResultSetLoader rsLoader ;
56
56
57
- @ Output // TODO Remove in future work
58
- BaseWriter .ComplexWriter writer ;
57
+ @ Workspace
58
+ org .apache .drill .exec .store .easy .json .loader .SingleElementIterator <java .io .InputStream > stream ;
59
+
60
+ @ Workspace
61
+ org .apache .drill .exec .store .easy .json .loader .JsonLoaderImpl jsonLoader ;
59
62
60
63
@ Override
61
64
public void setup () {
62
- jsonLoaderBuilder = new org .apache .drill .exec .store .easy .json .loader .JsonLoaderImpl .JsonLoaderBuilder ()
63
- .resultSetLoader (loader )
64
- .standardOptions (options );
65
+ rsLoader .startBatch ();
65
66
}
66
67
67
68
@ Override
@@ -71,41 +72,57 @@ public void eval() {
71
72
return ;
72
73
}
73
74
75
+ java .io .InputStream inputStream = org .apache .drill .exec .vector .complex .fn .DrillBufInputStream .getStream (in .start , in .end , in .buffer );
76
+
74
77
try {
75
- jsonLoaderBuilder .fromStream (in .start , in .end , in .buffer );
76
- org .apache .drill .exec .store .easy .json .loader .JsonLoader jsonLoader = jsonLoaderBuilder .build ();
77
- loader .startBatch ();
78
- jsonLoader .readBatch ();
78
+ stream .setValue (inputStream );
79
+
80
+ if (jsonLoader == null ) {
81
+ jsonLoader = org .apache .drill .exec .expr .fn .impl .conv .JsonConverterUtils .createJsonLoader (rsLoader , options , stream );
82
+ }
83
+
84
+ org .apache .drill .exec .physical .resultSet .RowSetLoader rowWriter = rsLoader .writer ();
85
+ rowWriter .start ();
86
+ if (jsonLoader .parser ().next ()) {
87
+ rowWriter .save ();
88
+ }
89
+ inputStream .close ();
90
+
79
91
} catch (Exception e ) {
80
- throw new org .apache .drill .common .exceptions .DrillRuntimeException ("Error while converting from JSON. " , e );
92
+ throw org .apache .drill .common .exceptions .UserException .dataReadError (e )
93
+ .message ("Error while reading JSON. " )
94
+ .addContext (e .getMessage ())
95
+ .build ();
81
96
}
82
97
}
83
98
}
84
99
85
100
@ FunctionTemplate (names = {"convert_fromJSON" , "convertFromJson" , "convert_from_json" },
86
- scope = FunctionScope .SIMPLE )
101
+ scope = FunctionScope .SIMPLE , nulls = NullHandling . INTERNAL )
87
102
public static class ConvertFromJsonVarcharInput implements DrillSimpleFunc {
88
103
89
104
@ Param
90
- VarCharHolder in ;
105
+ NullableVarCharHolder in ;
91
106
92
107
@ Output // TODO Remove in future work
93
108
ComplexWriter writer ;
94
109
95
110
@ Workspace
96
- org .apache .drill .exec .store .easy .json .loader .JsonLoaderImpl . JsonLoaderBuilder jsonLoaderBuilder ;
111
+ org .apache .drill .exec .store .easy .json .loader .SingleElementIterator < java . io . InputStream > stream ;
97
112
98
113
@ Inject
99
114
OptionManager options ;
100
115
101
116
@ Inject
102
- ResultSetLoader loader ;
117
+ ResultSetLoader rsLoader ;
118
+
119
+ @ Workspace
120
+ org .apache .drill .exec .store .easy .json .loader .JsonLoaderImpl jsonLoader ;
121
+
103
122
104
123
@ Override
105
124
public void setup () {
106
- jsonLoaderBuilder = new org .apache .drill .exec .store .easy .json .loader .JsonLoaderImpl .JsonLoaderBuilder ()
107
- .resultSetLoader (loader )
108
- .standardOptions (options );
125
+ rsLoader .startBatch ();
109
126
}
110
127
111
128
@ Override
@@ -118,12 +135,20 @@ public void eval() {
118
135
}
119
136
120
137
try {
121
- jsonLoaderBuilder .fromString (jsonString );
122
- org .apache .drill .exec .store .easy .json .loader .JsonLoader jsonLoader = jsonLoaderBuilder .build ();
123
- loader .startBatch ();
124
- jsonLoader .readBatch ();
138
+ stream .setValue (org .apache .drill .exec .expr .fn .impl .conv .JsonConverterUtils .convertStringToInputStream (jsonString ));
139
+ if (jsonLoader == null ) {
140
+ jsonLoader = org .apache .drill .exec .expr .fn .impl .conv .JsonConverterUtils .createJsonLoader (rsLoader , options , stream );
141
+ }
142
+ org .apache .drill .exec .physical .resultSet .RowSetLoader rowWriter = rsLoader .writer ();
143
+ rowWriter .start ();
144
+ if (jsonLoader .parser ().next ()) {
145
+ rowWriter .save ();
146
+ }
125
147
} catch (Exception e ) {
126
- throw new org .apache .drill .common .exceptions .DrillRuntimeException ("Error while converting from JSON. " , e );
148
+ throw org .apache .drill .common .exceptions .UserException .dataReadError (e )
149
+ .message ("Error while reading JSON. " )
150
+ .addContext (e .getMessage ())
151
+ .build ();
127
152
}
128
153
}
129
154
}
0 commit comments