Skip to content

[V2][Feat] Add events service#6970

Merged
machichima merged 18 commits intov2from
add-event-proxy-service
Mar 10, 2026
Merged

[V2][Feat] Add events service#6970
machichima merged 18 commits intov2from
add-event-proxy-service

Conversation

@machichima
Copy link
Member

@machichima machichima commented Mar 6, 2026

Tracking issue

Why are the changes needed?

Previous we call internal run service to record action events (persist to DB) from action client (k8s watcher). However, we cannot get the full info we want from the CR (e.g. LogInfo, ErrorInfo, ...). In this PR, we add a new EventService running independently, receiving requests from executor and forward to internal run service through background workers.

Alternatives

We've considered following alternatives:

1. Store map of ActionEvents under TaskAction CR status

We can store a map of action events with ID under the TaskAction CR status, and add a persisted_id slice to record the persisted action events. Executor will put new action event to the CR, and action service will take the action event out -> check if it's persisted by checking persisted_id, and call request internal run service.

This is using TaskAction CR as a queue, which we need to:

  1. take the action event info out
  2. determine which action event is persisted
  3. remove the persisted action event

This method has following disadvantages:

  1. can be more complex, as we need to handle the queueing logic on our own
  2. may cause performance issue in large scale
  3. Using CR object as queue is not a good practice

2. Store ActionEvent in a new CR

We can add a new ActionEvent CR to store the action events. Executor will create a new ActionEvent CR when there's status update, and action service can watch this CR. Whenever there's new ActionEvent CR create, it will parse info from the CR and send request to internal run service for persisting action event.

This method has following disadvantages:

  1. If run service is down, we may create overwhelming CRs, causing pressure to etcd

What changes were proposed in this pull request?

  • Add new EventsService proto and implementations
  • Start a background goroutine EventsWorker to queue and send requests to internal run service (through unary)
  • Clean-up executor/cmd/main.go and remove unused code

TODO next: Use streaming endpoint / implement batch request from EventsService to internal run service

How was this patch tested?

  1. Start Flyte locally following manager/DEVELOPMENT.md
  2. Use this config in flyte-sdk
admin:
  endpoint: dns:///localhost:8090
  insecure: True
image:
  builder: local
task:
  domain: development
  project: testproject
  org: testorg
  1. Run an example (e.g. python examples/basics/hello_v2.py)
  2. Look into the SQLite DB (flyte.db), check if the data exists
image

DB:

image

Labels

Please add one or more of the following labels to categorize your PR:

  • added: For new features.
  • changed: For changes in existing functionality.
  • deprecated: For soon-to-be-removed features.
  • removed: For features being removed.
  • fixed: For any bug fixed.
  • security: In case of vulnerabilities

This is important to improve the readability of release notes.

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima changed the title [V2][Feat] Add event proxy service [V2][Feat] Add events service Mar 6, 2026
@github-actions github-actions bot mentioned this pull request Mar 6, 2026
3 tasks
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the add-event-proxy-service branch from 81e5562 to a17628b Compare March 9, 2026 01:13
Events: []*workflow.ActionEvent{actionEvent},
})); err != nil {
// Best effort: status update succeeded, do not fail reconciliation on event publish errors.
logger.Error(err, "failed to publish action event", "action", actionEvent.GetId().GetName())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What If the events proxy is temporarily down? events are silently lost

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raise an error here in: e1a9f18

And we can rely on controller-runtime's reconcile error backoff logic

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
logger.Info("updateTaskActionStatus", "name", oldTaskAction.Name, "old status", oldTaskAction.Status, "new status", newTaskAction.Status)
err := r.Status().Update(ctx, newTaskAction)
if err != nil {
if err := r.Status().Update(ctx, newTaskAction); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only update the status after the event has been successfully sent to the event service. Otherwise, the event won’t be resent if it gets lost, since the old and new action statuses would be the same

Copy link
Contributor

@popojk popojk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Left a few small ones

return timestamppb.Now()
}

func outputRefs(runOutputBase, actionName string) *task.OutputReferences {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the actionName always be different if we execute the same task multiple times? Just want to make sure we will not have conflict bucket name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root action is created in:

case *workflow.CreateRunRequest_RunId:
org = id.RunId.Org
project = id.RunId.Project
domain = id.RunId.Domain
name = id.RunId.Name
case *workflow.CreateRunRequest_ProjectId:
org = id.ProjectId.Organization
project = id.ProjectId.Name
domain = id.ProjectId.Domain
name = fmt.Sprintf("run-%d", time.Now().Unix())

And for sub-actions, they are created in:
https://github.com/flyteorg/flyte-sdk/blob/a87584a9bc7a2601a6754b4cc3dd2d6481221904/src/flyte/_internal/controllers/remote/_controller.py#L186-L188

I think for sub-actions, we will always have a unique name. For root action, unique name is guaranteed when request with ProjectId. If request with RunId, CreateRun will raise error.

Therefor I think it's safe here

continue
}
// Return the first explicit cache status signal.
if resource.CacheStatus != core.CatalogCacheStatus_CACHE_DISABLED || len(resources) == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: In len(resources) == 1 the number "1" is kind of magical for me. Could we have a comment to explain why we returnresource.CacheStatus when len(resources) == 1?

Copy link
Member Author

@machichima machichima Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to return the cache status directly if there's only one resource. However, I think this is redundant.

If the only resource is not CatalogCacheStatus_CACHE_DISABLED, it will get into if resource.CacheStatus != core.CatalogCacheStatus_CACHE_DISABLED block, otherwise it will continue to the end of the function and return CatalogCacheStatus_CACHE_DISABLED.

I'll remove it here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 98ad3cc

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
…ervice

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima
Copy link
Member Author

Got error when merging v2, fixing

Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima force-pushed the add-event-proxy-service branch from ea15830 to 13b38d0 Compare March 10, 2026 01:29
@machichima machichima merged commit af8de97 into v2 Mar 10, 2026
5 checks passed
@machichima machichima deleted the add-event-proxy-service branch March 10, 2026 01:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants