Skip to content

[AMORO-3485] Introduce scheduler module and external resource container #3486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 28, 2025

Conversation

majin1102
Copy link
Contributor

@majin1102 majin1102 commented Mar 25, 2025

Why are the changes needed?

Close #3485.

Brief change log

Document: https://docs.google.com/document/d/1pEKneQXI_OIY7EU_XX0xwHeRFG0HHi37bb3SaNucI00/edit?tab=t.0

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before making a pull request

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@amoro.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Apr 25, 2025
@majin1102 majin1102 changed the title [AMORO-3485][WIP] Introduce scheduler module and external resource container [AMORO-3485] Introduce scheduler module and external resource container Apr 27, 2025
@github-actions github-actions bot removed the stale label Apr 28, 2025
private final Lock processLock = new ReentrantLock();
private final ServerTableIdentifier tableIdentifier;
private final DefaultOptimizingState optimizingState;
private final Map<Action, ProcessFactory<? extends TableProcessState>> processFactoryMap =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it thread safe? I believe processFactoryMap is used by multiple threads.
We can remove the reentrant lock and use ConcurrentHashMap directly to have better performance.

Copy link
Contributor Author

@majin1102 majin1102 May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is thread safe by processLock.

We could improve this by ConcurrentHashMap, still the processLock is necessary to handle read-and-create atomicity

The code has committed, PTAL

private final DefaultOptimizingState optimizingState;
private final Map<Action, ProcessFactory<? extends TableProcessState>> processFactoryMap =
new HashMap<>();
private final Map<Long, AmoroProcess<? extends TableProcessState>> processMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as processFactoryMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.process.TableProcessState;

public interface ManagedTableRuntime extends TableRuntime {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functions in this interface are all about actions and processes.

I think we can split it into an interface called SupportsAmoroProcess. So, DefaultTableRuntime implements TableRuntime and SupportsAmoroProcess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep,

I think SupportsProcessPlugins better, means support pluggable factories

return optimizingState;
}

public void recover(OptimizingProcess optimizingProcess) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expose addiational methods: recover, registerMetric, dispose.
How abuot invoke thoese methods via getOptimizingState().xxx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree
This could be introduced by another PR. I did't involve these to keep PR relatively small

@@ -71,7 +71,7 @@ public class DefaultTableService extends PersistentBase implements TableService
public static final Logger LOG = LoggerFactory.getLogger(DefaultTableService.class);
private final long externalCatalogRefreshingInterval;

private final Map<Long, TableRuntime> tableRuntimeMap = new ConcurrentHashMap<>();
private final Map<Long, DefaultTableRuntime> tableRuntimeMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use DefaultTableRuntime instead of TableRuntime. I don't see any methods which only belong to DefaultTableRuntime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like getOptimizingState()?
May be we can use ManagedTableRuntime to take place of DefaultTableRuntime, which would cause quite a lot of work. The previos POJO here is old TableRuntime, which is replaced by DefaultTableRuntime.getOptimizingState(), which has not been asbtracted to TableRuntime

* AMS. Resources are decoupled from processes. For ExternalResourceContainer, resources are managed
* outside, and each single resource belonged to a process.
*/
public interface InternalResourceContainer extends ResourceContainer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the life cycle is managed by AMS?
Maybe remove the description about "external resource container"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've improved the description. PTAL

* ExternalResourceContainer is the key interface for the AMS framework to interact with the
* external resource. Typically, it is used to submit and release resource on YARN/K8S.
*/
public interface ExternalResourceContainer extends ResourceContainer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the life cycle of ExternalResourceContainer is not managed by AMS?

Typically, it is used to submit and release resource on YARN/K8S.

I don't quite understand here. If AMS itself runs on K8S, then InternalResourceContainer will also run on K8S?What is the difference between InternalResourceContainer and ExternalResourceContainer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For InternalContainer executors,they recieve tasks scheduled by AMS, and released by AMS when necessary. For ExternalContainer executors, they recieved tasks by some JobManager or application master. The true diff is that ResourceManager(response for task scheduling and managing lifetime of executors) is inside AMS or outside

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make that clear in the comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL

private static final TableFormat[] DEFAULT_FORMATS =
new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE};

public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is SYSTEM meanning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the legacy process type like expiring process records, or just use it to take place all types of actions which don't have an action name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add action name for constructor like above

void complete();

/** Mark this process as failed, trigger callbacks */
void complete(String failedReason);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void complete(String failedReason);
void failed(String failedReason);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite different.
The complete(String failReason) is called by task completion phase. and failing could happen at any phase(like submitting), so I think complete(String failReason) is more reasonable here which means only completion callback would use this method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about define one method with a flag indicate the process is success or not

Copy link
Contributor Author

@majin1102 majin1102 May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point or difference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these two methods seem to perform different execution logic, but the two method names are the same, so I recommend differentiating the names

Copy link
Contributor Author

@majin1102 majin1102 May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 'flag' no better,'failed' has the syntax of lifetime issue
Could you give a better idea of " complete with an error" method naming?

@@ -30,6 +30,7 @@ public class TableProcessState implements ProcessState {
@StateField private volatile long id;
private final Action action;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the action field is materialized in the DB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes
What's the problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add @StateField for action field?

Copy link
Contributor Author

@majin1102 majin1102 May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field shoud never change. so action and identifer are both unnecessary for @StateField

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see

@majin1102
Copy link
Contributor Author

majin1102 commented May 12, 2025

@xxubai Thanks for reviewing. I've done some improvement. PTAL

import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.process.TableProcessState;

public interface SupportsProcessPlugins extends TableRuntime {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the interface can be separate, rather than inheriting from TableRuntime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, it's better

void complete();

/** Mark this process as failed, trigger callbacks */
void complete(String failedReason);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these two methods seem to perform different execution logic, but the two method names are the same, so I recommend differentiating the names

Comment on lines 64 to 67
@Override
protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) {
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This essentially schedules the task in a while(true) loop. Intentional?

Copy link
Contributor Author

@majin1102 majin1102 May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be implemented by child classes. it should be deleted.
PTAL

Copy link
Contributor

@xxubai xxubai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, it looks pretty good, but there are still some minor issues, such as upgrading SQL script and adding more test cases.

However, these can be addressed in other PRs.

Can you double check this PR @zhoujinsong

Copy link
Contributor

@Aireed Aireed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments, PTAL

"UPDATE table_process_state "
+ "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} "
+ "WHERE process_id = #{id} and retry_num = #{retryNumber}")
void updateProcessFailed(TableProcessState state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like above

private static final TableFormat[] DEFAULT_FORMATS =
new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE};

public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add action name for constructor like above

@@ -26,6 +26,7 @@ public enum ProcessStatus {
RUNNING,
SUCCESS,
CLOSED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLOSED should be removed for I don't see CLOSED being used again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check OptimizingQueue.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check OptimizingQueue.java

i still can't found CLOSED using in OptimizingQueue.java. Please share me the code

@majin1102
Copy link
Contributor Author

majin1102 commented May 16, 2025

Overall, it looks pretty good, but there are still some minor issues, such as upgrading SQL script and adding more test cases.

However, these can be addressed in other PRs.

Can you double check this PR @zhoujinsong

There's much work to do, like how to implement ExternalScheduler, where to put it, how to initialize it.
For some reasons I won't have much time to follow, this PR is mainly to complete the api and point out the path for constructing processes and the way to impelement schedulers(extensions need to manage lifetimes of external processes including tracking status).

I look forward to seeing more documents and extensions based on this

majin1102 and others added 3 commits May 16, 2025 17:47
…ptimizingState.java

Co-authored-by: Wang Tao <watters.tao@gmail.com>
…HiveClientPool.java

Co-authored-by: Wang Tao <watters.tao@gmail.com>
@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 42.41645% with 224 lines in your changes missing coverage. Please review.

Project coverage is 28.25%. Comparing base (d7d6534) to head (a0bb902).
Report is 10 commits behind head on master.

Files with missing lines Patch % Lines
...ro/server/scheduler/PeriodicExternalScheduler.java 0.00% 79 Missing ⚠️
...apache/amoro/server/table/DefaultTableRuntime.java 53.70% 25 Missing ⚠️
...va/org/apache/amoro/process/TableProcessState.java 0.00% 19 Missing ⚠️
...pache/amoro/server/optimizing/OptimizingQueue.java 67.30% 5 Missing and 12 partials ⚠️
...amoro/server/scheduler/PeriodicTableScheduler.java 26.66% 11 Missing ⚠️
...che/amoro/server/table/DefaultOptimizingState.java 56.00% 11 Missing ⚠️
.../scheduler/inline/TableRuntimeRefreshExecutor.java 41.17% 7 Missing and 3 partials ⚠️
...-common/src/main/java/org/apache/amoro/Action.java 0.00% 9 Missing ⚠️
...amoro/server/optimizing/sorter/BalancedSorter.java 0.00% 8 Missing ⚠️
...src/main/java/org/apache/amoro/IcebergActions.java 0.00% 7 Missing ⚠️
... and 14 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3486      +/-   ##
============================================
+ Coverage     21.76%   28.25%   +6.48%     
- Complexity     2391     3702    +1311     
============================================
  Files           431      614     +183     
  Lines         40498    49765    +9267     
  Branches       5744     6433     +689     
============================================
+ Hits           8816    14063    +5247     
- Misses        30935    34693    +3758     
- Partials        747     1009     +262     
Flag Coverage Δ
core 28.25% <42.41%> (?)
trino ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@majin1102
Copy link
Contributor Author

majin1102 commented May 27, 2025

I left some comments, PTAL

Thanks for reviewing, I have updated the PR, PTAL

Copy link
Contributor

@Aireed Aireed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xxubai xxubai merged commit 064f400 into apache:master May 28, 2025
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature]: Support external process and scheduler
5 participants