Conversation
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
81e5562 to
a17628b
Compare
| Events: []*workflow.ActionEvent{actionEvent}, | ||
| })); err != nil { | ||
| // Best effort: status update succeeded, do not fail reconciliation on event publish errors. | ||
| logger.Error(err, "failed to publish action event", "action", actionEvent.GetId().GetName()) |
There was a problem hiding this comment.
What If the events proxy is temporarily down? events are silently lost
There was a problem hiding this comment.
I raise an error here in: e1a9f18
And we can rely on controller-runtime's reconcile error backoff logic
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
| logger.Info("updateTaskActionStatus", "name", oldTaskAction.Name, "old status", oldTaskAction.Status, "new status", newTaskAction.Status) | ||
| err := r.Status().Update(ctx, newTaskAction) | ||
| if err != nil { | ||
| if err := r.Status().Update(ctx, newTaskAction); err != nil { |
There was a problem hiding this comment.
We should only update the status after the event has been successfully sent to the event service. Otherwise, the event won’t be resent if it gets lost, since the old and new action statuses would be the same
popojk
left a comment
There was a problem hiding this comment.
LGTM. Left a few small ones
| return timestamppb.Now() | ||
| } | ||
|
|
||
| func outputRefs(runOutputBase, actionName string) *task.OutputReferences { |
There was a problem hiding this comment.
Is the actionName always be different if we execute the same task multiple times? Just want to make sure we will not have conflict bucket name.
There was a problem hiding this comment.
The root action is created in:
flyte/runs/service/run_service.go
Lines 112 to 121 in e1a9f18
And for sub-actions, they are created in:
https://github.com/flyteorg/flyte-sdk/blob/a87584a9bc7a2601a6754b4cc3dd2d6481221904/src/flyte/_internal/controllers/remote/_controller.py#L186-L188
I think for sub-actions, we will always have a unique name. For root action, unique name is guaranteed when request with ProjectId. If request with RunId, CreateRun will raise error.
Therefor I think it's safe here
| continue | ||
| } | ||
| // Return the first explicit cache status signal. | ||
| if resource.CacheStatus != core.CatalogCacheStatus_CACHE_DISABLED || len(resources) == 1 { |
There was a problem hiding this comment.
nit: In len(resources) == 1 the number "1" is kind of magical for me. Could we have a comment to explain why we returnresource.CacheStatus when len(resources) == 1?
There was a problem hiding this comment.
I was thinking to return the cache status directly if there's only one resource. However, I think this is redundant.
If the only resource is not CatalogCacheStatus_CACHE_DISABLED, it will get into if resource.CacheStatus != core.CatalogCacheStatus_CACHE_DISABLED block, otherwise it will continue to the end of the function and return CatalogCacheStatus_CACHE_DISABLED.
I'll remove it here
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…ervice Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
|
Got error when merging v2, fixing |
Signed-off-by: machichima <nary12321@gmail.com>
ea15830 to
13b38d0
Compare
Tracking issue
Why are the changes needed?
Previous we call internal run service to record action events (persist to DB) from action client (k8s watcher). However, we cannot get the full info we want from the CR (e.g.
LogInfo,ErrorInfo, ...). In this PR, we add a newEventServicerunning independently, receiving requests from executor and forward to internal run service through background workers.Alternatives
We've considered following alternatives:
1. Store map of
ActionEventsunderTaskActionCR statusWe can store a map of action events with ID under the
TaskActionCR status, and add apersisted_idslice to record the persisted action events. Executor will put new action event to the CR, and action service will take the action event out -> check if it's persisted by checkingpersisted_id, and call request internal run service.This is using
TaskActionCR as a queue, which we need to:This method has following disadvantages:
2. Store
ActionEventin a new CRWe can add a new
ActionEventCR to store the action events. Executor will create a newActionEventCR when there's status update, and action service can watch this CR. Whenever there's newActionEventCR create, it will parse info from the CR and send request to internal run service for persisting action event.This method has following disadvantages:
What changes were proposed in this pull request?
EventsServiceproto and implementationsEventsWorkerto queue and send requests to internal run service (through unary)executor/cmd/main.goand remove unused codeTODO next: Use streaming endpoint / implement batch request from
EventsServiceto internal run serviceHow was this patch tested?
manager/DEVELOPMENT.mdpython examples/basics/hello_v2.py)flyte.db), check if the data existsDB:
Labels
Please add one or more of the following labels to categorize your PR:
This is important to improve the readability of release notes.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link
main