Skip to content

[AMORO-3485] Introduce scheduler module and external resource container #3486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.DefaultOptimizerManager;
import org.apache.amoro.server.resource.InternalContainers;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
import org.apache.amoro.server.table.DefaultTableManager;
import org.apache.amoro.server.table.DefaultTableService;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.server.table.executor.AsyncTableExecutors;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.server.utils.ThriftServiceProxy;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -165,18 +165,18 @@ public void startService() throws Exception {
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);

LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getHiveCommitSyncExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getTableRefreshingExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
tableManager.setTableService(tableService);
Expand Down Expand Up @@ -528,7 +528,7 @@ private void initContainerConfig() {
containerList.add(container);
}
}
ResourceContainers.init(containerList);
InternalContainers.init(containerList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
Expand Down Expand Up @@ -136,17 +136,19 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
return tableHandlerChain;
}

private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
Map<String, List<TableRuntime>> groupToTableRuntimes =
tableRuntimeMetaList.stream()
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
Map<String, List<DefaultTableRuntime>> groupToTableRuntimes =
tableRuntimeList.stream()
.collect(
Collectors.groupingBy(
tableRuntime -> tableRuntime.getOptimizingState().getOptimizerGroup()));
optimizerGroups.forEach(
group -> {
String groupName = group.getName();
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
List<DefaultTableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
catalogManager,
Expand Down Expand Up @@ -283,11 +285,11 @@ public boolean cancelProcess(long processId) throws TException {
return false;
}
long tableId = processMeta.getTableId();
TableRuntime tableRuntime = tableService.getRuntime(tableId);
DefaultTableRuntime tableRuntime = tableService.getRuntime(tableId);
if (tableRuntime == null) {
return false;
}
OptimizingProcess process = tableRuntime.getOptimizingProcess();
OptimizingProcess process = tableRuntime.getOptimizingState().getOptimizingProcess();
if (process == null || process.getProcessId() != processId) {
return false;
}
Expand Down Expand Up @@ -372,37 +374,39 @@ public int getTotalQuota(String resourceGroup) {
private class TableRuntimeHandlerImpl extends RuntimeHandlerChain {

@Override
public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) {
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
public void handleStatusChanged(
DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
if (!tableRuntime.getOptimizingState().getOptimizingStatus().isProcessing()) {
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
.ifPresent(q -> q.refreshTable(tableRuntime));
}
}

@Override
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
public void handleConfigChanged(
DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup();
if (!tableRuntime.getOptimizerGroup().equals(originalGroup)) {
if (!tableRuntime.getOptimizingState().getOptimizerGroup().equals(originalGroup)) {
getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime));
}
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
.ifPresent(q -> q.refreshTable(tableRuntime));
}

@Override
public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime) {
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime tableRuntime) {
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
.ifPresent(q -> q.refreshTable(tableRuntime));
}

@Override
public void handleTableRemoved(TableRuntime tableRuntime) {
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
public void handleTableRemoved(DefaultTableRuntime tableRuntime) {
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
.ifPresent(queue -> queue.releaseTable(tableRuntime));
}

@Override
protected void initHandler(List<TableRuntime> tableRuntimeList) {
protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
LOG.info("OptimizerManagementService begin initializing");
loadOptimizingQueues(tableRuntimeList);
optimizerKeeper.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.amoro.server.DefaultOptimizingService;
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.InternalContainers;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;

import java.util.List;
Expand Down Expand Up @@ -67,7 +67,7 @@ public void releaseOptimizer(Context ctx) {
"The resource ID %s has not been indexed" + " to any optimizer.", resourceId));
Resource resource = optimizerManager.getResource(resourceId);
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
InternalContainers.get(resource.getContainerName()).releaseResource(resource);
optimizerManager.deleteResource(resourceId);
optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
ctx.json(OkResponse.of("Success to release optimizer"));
Expand All @@ -85,7 +85,7 @@ public void createOptimizer(Context ctx) {
.setProperties(resourceGroup.getProperties())
.setThreadCount(parallelism)
.build();
ResourceContainers.get(resource.getContainerName()).requestResource(resource);
InternalContainers.get(resource.getContainerName()).requestResource(resource);
optimizerManager.createResource(resource);
ctx.json(OkResponse.of("success to create optimizer"));
}
Expand All @@ -94,7 +94,7 @@ public void createOptimizer(Context ctx) {
public void getContainers(Context ctx) {
ctx.json(
OkResponse.of(
ResourceContainers.getMetadataList().stream()
InternalContainers.getMetadataList().stream()
.map(ContainerMetadata::getName)
.collect(Collectors.toList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.amoro.server.dashboard.response.PageResult;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.InternalContainers;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -159,7 +159,7 @@ public void getOptimizerGroups(Context ctx) {
optimizerManager.listResourceGroups().stream()
.filter(
resourceGroup ->
!ResourceContainers.EXTERNAL_CONTAINER_NAME.equals(
!InternalContainers.UNMANAGED_CONTAINER_NAME.equals(
resourceGroup.getContainer()))
.map(
e -> {
Expand Down Expand Up @@ -209,7 +209,7 @@ public void releaseOptimizer(Context ctx) {
"The resource ID %s has not been indexed" + " to any optimizer.", resourceId));
Resource resource = optimizerManager.getResource(resourceId);
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
InternalContainers.get(resource.getContainerName()).releaseResource(resource);
optimizerManager.deleteResource(resourceId);
optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
ctx.json(OkResponse.of("Success to release optimizer"));
Expand All @@ -228,7 +228,7 @@ public void scaleOutOptimizer(Context ctx) {
.setProperties(resourceGroup.getProperties())
.setThreadCount(parallelism)
.build();
ResourceContainers.get(resource.getContainerName()).requestResource(resource);
InternalContainers.get(resource.getContainerName()).requestResource(resource);
optimizerManager.createResource(resource);
ctx.json(OkResponse.of("success to scaleOut optimizer"));
}
Expand Down Expand Up @@ -303,7 +303,7 @@ public void deleteCheckResourceGroup(Context ctx) {
public void getContainers(Context ctx) {
ctx.json(
OkResponse.of(
ResourceContainers.getMetadataList().stream()
InternalContainers.getMetadataList().stream()
.map(ContainerMetadata::getName)
.collect(Collectors.toList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.InternalContainers;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.glassfish.jersey.internal.guava.Sets;

import java.util.ArrayList;
Expand Down Expand Up @@ -72,7 +72,7 @@ private void putSetting(Map<String, String> settingMap, String key, Object value

/** Get container settings. */
public void getContainerSetting(Context ctx) {
List<ContainerMetadata> containerMetas = ResourceContainers.getMetadataList();
List<ContainerMetadata> containerMetas = InternalContainers.getMetadataList();
List<Map<String, Object>> result = new ArrayList<>();
Objects.requireNonNull(containerMetas)
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.amoro.server.manager;

import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.resource.InternalResourceContainer;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceContainer;
import org.apache.amoro.resource.ResourceStatus;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -30,7 +30,7 @@
import java.util.List;
import java.util.Map;

public abstract class AbstractResourceContainer implements ResourceContainer {
public abstract class AbstractOptimizerContainer implements InternalResourceContainer {
private String containerName;
private Map<String, String> containerProperties;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class FlinkOptimizerContainer extends AbstractResourceContainer {
public class FlinkOptimizerContainer extends AbstractOptimizerContainer {
private static final Logger LOG = LoggerFactory.getLogger(FlinkOptimizerContainer.class);

public static final String FLINK_HOME_PROPERTY = "flink-home";
Expand Down Expand Up @@ -385,7 +385,7 @@ private <T> T fetchCommandOutput(Process exec, Function<String, T> commandReader
}

@Override
public void releaseOptimizer(Resource resource) {
public void releaseResource(Resource resource) {
if (target.runByFlinkRestClient()) {
Preconditions.checkArgument(
resource.getProperties().containsKey(SESSION_CLUSTER_JOB_ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.stream.Collectors;

/** Kubernetes Optimizer Container with Standalone Optimizer */
public class KubernetesOptimizerContainer extends AbstractResourceContainer {
public class KubernetesOptimizerContainer extends AbstractOptimizerContainer {

private static final Logger LOG = LoggerFactory.getLogger(KubernetesOptimizerContainer.class);

Expand Down Expand Up @@ -348,7 +348,7 @@ public Deployment initPodTemplateFromFrontEnd(
}

@Override
public void releaseOptimizer(Resource resource) {
public void releaseResource(Resource resource) {
String resourceId = resource.getResourceId();
LOG.info("release Kubernetes Optimizer Container {}", resourceId);
String namespace = resource.getProperties().get(NAMESPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Collections;
import java.util.Map;

public class LocalOptimizerContainer extends AbstractResourceContainer {
public class LocalOptimizerContainer extends AbstractOptimizerContainer {

private static final Logger LOG = LoggerFactory.getLogger(LocalOptimizerContainer.class);

Expand Down Expand Up @@ -65,7 +65,7 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
}

@Override
public void releaseOptimizer(Resource resource) {
public void releaseResource(Resource resource) {
long jobId = Long.parseLong(resource.getRequiredProperty(Resource.PROPERTY_JOB_ID));

String os = System.getProperty("os.name").toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class SparkOptimizerContainer extends AbstractResourceContainer {
public class SparkOptimizerContainer extends AbstractOptimizerContainer {
private static final Logger LOG = LoggerFactory.getLogger(SparkOptimizerContainer.class);

public static final String SPARK_HOME_PROPERTY = "spark-home";
Expand Down Expand Up @@ -247,7 +247,7 @@ private <T> T fetchCommandOutput(Process exec, Function<String, T> commandReader
}

@Override
public void releaseOptimizer(Resource resource) {
public void releaseResource(Resource resource) {
String releaseCommand;
if (deployedOnKubernetes()) {
releaseCommand = buildReleaseKubernetesCommand(resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,39 +145,39 @@ public void register() {
(Gauge<Long>)
() ->
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
.filter(t -> t.getOptimizingStatus().equals(PLANNING))
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(PLANNING))
.count());
registerMetric(
registry,
OPTIMIZER_GROUP_PENDING_TABLES,
(Gauge<Long>)
() ->
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
.filter(t -> t.getOptimizingStatus().equals(PENDING))
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(PENDING))
.count());
registerMetric(
registry,
OPTIMIZER_GROUP_EXECUTING_TABLES,
(Gauge<Long>)
() ->
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
.filter(t -> t.getOptimizingStatus().isProcessing())
.filter(t -> t.getOptimizingState().getOptimizingStatus().isProcessing())
.count());
registerMetric(
registry,
OPTIMIZER_GROUP_IDLE_TABLES,
(Gauge<Long>)
() ->
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
.filter(t -> t.getOptimizingStatus().equals(IDLE))
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(IDLE))
.count());
registerMetric(
registry,
OPTIMIZER_GROUP_COMMITTING_TABLES,
(Gauge<Long>)
() ->
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
.filter(t -> t.getOptimizingStatus().equals(COMMITTING))
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(COMMITTING))
.count());

registerMetric(
Expand Down
Loading