-
Notifications
You must be signed in to change notification settings - Fork 334
[AMORO-3485] Introduce scheduler module and external resource container #3486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@amoro.apache.org list. Thank you for your contributions. |
private final Lock processLock = new ReentrantLock(); | ||
private final ServerTableIdentifier tableIdentifier; | ||
private final DefaultOptimizingState optimizingState; | ||
private final Map<Action, ProcessFactory<? extends TableProcessState>> processFactoryMap = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it thread safe? I believe processFactoryMap is used by multiple threads.
We can remove the reentrant lock and use ConcurrentHashMap
directly to have better performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is thread safe by processLock.
We could improve this by ConcurrentHashMap, still the processLock is necessary to handle read-and-create atomicity
The code has committed, PTAL
private final DefaultOptimizingState optimizingState; | ||
private final Map<Action, ProcessFactory<? extends TableProcessState>> processFactoryMap = | ||
new HashMap<>(); | ||
private final Map<Long, AmoroProcess<? extends TableProcessState>> processMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as processFactoryMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
import org.apache.amoro.process.ProcessFactory; | ||
import org.apache.amoro.process.TableProcessState; | ||
|
||
public interface ManagedTableRuntime extends TableRuntime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functions in this interface are all about actions and processes.
I think we can split it into an interface called SupportsAmoroProcess
. So, DefaultTableRuntime
implements TableRuntime
and SupportsAmoroProcess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep,
I think SupportsProcessPlugins better, means support pluggable factories
return optimizingState; | ||
} | ||
|
||
public void recover(OptimizingProcess optimizingProcess) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expose addiational methods: recover
, registerMetric
, dispose
.
How abuot invoke thoese methods via getOptimizingState().xxx
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
This could be introduced by another PR. I did't involve these to keep PR relatively small
@@ -71,7 +71,7 @@ public class DefaultTableService extends PersistentBase implements TableService | |||
public static final Logger LOG = LoggerFactory.getLogger(DefaultTableService.class); | |||
private final long externalCatalogRefreshingInterval; | |||
|
|||
private final Map<Long, TableRuntime> tableRuntimeMap = new ConcurrentHashMap<>(); | |||
private final Map<Long, DefaultTableRuntime> tableRuntimeMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why use DefaultTableRuntime
instead of TableRuntime
. I don't see any methods which only belong to DefaultTableRuntime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like getOptimizingState()?
May be we can use ManagedTableRuntime to take place of DefaultTableRuntime, which would cause quite a lot of work. The previos POJO here is old TableRuntime, which is replaced by DefaultTableRuntime.getOptimizingState(), which has not been asbtracted to TableRuntime
* AMS. Resources are decoupled from processes. For ExternalResourceContainer, resources are managed | ||
* outside, and each single resource belonged to a process. | ||
*/ | ||
public interface InternalResourceContainer extends ResourceContainer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the life cycle is managed by AMS?
Maybe remove the description about "external resource container"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've improved the description. PTAL
* ExternalResourceContainer is the key interface for the AMS framework to interact with the | ||
* external resource. Typically, it is used to submit and release resource on YARN/K8S. | ||
*/ | ||
public interface ExternalResourceContainer extends ResourceContainer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the life cycle of ExternalResourceContainer
is not managed by AMS?
Typically, it is used to submit and release resource on YARN/K8S.
I don't quite understand here. If AMS itself runs on K8S, then InternalResourceContainer
will also run on K8S?What is the difference between InternalResourceContainer
and ExternalResourceContainer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For InternalContainer executors,they recieve tasks scheduled by AMS, and released by AMS when necessary. For ExternalContainer executors, they recieved tasks by some JobManager or application master. The true diff is that ResourceManager(response for task scheduling and managing lifetime of executors) is inside AMS or outside
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make that clear in the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL
private static final TableFormat[] DEFAULT_FORMATS = | ||
new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE}; | ||
|
||
public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is SYSTEM
meanning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the legacy process type like expiring process records, or just use it to take place all types of actions which don't have an action name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add action name for constructor like above
void complete(); | ||
|
||
/** Mark this process as failed, trigger callbacks */ | ||
void complete(String failedReason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void complete(String failedReason); | |
void failed(String failedReason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite different.
The complete(String failReason) is called by task completion phase. and failing could happen at any phase(like submitting), so I think complete(String failReason) is more reasonable here which means only completion callback would use this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about define one method with a flag indicate the process is success or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point or difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because these two methods seem to perform different execution logic, but the two method names are the same, so I recommend differentiating the names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 'flag' no better,'failed' has the syntax of lifetime issue
Could you give a better idea of " complete with an error" method naming?
@@ -30,6 +30,7 @@ public class TableProcessState implements ProcessState { | |||
@StateField private volatile long id; | |||
private final Action action; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the action field is materialized in the DB?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
What's the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add @StateField
for action field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field shoud never change. so action and identifer are both unnecessary for @StateField
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
@xxubai Thanks for reviewing. I've done some improvement. PTAL |
import org.apache.amoro.process.ProcessFactory; | ||
import org.apache.amoro.process.TableProcessState; | ||
|
||
public interface SupportsProcessPlugins extends TableRuntime { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the interface can be separate, rather than inheriting from TableRuntime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, it's better
void complete(); | ||
|
||
/** Mark this process as failed, trigger callbacks */ | ||
void complete(String failedReason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because these two methods seem to perform different execution logic, but the two method names are the same, so I recommend differentiating the names
amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
Outdated
Show resolved
Hide resolved
amoro-ams/src/main/java/org/apache/amoro/server/scheduler/TimelyTableScheduler.java
Outdated
Show resolved
Hide resolved
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java
Outdated
Show resolved
Hide resolved
@Override | ||
protected long getNextExecutingTime(DefaultTableRuntime tableRuntime) { | ||
return 0; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This essentially schedules the task in a while(true) loop. Intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should be implemented by child classes. it should be deleted.
PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, it looks pretty good, but there are still some minor issues, such as upgrading SQL script and adding more test cases.
However, these can be addressed in other PRs.
Can you double check this PR @zhoujinsong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments, PTAL
amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java
Outdated
Show resolved
Hide resolved
amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
Show resolved
Hide resolved
amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/ProcessStateMapper.java
Show resolved
Hide resolved
"UPDATE table_process_state " | ||
+ "SET status = #{status}, end_time = #{endTime}, fail_reason = #{failedReason} " | ||
+ "WHERE process_id = #{id} and retry_num = #{retryNumber}") | ||
void updateProcessFailed(TableProcessState state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like above
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultOptimizingState.java
Outdated
Show resolved
Hide resolved
private static final TableFormat[] DEFAULT_FORMATS = | ||
new TableFormat[] {TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE}; | ||
|
||
public static final Action SYSTEM = new Action(DEFAULT_FORMATS, 0, "system"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add action name for constructor like above
amoro-common/src/main/java/org/apache/amoro/hive/AuthenticatedHiveClientPool.java
Outdated
Show resolved
Hide resolved
@@ -26,6 +26,7 @@ public enum ProcessStatus { | |||
RUNNING, | |||
SUCCESS, | |||
CLOSED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLOSED should be removed for I don't see CLOSED being used again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check OptimizingQueue.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check OptimizingQueue.java
i still can't found CLOSED using in OptimizingQueue.java. Please share me the code
There's much work to do, like how to implement ExternalScheduler, where to put it, how to initialize it. I look forward to seeing more documents and extensions based on this |
…ptimizingState.java Co-authored-by: Wang Tao <watters.tao@gmail.com>
…HiveClientPool.java Co-authored-by: Wang Tao <watters.tao@gmail.com>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3486 +/- ##
============================================
+ Coverage 21.76% 28.25% +6.48%
- Complexity 2391 3702 +1311
============================================
Files 431 614 +183
Lines 40498 49765 +9267
Branches 5744 6433 +689
============================================
+ Hits 8816 14063 +5247
- Misses 30935 34693 +3758
- Partials 747 1009 +262
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…rsistentBase.java Co-authored-by: Wang Tao <watters.tao@gmail.com>
Thanks for reviewing, I have updated the PR, PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Why are the changes needed?
Close #3485.
Brief change log
Document: https://docs.google.com/document/d/1pEKneQXI_OIY7EU_XX0xwHeRFG0HHi37bb3SaNucI00/edit?tab=t.0
How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before making a pull request
Documentation