Skip to content

Commit 0a093ce

Browse files
authored
Make Watermark checking configurable in distcpNG-replication (#1997)
1 parent 19870eb commit 0a093ce

File tree

2 files changed

+19
-9
lines changed

2 files changed

+19
-9
lines changed

gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDataset.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,14 @@ public class ConfigBasedDataset implements CopyableDataset {
7070
private final CopyRoute copyRoute;
7171
private final ReplicationConfiguration rc;
7272
private String datasetURN;
73+
private boolean watermarkEnabled;
7374

7475
public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute) {
7576
this.props = props;
7677
this.copyRoute = copyRoute;
7778
this.rc = rc;
79+
this.watermarkEnabled = Boolean.parseBoolean
80+
(this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true"));
7881
calculateDatasetURN();
7982
}
8083

@@ -110,14 +113,16 @@ public Collection<? extends CopyEntity> getCopyableFiles(FileSystem targetFs, Co
110113
return copyableFiles;
111114
}
112115

113-
if ((!copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent())
114-
|| (copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()
115-
&& copyFromRaw.getWatermark().get().compareTo(copyToRaw.getWatermark().get()) <= 0)) {
116-
log.info(
117-
"No need to copy as destination watermark >= source watermark with source watermark {}, for dataset with metadata {}",
118-
copyFromRaw.getWatermark().isPresent() ? copyFromRaw.getWatermark().get().toJson() : "N/A",
119-
this.rc.getMetaData());
120-
return copyableFiles;
116+
if (this.watermarkEnabled) {
117+
if ((!copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()) || (
118+
copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()
119+
&& copyFromRaw.getWatermark().get().compareTo(copyToRaw.getWatermark().get()) <= 0)) {
120+
log.info(
121+
"No need to copy as destination watermark >= source watermark with source watermark {}, for dataset with metadata {}",
122+
copyFromRaw.getWatermark().isPresent() ? copyFromRaw.getWatermark().get().toJson() : "N/A",
123+
this.rc.getMetaData());
124+
return copyableFiles;
125+
}
121126
}
122127

123128
HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw;
@@ -194,7 +199,7 @@ public boolean apply(FileStatus input) {
194199
deleteCommitStep, 0));
195200
}
196201

197-
// generate the watermark file
202+
// generate the watermark file even if watermark checking is disabled. Make sure it can come into functional once disired.
198203
if ((!watermarkMetadataCopied) && copyFrom.getWatermark().isPresent()) {
199204
copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String> newHashMap(),
200205
new WatermarkMetadataGenerationCommitStep(copyTo.getFsURI().toString(), copyTo.getDatasetPath(),

gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder {
9595
public static final String JOB_LEVEL_BLACKLIST = CopyConfiguration.COPY_PREFIX + ".configBased.blacklist" ;
9696
public static final String JOB_LEVEL_WHITELIST = CopyConfiguration.COPY_PREFIX + ".configBased.whitelist" ;
9797

98+
// There are some cases that WATERMARK checking is desired, like
99+
// Unexpected data loss on target while not changing watermark accordingly.
100+
// This configuration make WATERMARK checking configurable for operation convenience, default true
101+
public static final String WATERMARK_ENABLE = CopyConfiguration.COPY_PREFIX + ".configBased.watermark.enabled" ;
102+
98103

99104
protected final String storeRoot;
100105
protected final Path commonRoot;

0 commit comments

Comments
 (0)