-
Notifications
You must be signed in to change notification settings - Fork 39
run a step inside a go routine #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@af-md thanks for the PR! The approach has the right fundamentals, i.e., I think we can make this simpler by simply having a package level With respect to what chans = make([]chan int, 3)
for := range 10 {
resChan1, err := dbos.Go(ctx, StepFnClosure)
// handle err
}
// Read from each channel hereThe res channel can hold types similar to the workflow outcome chan: |
|
@maxdml does this feature have any conflict with what @apoliakov said about pre generating stepIDs: https://discord.com/channels/1156433345631232100/1166779411920597002/1413954852618244267 It makes sense to run steps inside Go routines - as they tend to be better performant compared to standard execution - however the users should be advised to write their code to wait for a step to complete (committed into DB) and then move onto the next step? probably that's what you were thinking of anyway... |
Ah... what I said was a comment on how Python works. Here we may have an opportunity to make it act differently. But Max or Peter will need to opine on that |
|
@maxdml @apoliakov there is a small misunderstanding here. The problem we are solving with this PR is the non deterministic generation of stepIDs, resulting from the execution of steps in goroutines. What this PR will do is to serialize the generation of step IDs from within the workflow. That way, step IDs will be generated deterministically, before the |
maxdml
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The low level implementation looks good, see my comments for the test.
I am realizing we need to change the API to support mocking. Specifically we should have a mirror Go method on the DBOSContext interface, that would be typeless (returns (stepOutcome[any], error)). The reason we've been doing this for all DBOS methods is to allow the mocking of DBOSContext in users' tests.
The package-level Go would, like the package-level RunAsStep does with its interface counterpart, call the interface Go with a typed-erased function and set the stepName in the options.
The interface level Go will do the step introspection, increment the stepID, then call the interface level RunAsStep and return a typeless (stepOutcome[any]) channel which we can pipe to the generic one (see an example in RunWorkflow.)
dbos/workflows_test.go
Outdated
| // Test step IDs are deterministic and in the order of execution | ||
| steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) | ||
| require.NoError(t, err, "failed to get workflow steps") | ||
| require.Len(t, steps, numSteps, "expected %d steps, got %d", numSteps, len(steps)) | ||
| for i := 0; i < numSteps; i++ { | ||
| assert.Equal(t, i, steps[i].StepID, "expected step ID to be %d, got %d", i, steps[i].StepID) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not test what you think it does: GetWorkflowSteps returns all the workflow steps sorted by ascending step ID, so you're testing the SQL, not the step ID attribution.
The way to test this would be to have each step take their ID as input and return their ID. Then, you can iterate over the channels and make sure that the iterator number == the step result from the channel.
- Channels should be ordered by stepID
- If the correct ID was attributed to the step, the step return value will be equal to the channel iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a second part, we should also exercise the recovery part, either by running recoverPendingWorkflow, or simply by executing the workflow again with the same workflowID. If there was a non determinism step attribution, DBOS would throw an error during the second RunAsStep -- which it shouldn't.
For this to happen, however, we must ensure the workflow stays PENDING and does not return in the first, run, which we can achieve with an event (see this example). (If we don't do that, re-running the workflow will just get the workflow outcome, rather than going through the steps again.
5ce5275 to
f01ca09
Compare
| // Go runs a step inside a Go routine and returns a channel to receive the result. | ||
| // Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic. | ||
| // The step ID is used to track the steps within the same workflow and use the step ID to perform recovery. | ||
| // The folliwing examples shows how to use Go: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // The folliwing examples shows how to use Go: | |
| // The following example shows how to use Go: |
|
|
||
| func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) { | ||
| // create a determistic step ID | ||
| stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the step name we want to display: it is the name of the typed-erase step. We can retrieve the user-provided function's name by inspecting the options
// Process functional options
stepOpts := &stepOptions{}
for _, opt := range opts {
opt(stepOpts)
}
name := stepOpts.stepName| // Step function could return a nil result | ||
| if result == nil { | ||
| return *new(chan StepOutcome[R]), err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to close the result chan. Let's do it after reading the outcome from it.
| // Otherwise type-check and cast the result | ||
| typedResult, ok := outcome.result.(R) | ||
| if !ok { | ||
| return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) | ||
| } | ||
| outcomeChan <- StepOutcome[R]{ | ||
| result: typedResult, | ||
| err: nil, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're going to have to modify this after #175 is merged. Specifically, it is possible that the step output was returned from the database, encoded, in which case we'll need to decode it. We can mimick the code in RunAsStep that does the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also in case of error just return outcomeChan, not a nil channel (which reading from blocks forever). That's a bit of a footgun for the user.
| for i, resultChan := range resultChans { | ||
| result1 := <-resultChan | ||
| if result1.err != nil { | ||
| errors <- result1.result.Error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we piping result1.result.Error? Is it every set to anything? If not let's just remove it from the result struct to clarify the code
| close(results) | ||
| close(errors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: defer the closing after creation
| stepDeterminismStartEvent.Set() | ||
| fmt.Println("stepThatBlocks: started to block") | ||
| stepDeterminismEvent.Wait() | ||
| fmt.Println("stepThatBlocks: unblocked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove print statements pls
| // Run the second workflow | ||
| handle2, err := RunWorkflow(dbosCtx, goWorkflow, "test-input", WithWorkflowID(handle.GetWorkflowID())) | ||
|
|
||
| // If it throws an error, it's because of steps not being deterministically executed when using Go routines in the first workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is not exactly accurate: determinism errors would come from handle2.GetResult()
…implify result handling
…uce stepWithSleep function
…step ID generation
…es with result channel
…custom output types
…y removing unnecessary line
…nism checks in Go workflows
…e related function signatures
ebeaa35 to
27e3e9b
Compare
closes #90
Summary
Adds support for running steps inside goroutines with deterministic step ID generation.
This is by no means the final solution, it's a PR to get feedback.
Problem
Currently, running steps inside goroutines causes non-deterministic step ID generation due to race conditions:
Solution
Pre-generate step IDs before launching goroutines:
Open Questions
ToDos