Skip to content

Commit d3b295b

Browse files
author
majin.nathan
committed
Introduce scheduler module and external resource container
1 parent 654718b commit d3b295b

File tree

74 files changed

+1374
-655
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1374
-655
lines changed

.idea/icon.svg

Lines changed: 0 additions & 1 deletion
This file was deleted.

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@
4343
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
4444
import org.apache.amoro.server.resource.ContainerMetadata;
4545
import org.apache.amoro.server.resource.DefaultOptimizerManager;
46-
import org.apache.amoro.server.resource.OptimizerManager;
4746
import org.apache.amoro.server.resource.InternalContainers;
47+
import org.apache.amoro.server.resource.OptimizerManager;
48+
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
4849
import org.apache.amoro.server.table.DefaultTableManager;
4950
import org.apache.amoro.server.table.DefaultTableService;
5051
import org.apache.amoro.server.table.RuntimeHandlerChain;
5152
import org.apache.amoro.server.table.TableManager;
5253
import org.apache.amoro.server.table.TableService;
53-
import org.apache.amoro.server.table.executor.AsyncTableExecutors;
5454
import org.apache.amoro.server.terminal.TerminalManager;
5555
import org.apache.amoro.server.utils.ThriftServiceProxy;
5656
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
@@ -165,18 +165,18 @@ public void startService() throws Exception {
165165
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
166166

167167
LOG.info("Setting up AMS table executors...");
168-
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
168+
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
169169
addHandlerChain(optimizingService.getTableRuntimeHandler());
170-
addHandlerChain(AsyncTableExecutors.getInstance().getDataExpiringExecutor());
171-
addHandlerChain(AsyncTableExecutors.getInstance().getSnapshotsExpiringExecutor());
172-
addHandlerChain(AsyncTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
173-
addHandlerChain(AsyncTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
174-
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingCommitExecutor());
175-
addHandlerChain(AsyncTableExecutors.getInstance().getOptimizingExpiringExecutor());
176-
addHandlerChain(AsyncTableExecutors.getInstance().getBlockerExpiringExecutor());
177-
addHandlerChain(AsyncTableExecutors.getInstance().getHiveCommitSyncExecutor());
178-
addHandlerChain(AsyncTableExecutors.getInstance().getTableRefreshingExecutor());
179-
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
170+
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
171+
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
172+
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
173+
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
174+
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
175+
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor());
176+
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
177+
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
178+
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
179+
addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor());
180180
tableService.initialize();
181181
LOG.info("AMS table service have been initialized");
182182
tableManager.setTableService(tableService);

amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import org.apache.amoro.server.resource.OptimizerManager;
4848
import org.apache.amoro.server.resource.OptimizerThread;
4949
import org.apache.amoro.server.resource.QuotaProvider;
50+
import org.apache.amoro.server.table.DefaultTableRuntime;
5051
import org.apache.amoro.server.table.RuntimeHandlerChain;
51-
import org.apache.amoro.server.table.TableRuntime;
5252
import org.apache.amoro.server.table.TableService;
5353
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
5454
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
@@ -136,17 +136,19 @@ public RuntimeHandlerChain getTableRuntimeHandler() {
136136
return tableHandlerChain;
137137
}
138138

139-
private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
139+
private void loadOptimizingQueues(List<DefaultTableRuntime> tableRuntimeList) {
140140
List<ResourceGroup> optimizerGroups =
141141
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
142142
List<OptimizerInstance> optimizers = getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
143-
Map<String, List<TableRuntime>> groupToTableRuntimes =
144-
tableRuntimeMetaList.stream()
145-
.collect(Collectors.groupingBy(TableRuntime::getOptimizerGroup));
143+
Map<String, List<DefaultTableRuntime>> groupToTableRuntimes =
144+
tableRuntimeList.stream()
145+
.collect(
146+
Collectors.groupingBy(
147+
tableRuntime -> tableRuntime.getOptimizingState().getOptimizerGroup()));
146148
optimizerGroups.forEach(
147149
group -> {
148150
String groupName = group.getName();
149-
List<TableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
151+
List<DefaultTableRuntime> tableRuntimes = groupToTableRuntimes.remove(groupName);
150152
OptimizingQueue optimizingQueue =
151153
new OptimizingQueue(
152154
catalogManager,
@@ -283,11 +285,11 @@ public boolean cancelProcess(long processId) throws TException {
283285
return false;
284286
}
285287
long tableId = processMeta.getTableId();
286-
TableRuntime tableRuntime = tableService.getRuntime(tableId);
288+
DefaultTableRuntime tableRuntime = tableService.getRuntime(tableId);
287289
if (tableRuntime == null) {
288290
return false;
289291
}
290-
OptimizingProcess process = tableRuntime.getOptimizingProcess();
292+
OptimizingProcess process = tableRuntime.getOptimizingState().getOptimizingProcess();
291293
if (process == null || process.getProcessId() != processId) {
292294
return false;
293295
}
@@ -372,37 +374,39 @@ public int getTotalQuota(String resourceGroup) {
372374
private class TableRuntimeHandlerImpl extends RuntimeHandlerChain {
373375

374376
@Override
375-
public void handleStatusChanged(TableRuntime tableRuntime, OptimizingStatus originalStatus) {
376-
if (!tableRuntime.getOptimizingStatus().isProcessing()) {
377-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
377+
public void handleStatusChanged(
378+
DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {
379+
if (!tableRuntime.getOptimizingState().getOptimizingStatus().isProcessing()) {
380+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
378381
.ifPresent(q -> q.refreshTable(tableRuntime));
379382
}
380383
}
381384

382385
@Override
383-
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
386+
public void handleConfigChanged(
387+
DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {
384388
String originalGroup = originalConfig.getOptimizingConfig().getOptimizerGroup();
385-
if (!tableRuntime.getOptimizerGroup().equals(originalGroup)) {
389+
if (!tableRuntime.getOptimizingState().getOptimizerGroup().equals(originalGroup)) {
386390
getOptionalQueueByGroup(originalGroup).ifPresent(q -> q.releaseTable(tableRuntime));
387391
}
388-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
392+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
389393
.ifPresent(q -> q.refreshTable(tableRuntime));
390394
}
391395

392396
@Override
393-
public void handleTableAdded(AmoroTable<?> table, TableRuntime tableRuntime) {
394-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
397+
public void handleTableAdded(AmoroTable<?> table, DefaultTableRuntime tableRuntime) {
398+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
395399
.ifPresent(q -> q.refreshTable(tableRuntime));
396400
}
397401

398402
@Override
399-
public void handleTableRemoved(TableRuntime tableRuntime) {
400-
getOptionalQueueByGroup(tableRuntime.getOptimizerGroup())
403+
public void handleTableRemoved(DefaultTableRuntime tableRuntime) {
404+
getOptionalQueueByGroup(tableRuntime.getOptimizingState().getOptimizerGroup())
401405
.ifPresent(queue -> queue.releaseTable(tableRuntime));
402406
}
403407

404408
@Override
405-
protected void initHandler(List<TableRuntime> tableRuntimeList) {
409+
protected void initHandler(List<DefaultTableRuntime> tableRuntimeList) {
406410
LOG.info("OptimizerManagementService begin initializing");
407411
loadOptimizingQueues(tableRuntimeList);
408412
optimizerKeeper.start();

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.amoro.server.DefaultOptimizingService;
2626
import org.apache.amoro.server.dashboard.response.OkResponse;
2727
import org.apache.amoro.server.resource.ContainerMetadata;
28+
import org.apache.amoro.server.resource.InternalContainers;
2829
import org.apache.amoro.server.resource.OptimizerInstance;
2930
import org.apache.amoro.server.resource.OptimizerManager;
30-
import org.apache.amoro.server.resource.InternalContainers;
3131
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
3232

3333
import java.util.List;

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/OptimizerGroupController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.apache.amoro.server.dashboard.response.PageResult;
3131
import org.apache.amoro.server.optimizing.OptimizingStatus;
3232
import org.apache.amoro.server.resource.ContainerMetadata;
33+
import org.apache.amoro.server.resource.InternalContainers;
3334
import org.apache.amoro.server.resource.OptimizerInstance;
3435
import org.apache.amoro.server.resource.OptimizerManager;
35-
import org.apache.amoro.server.resource.InternalContainers;
3636
import org.apache.amoro.server.table.TableManager;
3737
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
3838
import org.apache.commons.lang3.StringUtils;

amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/SettingController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.amoro.server.AmoroManagementConf;
2525
import org.apache.amoro.server.dashboard.response.OkResponse;
2626
import org.apache.amoro.server.resource.ContainerMetadata;
27-
import org.apache.amoro.server.resource.OptimizerManager;
2827
import org.apache.amoro.server.resource.InternalContainers;
28+
import org.apache.amoro.server.resource.OptimizerManager;
2929
import org.glassfish.jersey.internal.guava.Sets;
3030

3131
import java.util.ArrayList;

amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizerGroupMetrics.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,39 +145,39 @@ public void register() {
145145
(Gauge<Long>)
146146
() ->
147147
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
148-
.filter(t -> t.getOptimizingStatus().equals(PLANNING))
148+
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(PLANNING))
149149
.count());
150150
registerMetric(
151151
registry,
152152
OPTIMIZER_GROUP_PENDING_TABLES,
153153
(Gauge<Long>)
154154
() ->
155155
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
156-
.filter(t -> t.getOptimizingStatus().equals(PENDING))
156+
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(PENDING))
157157
.count());
158158
registerMetric(
159159
registry,
160160
OPTIMIZER_GROUP_EXECUTING_TABLES,
161161
(Gauge<Long>)
162162
() ->
163163
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
164-
.filter(t -> t.getOptimizingStatus().isProcessing())
164+
.filter(t -> t.getOptimizingState().getOptimizingStatus().isProcessing())
165165
.count());
166166
registerMetric(
167167
registry,
168168
OPTIMIZER_GROUP_IDLE_TABLES,
169169
(Gauge<Long>)
170170
() ->
171171
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
172-
.filter(t -> t.getOptimizingStatus().equals(IDLE))
172+
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(IDLE))
173173
.count());
174174
registerMetric(
175175
registry,
176176
OPTIMIZER_GROUP_COMMITTING_TABLES,
177177
(Gauge<Long>)
178178
() ->
179179
optimizingQueue.getSchedulingPolicy().getTableRuntimeMap().values().stream()
180-
.filter(t -> t.getOptimizingStatus().equals(COMMITTING))
180+
.filter(t -> t.getOptimizingState().getOptimizingStatus().equals(COMMITTING))
181181
.count());
182182

183183
registerMetric(

0 commit comments

Comments
 (0)