1010import com .google .gson .JsonObject ;
1111import com .linkedin .cdi .configuration .MultistageProperties ;
1212import com .linkedin .cdi .configuration .StaticConstants ;
13+ import com .linkedin .cdi .util .JsonUtils ;
1314import java .text .DecimalFormat ;
1415import java .util .ArrayList ;
1516import java .util .List ;
17+ import java .util .concurrent .atomic .AtomicInteger ;
1618import lombok .extern .slf4j .Slf4j ;
1719import org .apache .avro .Schema ;
1820import org .apache .avro .generic .GenericData ;
2931
3032/**
3133 * This converter does basic count validation based on the Failure Records or Success Records criteria.
34+ *
35+ * To use this converter for validation, the main source should be the dataset to be validated,
36+ * and the secondary input should be the base dataset to validate against.
37+ *
38+ * The base dataset can be in a nested column of the secondary input, i.e. a field, which can be
39+ * retrieved through a JSON path, contains the actual base records.
40+ *
41+ * Currently following rules are defined:
42+ *
43+ * fail (upper bound rule): the source should be failed records
44+ * Job succeeds when the row count in validation set / row count in base set < threshold
45+ * Job fails when the row count in validation set / row count in base set >= threshold
46+ *
47+ * success (lower bound rule): the source should be succeeded records
48+ * Job succeeds when the row count in validation set / row count in base set >= threshold
49+ * Job fails when the row count in validation set / row count in base set < threshold
3250 */
3351@ Slf4j
3452public class InFlowValidationConverter extends Converter <Schema , Schema , GenericRecord , GenericRecord > {
3553 int expectedRecordsCount ;
3654 int actualRecordsCount ;
3755 private String field ;
38- private int failurePercentage ;
56+ private int threshold ;
3957 private String criteria ;
4058 private String errorColumn ;
4159
4260 @ Override
4361 public Converter <Schema , Schema , GenericRecord , GenericRecord > init (WorkUnitState workUnitState ) {
4462 //Load the input to memory
45- getPayloads (workUnitState );
63+ expectedRecordsCount = getBaseRowCount (workUnitState );
4664 fillValidationAttributes (workUnitState );
4765 return super .init (workUnitState );
4866 }
@@ -74,20 +92,15 @@ private void verifyAndUpdateCount(GenericRecord inputRecord) {
7492 actualRecordsCount += ((GenericData .Array <?>) inputRecord .get (fieldList .get (0 ).name ())).size ();
7593 }
7694 } else {
77- if (errorColumn != null ) {
78- updateFailureCount (inputRecord );
79- } else {
80- throw new RuntimeException ("Invalid ms.data.field/ms.validation.attributes configuration. "
81- + "InputRecord should be of type Array or should have errorColumn" );
82- }
95+ actualRecordsCount += (errorColumn == null || inputRecord .get (errorColumn ) != null ? 1 : 0 );
8396 }
8497 }
8598
8699 private void fillValidationAttributes (WorkUnitState workUnitState ) {
87100 JsonObject validationAttributes =
88101 MultistageProperties .MSTAGE_VALIDATION_ATTRIBUTES .getValidNonblankWithDefault (workUnitState );
89102 if (validationAttributes .has (KEY_WORD_THRESHOLD )) {
90- failurePercentage = validationAttributes .get (KEY_WORD_THRESHOLD ).getAsInt ();
103+ threshold = validationAttributes .get (KEY_WORD_THRESHOLD ).getAsInt ();
91104 }
92105 if (validationAttributes .has (KEY_WORD_CRITERIA )) {
93106 criteria = validationAttributes .get (KEY_WORD_CRITERIA ).getAsString ();
@@ -102,34 +115,41 @@ private void fillValidationAttributes(WorkUnitState workUnitState) {
102115 * If field is configured in the secondary input and field column
103116 * is of type array expected record count with array size
104117 * else use all the input records as expected size
118+ * @param workUnitState the work unit state object containing secondary input parameter
119+ * @return the expected row count
105120 */
106- private void getPayloads (WorkUnitState workUnitState ) {
107- JsonArray payloads = MultistageProperties .MSTAGE_SECONDARY_INPUT .getValidNonblankWithDefault (workUnitState );
108- JsonArray records = new JsonArray ();
109- List <String > fields = new ArrayList <>();
121+ private int getBaseRowCount (WorkUnitState workUnitState ) {
122+ JsonArray payloads = JsonUtils .filter (KEY_WORD_CATEGORY , KEY_WORD_PAYLOAD ,
123+ MultistageProperties .MSTAGE_SECONDARY_INPUT .getValidNonblankWithDefault (workUnitState ));
124+
125+ // by default, we expect 1 record
126+ if (payloads .size () == 0 ) {
127+ return 1 ;
128+ }
129+
130+ // secondary input can have multiple payload entries, and each can configure a "fields" element
131+ // but for validation purpose, only the first payload entry, and the first field is used.
132+ JsonElement fields = JsonUtils .get (KEY_WORD_FIELDS , payloads .get (0 ).getAsJsonObject ());
133+ field = StringUtils .EMPTY ;
134+ if (fields .isJsonArray () && fields .getAsJsonArray ().size () > 0 ) {
135+ field = fields .getAsJsonArray ().get (0 ).getAsString ();
136+ }
137+
138+ AtomicInteger rowCount = new AtomicInteger ();
110139 for (JsonElement entry : payloads ) {
111- if (!entry .isJsonObject ()) {
112- log .error ("Elements within secondary input should be valid JsonObjects, provided: {}" , entry .toString ());
113- }
114140 JsonObject entryJson = entry .getAsJsonObject ();
141+ JsonArray records = new JsonArray ();
115142 records .addAll (new HdfsReader (workUnitState ).readSecondary (entryJson ));
116- if (entryJson .has (StaticConstants .KEY_WORD_FIELDS )) {
117- if (entryJson .get (StaticConstants .KEY_WORD_FIELDS ).isJsonArray ()) {
118- entryJson .get (StaticConstants .KEY_WORD_FIELDS )
119- .getAsJsonArray ()
120- .forEach (arrayItem -> fields .add (arrayItem .getAsString ()));
121- }
122- field = fields .size () >= 1 ? fields .get (0 ) : StringUtils .EMPTY ;
123- }
143+
124144 // No of expected records
125- if (records .size () > 0 && StringUtils . isNotBlank ( field ) && ( records . get ( 0 )
126- . getAsJsonObject ( )
127- .get (field ) instanceof JsonArray )) {
128- records .forEach (record -> expectedRecordsCount += record .getAsJsonObject ().get (field ).getAsJsonArray ().size ());
129- } else if ( records . size () > 0 ) {
130- expectedRecordsCount = records .size ();
145+ if (records .size () > 0
146+ && StringUtils . isNotBlank ( field )
147+ && ( records . get ( 0 ). getAsJsonObject () .get (field ) instanceof JsonArray )) {
148+ records .forEach (record -> rowCount . addAndGet ( record .getAsJsonObject ().get (field ).getAsJsonArray ().size () ));
149+ } else {
150+ rowCount . addAndGet ( records .size () );
131151 }
132- }
152+ } return rowCount . get ();
133153 }
134154
135155 private void updateFailureCount (GenericRecord record ) {
@@ -144,21 +164,20 @@ private void updateFailureCount(GenericRecord record) {
144164 private void validateRule () {
145165 // check the threshold and throw new Runtime Exception
146166 float actualPercentage = ((float ) actualRecordsCount / expectedRecordsCount ) * 100 ;
147- boolean failJob = false ;
148- // validate rules based on type of records
149- if (criteria .equalsIgnoreCase (KEY_WORD_FAIL )) {
150- failJob = actualPercentage > failurePercentage ;
151- } else if (criteria .equalsIgnoreCase (KEY_WORD_SUCCESS )) {
152- failJob = (100 - actualPercentage ) > failurePercentage ;
153- }
154- log .info ("Total expectedRecords: {} , failedRecords: {}" , expectedRecordsCount , actualRecordsCount );
167+ log .info ("base row count: {}, actual row count: {}" , expectedRecordsCount , actualRecordsCount );
168+
169+ boolean failJob = criteria .equalsIgnoreCase (KEY_WORD_FAIL ) && actualPercentage >= threshold
170+ || criteria .equalsIgnoreCase (KEY_WORD_SUCCESS ) && actualPercentage < threshold ;
155171
156172 if (failJob ) {
157173 // Fail the validation by throwing runtime exception
158- throw new RuntimeException ("Failure Threshold exceeds more than " + failurePercentage + "%" );
174+ throw new RuntimeException ("Failure Threshold exceeds more than " + threshold + "%" );
159175 } else {
160- log .info ("Validation passed with failure rate {}% less than {}%" ,
161- new DecimalFormat ("##.##" ).format (actualPercentage ), failurePercentage );
176+ log .info ("Validation passed with {} rate {}% {} {}%" ,
177+ criteria .equalsIgnoreCase (KEY_WORD_FAIL ) ? "failure" : "success" ,
178+ new DecimalFormat ("##.##" ).format (actualPercentage ),
179+ criteria .equalsIgnoreCase (KEY_WORD_FAIL ) ? "less than" : "greater than or equal" ,
180+ threshold );
162181 }
163182 }
164183}
0 commit comments