Skip to content

Commit 71adc81

Browse files
author
majin.nathan
committed
[AMORO-3485] Introduce scheduler module and external resource container
1 parent fe49db5 commit 71adc81

File tree

79 files changed

+1465
-692
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1465
-692
lines changed

.idea/icon.svg

Lines changed: 0 additions & 1 deletion
This file was deleted.

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@
4343
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
4444
import org.apache.amoro.server.resource.ContainerMetadata;
4545
import org.apache.amoro.server.resource.DefaultOptimizerManager;
46+
import org.apache.amoro.server.resource.InternalContainers;
4647
import org.apache.amoro.server.resource.OptimizerManager;
47-
import org.apache.amoro.server.resource.ResourceContainers;
48+
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
4849
import org.apache.amoro.server.table.DefaultTableManager;
4950
import org.apache.amoro.server.table.DefaultTableService;
5051
import org.apache.amoro.server.table.RuntimeHandlerChain;
5152
import org.apache.amoro.server.table.TableManager;
5253
import org.apache.amoro.server.table.TableService;
53-
import org.apache.amoro.server.table.executor.AsyncTableExecutors;
5454
import org.apache.amoro.server.terminal.TerminalManager;
5555
import org.apache.amoro.server.utils.ThriftServiceProxy;
5656
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
@@ -165,18 +165,18 @@ public void startService() throws Exception {
165165
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
166166

167167
LOG.info("Setting up AMS table executors...");
168-
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
168+
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
169169
addHandlerChain(optimizingService.getTableRuntimeHandler());
170-
addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor());
171-
addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor());
172-
addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
173-
addHandlerChain(AsyncTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
174-
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingCommitExecutor());
175-
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingExpiringExecutor());
176-
addHandlerChain(AsyncTableExecutors.getInstance().getBlockerExpiringExecutor());
177-
addHandlerChain(AsyncTableExecutors.getInstance().getHiveCommitSyncExecutor());
178-
addHandlerChain(AsyncTableExecutors.getInstance().getTableRefreshingExecutor());
179-
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
170+
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
171+
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
172+
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
173+
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
174+
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
175+
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor());
176+
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
177+
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
178+
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
179+
addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor());
180180
tableService.initialize();
181181
LOG.info("AMS table service have been initialized");
182182
tableManager.setTableService(tableService);
@@ -528,7 +528,7 @@ private void initContainerConfig() {
528528
containerList.add(container);
529529
}
530530
}
531-
ResourceContainers.init(containerList);
531+
InternalContainers.init(containerList);
532532
}
533533
}
534534

amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import org.apache.amoro.server.resource.OptimizerManager;
4848
import org.apache.amoro.server.resource.OptimizerThread;
4949
import org.apache.amoro.server.resource.QuotaProvider;
50+
import org.apache.amoro.server.table.DefaultTableRuntime;
5051
import org.apache.amoro.server.table.RuntimeHandlerChain;
51-
import org.apache.amoro.server.table.TableRuntime;
5252
import org.apache.amoro.server.table.TableService;
5353
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
5454
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -136,17 +136,19 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
136136
return tableHandlerChain;
137137
}
138138

139-
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
139+
private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
140140
List<ResourceGroup> optimizerGroups =
141141
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
142142
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
143-
Map<String, List<TableRuntime>> groupToTableRuntimes =
144-
tableRuntimeMetaList.stream()
145-
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
143+
Map<String, List<DefaultTableRuntime>> groupToTableRuntimes =
144+
tableRuntimeList.stream()
145+
.collect(
146+
Collectors.groupingBy(
147+
tableRuntime -> tableRuntime.getOptimizingState().getOptimizerGroup()));
146148
optimizerGroups.forEach(
147149
group -> {
148150
String groupName = group.getName();
149-
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
151+
List<DefaultTableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
150152
OptimizingQueue optimizingQueue =
151153
new OptimizingQueue(
152154
catalogManager,
@@ -283,11 +285,11 @@ public boolean cancelProcess(long processId) throws TException {
283285
return false;
284286
}
285287
long tableId = processMeta.getTableId();
286-
TableRuntime tableRuntime = tableService.getRuntime(tableId);
288+
DefaultTableRuntime tableRuntime = tableService.getRuntime(tableId);
287289
if (tableRuntime == null) {
288290
return false;
289291
}
290-
OptimizingProcess process = tableRuntime.getOptimizingProcess();
292+
OptimizingProcess process = tableRuntime.getOptimizingState().getOptimizingProcess();
291293
if (process == null || process.getProcessId() != processId) {
292294
return false;
293295
}
@@ -372,37 +374,39 @@ public int getTotalQuota(String resourceGroup) {
372374
private class TableRuntimeHandlerImpl extends RuntimeHandlerChain {
373375

374376
@Override
375-
public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) {
376-
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
377-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
377+
public void handleStatusChanged(
378+
DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
379+
if (!tableRuntime.getOptimizingState().getOptimizingStatus().isProcessing()) {
380+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
378381
.ifPresent(q -> q.refreshTable(tableRuntime));
379382
}
380383
}
381384

382385
@Override
383-
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
386+
public void handleConfigChanged(
387+
DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
384388
String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup();
385-
if (!tableRuntime.getOptimizerGroup().equals(originalGroup)) {
389+
if (!tableRuntime.getOptimizingState().getOptimizerGroup().equals(originalGroup)) {
386390
getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime));
387391
}
388-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
392+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
389393
.ifPresent(q -> q.refreshTable(tableRuntime));
390394
}
391395

392396
@Override
393-
public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime) {
394-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
397+
public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime tableRuntime) {
398+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
395399
.ifPresent(q -> q.refreshTable(tableRuntime));
396400
}
397401

398402
@Override
399-
public void handleTableRemoved(TableRuntime tableRuntime) {
400-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
403+
public void handleTableRemoved(DefaultTableRuntime tableRuntime) {
404+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
401405
.ifPresent(queue -> queue.releaseTable(tableRuntime));
402406
}
403407

404408
@Override
405-
protected void initHandler(List<TableRuntime> tableRuntimeList) {
409+
protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
406410
LOG.info("OptimizerManagementService begin initializing");
407411
loadOptimizingQueues(tableRuntimeList);
408412
optimizerKeeper.start();

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.amoro.server.DefaultOptimizingService;
2626
import org.apache.amoro.server.dashboard.response.OkResponse;
2727
import org.apache.amoro.server.resource.ContainerMetadata;
28+
import org.apache.amoro.server.resource.InternalContainers;
2829
import org.apache.amoro.server.resource.OptimizerInstance;
2930
import org.apache.amoro.server.resource.OptimizerManager;
30-
import org.apache.amoro.server.resource.ResourceContainers;
3131
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
3232

3333
import java.util.List;
@@ -67,7 +67,7 @@ public void releaseOptimizer(Context ctx) {
6767
"The resource ID %s has not been indexed" + " to any optimizer.", resourceId));
6868
Resource resource = optimizerManager.getResource(resourceId);
6969
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
70-
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
70+
InternalContainers.get(resource.getContainerName()).releaseResource(resource);
7171
optimizerManager.deleteResource(resourceId);
7272
optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
7373
ctx.json(OkResponse.of("Success to release optimizer"));
@@ -85,7 +85,7 @@ public void createOptimizer(Context ctx) {
8585
.setProperties(resourceGroup.getProperties())
8686
.setThreadCount(parallelism)
8787
.build();
88-
ResourceContainers.get(resource.getContainerName()).requestResource(resource);
88+
InternalContainers.get(resource.getContainerName()).requestResource(resource);
8989
optimizerManager.createResource(resource);
9090
ctx.json(OkResponse.of("success to create optimizer"));
9191
}
@@ -94,7 +94,7 @@ public void createOptimizer(Context ctx) {
9494
public void getContainers(Context ctx) {
9595
ctx.json(
9696
OkResponse.of(
97-
ResourceContainers.getMetadataList().stream()
97+
InternalContainers.getMetadataList().stream()
9898
.map(ContainerMetadata::getName)
9999
.collect(Collectors.toList())));
100100
}

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.amoro.server.dashboard.response.PageResult;
3131
import org.apache.amoro.server.optimizing.OptimizingStatus;
3232
import org.apache.amoro.server.resource.ContainerMetadata;
33+
import org.apache.amoro.server.resource.InternalContainers;
3334
import org.apache.amoro.server.resource.OptimizerInstance;
3435
import org.apache.amoro.server.resource.OptimizerManager;
35-
import org.apache.amoro.server.resource.ResourceContainers;
3636
import org.apache.amoro.server.table.TableManager;
3737
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
3838
import org.apache.commons.lang3.StringUtils;
@@ -159,7 +159,7 @@ public void getOptimizerGroups(Context ctx) {
159159
optimizerManager.listResourceGroups().stream()
160160
.filter(
161161
resourceGroup ->
162-
!ResourceContainers.EXTERNAL_CONTAINER_NAME.equals(
162+
!InternalContainers.UNMANAGED_CONTAINER_NAME.equals(
163163
resourceGroup.getContainer()))
164164
.map(
165165
e -> {
@@ -209,7 +209,7 @@ public void releaseOptimizer(Context ctx) {
209209
"The resource ID %s has not been indexed" + " to any optimizer.", resourceId));
210210
Resource resource = optimizerManager.getResource(resourceId);
211211
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
212-
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
212+
InternalContainers.get(resource.getContainerName()).releaseResource(resource);
213213
optimizerManager.deleteResource(resourceId);
214214
optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
215215
ctx.json(OkResponse.of("Success to release optimizer"));
@@ -228,7 +228,7 @@ public void scaleOutOptimizer(Context ctx) {
228228
.setProperties(resourceGroup.getProperties())
229229
.setThreadCount(parallelism)
230230
.build();
231-
ResourceContainers.get(resource.getContainerName()).requestResource(resource);
231+
InternalContainers.get(resource.getContainerName()).requestResource(resource);
232232
optimizerManager.createResource(resource);
233233
ctx.json(OkResponse.of("success to scaleOut optimizer"));
234234
}
@@ -303,7 +303,7 @@ public void deleteCheckResourceGroup(Context ctx) {
303303
public void getContainers(Context ctx) {
304304
ctx.json(
305305
OkResponse.of(
306-
ResourceContainers.getMetadataList().stream()
306+
InternalContainers.getMetadataList().stream()
307307
.map(ContainerMetadata::getName)
308308
.collect(Collectors.toList())));
309309
}

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/SettingController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.amoro.server.AmoroManagementConf;
2525
import org.apache.amoro.server.dashboard.response.OkResponse;
2626
import org.apache.amoro.server.resource.ContainerMetadata;
27+
import org.apache.amoro.server.resource.InternalContainers;
2728
import org.apache.amoro.server.resource.OptimizerManager;
28-
import org.apache.amoro.server.resource.ResourceContainers;
2929
import org.glassfish.jersey.internal.guava.Sets;
3030

3131
import java.util.ArrayList;
@@ -72,7 +72,7 @@ private void putSetting(Map<String, String> settingMap, String key, Object value
7272

7373
/** Get container settings. */
7474
public void getContainerSetting(Context ctx) {
75-
List<ContainerMetadata> containerMetas = ResourceContainers.getMetadataList();
75+
List<ContainerMetadata> containerMetas = InternalContainers.getMetadataList();
7676
List<Map<String, Object>> result = new ArrayList<>();
7777
Objects.requireNonNull(containerMetas)
7878
.forEach(

amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractResourceContainer.java renamed to amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package org.apache.amoro.server.manager;
2020

2121
import org.apache.amoro.OptimizerProperties;
22+
import org.apache.amoro.resource.InternalResourceContainer;
2223
import org.apache.amoro.resource.Resource;
23-
import org.apache.amoro.resource.ResourceContainer;
2424
import org.apache.amoro.resource.ResourceStatus;
2525
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
2626
import org.apache.commons.lang3.StringUtils;
@@ -30,7 +30,7 @@
3030
import java.util.List;
3131
import java.util.Map;
3232

33-
public abstract class AbstractResourceContainer implements ResourceContainer {
33+
public abstract class AbstractOptimizerContainer implements InternalResourceContainer {
3434
private String containerName;
3535
private Map<String, String> containerProperties;
3636

amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
import java.util.regex.Pattern;
8080
import java.util.stream.Collectors;
8181

82-
public class FlinkOptimizerContainer extends AbstractResourceContainer {
82+
public class FlinkOptimizerContainer extends AbstractOptimizerContainer {
8383
private static final Logger LOG = LoggerFactory.getLogger(FlinkOptimizerContainer.class);
8484

8585
public static final String FLINK_HOME_PROPERTY = "flink-home";
@@ -385,7 +385,7 @@ private <T> T fetchCommandOutput(Process exec, Function<String, T> commandReader
385385
}
386386

387387
@Override
388-
public void releaseOptimizer(Resource resource) {
388+
public void releaseResource(Resource resource) {
389389
if (target.runByFlinkRestClient()) {
390390
Preconditions.checkArgument(
391391
resource.getProperties().containsKey(SESSION_CLUSTER_JOB_ID),

amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
import java.util.stream.Collectors;
5757

5858
/** Kubernetes Optimizer Container with Standalone Optimizer */
59-
public class KubernetesOptimizerContainer extends AbstractResourceContainer {
59+
public class KubernetesOptimizerContainer extends AbstractOptimizerContainer {
6060

6161
private static final Logger LOG = LoggerFactory.getLogger(KubernetesOptimizerContainer.class);
6262

@@ -348,7 +348,7 @@ public Deployment initPodTemplateFromFrontEnd(
348348
}
349349

350350
@Override
351-
public void releaseOptimizer(Resource resource) {
351+
public void releaseResource(Resource resource) {
352352
String resourceId = resource.getResourceId();
353353
LOG.info("release Kubernetes Optimizer Container {}", resourceId);
354354
String namespace = resource.getProperties().get(NAMESPACE);

amoro-ams/src/main/java/org/apache/amoro/server/manager/LocalOptimizerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.util.Collections;
2828
import java.util.Map;
2929

30-
public class LocalOptimizerContainer extends AbstractResourceContainer {
30+
public class LocalOptimizerContainer extends AbstractOptimizerContainer {
3131

3232
private static final Logger LOG = LoggerFactory.getLogger(LocalOptimizerContainer.class);
3333

@@ -65,7 +65,7 @@ protected String buildOptimizerStartupArgsString(Resource resource) {
6565
}
6666

6767
@Override
68-
public void releaseOptimizer(Resource resource) {
68+
public void releaseResource(Resource resource) {
6969
long jobId = Long.parseLong(resource.getRequiredProperty(Resource.PROPERTY_JOB_ID));
7070

7171
String os = System.getProperty("os.name").toLowerCase();

0 commit comments

Comments
 (0)