Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/contextutil/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ type (

var metadataCtxKey = metadataContextKey{}

const (
// MetadataKeyWorkflowType is the context metadata key for workflow type
MetadataKeyWorkflowType = "workflow-type"
// MetadataKeyWorkflowTaskQueue is the context metadata key for workflow task queue
MetadataKeyWorkflowTaskQueue = "workflow-task-queue"
)

// getMetadataContext extracts metadata context from golang context.
func getMetadataContext(ctx context.Context) *metadataContext {
metadataCtx := ctx.Value(metadataCtxKey)
Expand All @@ -38,6 +45,12 @@ func WithMetadataContext(ctx context.Context) context.Context {
return context.WithValue(ctx, metadataCtxKey, metadataCtx)
}

// ContextHasMetadata returns true if the context has metadata support.
// This can be used to debug whether a context has been properly initialized with metadata.
func ContextHasMetadata(ctx context.Context) bool {
return getMetadataContext(ctx) != nil
}

// ContextMetadataSet sets a metadata key-value pair in the context.
func ContextMetadataSet(ctx context.Context, key string, value any) bool {
metadataCtx := getMetadataContext(ctx)
Expand Down
60 changes: 60 additions & 0 deletions common/contextutil/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,63 @@ func TestMetadataContextIsolation(t *testing.T) {
assert.Equal(t, "value2", value2)
})
}

func TestContextHasMetadata(t *testing.T) {
t.Run("returns true when context has metadata", func(t *testing.T) {
ctx := WithMetadataContext(context.Background())
assert.True(t, ContextHasMetadata(ctx))
})

t.Run("returns false for context without metadata", func(t *testing.T) {
ctx := context.Background()
assert.False(t, ContextHasMetadata(ctx))
})

t.Run("returns true after setting metadata values", func(t *testing.T) {
ctx := WithMetadataContext(context.Background())
ContextMetadataSet(ctx, "key1", "value1")
ContextMetadataSet(ctx, "key2", "value2")

assert.True(t, ContextHasMetadata(ctx))
})

t.Run("returns true for empty metadata context", func(t *testing.T) {
ctx := WithMetadataContext(context.Background())
// No values set, but metadata context exists
assert.True(t, ContextHasMetadata(ctx))
})

t.Run("child context inherits metadata from parent", func(t *testing.T) {
parentCtx := WithMetadataContext(context.Background())
ContextMetadataSet(parentCtx, "key", "value")

type testContextKey string
childCtx := context.WithValue(parentCtx, testContextKey("other-key"), "other-value")

assert.True(t, ContextHasMetadata(parentCtx))
assert.True(t, ContextHasMetadata(childCtx))
})

t.Run("returns false for wrong type in context", func(t *testing.T) {
ctx := context.WithValue(context.Background(), metadataCtxKey, "wrong type")
assert.False(t, ContextHasMetadata(ctx))
})

t.Run("returns true for cancelled context with metadata", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx = WithMetadataContext(ctx)
cancel()

assert.True(t, ContextHasMetadata(ctx))
})

t.Run("multiple contexts with metadata are independent", func(t *testing.T) {
ctx1 := WithMetadataContext(context.Background())
ctx2 := WithMetadataContext(context.Background())
ctx3 := context.Background()

assert.True(t, ContextHasMetadata(ctx1))
assert.True(t, ContextHasMetadata(ctx2))
assert.False(t, ContextHasMetadata(ctx3))
})
}
5 changes: 5 additions & 0 deletions service/history/api/queryworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func Invoke(
workflowLease.GetReleaseFn()(nil)
}()

// Context metadata is automatically set during mutable state transaction close for operations that mutate state.
// Since QueryWorkflow is readonly and never closes the transaction, we explicitly call SetContextMetadata
// here to ensure successful requests have workflow metadata populated in the context.
workflowLease.GetMutableState().SetContextMetadata(ctx)

req := request.GetRequest()
_, mutableStateStatus := workflowLease.GetMutableState().GetWorkflowStateStatus()
scope = scope.WithTags(metrics.StringTag("workflow_status", mutableStateStatus.String()))
Expand Down
8 changes: 8 additions & 0 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
weContext := workflowLease.GetContext()
ms := workflowLease.GetMutableState()
currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId())

if len(request.Commands) == 0 {
// Context metadata is automatically set during mutable state transaction close. For RespondWorkflowTaskCompleted
// with no commands (e.g., workflow task heartbeat or only readonly messages like `update.Rejection`), the transaction
// is never closed. We explicitly call SetContextMetadata here to ensure workflow metadata is populated in the context.
ms.SetContextMetadata(ctx)
}

defer func() {
var errForRelease error
if releaseLeaseWithError {
Expand Down
138 changes: 130 additions & 8 deletions service/history/api/respondworkflowtaskcompleted/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/contextutil"
"go.temporal.io/server/common/effect"
"go.temporal.io/server/common/locks"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -171,7 +172,10 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext)
s.NotNil(upd)

_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
// Use context with metadata to verify it's set via transaction close
ctx := contextutil.WithMetadataContext(context.Background())

_, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand All @@ -193,6 +197,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
4 WorkflowTaskCompleted
5 WorkflowExecutionUpdateAccepted
6 WorkflowExecutionUpdateCompleted`, <-writtenHistoryCh)

// VERIFY: Context metadata set via transaction close (normal case WITH commands)
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set in normal case with commands")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set in normal case with commands")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})

s.Run("Reject", func() {
Expand All @@ -204,7 +217,10 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext)
s.NotNil(upd)

_, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
// Use context with metadata support to verify SetContextMetadata is called
ctx := contextutil.WithMetadataContext(context.Background())

_, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand All @@ -218,6 +234,46 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
s.NoError(err)
s.Equal(enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED.String(), updStatus.Stage.String())
s.Equal("rejection-of-"+tv.UpdateID(), updStatus.Outcome.GetFailure().GetMessage())

// VERIFY: Context metadata set for readonly operation (no commands, only messages)
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set for readonly operation")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set for readonly operation")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})

s.Run("Heartbeat (empty completion)", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)

_, _, serializedTaskToken := s.createSentUpdate(tv, wfContext)

// Use context with metadata support to verify SetContextMetadata is called
ctx := contextutil.WithMetadataContext(context.Background())

_, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
// No Commands, no Messages - this is a workflow task heartbeat
Identity: tv.Any().String(),
},
})
s.NoError(err)

// VERIFY: Context metadata set for heartbeat (no commands, no messages)
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set for heartbeat")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set for heartbeat")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})

s.Run("Write failed on normal task queue", func() {
Expand All @@ -232,7 +288,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext)
s.NotNil(upd)

_, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
ctx := contextutil.WithMetadataContext(context.Background())

_, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand All @@ -244,6 +302,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
s.ErrorIs(err, writeErr)

s.Nil(wfContext.(*workflow.ContextImpl).MutableState, "mutable state must be cleared")

// VERIFY: Context metadata set even when write fails (transaction close is attempted)
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set even when write fails")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set even when write fails")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})

s.Run("Write failed on sticky task queue", func() {
Expand All @@ -261,7 +328,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext)
s.NotNil(upd)

_, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
ctx := contextutil.WithMetadataContext(context.Background())

_, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand All @@ -274,6 +343,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
s.ErrorIs(err, writeErr)

s.Nil(wfContext.(*workflow.ContextImpl).MutableState, "mutable state must be cleared")

// VERIFY: Context metadata set even when sticky write fails
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set even when sticky write fails")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set even when sticky write fails")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})

s.Run("GetHistory failed", func() {
Expand All @@ -290,7 +368,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
readHistoryErr := errors.New("get history failed")
s.mockExecutionMgr.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, readHistoryErr)

_, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
ctx := contextutil.WithMetadataContext(context.Background())

_, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand All @@ -316,6 +396,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
6 WorkflowExecutionUpdateCompleted
7 WorkflowTaskScheduled
8 WorkflowTaskStarted`, <-writtenHistoryCh)

// VERIFY: Context metadata set even when ReadHistoryBranch fails
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set even when ReadHistoryBranch fails")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set even when ReadHistoryBranch fails")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})

s.Run("Discard speculative WFT with events", func() {
Expand Down Expand Up @@ -346,7 +435,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext)
s.NotNil(upd)

_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
ctx := contextutil.WithMetadataContext(context.Background())

_, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand Down Expand Up @@ -374,6 +465,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
s.EqualHistoryEvents(`
3 TimerFired // No WFT events in between 2 and 3.
`, <-writtenHistoryCh)

// VERIFY: Context metadata set for discard speculative WFT case
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set when discarding speculative WFT")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set when discarding speculative WFT")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})

s.Run("Do not discard speculative WFT with more than 10 events", func() {
Expand Down Expand Up @@ -416,7 +516,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext)
s.NotNil(upd)

_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
ctx := contextutil.WithMetadataContext(context.Background())

_, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand All @@ -439,6 +541,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {
14 WorkflowTaskStarted
15 WorkflowTaskCompleted
`, <-writtenHistoryCh)

// VERIFY: Context metadata set when NOT discarding speculative WFT (>10 events)
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set when keeping speculative WFT")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set when keeping speculative WFT")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})
}

Expand All @@ -453,7 +564,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestForceCreateNewWorkflowTaskOnPaus
s.NoError(err)
s.True(ms.IsWorkflowExecutionStatusPaused())

_, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
ctx := contextutil.WithMetadataContext(context.Background())

_, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tv.NamespaceID().String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: serializedTaskToken,
Expand All @@ -465,6 +578,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestForceCreateNewWorkflowTaskOnPaus
var failedPrecondition *serviceerror.FailedPrecondition
s.ErrorAs(err, &failedPrecondition)
s.Contains(err.Error(), "Workflow is paused and force create new workflow task is not allowed")

// VERIFY: Context metadata set even when paused workflow returns error
contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType)
s.True(ok, "context workflow type MUST be set even for paused workflow error")
s.Equal(tv.WorkflowType().GetName(), contextWorkflowType)

contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue)
s.True(ok, "context task queue MUST be set even for paused workflow error")
s.Equal(tv.TaskQueue().GetName(), contextTaskQueue)
})
}

Expand Down
Loading
Loading