2
2
*
3
3
* MIT License
4
4
*
5
- * Copyright (c) 2019 Institutional Shareholder Services. All other rights reserved.
5
+ * Copyright (c) 2020 Institutional Shareholder Services. All other rights reserved.
6
6
*
7
7
* Permission is hereby granted, free of charge, to any person obtaining a copy
8
8
* of this software and associated documentation files (the "Software"), to deal
66
66
import org .apache .nifi .processor .io .OutputStreamCallback ;
67
67
import org .apache .nifi .processor .util .StandardValidators ;
68
68
69
+ import edu .stanford .nlp .pipeline .AnnotationPipeline ;
70
+
69
71
@ Tags ({ "Stanford" , "CoreNLP" })
70
72
@ CapabilityDescription ("Stanford CoreNLP Processor" )
71
73
@ SeeAlso ({})
@@ -157,7 +159,8 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
157
159
158
160
@ OnScheduled
159
161
public void onScheduled (final ProcessContext context ) throws Exception {
160
- ensureService (context );
162
+ getLogger ().debug ("OnScheduled called for StanfordCoreNLPProcessor, refreshing StanfordCoreNLPService" );
163
+ service = new StanfordCoreNLPService (createPipeline (context ));
161
164
}
162
165
163
166
@ Override
@@ -169,22 +172,22 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
169
172
flowFile = session .create ();
170
173
}
171
174
172
- String flowFileText = getTextFromSession (session , flowFile );
175
+ final String flowFileText = getTextFromSession (session , flowFile );
173
176
174
177
if (flowFileText == null || flowFileText .isEmpty ()) {
175
178
getLogger ().error ("Empty flow file cannot be analyzed" );
176
179
session .transfer (flowFile , FAILURE_RELATIONSHIP );
177
180
return ;
178
181
}
179
182
180
- String jsonPath = context .getProperty (PATH_ATTR ).evaluateAttributeExpressions (flowFile ).getValue ();
181
- String entityTypes = context .getProperty (ENTITIES_ATTR ).evaluateAttributeExpressions (flowFile ).getValue ();
182
- String text = getTextFromJson (flowFileText , jsonPath );
183
+ final String jsonPath = context .getProperty (PATH_ATTR ).evaluateAttributeExpressions (flowFile ).getValue ();
184
+ final String entityTypes = context .getProperty (ENTITIES_ATTR ).evaluateAttributeExpressions (flowFile ).getValue ();
185
+ final String text = getTextFromJson (flowFileText , jsonPath );
183
186
Map <String , List <String >> entityMap ;
184
187
185
188
try {
186
189
entityMap = service .extractEntities (text , entityTypes );
187
- } catch (Exception e ) {
190
+ } catch (final Exception e ) {
188
191
e .printStackTrace ();
189
192
getLogger ().error ("Failed to analyze flow file text" );
190
193
session .transfer (flowFile , FAILURE_RELATIONSHIP );
@@ -193,136 +196,142 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
193
196
194
197
Map <String , Object > flowFileJsonMap ;
195
198
196
- Gson gson = new Gson ();
199
+ final Gson gson = new Gson ();
197
200
try {
198
201
flowFileJsonMap = gson .fromJson (flowFileText , Map .class );
199
- } catch (JsonSyntaxException e ) {
202
+ } catch (final JsonSyntaxException e ) {
200
203
e .printStackTrace ();
201
204
getLogger ().warn ("Failed to parse flow file text as json, writing new flow file from blank json document" );
202
205
flowFileJsonMap = new HashMap <String , Object >();
203
206
}
204
207
205
208
try {
206
- for (String k : entityMap .keySet ()) {
209
+ for (final String k : entityMap .keySet ()) {
207
210
flowFileJsonMap .put (k , entityMap .get (k ));
208
211
}
209
212
210
- String entityJson = gson .toJson (entityMap );
211
- String finalJson = gson .toJson (flowFileJsonMap );
213
+ final String entityJson = gson .toJson (entityMap );
214
+ final String finalJson = gson .toJson (flowFileJsonMap );
212
215
213
216
flowFile = session .putAttribute (flowFile , OUTPUT_ATTR , entityJson );
214
217
flowFile = session .write (flowFile , new OutputStreamCallback () {
215
218
@ Override
216
- public void process (OutputStream out ) throws IOException {
219
+ public void process (final OutputStream out ) throws IOException {
217
220
out .write (finalJson .getBytes ());
218
221
}
219
222
});
220
223
221
224
session .transfer (flowFile , SUCCESS_RELATIONSHIP );
222
225
return ;
223
- } catch (Exception e ) {
226
+ } catch (final Exception e ) {
224
227
e .printStackTrace ();
225
228
getLogger ().warn ("Failed to generate flow file or attributes" );
226
229
}
227
230
228
231
session .transfer (flowFile , FAILURE_RELATIONSHIP );
229
232
}
230
233
231
- private String getTextFromSession (final ProcessSession session , FlowFile flowFile ) {
234
+ private String getTextFromSession (final ProcessSession session , final FlowFile flowFile ) {
232
235
final AtomicReference <String > atomicText = new AtomicReference <>();
233
236
234
237
session .read (flowFile , new InputStreamCallback () {
235
238
@ Override
236
- public void process (InputStream in ) throws IOException {
239
+ public void process (final InputStream in ) throws IOException {
237
240
try {
238
- String rawText = IOUtils .toString (in );
241
+ final String rawText = IOUtils .toString (in );
239
242
atomicText .set (rawText );
240
- } catch (NullPointerException e ) {
243
+ } catch (final NullPointerException e ) {
241
244
e .printStackTrace ();
242
245
getLogger ().warn ("FlowFile text was null" );
243
- } catch (IOException e ) {
246
+ } catch (final IOException e ) {
244
247
e .printStackTrace ();
245
248
getLogger ().error ("FlowFile text could not be read due to IOException" );
246
249
}
247
250
}
248
251
});
249
252
250
- String text = atomicText .get ();
253
+ final String text = atomicText .get ();
251
254
if (text == null || text .isEmpty ()) {
252
255
return null ;
253
256
}
254
257
255
258
return text ;
256
259
}
257
260
258
- private String getTextFromJson (String flowFileText , String jsonPath ) {
261
+ private String getTextFromJson (final String flowFileText , final String jsonPath ) {
259
262
if (jsonPath == null || jsonPath .isEmpty ()) {
260
263
return flowFileText ;
261
264
}
262
265
263
266
try {
264
- Configuration conf = Configuration .builder ().options (Option .ALWAYS_RETURN_LIST ).build ();
265
- List <String > result = JsonPath .using (conf ).parse (flowFileText ).read (jsonPath );
266
- String combined = String .join (" " , result );
267
- getLogger ().info ("Extracted this text from the flow file with the configured json path: " + combined );
267
+ final Configuration conf = Configuration .builder ().options (Option .ALWAYS_RETURN_LIST ).build ();
268
+ final List <String > result = JsonPath .using (conf ).parse (flowFileText ).read (jsonPath );
269
+ final String combined = String .join (" " , result );
268
270
return combined ;
269
- } catch (ClassCastException e ) {
270
- LinkedHashMap <String , Object > resultMap = JsonPath .read (flowFileText , jsonPath );
271
+ } catch (final ClassCastException e ) {
272
+ final LinkedHashMap <String , Object > resultMap = JsonPath .read (flowFileText , jsonPath );
271
273
String combined = "" ;
272
- for (String k : resultMap .keySet ()) {
274
+ for (final String k : resultMap .keySet ()) {
273
275
combined += " " + resultMap .get (k );
274
276
}
275
- getLogger ().info ("Extracted this text from the flow file with the configured json path: " + combined );
276
277
return combined ;
277
- } catch (Exception e ) {
278
+ } catch (final Exception e ) {
278
279
e .printStackTrace ();
279
280
getLogger ().warn ("Failed to parse json using specified json path, analyzing flow file as text" );
280
281
}
281
282
282
283
return flowFileText ;
283
284
}
284
285
285
- private void ensureService (final ProcessContext context ) {
286
- if (service != null ) {
287
- return ;
288
- }
289
- String jsonProps = context .getProperty (PROPS_ATTR ).getValue ();
290
- Properties props = jsonToProps (jsonProps );
291
-
292
- String host = context .getProperty (HOST_ATTR ).getValue ();
293
-
294
- if (host == null ) {
295
- service = new StanfordCoreNLPService (props );
296
- return ;
297
- }
298
-
286
+ private int getPort (final ProcessContext context ) {
299
287
int port ;
300
288
try {
301
289
port = context .getProperty (PORT_ATTR ).asInteger ();
302
- } catch (NumberFormatException e ) {
290
+ } catch (final NumberFormatException e ) {
303
291
e .printStackTrace ();
304
292
getLogger ().error ("Failed to read port as integer, using default 9000" );
305
293
port = 9000 ;
306
294
}
295
+ return port ;
296
+ }
297
+
298
+ private AnnotationPipeline createPipeline (final ProcessContext context ) {
299
+ final String jsonProps = context .getProperty (PROPS_ATTR ).getValue ();
300
+ final Properties props = jsonToProps (jsonProps );
301
+ final String host = context .getProperty (HOST_ATTR ).getValue ();
302
+
303
+ if (host == null ) {
304
+ return StanfordCoreNLPService .createPipeline (props );
305
+ }
306
+
307
+ final int port = getPort (context );
308
+ final String key = context .getProperty (KEY_ATTR ).getValue ();
309
+ final String secret = context .getProperty (SECRET_ATTR ).getValue ();
307
310
308
- String key = context .getProperty (KEY_ATTR ).getValue ();
309
- String secret = context .getProperty (SECRET_ATTR ).getValue ();
311
+ return StanfordCoreNLPService .createPipeline (props , host , port , key , secret );
312
+ }
313
+
314
+ private void ensureService (final ProcessContext context ) {
315
+ if (service != null ) {
316
+ return ;
317
+ }
310
318
311
- service = new StanfordCoreNLPService (props , host , port , key , secret );
319
+ service = new StanfordCoreNLPService (createPipeline (context ));
320
+ return ;
312
321
}
313
322
314
- private Properties jsonToProps (String jsonProps ) {
315
- Properties props = new Properties ();
323
+ private Properties jsonToProps (final String jsonProps ) {
324
+ final Properties props = new Properties ();
316
325
if (jsonProps == null ) {
317
326
return props ;
318
327
}
319
- Gson gson = new Gson ();
328
+ final Gson gson = new Gson ();
320
329
try {
321
- Map <String , Object > jsonMap = gson .fromJson (jsonProps , Map .class );
322
- for (String k : jsonMap .keySet ()) {
330
+ final Map <String , Object > jsonMap = gson .fromJson (jsonProps , Map .class );
331
+ for (final String k : jsonMap .keySet ()) {
323
332
props .setProperty (k , jsonMap .get (k ).toString ());
324
333
}
325
- } catch (JsonSyntaxException e ) {
334
+ } catch (final JsonSyntaxException e ) {
326
335
e .printStackTrace ();
327
336
getLogger ().error ("Failed to read json string." );
328
337
}
0 commit comments