Skip to content

Commit 49f8747

Browse files
authored
Merge pull request #13 from mkumar1984/text_extractor_1025
Adding text extractor for extracting unstructured output
2 parents e62ab2f + 1134502 commit 49f8747

File tree

4 files changed

+383
-44
lines changed

4 files changed

+383
-44
lines changed

cdi-core/src/main/java/com/linkedin/cdi/configuration/PropertyCollection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public boolean isValid(State state) {
5858
StringProperties MSTAGE_CONNECTION_CLIENT_FACTORY = new StringProperties("ms.connection.client.factory",
5959
"com.linkedin.cdi.factory.DefaultConnectionClientFactory");
6060

61+
6162
CsvProperties MSTAGE_CSV = new CsvProperties("ms.csv");
6263

6364
BooleanProperties MSTAGE_DATA_EXPLICIT_EOF = new BooleanProperties("ms.data.explicit.eof", Boolean.FALSE);
@@ -244,8 +245,8 @@ public Long getMillis(State state) {
244245
}
245246
};
246247

247-
WatermarkProperties MSTAGE_WATERMARK = new WatermarkProperties("ms.watermark");
248248

249+
WatermarkProperties MSTAGE_WATERMARK = new WatermarkProperties("ms.watermark");
249250
JsonArrayProperties MSTAGE_WATERMARK_GROUPS = new JsonArrayProperties("ms.watermark.groups");
250251

251252
// default: 0, minimum: 0, maximum: -
@@ -403,7 +404,6 @@ protected String getValidNonblankWithDefault(State state) {
403404
TASK_MAXRETRIES,
404405
TASKEXECUTOR_THREADPOOL_SIZE
405406
);
406-
407407
Map<String, MultistageProperties<?>> deprecatedProperties =
408408
new ImmutableMap.Builder<String, MultistageProperties<?>>()
409409
.put("ms.csv.column.header", MSTAGE_CSV)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
// Copyright 2021 LinkedIn Corporation. All rights reserved.
2+
// Licensed under the BSD-2 Clause license.
3+
// See LICENSE in the project root for license information.
4+
5+
package com.linkedin.cdi.extractor;
6+
7+
import com.google.common.annotations.VisibleForTesting;
8+
import com.google.common.base.Preconditions;
9+
import com.google.gson.JsonArray;
10+
import com.google.gson.JsonElement;
11+
import com.google.gson.JsonNull;
12+
import com.google.gson.JsonObject;
13+
import com.google.gson.JsonParser;
14+
import com.linkedin.cdi.configuration.StaticConstants;
15+
import com.linkedin.cdi.keys.ExtractorKeys;
16+
import com.linkedin.cdi.keys.JobKeys;
17+
import com.linkedin.cdi.keys.JsonExtractorKeys;
18+
import com.linkedin.cdi.preprocessor.InputStreamProcessor;
19+
import com.linkedin.cdi.preprocessor.StreamProcessor;
20+
import com.linkedin.cdi.util.JsonUtils;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.InputStreamReader;
24+
import java.io.Reader;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.Iterator;
27+
import java.util.Map;
28+
import javax.annotation.Nullable;
29+
import lombok.Getter;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.gobblin.configuration.WorkUnitState;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
import org.testng.Assert;
35+
36+
37+
/**
38+
* TextExtractor takes an InputStream, applies proper preprocessors, and returns a String output
39+
*/
40+
@Slf4j
41+
public class TextExtractor extends MultistageExtractor<JsonArray, JsonObject> {
42+
private static final Logger logger = LoggerFactory.getLogger(TextExtractor.class);
43+
44+
private final static int TEXT_EXTRACTOR_BYTE_LIMIT = 1048576;
45+
private final static int BUFFER_SIZE = 8192;
46+
private final static String TEXT_EXTRACTOR_SCHEMA =
47+
"[{\"columnName\":\"output\",\"isNullable\":true,\"dataType\":{\"type\":\"string\"}}]";
48+
@Getter
49+
private JsonExtractorKeys jsonExtractorKeys = new JsonExtractorKeys();
50+
51+
public TextExtractor(WorkUnitState state, JobKeys jobKeys) {
52+
super(state, jobKeys);
53+
super.initialize(this.jsonExtractorKeys);
54+
initialize(jsonExtractorKeys);
55+
}
56+
57+
@Override
58+
protected void initialize(ExtractorKeys keys) {
59+
jsonExtractorKeys.logUsage(state);
60+
jsonExtractorKeys.logDebugAll(state.getWorkunit());
61+
}
62+
63+
/**
64+
* Utility function to do a double assignment
65+
* @param jsonExtractorKeys the extractor key
66+
*/
67+
@VisibleForTesting
68+
protected void setFileDumpExtractorKeys(JsonExtractorKeys jsonExtractorKeys) {
69+
this.extractorKeys = jsonExtractorKeys;
70+
this.jsonExtractorKeys = jsonExtractorKeys;
71+
}
72+
73+
/**
74+
* This method rely on the parent class to get a JsonArray formatted schema, and pass it out as
75+
* a string. Typically we expect the downstream is a CsvToJsonConverter.
76+
*
77+
* @return schema that is structured as a JsonArray but formatted as a String
78+
*/
79+
@Override
80+
public JsonArray getSchema() {
81+
JsonParser parser = new JsonParser();
82+
JsonElement jsonelement = parser.parse(TEXT_EXTRACTOR_SCHEMA);
83+
JsonArray schemaArray = jsonelement.getAsJsonArray();
84+
Assert.assertNotNull(schemaArray);
85+
if (jobKeys.getDerivedFields().size() > 0 &&
86+
JsonUtils.get(StaticConstants.KEY_WORD_COLUMN_NAME, jobKeys.getDerivedFields().keySet().iterator().next(),
87+
StaticConstants.KEY_WORD_COLUMN_NAME, schemaArray) == JsonNull.INSTANCE) {
88+
schemaArray.addAll(addDerivedFieldsToAltSchema());
89+
}
90+
return schemaArray;
91+
}
92+
93+
@Nullable
94+
@Override
95+
public JsonObject readRecord(JsonObject reuse) {
96+
if (this.jsonExtractorKeys.getTotalCount() == 1) {
97+
return null;
98+
}
99+
if (processInputStream(this.jsonExtractorKeys.getTotalCount())) {
100+
this.jsonExtractorKeys.setTotalCount(1);
101+
StringBuffer output = new StringBuffer();
102+
if (workUnitStatus.getBuffer() == null) {
103+
logger.warn("Received a NULL InputStream, end the work unit");
104+
return null;
105+
} else {
106+
try {
107+
// apply preprocessors
108+
InputStream input = workUnitStatus.getBuffer();
109+
for (StreamProcessor<?> transformer : extractorKeys.getPreprocessors()) {
110+
if (transformer instanceof InputStreamProcessor) {
111+
input = ((InputStreamProcessor) transformer).process(input);
112+
}
113+
}
114+
writeToStringBuffer(input, output);
115+
input.close();
116+
JsonObject jsonObject = new JsonObject();
117+
jsonObject.addProperty("output", output.toString());
118+
JsonObject outputJson = addDerivedFields(jsonObject);
119+
return outputJson;
120+
} catch (Exception e) {
121+
logger.error("Error while extracting from source or writing to target", e);
122+
this.state.setWorkingState(WorkUnitState.WorkingState.FAILED);
123+
return null;
124+
}
125+
}
126+
} else {
127+
return this.readRecord(reuse);
128+
}
129+
}
130+
131+
/**
132+
* write an input stream at the dump location.
133+
*/
134+
private void writeToStringBuffer(InputStream is, StringBuffer output) {
135+
Preconditions.checkNotNull(is, "InputStream");
136+
try {
137+
char[] buffer = new char[BUFFER_SIZE];
138+
long totalBytes = 0;
139+
int len = 0;
140+
Reader in = new InputStreamReader(is, StandardCharsets.UTF_8);
141+
while ((len = in.read(buffer)) != -1) {
142+
output.append(String.valueOf(buffer, 0, len));
143+
totalBytes += len;
144+
if (totalBytes > TEXT_EXTRACTOR_BYTE_LIMIT) {
145+
logger.warn("Download limit of {} bytes reached for text extractor ", TEXT_EXTRACTOR_BYTE_LIMIT);
146+
break;
147+
}
148+
}
149+
is.close();
150+
logger.info("TextExtractor: written {} bytes ", totalBytes);
151+
} catch (IOException e) {
152+
throw new RuntimeException("Unable to extract text in TextExtractor", e);
153+
}
154+
}
155+
private String processDerivedFieldSource(JsonObject row, String name, Map<String, String> derivedFieldDef) {
156+
String source = (String)derivedFieldDef.getOrDefault("source", "");
157+
String inputValue = (String)derivedFieldDef.getOrDefault("value", "");
158+
boolean isInputValueFromSource = false;
159+
if (this.jsonExtractorKeys.getPushDowns().entrySet().size() > 0 && this.jsonExtractorKeys.getPushDowns().has(name)) {
160+
inputValue = this.jsonExtractorKeys.getPushDowns().get(name).getAsString();
161+
isInputValueFromSource = true;
162+
} else if (this.isInputValueFromSource(source)) {
163+
JsonElement ele = JsonUtils.get(row, source);
164+
if (ele != null && !ele.isJsonNull()) {
165+
inputValue = ele.getAsString();
166+
isInputValueFromSource = true;
167+
}
168+
}
169+
170+
return this.generateDerivedFieldValue(name, derivedFieldDef, inputValue, isInputValueFromSource);
171+
}
172+
173+
private JsonObject addDerivedFields(JsonObject row) {
174+
Iterator var2 = this.jobKeys.getDerivedFields().entrySet().iterator();
175+
176+
while(var2.hasNext()) {
177+
Map.Entry<String, Map<String, String>> derivedField = (Map.Entry)var2.next();
178+
String name = (String)derivedField.getKey();
179+
Map<String, String> derivedFieldDef = (Map)derivedField.getValue();
180+
String strValue = this.processDerivedFieldSource(row, name, derivedFieldDef);
181+
String type = (String)((Map)derivedField.getValue()).get("type");
182+
byte var9 = -1;
183+
switch(type.hashCode()) {
184+
case -1034364087:
185+
if (type.equals("number")) {
186+
var9 = 5;
187+
}
188+
break;
189+
case -934799095:
190+
if (type.equals("regexp")) {
191+
var9 = 2;
192+
}
193+
break;
194+
case -891985903:
195+
if (type.equals("string")) {
196+
var9 = 1;
197+
}
198+
break;
199+
case 3120063:
200+
if (type.equals("epoc")) {
201+
var9 = 0;
202+
}
203+
break;
204+
case 64711720:
205+
if (type.equals("boolean")) {
206+
var9 = 3;
207+
}
208+
break;
209+
case 1958052158:
210+
if (type.equals("integer")) {
211+
var9 = 4;
212+
}
213+
}
214+
215+
switch(var9) {
216+
case 0:
217+
if (strValue.length() > 0) {
218+
row.addProperty(name, Long.parseLong(strValue));
219+
}
220+
break;
221+
case 1:
222+
case 2:
223+
row.addProperty(name, strValue);
224+
break;
225+
case 3:
226+
row.addProperty(name, Boolean.parseBoolean(strValue));
227+
break;
228+
case 4:
229+
row.addProperty(name, Integer.parseInt(strValue));
230+
break;
231+
case 5:
232+
row.addProperty(name, Double.parseDouble(strValue));
233+
break;
234+
default:
235+
this.failWorkUnit("Unsupported type for derived fields: " + type);
236+
}
237+
}
238+
239+
return row;
240+
}
241+
242+
/**
243+
* Utility function to do a double assignment
244+
* @param jsonExtractorKeys the extractor key
245+
*/
246+
@VisibleForTesting
247+
protected void setJsonExtractorKeys(JsonExtractorKeys jsonExtractorKeys) {
248+
this.extractorKeys = jsonExtractorKeys;
249+
this.jsonExtractorKeys = jsonExtractorKeys;
250+
}
251+
}

cdi-core/src/test/java/com/linkedin/cdi/configuration/MultistagePropertiesIndividualTest.java

-42
Original file line numberDiff line numberDiff line change
@@ -181,48 +181,6 @@ public void testSSL() {
181181
}
182182

183183
@Test
184-
public void testWatermark() {
185-
SourceState state = new SourceState();
186-
Assert.assertTrue(MSTAGE_WATERMARK.isValid(state));
187-
188-
// not a JsonArray
189-
state.setProp("ms.watermark", "string");
190-
Assert.assertFalse(MSTAGE_WATERMARK.isValid(state));
191-
192-
// array item is not a JsonObject
193-
state.setProp("ms.watermark", "[\"string\"]");
194-
Assert.assertFalse(MSTAGE_WATERMARK.isValid(state));
195-
196-
// no "name"
197-
state.setProp("ms.watermark", "[{\"type\": \"datetime\",\"range\": {\"from\": \"2019-01-01\", \"to\": \"-\"}}]");
198-
Assert.assertFalse(MSTAGE_WATERMARK.isValid(state));
199-
200-
// unknown type
201-
state.setProp("ms.watermark", "[{\"name\": \"system\",\"type\": \"unknown\"}]");
202-
Assert.assertFalse(MSTAGE_WATERMARK.isValid(state));
203-
204-
// no "range"
205-
state.setProp("ms.watermark", "[{\"name\": \"system\",\"type\": \"datetime\"}]");
206-
Assert.assertFalse(MSTAGE_WATERMARK.isValid(state));
207-
208-
// no "units"
209-
state.setProp("ms.watermark", "[{\"name\": \"system\",\"type\": \"unit\"}]");
210-
Assert.assertFalse(MSTAGE_WATERMARK.isValid(state));
211-
212-
// normal datetime watermark
213-
state.setProp("ms.watermark", "[{\"name\": \"system\",\"type\": \"datetime\",\"range\": {\"from\": \"2019-01-01\", \"to\": \"-\"}}]");
214-
Assert.assertTrue(MSTAGE_WATERMARK.isValid(state));
215-
Assert.assertEquals(MSTAGE_WATERMARK.getRanges(state).getRight(), "-");
216-
217-
// normal datetime watermark and normal unit watermark
218-
state.setProp("ms.watermark", "[{\"name\": \"system\",\"type\": \"datetime\", \"range\": {\"from\": \"2021-08-21\", \"to\": \"-\"}}, {\"name\": \"bucketId\", \"type\": \"unit\", \"units\": \"null,0,1,2,3,4,5,6,7,8,9\"}]");
219-
Assert.assertTrue(MSTAGE_WATERMARK.isValid(state));
220-
Assert.assertEquals(MSTAGE_WATERMARK.getRanges(state).getLeft(), "2021-08-21");
221-
Assert.assertEquals(MSTAGE_WATERMARK.getUnits(state), Lists.newArrayList("null,0,1,2,3,4,5,6,7,8,9".split(",")));
222-
}
223-
224-
225-
@Test
226184
public void testWorkUnitParallelismMax() {
227185
SourceState state = new SourceState();
228186
Assert.assertTrue(MSTAGE_WORK_UNIT_PARALLELISM_MAX.isValid(state));

0 commit comments

Comments
 (0)