Skip to content

Commit 9f54deb

Browse files
author
Chris Li
committed
Merge latest code
1 parent 72beea1 commit 9f54deb

File tree

66 files changed

+2189
-476
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2189
-476
lines changed

README.md

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,32 @@ LinkedIn Data Integration Library (DIL) is a collection of generic data integrat
1717
- Ingest data from one Rest API and egress to another (Rest API) on cloud
1818

1919
# Requirements
20-
* Java >= 1.8
20+
* JDK 1.8
2121

2222
If building the distribution with tests turned on:
2323
* Maven version 3.5.3
2424

2525
# Instructions to build the distribution
2626
1. Extract the archive file to your local directory.
27-
2. Set JAVA_HOME
27+
2. Set JAVA_HOME to use JDK 1.8 (JDK 11+ not supported)
2828
3. Build
29-
* Skip tests and build the distribution
30-
> `./gradlew build -x findbugsMain -x test -x rat -x checkstyleMain`
31-
32-
* Tests and build the distribution (requires Maven):
3329
> `./gradlew build`
30+
31+
# Instructions to contribute
32+
To contribute, please use submit Pull Request (PR) for committers to merge.
33+
- Create your own fork on GitHub off the main repository
34+
- Clone your fork to your local computer
35+
>- `git clone https://github.com/<<your-github-login>>/data-integration-library.git`
36+
- Add upstream and verify
37+
>- `git remote add upstream https://github.com/linkedin/data-integration-library.git`
38+
>- `git remote -v`
39+
- Change, test, commit, and push to your fork
40+
>- `git status`
41+
>- `git add .`
42+
>- `git commit -m "comments"`
43+
>- `git push origin master`
44+
- Create Pull Request on GitHub with the following details
45+
>- Title
46+
>- Detailed description
47+
>- Document the tests done
48+
>- Links to the updated documents

buildSrc/src/main/groovy/org/apache/gobblin/gradle/BuildProperties.groovy

Lines changed: 0 additions & 50 deletions
This file was deleted.

buildSrc/src/main/groovy/org/apache/gobblin/gradle/BuildProperty.groovy

Lines changed: 0 additions & 23 deletions
This file was deleted.

cdi-core/src/main/java/com/linkedin/cdi/configuration/MultistageProperties.java

Lines changed: 79 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
import com.google.gson.Gson;
88
import com.google.gson.JsonArray;
99
import com.google.gson.JsonObject;
10+
import com.linkedin.cdi.factory.DefaultConnectionClientFactory;
1011
import lombok.Getter;
1112
import lombok.extern.slf4j.Slf4j;
1213
import org.apache.commons.lang.StringUtils;
1314
import org.apache.gobblin.configuration.State;
14-
import com.linkedin.cdi.factory.DefaultS3ClientFactory;
1515

1616

1717
/**
@@ -117,7 +117,20 @@ public <T> T getDefaultValue() {
117117
* this value is in milliseconds.
118118
*/
119119
MSTAGE_CALL_INTERVAL("ms.call.interval.millis", Long.class),
120+
MSTAGE_CONVERTER_CSV_MAX_FAILURES("ms.converter.csv.max.failures", Long.class),
121+
MSTAGE_CONVERTER_KEEP_NULL_STRINGS("ms.converter.keep.null.strings", Boolean.class) {
122+
@Override
123+
public <T> T getDefaultValue() {
124+
return (T) Boolean.FALSE;
125+
}
126+
},
120127
MSTAGE_CSV_COLUMN_HEADER("ms.csv.column.header", Boolean.class),
128+
MSTAGE_CSV_COLUMN_HEADER_INDEX("ms.csv.column.header.index", Integer.class) {
129+
@Override
130+
public <T> T getDefaultValue() {
131+
return (T) Integer.valueOf(0);
132+
}
133+
},
121134
/**
122135
* a comma-separated string, where each value is either an integer or a range
123136
* representing the index of the field to include
@@ -155,6 +168,18 @@ public <T> T getDefaultValue() {
155168
return (T) ",";
156169
}
157170
},
171+
/**
172+
* By default, CsvExtractor tries to infer the true type of fields when inferring schema
173+
* However, in some cases, the inference is not accurate, and users may prefer to keep all fields as strings.
174+
* In this case ms.csv.default.field.type = string
175+
* Supported types: string | int | long | double | boolean | float
176+
*/
177+
MSTAGE_CSV_DEFAULT_FIELD_TYPE("ms.csv.default.field.type", String.class) {
178+
@Override
179+
public <T> T getDefaultValue() {
180+
return (T) StringUtils.EMPTY;
181+
}
182+
},
158183
/**
159184
* if csv.column.header is true, csv.skip.lines will be 1 by default, if more than 1
160185
* row to be skipped, then set this parameter explicitly.
@@ -332,13 +357,13 @@ public Long getMillis(State state) {
332357
}
333358
},
334359
/**
335-
* http.client.factory define an indirect way to specify the type of HttpClient to use.
336-
* default = {@link com.linkedin.cdi.factory.ApacheHttpClientFactory}
360+
* Define an indirect way to specify the type of connection clients
361+
* default = {@link DefaultConnectionClientFactory}
337362
*/
338-
MSTAGE_HTTP_CLIENT_FACTORY("ms.http.client.factory", String.class) {
363+
MSTAGE_CONNECTION_CLIENT_FACTORY("ms.connection.client.factory", String.class) {
339364
@Override
340365
public <T> T getDefaultValue() {
341-
return (T) "com.linkedin.cdi.factory.ApacheHttpClientFactory";
366+
return (T) "com.linkedin.cdi.factory.DefaultConnectionClientFactory";
342367
}
343368
},
344369
/**
@@ -387,17 +412,6 @@ public <T> T getDefaultValue() {
387412
* Currently, we don't allow exceptions being made to revert errors by using reason code.
388413
*/
389414
MSTAGE_HTTP_STATUS_REASONS("ms.http.status.reasons", JsonObject.class),
390-
/**
391-
* jdbc.client.factory define an indirect way to specify the type of JDBC Client to use.
392-
* default = {@link com.linkedin.cdi.factory.DefaultJdbcClientFactory}
393-
*/
394-
MSTAGE_JDBC_CLIENT_FACTORY("ms.jdbc.client.factory", String.class) {
395-
@Override
396-
public <T> T getDefaultValue() {
397-
return (T) "com.linkedin.cdi.factory.DefaultJdbcClientFactory";
398-
}
399-
},
400-
401415
MSTAGE_JDBC_SCHEMA_REFACTOR("ms.jdbc.schema.refactor", String.class) {
402416
@Override
403417
public <T> T getDefaultValue() {
@@ -497,6 +511,24 @@ public boolean validateNonblank(State state) {
497511
* path and fields, etc.
498512
*/
499513
MSTAGE_PAYLOAD_PROPERTY("ms.payload.property", JsonArray.class),
514+
/**
515+
* This property is required for inflowValidation with simple count comparison rule
516+
* The rule is accepted as a JsonObject with following Keys
517+
* 1. "threshold" - represents the percentage of accepted failure records to mark job as passed
518+
* 2. "criteria" - this value can be "fail" or "success" , fail represents that input record only has failed records
519+
* 3. "errorColumn" - this value is optional and is required when we require to filter the failure records based on a specific column
520+
* if the input record has only success records then set this as "success"
521+
* Ex: ms.validation.attributes={"threshold": "10", "criteria" : "fail"}
522+
*/
523+
MSTAGE_VALIDATION_ATTRIBUTES("ms.validation.attributes", JsonObject.class) {
524+
@Override
525+
public <T> T getDefaultValue() {
526+
JsonObject attributesJson = new JsonObject();
527+
attributesJson.addProperty(StaticConstants.KEY_WORD_THRESHOLD, 0);
528+
attributesJson.addProperty(StaticConstants.KEY_WORD_CRITERIA, StaticConstants.KEY_WORD_FAIL);
529+
return (T) attributesJson;
530+
}
531+
},
500532
MSTAGE_RETENTION("ms.retention", JsonObject.class) {
501533
@Override
502534
public <T> T getDefaultValue() {
@@ -507,16 +539,6 @@ public <T> T getDefaultValue() {
507539
return (T) retention;
508540
}
509541
},
510-
/**
511-
* s3.client.factory define an indirect way to specify the type of S3 Client to use.
512-
* default = {@link DefaultS3ClientFactory}
513-
*/
514-
MSTAGE_S3_CLIENT_FACTORY("ms.s3.client.factory", String.class) {
515-
@Override
516-
public <T> T getDefaultValue() {
517-
return (T) "com.linkedin.cdi.factory.DefaultS3ClientFactory";
518-
}
519-
},
520542
/**
521543
* Schema cleansing will replace special characters in the schema element names based
522544
* on a pattern. By default it will replace all blank spaces, $, and @ to underscores.
@@ -733,6 +755,27 @@ public <T> T getDefaultValue() {
733755
*/
734756
MSTAGE_WATERMARK("ms.watermark", JsonArray.class),
735757
MSTAGE_WATERMARK_GROUPS("ms.watermark.groups", JsonArray.class),
758+
/**
759+
* Minimum records to be present in order for the work unit to be successful,
760+
* below the minimum value, the work unit will be failed.
761+
*/
762+
MSTAGE_WORK_UNIT_MIN_RECORDS("ms.work.unit.min.records", Long.class) {
763+
@Override
764+
public <T> T getDefaultValue() {
765+
return (T) Long.valueOf(0);
766+
}
767+
},
768+
/**
769+
* Minimum number of work units to be present in order for the job to proceed,
770+
* below the minimum value, the job will be failed. This parameter shold be used
771+
* only when there is a unit watermark.
772+
*/
773+
MSTAGE_WORK_UNIT_MIN_UNITS("ms.work.unit.min.units", Long.class) {
774+
@Override
775+
public <T> T getDefaultValue() {
776+
return (T) Long.valueOf(0);
777+
}
778+
},
736779
MSTAGE_WORK_UNIT_PARALLELISM_MAX("ms.work.unit.parallelism.max", Integer.class) {
737780
@Override
738781
public boolean validateNonblank(State state) {
@@ -799,6 +842,16 @@ public <T> T getDefaultValue() {
799842
return (T) Long.valueOf(500L);
800843
}
801844
},
845+
MSTAGE_AUDIT_ENABLED("ms.audit.enabled", Boolean.class) {
846+
@Override
847+
public <T> T getDefaultValue() {
848+
return (T) Boolean.FALSE;
849+
}
850+
},
851+
MSTAGE_KAFKA_BROKERS("ms.kafka.brokers", String.class),
852+
MSTAGE_KAFKA_SCHEMA_REGISTRY_URL("ms.kafka.schema.registry.url", String.class),
853+
MSTAGE_KAFKA_CLIENT_ID("ms.kafka.clientId", String.class),
854+
MSTAGE_KAFKA_TOPIC_NAME("ms.kafka.audit.topic.name", String.class),
802855
// Properties defined in Gobblin, redefine here to leverage the new features like validation
803856
CONVERTER_CLASSES("converter.classes", String.class),
804857
DATASET_URN_KEY("dataset.urn", String.class),

cdi-core/src/main/java/com/linkedin/cdi/configuration/StaticConstants.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ public interface StaticConstants {
1919
String KEY_WORD_DATA_TYPE = "dataType";
2020
String KEY_WORD_DATA_TYPE_TYPE = "dataType.type";
2121
String KEY_WORD_EOF = "EOF";
22+
String KEY_WORD_EPOC = "epoc";
2223
String KEY_WORD_FIELDS = "fields";
2324
String KEY_WORD_RANGE_FROM = "from";
2425
String KEY_WORD_HTTP_OK = "ok";
2526
String KEY_WORD_INTEGER = "integer";
27+
String KEY_WORD_IS_NULLABLE = "isNullable";
2628
String KEY_WORD_ITEMS = "items";
2729
String KEY_WORD_MAP = "map";
2830
String KEY_WORD_NAME = "name";
@@ -37,6 +39,7 @@ public interface StaticConstants {
3739
String KEY_WORD_PROPERTIES = "properties";
3840
String KEY_WORD_RANGE = "range";
3941
String KEY_WORD_RECORD = "record";
42+
String KEY_WORD_REGEXP = "regexp";
4043
String KEY_WORD_RETRY = "retry";
4144
String KEY_WORD_RETRY_COUNT = "retryCount";
4245
String KEY_WORD_RETRY_DELAY_IN_SEC = "delayInSec";
@@ -52,7 +55,25 @@ public interface StaticConstants {
5255
String KEY_WORD_UNITS = "units";
5356
String KEY_WORD_UNKNOWN = "unknown";
5457
String KEY_WORD_VALUES = "values";
58+
String KEY_WORD_THRESHOLD = "threshold";
59+
String KEY_WORD_CRITERIA = "criteria";
60+
String KEY_WORD_FAIL = "fail";
61+
String KEY_WORD_SUCCESS = "success";
62+
String KEY_WORD_ERROR_COLUMN = "errorColumn";
63+
String KEY_WORD_INT = "int";
64+
String KEY_WORD_LONG = "long";
65+
String KEY_WORD_DOUBLE = "double";
66+
String KEY_WORD_FLOAT = "float";
67+
String KEY_WORD_JSON = "json";
68+
String KEY_WORD_CSV = "csv";
69+
String KEY_WORD_AVRO = "avro";
5570

56-
Gson GSON = new Gson();
71+
String EXCEPTION_WORK_UNIT_MINIMUM = "Job requires a minimum of %s work unit(s) to proceed because ms.work.unit.min.units = %s.";
72+
String EXCEPTION_RECORD_MINIMUM = "Work unit requires a minimum of %s record(s) to succeed because ms.work.unit.min.records = %s.";
5773

74+
String MSG_ROWS_PROCESSED = "Processed %s records, work unit: %s";
75+
String MSG_WORK_UNIT_ALWAYS = "There should be a work unit.";
76+
String MSG_LOW_WATER_MARK_ALWAYS = "There should be a low watermark.";
77+
String MSG_WORK_UNIT_INFO = "Generating Work Unit: %s, watermark: %s";
78+
Gson GSON = new Gson();
5879
}

cdi-core/src/main/java/com/linkedin/cdi/connection/HdfsConnection.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package com.linkedin.cdi.connection;
66

77
import com.google.common.annotations.VisibleForTesting;
8+
import com.google.common.base.Preconditions;
89
import com.google.common.collect.Lists;
910
import java.io.InputStream;
1011
import java.net.URI;
@@ -66,7 +67,7 @@ public HdfsConnection(State state, JobKeys jobKeys, ExtractorKeys extractorKeys)
6667
*/
6768
@Override
6869
public WorkUnitStatus execute(final WorkUnitStatus status) {
69-
assert hdfsKeys.getSourceUri() != null;
70+
Preconditions.checkNotNull(hdfsKeys.getSourceUri(), "ms.source.uri is missing or of wrong format");
7071
URI uri = URI.create(getWorkUnitSpecificString(hdfsKeys.getSourceUri(),
7172
getExtractorKeys().getDynamicParameters()));
7273

@@ -142,6 +143,7 @@ private List<String> readFileList(final String path, final String pattern) {
142143
* @return the file content in an InputStream
143144
*/
144145
private InputStream readSingleFile(final String path) {
146+
log.info("Processing file: {}", path);
145147
try {
146148
return fsHelper.getFileStream(path);
147149
} catch (FileBasedHelperException e) {

0 commit comments

Comments
 (0)