Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ public class InternalTableBuildRequest {
private String yarnQueue;

@JsonProperty("partitions")
private String[] partitions;
private String[] partitions = new String[] {};

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static class InternalTableJobBuildParam extends JobFactory.JobBuildParams
private String endDate;
private String deletePartitionValues;
private String deletePartition; // true or false
private String refreshPartitionValues;

public InternalTableJobBuildParam(JobParam jobParam) {
super(null, null, jobParam.getOwner(), jobParam.getJobTypeEnum(), jobParam.getJobId(), null, null, null,
Expand All @@ -51,6 +52,7 @@ public InternalTableJobBuildParam(JobParam jobParam) {
this.endDate = jobParam.getExtParams().get(NBatchConstants.P_END_DATE);
this.deletePartitionValues = jobParam.getExtParams().get(NBatchConstants.P_DELETE_PARTITION_VALUES);
this.deletePartition = jobParam.getExtParams().get(NBatchConstants.P_DELETE_PARTITION);
this.refreshPartitionValues = jobParam.getExtParams().get(NBatchConstants.P_REFRESH_PARTITION_VALUES);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public interface NBatchConstants {
String P_DELETE_PARTITION = "deletePartition";
String P_SORT_BY_PARTITION_BEFORE_SAVE = "sortByPartition";
String P_PRELOADED_CACHE = "preloadedCache";

String P_REFRESH_PARTITION_VALUES = "refreshPartitionValues";

/** index planner job parameters */
String P_PLANNER_INITIALIZE_CUBOID_COUNT = "kylin.planner.initializeCuboidCount";
Expand All @@ -92,10 +92,8 @@ public interface NBatchConstants {

@Getter
enum TblPropertyKey {
PRIMARY_KEY(P_PRIMARY_KEY),
ORDER_BY_KEY(P_ORDER_BY_KEY),
BUCKET_COLUMN(P_BUCKET_COLUMN),
BUCKET_NUM(P_BUCKET_NUM);
PRIMARY_KEY(P_PRIMARY_KEY), ORDER_BY_KEY(P_ORDER_BY_KEY), BUCKET_COLUMN(P_BUCKET_COLUMN), BUCKET_NUM(
P_BUCKET_NUM);

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.model.PartitionDesc;

public final class DataRangeUtils {
Expand Down Expand Up @@ -169,13 +170,18 @@ public static List<String[]> mergeTimeRange(List<String> values, String dateForm
if (values == null || values.isEmpty()) {
return mergedRanges;
}

SimpleDateFormat sdf = new SimpleDateFormat(dateFormat, Locale.ROOT);
try {
List<Date> dates = new ArrayList<>();
for (String value : values) {
if (StringUtils.isEmpty(value)) {
continue;
}
dates.add(sdf.parse(value));
}
if (dates.isEmpty()) {
return Lists.newArrayList();
}
// Sort the dates
dates.sort(Date::compareTo);
Date start = dates.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public class ExecutableResponse implements Comparable<ExecutableResponse> {
private ExecutableState schedulerState;
@JsonProperty("job_name")
private String jobName;
@JsonProperty("data_range_partitions")
private String dataRangePartitions;
@JsonProperty("data_range_start")
private long dataRangeStart;
@JsonProperty("data_range_end")
Expand Down Expand Up @@ -202,9 +204,12 @@ public static ExecutableResponse create(AbstractExecutable abstractExecutable, E
}
} else if (abstractExecutable instanceof InternalTableLoadingJob) {
InternalTableLoadingJob internalTableJob = (InternalTableLoadingJob) abstractExecutable;
if ("false".equals(internalTableJob.getParam("incrementalBuild"))
|| "true".equals(internalTableJob.getParam("deletePartition"))) {
String partitionValues = internalTableJob.getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES);
if ("false".equals(internalTableJob.getParam("incrementalBuild"))) {
executableResponse.setDataRangeEnd(Long.MAX_VALUE);
} else if (StringUtils.isNotEmpty(partitionValues)) {
partitionValues = partitionValues.replace("[", "").replace("]", "");
executableResponse.setDataRangePartitions(partitionValues);
} else {
executableResponse.setDataRangeStart(Long.parseLong(internalTableJob.getParam("startTime")));
executableResponse.setDataRangeEnd(Long.parseLong(internalTableJob.getParam("endTime")));
Expand Down Expand Up @@ -288,8 +293,7 @@ public static float calculateStepRatio(AbstractExecutable abstractExecutable, Ex

/** calculate stage count from segment */
public static double calculateSuccessStageInTaskMap(AbstractExecutable task,
Map<String, List<StageExecutable>> stageMap,
ExecutablePO executablePO) {
Map<String, List<StageExecutable>> stageMap, ExecutablePO executablePO) {
var successStages = 0D;
boolean calculateIndexExecRadio = stageMap.size() == 1;
for (Map.Entry<String, List<StageExecutable>> entry : stageMap.entrySet()) {
Expand All @@ -301,8 +305,7 @@ public static double calculateSuccessStageInTaskMap(AbstractExecutable task,
}

public static double calculateSuccessStage(AbstractExecutable task, String segmentId,
List<StageExecutable> stageExecutables,
boolean calculateIndexExecRadio, ExecutablePO executablePO) {
List<StageExecutable> stageExecutables, boolean calculateIndexExecRadio, ExecutablePO executablePO) {
var successStages = 0D;
for (StageExecutable stage : stageExecutables) {
if (ExecutableState.SUCCEED == stage.getStatusInMem(segmentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,13 @@ void testSubmitMultiInternalJobs() throws Exception {
ExecutableState state = executableManager.getJob(finalJobId3).getStatus();
return state.isFinalState() || state == ExecutableState.ERROR;
});

// test cancel a full load job
internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
jobId = internalTableService
.loadIntoInternalTable(PROJECT, table.getName(), table.getDatabase(), false, false, "", "", null, null)
.getJobs().get(0).getJobId();
executableManager.discardJob(jobId);
}

@Test
Expand Down Expand Up @@ -578,6 +585,7 @@ void testDropNonPartitionTablePartition() throws Exception {
void testRefreshPartitions() throws Exception {
KylinConfig config = KylinConfig.getInstanceFromEnv();
NTableMetadataManager tManager = NTableMetadataManager.getInstance(config, PROJECT);
ExecutableManager executableManager = ExecutableManager.getInstance(config, PROJECT);
TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
internalTableService.createInternalTable(PROJECT, TABLE_INDENTITY, new String[] { PARTITION_COL }, null,
new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
Expand All @@ -593,8 +601,9 @@ void testRefreshPartitions() throws Exception {
when(tableService.getPartitionColumnFormat(any(), any(), any(), any())).thenReturn("yyyyMM");
internalTableService.updateInternalTable(PROJECT, table.getName(), table.getDatabase(),
new String[] { DATE_COL }, "yyyyMM", new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
internalTableService.loadIntoInternalTable(PROJECT, table.getName(), table.getDatabase(), false, true, "", "",
new String[] { "199201", "199203" }, null);
String jobId = internalTableService.loadIntoInternalTable(PROJECT, table.getName(), table.getDatabase(), false,
true, "", "", new String[] { "199201", "199203" }, null).getJobs().get(0).getJobId();
executableManager.discardJob(jobId);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,11 +1009,6 @@ public void testInternalTableLoadingJobResponse() {
response = ExecutableResponse.create(job, null);
Assert.assertEquals(Long.MAX_VALUE, response.getDataRangeEnd());

job.setParam("incrementalBuild", "true");
job.setParam("deletePartition", "true");
response = ExecutableResponse.create(job, null);
Assert.assertEquals(Long.MAX_VALUE, response.getDataRangeEnd());

TableDesc originTable = manager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
internalManager.createInternalTable(new InternalTableDesc(originTable));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
Expand All @@ -39,6 +41,7 @@
import org.apache.kylin.engine.spark.builder.InternalTableLoader;
import org.apache.kylin.engine.spark.job.InternalTableLoadJob;
import org.apache.kylin.engine.spark.job.InternalTableUpdateMetadataStep;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.dao.JobStatisticsManager;
import org.apache.kylin.job.execution.JobTypeEnum;
Expand Down Expand Up @@ -75,28 +78,22 @@ public InternalTableLoadingJobResponse loadIntoInternalTable(String project, Str
jobStatisticsManager.updateStatistics(TimeUtil.getDayStart(System.currentTimeMillis()), 0, 0, 1);
InternalTableDesc internalTable = checkAndGetInternalTables(project, table, database);
InternalTableManager internalTableManager = InternalTableManager.getInstance(getConfig(), project);
// refresh partitions
if (isRefresh && null != partitions && partitions.length > 0) {
SparkSession ss = SparkSession.getDefaultSession().get();
InternalTableLoader internalTableLoader = new InternalTableLoader();
internalTableLoader.loadInternalTable(ss, internalTable, new String[] { startDate, endDate },
partitions, internalTable.getStorageType().getFormat(), isIncremental);
} else {
checkBeforeSubmit(internalTable, jobType, isIncremental, isRefresh, startDate, endDate);
logger.info(
"create internal table loading job for table: {}, isIncrementBuild: {}, startTime: {}, endTime: {}",
internalTable.getIdentity(), isIncremental, startDate, endDate);
JobParam jobParam = new JobParam().withProject(project).withTable(internalTable.getIdentity())
.withYarnQueue(yarnQueue).withJobTypeEnum(jobType).withOwner(BasicService.getUsername())
.addExtParams(NBatchConstants.P_INCREMENTAL_BUILD, String.valueOf(isIncremental))
.addExtParams(NBatchConstants.P_OUTPUT_MODE, String.valueOf(isRefresh))
.addExtParams(NBatchConstants.P_START_DATE, startDate)
.addExtParams(NBatchConstants.P_END_DATE, endDate);
String jobId = getManager(SourceUsageManager.class).licenseCheckWrap(project,
() -> getManager(JobManager.class, project).addJob(jobParam));
jobIds.add(jobId);
internalTableManager.saveOrUpdateInternalTable(internalTable);
}
checkBeforeSubmit(internalTable, isIncremental, isRefresh, startDate, endDate, partitions);
logger.info(
"create internal table loading job for table: {}, isIncrementBuild: {}, startTime: {}, endTime: {}",
internalTable.getIdentity(), isIncremental, startDate, endDate);
String partitionValues = null == partitions ? "" : Arrays.toString(partitions);
JobParam jobParam = new JobParam().withProject(project).withTable(internalTable.getIdentity())
.withYarnQueue(yarnQueue).withJobTypeEnum(jobType).withOwner(BasicService.getUsername())
.addExtParams(NBatchConstants.P_INCREMENTAL_BUILD, String.valueOf(isIncremental))
.addExtParams(NBatchConstants.P_OUTPUT_MODE, String.valueOf(isRefresh))
.addExtParams(NBatchConstants.P_START_DATE, startDate)
.addExtParams(NBatchConstants.P_END_DATE, endDate)
.addExtParams(NBatchConstants.P_REFRESH_PARTITION_VALUES, partitionValues);
String jobId = getManager(SourceUsageManager.class).licenseCheckWrap(project,
() -> getManager(JobManager.class, project).addJob(jobParam));
jobIds.add(jobId);
internalTableManager.saveOrUpdateInternalTable(internalTable);
return true;
}, project);
String jobName = isRefresh ? INTERNAL_TABLE_REFRESH.toString() : INTERNAL_TABLE_BUILD.toString();
Expand All @@ -108,42 +105,53 @@ public InternalTableLoadingJobResponse loadIntoInternalTable(String project, Str
* @param startDate
* @param endDate
*/
private void checkBeforeSubmit(InternalTableDesc internalTable, JobTypeEnum jobType, boolean isIncremental,
boolean isRefresh, String startDate, String endDate) throws KylinException {
private void checkBeforeSubmit(InternalTableDesc internalTable, boolean isIncremental, boolean isRefresh,
String startDate, String endDate, String[] partitions) throws Exception {
if (isIncremental && (Objects.isNull(internalTable.getTablePartition())
|| Objects.isNull(internalTable.getTablePartition().getPartitionColumns())
|| internalTable.getTablePartition().getPartitionColumns().length == 0)) {
String errorMsg = String.format(Locale.ROOT, MsgPicker.getMsg().getInternalTableUnpartitioned());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
partitions = null == partitions ? new String[] {} : partitions;
// check job_range overlap?
InternalTablePartition tablePartition = internalTable.getTablePartition();
List<String[]> jobRange = internalTable.getJobRange();
String[] curJobRange = new String[] { "0", "0" };
List<String[]> curRange = Lists.newArrayList();
if (!isIncremental) {
curRange.add(new String[] { "0", "0" });
}
String timeFmt = Objects.isNull(tablePartition) ? "" : tablePartition.getDatePartitionFormat();
if (isRefresh && partitions.length > 0 && StringUtils.isNotEmpty(timeFmt)) {
curRange.addAll(DataRangeUtils.mergeTimeRange(Arrays.asList(partitions), timeFmt));
}
if (StringUtils.isNotEmpty(startDate) && StringUtils.isNotEmpty(timeFmt)) {
SimpleDateFormat fmt = new SimpleDateFormat(timeFmt, Locale.ROOT);
String start = StringUtils.isEmpty(startDate) ? "0" : fmt.format(Long.parseLong(startDate));
String end = StringUtils.isEmpty(endDate) ? "0" : fmt.format(Long.parseLong(endDate));
curJobRange = new String[] { start, end };
String start = fmt.format(Long.parseLong(startDate));
String end = fmt.format(Long.parseLong(endDate));
curRange.add(new String[] { start, end });
}
// non-time partition table can not submit incremental job
if (isIncremental && !Objects.isNull(tablePartition)
if (isIncremental && !Objects.isNull(tablePartition) && StringUtils.isNotEmpty(startDate)
&& StringUtils.isEmpty(tablePartition.getDatePartitionFormat())) {
String errorMsg = String.format(Locale.ROOT, MsgPicker.getMsg().getNonTimeInternalTableIncrementalBuild());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
if (DataRangeUtils.timeOverlap(internalTable.getJobRange(), curJobRange, timeFmt)) {
String errorMsg = String.format(Locale.ROOT, MsgPicker.getMsg().getTimeRangeOverlap());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
// check refresh out of data range
if (isRefresh && !DataRangeUtils.timeInRange(curJobRange, internalTable.getPartitionRange(), timeFmt)) {
String errorMsg = String.format(Locale.ROOT, MsgPicker.getMsg().getTimeOutOfRange());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
logger.info(jobType.getCategory());
jobRange.add(curJobRange);
String[] finalPartitions = partitions;
curRange.forEach(range -> {
if (DataRangeUtils.timeOverlap(internalTable.getJobRange(), range, timeFmt)) {
String errorMsg = String.format(Locale.ROOT, MsgPicker.getMsg().getTimeRangeOverlap());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
// check refresh out of data range(exclude specify partitions)
if (isRefresh && finalPartitions.length == 0
&& !DataRangeUtils.timeInRange(range, internalTable.getPartitionRange(), timeFmt)) {
String errorMsg = String.format(Locale.ROOT, MsgPicker.getMsg().getInternalTableUnpartitioned());
throw new KylinException(INTERNAL_TABLE_ERROR, errorMsg);
}
});
jobRange.addAll(curRange);
jobRange.sort(Comparator.comparing(valueA -> valueA[0]));
internalTable.setJobRange(jobRange);
}

Expand All @@ -168,6 +176,11 @@ public void dropPartitions(String project, String[] partitionValues, String tabl
tablePartition.setPartitionValues(info.getPartitionValues());
tablePartition.setPartitionDetails(info.getPartitionDetails());
oldTable.setRowCount(info.getFinalCount());
if (StringUtils.isNotEmpty(tablePartition.getDatePartitionFormat())) {
List<String[]> partitionRange = DataRangeUtils.mergeTimeRange(tablePartition.getPartitionValues(),
tablePartition.getDatePartitionFormat());
oldTable.setPartitionRange(partitionRange);
}
internalTableManager.saveOrUpdateInternalTable(oldTable);
return true;
}, project);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ public InternalTableLoadingJobResponse loadIntoInternalTable(String project, Str
if (isIncremental) {
DataRangeUtils.validateRange(startDate, endDate);
}
if (null != partitions && partitions.length > 0) {
isIncremental = true;
}
// treat full refresh as full load processing.
if (!isIncremental && isRefresh) {
isRefresh = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@ public void loadIntoInternalTable() throws IOException {
boolean incrementalBuild = "true".equals(getParam(NBatchConstants.P_INCREMENTAL_BUILD));
String startDate = getParam(NBatchConstants.P_START_DATE);
String endDate = getParam(NBatchConstants.P_END_DATE);
String refreshPartitions = getParam(NBatchConstants.P_REFRESH_PARTITION_VALUES).replace("[", "").replace("]",
"");
String[] partitions = new String[] {};
if (StringUtils.isNotEmpty(refreshPartitions)) {
partitions = StringUtils.isEmpty(refreshPartitions) ? new String[] {} : refreshPartitions.split(", ");
}
String storagePolicy = config.getGlutenStoragePolicy();
InternalTableLoader loader = new InternalTableLoader();
loader.loadInternalTable(ss, internalTable, new String[] { startDate, endDate }, null, storagePolicy,
loader.loadInternalTable(ss, internalTable, new String[] { startDate, endDate }, partitions, storagePolicy,
incrementalBuild);
}

Expand Down
Loading