From e6f91ffaa9741a1e92c09f6656e9e5ad3d737a43 Mon Sep 17 00:00:00 2001 From: Nikki Dag Date: Thu, 5 Feb 2026 22:25:21 -0600 Subject: [PATCH 1/5] History: Expose and call SetContextMetadata in readonly operations --- common/contextutil/metadata.go | 13 ++++ common/contextutil/metadata_test.go | 60 +++++++++++++++++++ service/history/api/queryworkflow/api.go | 5 ++ .../api/respondworkflowtaskcompleted/api.go | 7 +++ service/history/interfaces/mutable_state.go | 1 + .../history/interfaces/mutable_state_mock.go | 12 ++++ .../history/workflow/mutable_state_impl.go | 8 +-- .../workflow/mutable_state_impl_test.go | 38 ++++++++++++ 8 files changed, 140 insertions(+), 4 deletions(-) diff --git a/common/contextutil/metadata.go b/common/contextutil/metadata.go index b804dcc00f..5900c3f294 100644 --- a/common/contextutil/metadata.go +++ b/common/contextutil/metadata.go @@ -17,6 +17,13 @@ type ( var metadataCtxKey = metadataContextKey{} +const ( + // MetadataKeyWorkflowType is the context metadata key for workflow type + MetadataKeyWorkflowType = "workflow-type" + // MetadataKeyWorkflowTaskQueue is the context metadata key for workflow task queue + MetadataKeyWorkflowTaskQueue = "workflow-task-queue" +) + // getMetadataContext extracts metadata context from golang context. func getMetadataContext(ctx context.Context) *metadataContext { metadataCtx := ctx.Value(metadataCtxKey) @@ -38,6 +45,12 @@ func WithMetadataContext(ctx context.Context) context.Context { return context.WithValue(ctx, metadataCtxKey, metadataCtx) } +// ContextHasMetadata returns true if the context has metadata support. +// This can be used to debug whether a context has been properly initialized with metadata. +func ContextHasMetadata(ctx context.Context) bool { + return getMetadataContext(ctx) != nil +} + // ContextMetadataSet sets a metadata key-value pair in the context. func ContextMetadataSet(ctx context.Context, key string, value any) bool { metadataCtx := getMetadataContext(ctx) diff --git a/common/contextutil/metadata_test.go b/common/contextutil/metadata_test.go index eb43617e00..6c57c49296 100644 --- a/common/contextutil/metadata_test.go +++ b/common/contextutil/metadata_test.go @@ -301,3 +301,63 @@ func TestMetadataContextIsolation(t *testing.T) { assert.Equal(t, "value2", value2) }) } + +func TestContextHasMetadata(t *testing.T) { + t.Run("returns true when context has metadata", func(t *testing.T) { + ctx := WithMetadataContext(context.Background()) + assert.True(t, ContextHasMetadata(ctx)) + }) + + t.Run("returns false for context without metadata", func(t *testing.T) { + ctx := context.Background() + assert.False(t, ContextHasMetadata(ctx)) + }) + + t.Run("returns true after setting metadata values", func(t *testing.T) { + ctx := WithMetadataContext(context.Background()) + ContextMetadataSet(ctx, "key1", "value1") + ContextMetadataSet(ctx, "key2", "value2") + + assert.True(t, ContextHasMetadata(ctx)) + }) + + t.Run("returns true for empty metadata context", func(t *testing.T) { + ctx := WithMetadataContext(context.Background()) + // No values set, but metadata context exists + assert.True(t, ContextHasMetadata(ctx)) + }) + + t.Run("child context inherits metadata from parent", func(t *testing.T) { + parentCtx := WithMetadataContext(context.Background()) + ContextMetadataSet(parentCtx, "key", "value") + + type testContextKey string + childCtx := context.WithValue(parentCtx, testContextKey("other-key"), "other-value") + + assert.True(t, ContextHasMetadata(parentCtx)) + assert.True(t, ContextHasMetadata(childCtx)) + }) + + t.Run("returns false for wrong type in context", func(t *testing.T) { + ctx := context.WithValue(context.Background(), metadataCtxKey, "wrong type") + assert.False(t, ContextHasMetadata(ctx)) + }) + + t.Run("returns true for cancelled context with metadata", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + ctx = WithMetadataContext(ctx) + cancel() + + assert.True(t, ContextHasMetadata(ctx)) + }) + + t.Run("multiple contexts with metadata are independent", func(t *testing.T) { + ctx1 := WithMetadataContext(context.Background()) + ctx2 := WithMetadataContext(context.Background()) + ctx3 := context.Background() + + assert.True(t, ContextHasMetadata(ctx1)) + assert.True(t, ContextHasMetadata(ctx2)) + assert.False(t, ContextHasMetadata(ctx3)) + }) +} diff --git a/service/history/api/queryworkflow/api.go b/service/history/api/queryworkflow/api.go index e798ef3b2f..ff9957b01c 100644 --- a/service/history/api/queryworkflow/api.go +++ b/service/history/api/queryworkflow/api.go @@ -80,6 +80,11 @@ func Invoke( workflowLease.GetReleaseFn()(nil) }() + // Context metadata is automatically set during mutable state transaction close for operations that mutate state. + // Since QueryWorkflow is readonly and never closes the transaction, we explicitly call SetContextMetadata + // here to ensure successful requests have workflow metadata populated in the context. + workflowLease.GetMutableState().SetContextMetadata(ctx) + req := request.GetRequest() _, mutableStateStatus := workflowLease.GetMutableState().GetWorkflowStateStatus() scope = scope.WithTags(metrics.StringTag("workflow_status", mutableStateStatus.String())) diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index f6996a07a0..6961e1ee75 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -180,6 +180,13 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( } } + if len(request.Commands) == 0 && len(request.Messages) > 0 { + // Context metadata is automatically set during mutable state transaction close. For RespondWorkflowTaskCompleted + // with only `update.Rejection` messages, the transaction is never closed. We explicitly call SetContextMetadata + // here to ensure readonly messages have workflow metadata populated in the context. + workflowLease.GetMutableState().SetContextMetadata(ctx) + } + workflowLease.GetReleaseFn()(errForRelease) }() diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index daec80d476..be77e7e16b 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -289,6 +289,7 @@ type ( UpdateBuildIdAssignment(buildId string) error ApplyBuildIdRedirect(startingTaskScheduledEventId int64, buildId string, redirectCounter int64) error RefreshExpirationTimeoutTask(ctx context.Context) error + SetContextMetadata(context.Context) GetHistorySize() int64 AddHistorySize(size int64) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index c6ce89b78d..3150e5873c 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -3382,6 +3382,18 @@ func (mr *MockMutableStateMockRecorder) SetChildrenInitializedPostResetPoint(chi return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetChildrenInitializedPostResetPoint", reflect.TypeOf((*MockMutableState)(nil).SetChildrenInitializedPostResetPoint), children) } +// SetContextMetadata mocks base method. +func (m *MockMutableState) SetContextMetadata(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetContextMetadata", arg0) +} + +// SetContextMetadata indicates an expected call of SetContextMetadata. +func (mr *MockMutableStateMockRecorder) SetContextMetadata(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetContextMetadata", reflect.TypeOf((*MockMutableState)(nil).SetContextMetadata), arg0) +} + // SetCurrentBranchToken mocks base method. func (m *MockMutableState) SetCurrentBranchToken(branchToken []byte) error { m.ctrl.T.Helper() diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index af50fccc29..dd59c02102 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6993,19 +6993,19 @@ type closeTransactionResult struct { chasmNodesMutation chasm.NodesMutation } -func (ms *MutableStateImpl) setMetaDataMap( +func (ms *MutableStateImpl) SetContextMetadata( ctx context.Context, ) { switch ms.chasmTree.ArchetypeID() { case chasm.WorkflowArchetypeID, chasm.UnspecifiedArchetypeID: // Set workflow type if wfType := ms.GetWorkflowType(); wfType != nil && wfType.GetName() != "" { - contextutil.ContextMetadataSet(ctx, "workflow-type", wfType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, wfType.GetName()) } // Set workflow task queue if ms.executionInfo != nil && ms.executionInfo.TaskQueue != "" { - contextutil.ContextMetadataSet(ctx, "workflow-task-queue", ms.executionInfo.TaskQueue) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, ms.executionInfo.TaskQueue) } // TODO: To set activity_type/activity_task_queue metadata, the history gRPC handler should @@ -7019,7 +7019,7 @@ func (ms *MutableStateImpl) closeTransaction( ctx context.Context, transactionPolicy historyi.TransactionPolicy, ) (closeTransactionResult, error) { - ms.setMetaDataMap(ctx) + ms.SetContextMetadata(ctx) if err := ms.closeTransactionWithPolicyCheck( transactionPolicy, diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 3af55e0066..b167425196 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/chasm" "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/contextutil" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/failure" @@ -6110,3 +6111,40 @@ func (s *mutableStateSuite) TestCHASMNodeSize() { expectedTotalSize += len(newNodeKey) + newNode.Size() s.Equal(expectedTotalSize, mutableState.GetApproximatePersistedSize()) } + +func (s *mutableStateSuite) TestSetContextMetadata() { + s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).Times(1) + + ctx := contextutil.WithMetadataContext(context.Background()) + workflowType := "test-workflow-type" + taskQueue := "test-task-queue" + + execution := &commonpb.WorkflowExecution{ + WorkflowId: tests.WorkflowID, + RunId: tests.RunID, + } + + _, err := s.mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + NamespaceId: tests.NamespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowId: tests.WorkflowID, + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + }, + }, + ) + s.NoError(err) + + s.mutableState.SetContextMetadata(ctx) + + // Verify metadata was set correctly + wfType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok) + s.Equal(workflowType, wfType) + + tq, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok) + s.Equal(taskQueue, tq) +} From a522a1e093af14b15ec7747a818da7dec8ed7123 Mon Sep 17 00:00:00 2001 From: Nikki Dag Date: Tue, 10 Feb 2026 17:06:34 -0600 Subject: [PATCH 2/5] add unit test coverage --- service/history/api/queryworkflow/api_test.go | 1093 +++++++++++++++++ .../api/respondworkflowtaskcompleted/api.go | 6 +- .../respondworkflowtaskcompleted/api_test.go | 138 ++- 3 files changed, 1226 insertions(+), 11 deletions(-) create mode 100644 service/history/api/queryworkflow/api_test.go diff --git a/service/history/api/queryworkflow/api_test.go b/service/history/api/queryworkflow/api_test.go new file mode 100644 index 0000000000..0842f30a20 --- /dev/null +++ b/service/history/api/queryworkflow/api_test.go @@ -0,0 +1,1093 @@ +package queryworkflow + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + querypb "go.temporal.io/api/query/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + enumsspb "go.temporal.io/server/api/enums/v1" + historyspb "go.temporal.io/server/api/history/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/api/matchingservicemock/v1" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/contextutil" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/locks" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/payloads" + "go.temporal.io/server/service/history/api" + historyi "go.temporal.io/server/service/history/interfaces" + "go.uber.org/mock/gomock" +) + +type ( + apiSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + workflowConsistencyChecker *api.MockWorkflowConsistencyChecker + workflowLease api.WorkflowLease + workflowContext *historyi.MockWorkflowContext + mutableState *historyi.MockMutableState + shardContext *historyi.MockShardContext + namespaceRegistry *namespace.MockRegistry + clusterMetadata *cluster.MockMetadata + } +) + +func TestAPISuite(t *testing.T) { + s := new(apiSuite) + suite.Run(t, s) +} + +func (s *apiSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *apiSuite) SetupSubTest() { + s.controller = gomock.NewController(s.T()) + s.workflowContext = historyi.NewMockWorkflowContext(s.controller) + s.mutableState = historyi.NewMockMutableState(s.controller) + s.shardContext = historyi.NewMockShardContext(s.controller) + s.namespaceRegistry = namespace.NewMockRegistry(s.controller) + s.clusterMetadata = cluster.NewMockMetadata(s.controller) + s.workflowConsistencyChecker = api.NewMockWorkflowConsistencyChecker(s.controller) + + s.workflowLease = api.NewWorkflowLease( + s.workflowContext, + func(err error) {}, + s.mutableState, + ) +} + +func (s *apiSuite) TearDownSubTest() { + s.controller.Finish() +} + +func (s *apiSuite) TestContextMetadataPopulated() { + // This test suite verifies that SetContextMetadata is ALWAYS called for QueryWorkflow operations, + // regardless of which code path is taken (success, rejected, paused, error, etc.). + // SetContextMetadata is called early in the API handler, before any branching logic. + + s.Run("SUCCESS CASE - Query executed successfully", func() { + // This is the MAIN case - a query that actually runs and returns results + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-id-success" + runID := "test-run-id-success" + workflowType := &commonpb.WorkflowType{Name: "test-workflow-type-success"} + taskQueue := "test-task-queue-success" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + // Mock matching client to return successful query response + mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) + mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( + &matchingservice.QueryWorkflowResponse{ + QueryResult: payloads.EncodeString("query-success"), + }, nil) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() + s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + // THIS IS THE KEY: SetContextMetadata is called + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + + // Setup for successful query dispatch + s.mutableState.EXPECT().GetWorkflowStateStatus().Return( + enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, + enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + ).AnyTimes() + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + WorkflowTaskAttempt: 0, + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + BranchToken: []byte("branch-token"), + Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, + }, + }, + }, + }).AnyTimes() + s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ + RunId: runID, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }).AnyTimes() + s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() + s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() + s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() + s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() + s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() + s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() + s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() + s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NONE, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) + s.NoError(err, "MAIN success case should not error") + s.NotNil(resp) + s.NotNil(resp.Response) + s.NotNil(resp.Response.QueryResult) + + // VERIFY: Context metadata was populated in the SUCCESS case + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set in success case") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set in success case") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("Completed workflow (rejected)", func() { + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-id" + runID := "test-run-id" + workflowType := &commonpb.WorkflowType{Name: "test-workflow-type"} + taskQueue := "test-task-queue" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED) + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + }).AnyTimes() + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.NoError(err) + s.NotNil(resp) + s.NotNil(resp.Response.QueryRejected) + + // Verify context metadata was populated + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("Paused workflow", func() { + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-id-2" + runID := "test-run-id-2" + workflowType := &commonpb.WorkflowType{Name: "test-workflow-type-2"} + taskQueue := "test-task-queue-2" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + // Return PAUSED status to trigger the paused workflow path + s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED) + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + }).AnyTimes() + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.NoError(err) + s.NotNil(resp) + s.NotNil(resp.Response.QueryRejected) + s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, resp.Response.QueryRejected.Status) + + // Verify context metadata was populated even for paused workflow + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set for paused workflow") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set for paused workflow") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("Namespace validation error - metadata NOT set", func() { + // Test early exit BEFORE SetContextMetadata + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler) + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: "invalid-uuid", + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: "test", + RunId: "test", + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.Error(err) // Should error on namespace validation + + // Verify context metadata was NOT set (error before SetContextMetadata) + _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.False(ok, "workflow type should NOT be set - exited before SetContextMetadata") + }) + + s.Run("Namespace registry error - metadata NOT set", func() { + // Test error when GetNamespaceByID fails + namespaceID := namespace.ID(uuid.NewString()) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler) + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nil, serviceerror.NewNotFound("namespace not found")) + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: "test-wf", + RunId: "test-run", + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.Error(err, "should error when GetNamespaceByID fails") + + // Verify metadata was NOT set (error before SetContextMetadata) + _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.False(ok, "metadata should NOT be set - error before SetContextMetadata") + }) + + s.Run("GetCurrentWorkflowRunID path", func() { + // Test when RunID is empty and needs to be fetched (lines 52-62) + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-runid" + runID := "fetched-run-id" + workflowType := &commonpb.WorkflowType{Name: "test-wf-type"} + taskQueue := "test-queue" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + + // Mock GetCurrentWorkflowRunID + s.workflowConsistencyChecker.EXPECT().GetCurrentWorkflowRunID( + gomock.Any(), + namespaceID.String(), + workflowID, + locks.PriorityHigh, + ).Return(runID, nil) + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return( + enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, + enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + ) + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + }).AnyTimes() + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + // RunId is EMPTY - will be fetched + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.NoError(err) + s.NotNil(resp) + + // VERIFY: Context metadata set even when RunID was fetched + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set after RunID fetch") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set after RunID fetch") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("Running workflow with no reject condition", func() { + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-id-3" + runID := "test-run-id-3" + workflowType := &commonpb.WorkflowType{Name: "test-workflow-type-3"} + taskQueue := "test-task-queue-3" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING) + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + WorkflowTaskAttempt: 0, // Not in failed state + }).AnyTimes() + // Return false so it triggers "workflow closed before task started" error + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false) + s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(false) + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.Error(err) // Should return ErrWorkflowClosedBeforeWorkflowTaskStarted + + // Verify context metadata was populated even when query fails with error + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set even when query fails") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set even when query fails") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("GetWorkflowLease error - metadata NOT set", func() { + // Test error BEFORE SetContextMetadata is called + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-lease-error" + runID := "test-run-lease-error" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + // Return error from GetWorkflowLease - this happens BEFORE SetContextMetadata + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(nil, serviceerror.NewNotFound("workflow not found")) + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.Error(err, "should error when GetWorkflowLease fails") + + // Verify context metadata was NOT set (error before SetContextMetadata) + _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.False(ok, "metadata should NOT be set - error before SetContextMetadata call") + }) + + s.Run("NOT_COMPLETED_CLEANLY reject condition", func() { + // Test QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY path + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-not-cleanly" + runID := "test-run-not-cleanly" + workflowType := &commonpb.WorkflowType{Name: "test-wf-type-cleanly"} + taskQueue := "test-queue-cleanly" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + // Return FAILED status (not completed cleanly) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return( + enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, + enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, + ) + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + }).AnyTimes() + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + // This should trigger rejection because workflow is FAILED (not completed cleanly) + QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.NoError(err) + s.NotNil(resp) + s.NotNil(resp.Response.QueryRejected) + s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, resp.Response.QueryRejected.Status) + + // Verify metadata was set + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set for NOT_COMPLETED_CLEANLY path") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set for NOT_COMPLETED_CLEANLY path") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("WorkflowTask failing repeatedly (attempt >= 3)", func() { + // Test fail-fast path when workflow task keeps failing + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-task-failing" + runID := "test-run-task-failing" + workflowType := &commonpb.WorkflowType{Name: "test-wf-type-failing"} + taskQueue := "test-queue-failing" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + s.shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger()).AnyTimes() + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return( + enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + ) + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + WorkflowTaskAttempt: 5, // >= 3, should fail fast + }).AnyTimes() + // !IsWorkflowExecutionRunning() && !HasCompletedAnyWorkflowTask() - returns true so short circuits, HasCompletedAnyWorkflowTask never called + // !HadOrHasWorkflowTask() - returns true so this is false, block is skipped + // GetExecutionInfo().WorkflowTaskAttempt >= 3 - this triggers the fail-fast error + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) + s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true) + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.Error(err, "should fail fast when workflow task attempt >= 3") + + // VERIFY: Context metadata set before failing fast + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set even when failing fast") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set even when failing fast") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("Namespace not active in cluster - dispatch directly", func() { + // Test safeToDispatchDirectly when namespace is not active + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-not-active" + runID := "test-run-not-active" + workflowType := &commonpb.WorkflowType{Name: "test-wf-type-not-active"} + taskQueue := "test-queue-not-active" + + // Namespace is active in "other-cluster", NOT in "test-cluster" + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "other-cluster", // Different from current cluster + ) + + mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) + mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( + &matchingservice.QueryWorkflowResponse{ + QueryResult: payloads.EncodeString("query-result-not-active"), + }, nil) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() + s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return( + enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + ).AnyTimes() + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + WorkflowTaskAttempt: 0, + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + BranchToken: []byte("branch-token"), + Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, + }, + }, + }, + }).AnyTimes() + s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ + RunId: runID, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + }).AnyTimes() + s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() + s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() + s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() + s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() + s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() + s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() + s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() + s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() + // These are checked when determining safeToDispatchDirectly + s.mutableState.EXPECT().HasPendingWorkflowTask().Return(false).AnyTimes() + s.mutableState.EXPECT().HasStartedWorkflowTask().Return(false).AnyTimes() + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) + s.NoError(err, "should succeed when dispatching directly (namespace not active)") + s.NotNil(resp) + s.NotNil(resp.Response) + + // VERIFY: Context metadata set for namespace not active path + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set for direct dispatch path") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set for direct dispatch path") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("GetCurrentWorkflowRunID error path", func() { + // Test error when fetching current run ID + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-runid-error" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + + // Return error from GetCurrentWorkflowRunID + s.workflowConsistencyChecker.EXPECT().GetCurrentWorkflowRunID( + gomock.Any(), + namespaceID.String(), + workflowID, + locks.PriorityHigh, + ).Return("", serviceerror.NewNotFound("workflow not found")) + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + // RunId is EMPTY - will try to fetch but fail + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) + s.Error(err, "should error when GetCurrentWorkflowRunID fails") + + // Verify metadata was NOT set (error before SetContextMetadata) + _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.False(ok, "metadata should NOT be set - error before SetContextMetadata") + }) + + s.Run("Workflow not running - dispatch directly", func() { + // Test safeToDispatchDirectly when workflow is not running + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-not-running" + runID := "test-run-not-running" + workflowType := &commonpb.WorkflowType{Name: "test-wf-type-not-running"} + taskQueue := "test-queue-not-running" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) + mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( + &matchingservice.QueryWorkflowResponse{ + QueryResult: payloads.EncodeString("query-result-not-running"), + }, nil) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() + s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return( + enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, + enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + ).AnyTimes() + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + WorkflowTaskAttempt: 0, + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + BranchToken: []byte("branch-token"), + Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, + }, + }, + }, + }).AnyTimes() + s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ + RunId: runID, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + }).AnyTimes() + s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() + s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() + s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() + s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() + s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() + s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() + s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() + s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() + // Workflow is NOT running - this makes safeToDispatchDirectly = true + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) + s.NoError(err, "should succeed when dispatching directly (workflow not running)") + s.NotNil(resp) + s.NotNil(resp.Response) + + // VERIFY: Context metadata set for workflow not running path + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set for direct dispatch path") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set for direct dispatch path") + s.Equal(taskQueue, contextTaskQueue) + }) + + s.Run("No pending/started workflow tasks - dispatch directly", func() { + // Test safeToDispatchDirectly when no pending/started tasks + namespaceID := namespace.ID(uuid.NewString()) + workflowID := "test-workflow-no-pending" + runID := "test-run-no-pending" + workflowType := &commonpb.WorkflowType{Name: "test-wf-type-no-pending"} + taskQueue := "test-queue-no-pending" + + nsEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{}, + "test-cluster", + ) + + mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) + mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( + &matchingservice.QueryWorkflowResponse{ + QueryResult: payloads.EncodeString("query-result-no-pending"), + }, nil) + + s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() + s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) + s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) + s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() + s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() + + workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) + s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( + gomock.Any(), + nil, + workflowKey, + locks.PriorityHigh, + ).Return(s.workflowLease, nil) + + s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) + contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) + }) + s.mutableState.EXPECT().GetWorkflowStateStatus().Return( + enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + ).AnyTimes() + s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() + s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + TaskQueue: taskQueue, + WorkflowTaskAttempt: 0, + VersionHistories: &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + BranchToken: []byte("branch-token"), + Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, + }, + }, + }, + }).AnyTimes() + s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ + RunId: runID, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + }).AnyTimes() + s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() + s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() + s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() + s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() + s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() + s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() + s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() + s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() + s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() + // Workflow IS running, namespace IS active in cluster, but NO pending/started tasks + // This makes safeToDispatchDirectly = true + s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() + s.mutableState.EXPECT().HasPendingWorkflowTask().Return(false) + s.mutableState.EXPECT().HasStartedWorkflowTask().Return(false) + + ctx := contextutil.WithMetadataContext(context.Background()) + + request := &historyservice.QueryWorkflowRequest{ + NamespaceId: namespaceID.String(), + Request: &workflowservice.QueryWorkflowRequest{ + Namespace: "test-namespace", + Execution: &commonpb.WorkflowExecution{ + WorkflowId: workflowID, + RunId: runID, + }, + Query: &querypb.WorkflowQuery{ + QueryType: "test-query", + }, + }, + } + + resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) + s.NoError(err, "should succeed when no pending/started tasks") + s.NotNil(resp) + s.NotNil(resp.Response) + + // VERIFY: Context metadata set for no pending/started tasks path + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set for direct dispatch path") + s.Equal(workflowType.GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set for direct dispatch path") + s.Equal(taskQueue, contextTaskQueue) + }) + +} diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 6961e1ee75..750119c145 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -180,10 +180,10 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( } } - if len(request.Commands) == 0 && len(request.Messages) > 0 { + if len(request.Commands) == 0 { // Context metadata is automatically set during mutable state transaction close. For RespondWorkflowTaskCompleted - // with only `update.Rejection` messages, the transaction is never closed. We explicitly call SetContextMetadata - // here to ensure readonly messages have workflow metadata populated in the context. + // with no commands (e.g., workflow task heartbeat or only readonly messages like `update.Rejection`), the transaction + // is never closed. We explicitly call SetContextMetadata here to ensure workflow metadata is populated in the context. workflowLease.GetMutableState().SetContextMetadata(ctx) } diff --git a/service/history/api/respondworkflowtaskcompleted/api_test.go b/service/history/api/respondworkflowtaskcompleted/api_test.go index 7230f19ff1..7e0fd09613 100644 --- a/service/history/api/respondworkflowtaskcompleted/api_test.go +++ b/service/history/api/respondworkflowtaskcompleted/api_test.go @@ -24,6 +24,7 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/contextutil" "go.temporal.io/server/common/effect" "go.temporal.io/server/common/locks" "go.temporal.io/server/common/log" @@ -171,7 +172,10 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext) s.NotNil(upd) - _, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + // Use context with metadata to verify it's set via transaction close + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -193,6 +197,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { 4 WorkflowTaskCompleted 5 WorkflowExecutionUpdateAccepted 6 WorkflowExecutionUpdateCompleted`, <-writtenHistoryCh) + + // VERIFY: Context metadata set via transaction close (normal case WITH commands) + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set in normal case with commands") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set in normal case with commands") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) s.Run("Reject", func() { @@ -204,7 +217,10 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext) s.NotNil(upd) - _, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + // Use context with metadata support to verify SetContextMetadata is called + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -218,6 +234,46 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { s.NoError(err) s.Equal(enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED.String(), updStatus.Stage.String()) s.Equal("rejection-of-"+tv.UpdateID(), updStatus.Outcome.GetFailure().GetMessage()) + + // VERIFY: Context metadata set for readonly operation (no commands, only messages) + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set for readonly operation") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set for readonly operation") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) + }) + + s.Run("Heartbeat (empty completion)", func() { + tv := testvars.New(s.T()) + tv = tv.WithRunID(tv.Any().RunID()) + s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes() + wfContext := s.createStartedWorkflow(tv) + + _, _, serializedTaskToken := s.createSentUpdate(tv, wfContext) + + // Use context with metadata support to verify SetContextMetadata is called + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ + NamespaceId: tv.NamespaceID().String(), + CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ + TaskToken: serializedTaskToken, + // No Commands, no Messages - this is a workflow task heartbeat + Identity: tv.Any().String(), + }, + }) + s.NoError(err) + + // VERIFY: Context metadata set for heartbeat (no commands, no messages) + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set for heartbeat") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set for heartbeat") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) s.Run("Write failed on normal task queue", func() { @@ -232,7 +288,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext) s.NotNil(upd) - _, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -244,6 +302,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { s.ErrorIs(err, writeErr) s.Nil(wfContext.(*workflow.ContextImpl).MutableState, "mutable state must be cleared") + + // VERIFY: Context metadata set even when write fails (transaction close is attempted) + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set even when write fails") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set even when write fails") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) s.Run("Write failed on sticky task queue", func() { @@ -261,7 +328,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext) s.NotNil(upd) - _, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -274,6 +343,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { s.ErrorIs(err, writeErr) s.Nil(wfContext.(*workflow.ContextImpl).MutableState, "mutable state must be cleared") + + // VERIFY: Context metadata set even when sticky write fails + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set even when sticky write fails") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set even when sticky write fails") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) s.Run("GetHistory failed", func() { @@ -290,7 +368,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { readHistoryErr := errors.New("get history failed") s.mockExecutionMgr.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(nil, readHistoryErr) - _, err := s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err := s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -316,6 +396,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { 6 WorkflowExecutionUpdateCompleted 7 WorkflowTaskScheduled 8 WorkflowTaskStarted`, <-writtenHistoryCh) + + // VERIFY: Context metadata set even when ReadHistoryBranch fails + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set even when ReadHistoryBranch fails") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set even when ReadHistoryBranch fails") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) s.Run("Discard speculative WFT with events", func() { @@ -346,7 +435,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext) s.NotNil(upd) - _, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -374,6 +465,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { s.EqualHistoryEvents(` 3 TimerFired // No WFT events in between 2 and 3. `, <-writtenHistoryCh) + + // VERIFY: Context metadata set for discard speculative WFT case + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set when discarding speculative WFT") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set when discarding speculative WFT") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) s.Run("Do not discard speculative WFT with more than 10 events", func() { @@ -416,7 +516,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { updRequestMsg, upd, serializedTaskToken := s.createSentUpdate(tv, wfContext) s.NotNil(upd) - _, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -439,6 +541,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() { 14 WorkflowTaskStarted 15 WorkflowTaskCompleted `, <-writtenHistoryCh) + + // VERIFY: Context metadata set when NOT discarding speculative WFT (>10 events) + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set when keeping speculative WFT") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set when keeping speculative WFT") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) } @@ -453,7 +564,9 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestForceCreateNewWorkflowTaskOnPaus s.NoError(err) s.True(ms.IsWorkflowExecutionStatusPaused()) - _, err = s.workflowTaskCompletedHandler.Invoke(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{ + ctx := contextutil.WithMetadataContext(context.Background()) + + _, err = s.workflowTaskCompletedHandler.Invoke(ctx, &historyservice.RespondWorkflowTaskCompletedRequest{ NamespaceId: tv.NamespaceID().String(), CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: serializedTaskToken, @@ -465,6 +578,15 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestForceCreateNewWorkflowTaskOnPaus var failedPrecondition *serviceerror.FailedPrecondition s.ErrorAs(err, &failedPrecondition) s.Contains(err.Error(), "Workflow is paused and force create new workflow task is not allowed") + + // VERIFY: Context metadata set even when paused workflow returns error + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type MUST be set even for paused workflow error") + s.Equal(tv.WorkflowType().GetName(), contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue MUST be set even for paused workflow error") + s.Equal(tv.TaskQueue().GetName(), contextTaskQueue) }) } From 1ed923e811c8970d712585f46582a51cd2fdfb9b Mon Sep 17 00:00:00 2001 From: Nikki Dag Date: Tue, 10 Feb 2026 19:31:19 -0600 Subject: [PATCH 3/5] update query workflow tests --- service/history/api/queryworkflow/api_test.go | 1093 ----------------- service/history/history_engine_test.go | 121 +- 2 files changed, 106 insertions(+), 1108 deletions(-) delete mode 100644 service/history/api/queryworkflow/api_test.go diff --git a/service/history/api/queryworkflow/api_test.go b/service/history/api/queryworkflow/api_test.go deleted file mode 100644 index 0842f30a20..0000000000 --- a/service/history/api/queryworkflow/api_test.go +++ /dev/null @@ -1,1093 +0,0 @@ -package queryworkflow - -import ( - "context" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - commonpb "go.temporal.io/api/common/v1" - enumspb "go.temporal.io/api/enums/v1" - querypb "go.temporal.io/api/query/v1" - "go.temporal.io/api/serviceerror" - "go.temporal.io/api/workflowservice/v1" - enumsspb "go.temporal.io/server/api/enums/v1" - historyspb "go.temporal.io/server/api/history/v1" - "go.temporal.io/server/api/historyservice/v1" - "go.temporal.io/server/api/matchingservice/v1" - "go.temporal.io/server/api/matchingservicemock/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/cluster" - "go.temporal.io/server/common/contextutil" - "go.temporal.io/server/common/definition" - "go.temporal.io/server/common/locks" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/metrics" - "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/payloads" - "go.temporal.io/server/service/history/api" - historyi "go.temporal.io/server/service/history/interfaces" - "go.uber.org/mock/gomock" -) - -type ( - apiSuite struct { - suite.Suite - *require.Assertions - - controller *gomock.Controller - workflowConsistencyChecker *api.MockWorkflowConsistencyChecker - workflowLease api.WorkflowLease - workflowContext *historyi.MockWorkflowContext - mutableState *historyi.MockMutableState - shardContext *historyi.MockShardContext - namespaceRegistry *namespace.MockRegistry - clusterMetadata *cluster.MockMetadata - } -) - -func TestAPISuite(t *testing.T) { - s := new(apiSuite) - suite.Run(t, s) -} - -func (s *apiSuite) SetupTest() { - s.Assertions = require.New(s.T()) -} - -func (s *apiSuite) SetupSubTest() { - s.controller = gomock.NewController(s.T()) - s.workflowContext = historyi.NewMockWorkflowContext(s.controller) - s.mutableState = historyi.NewMockMutableState(s.controller) - s.shardContext = historyi.NewMockShardContext(s.controller) - s.namespaceRegistry = namespace.NewMockRegistry(s.controller) - s.clusterMetadata = cluster.NewMockMetadata(s.controller) - s.workflowConsistencyChecker = api.NewMockWorkflowConsistencyChecker(s.controller) - - s.workflowLease = api.NewWorkflowLease( - s.workflowContext, - func(err error) {}, - s.mutableState, - ) -} - -func (s *apiSuite) TearDownSubTest() { - s.controller.Finish() -} - -func (s *apiSuite) TestContextMetadataPopulated() { - // This test suite verifies that SetContextMetadata is ALWAYS called for QueryWorkflow operations, - // regardless of which code path is taken (success, rejected, paused, error, etc.). - // SetContextMetadata is called early in the API handler, before any branching logic. - - s.Run("SUCCESS CASE - Query executed successfully", func() { - // This is the MAIN case - a query that actually runs and returns results - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-id-success" - runID := "test-run-id-success" - workflowType := &commonpb.WorkflowType{Name: "test-workflow-type-success"} - taskQueue := "test-task-queue-success" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - // Mock matching client to return successful query response - mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) - mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( - &matchingservice.QueryWorkflowResponse{ - QueryResult: payloads.EncodeString("query-success"), - }, nil) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() - s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - // THIS IS THE KEY: SetContextMetadata is called - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - - // Setup for successful query dispatch - s.mutableState.EXPECT().GetWorkflowStateStatus().Return( - enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, - enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, - ).AnyTimes() - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - WorkflowTaskAttempt: 0, - VersionHistories: &historyspb.VersionHistories{ - Histories: []*historyspb.VersionHistory{ - { - BranchToken: []byte("branch-token"), - Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, - }, - }, - }, - }).AnyTimes() - s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: runID, - Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, - }).AnyTimes() - s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() - s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() - s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() - s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() - s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() - s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() - s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() - s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() - s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NONE, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) - s.NoError(err, "MAIN success case should not error") - s.NotNil(resp) - s.NotNil(resp.Response) - s.NotNil(resp.Response.QueryResult) - - // VERIFY: Context metadata was populated in the SUCCESS case - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set in success case") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set in success case") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("Completed workflow (rejected)", func() { - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-id" - runID := "test-run-id" - workflowType := &commonpb.WorkflowType{Name: "test-workflow-type"} - taskQueue := "test-task-queue" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED) - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - }).AnyTimes() - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.NoError(err) - s.NotNil(resp) - s.NotNil(resp.Response.QueryRejected) - - // Verify context metadata was populated - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("Paused workflow", func() { - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-id-2" - runID := "test-run-id-2" - workflowType := &commonpb.WorkflowType{Name: "test-workflow-type-2"} - taskQueue := "test-task-queue-2" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - // Return PAUSED status to trigger the paused workflow path - s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED) - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - }).AnyTimes() - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.NoError(err) - s.NotNil(resp) - s.NotNil(resp.Response.QueryRejected) - s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED, resp.Response.QueryRejected.Status) - - // Verify context metadata was populated even for paused workflow - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set for paused workflow") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set for paused workflow") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("Namespace validation error - metadata NOT set", func() { - // Test early exit BEFORE SetContextMetadata - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler) - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: "invalid-uuid", - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: "test", - RunId: "test", - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.Error(err) // Should error on namespace validation - - // Verify context metadata was NOT set (error before SetContextMetadata) - _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.False(ok, "workflow type should NOT be set - exited before SetContextMetadata") - }) - - s.Run("Namespace registry error - metadata NOT set", func() { - // Test error when GetNamespaceByID fails - namespaceID := namespace.ID(uuid.NewString()) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler) - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nil, serviceerror.NewNotFound("namespace not found")) - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: "test-wf", - RunId: "test-run", - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.Error(err, "should error when GetNamespaceByID fails") - - // Verify metadata was NOT set (error before SetContextMetadata) - _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.False(ok, "metadata should NOT be set - error before SetContextMetadata") - }) - - s.Run("GetCurrentWorkflowRunID path", func() { - // Test when RunID is empty and needs to be fetched (lines 52-62) - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-runid" - runID := "fetched-run-id" - workflowType := &commonpb.WorkflowType{Name: "test-wf-type"} - taskQueue := "test-queue" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - - // Mock GetCurrentWorkflowRunID - s.workflowConsistencyChecker.EXPECT().GetCurrentWorkflowRunID( - gomock.Any(), - namespaceID.String(), - workflowID, - locks.PriorityHigh, - ).Return(runID, nil) - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return( - enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, - enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, - ) - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - }).AnyTimes() - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - // RunId is EMPTY - will be fetched - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.NoError(err) - s.NotNil(resp) - - // VERIFY: Context metadata set even when RunID was fetched - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set after RunID fetch") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set after RunID fetch") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("Running workflow with no reject condition", func() { - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-id-3" - runID := "test-run-id-3" - workflowType := &commonpb.WorkflowType{Name: "test-workflow-type-3"} - taskQueue := "test-task-queue-3" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING) - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - WorkflowTaskAttempt: 0, // Not in failed state - }).AnyTimes() - // Return false so it triggers "workflow closed before task started" error - s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false) - s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(false) - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.Error(err) // Should return ErrWorkflowClosedBeforeWorkflowTaskStarted - - // Verify context metadata was populated even when query fails with error - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set even when query fails") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set even when query fails") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("GetWorkflowLease error - metadata NOT set", func() { - // Test error BEFORE SetContextMetadata is called - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-lease-error" - runID := "test-run-lease-error" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - // Return error from GetWorkflowLease - this happens BEFORE SetContextMetadata - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(nil, serviceerror.NewNotFound("workflow not found")) - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.Error(err, "should error when GetWorkflowLease fails") - - // Verify context metadata was NOT set (error before SetContextMetadata) - _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.False(ok, "metadata should NOT be set - error before SetContextMetadata call") - }) - - s.Run("NOT_COMPLETED_CLEANLY reject condition", func() { - // Test QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY path - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-not-cleanly" - runID := "test-run-not-cleanly" - workflowType := &commonpb.WorkflowType{Name: "test-wf-type-cleanly"} - taskQueue := "test-queue-cleanly" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - // Return FAILED status (not completed cleanly) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return( - enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, - enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, - ) - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - }).AnyTimes() - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - // This should trigger rejection because workflow is FAILED (not completed cleanly) - QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.NoError(err) - s.NotNil(resp) - s.NotNil(resp.Response.QueryRejected) - s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, resp.Response.QueryRejected.Status) - - // Verify metadata was set - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set for NOT_COMPLETED_CLEANLY path") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set for NOT_COMPLETED_CLEANLY path") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("WorkflowTask failing repeatedly (attempt >= 3)", func() { - // Test fail-fast path when workflow task keeps failing - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-task-failing" - runID := "test-run-task-failing" - workflowType := &commonpb.WorkflowType{Name: "test-wf-type-failing"} - taskQueue := "test-queue-failing" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - s.shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger()).AnyTimes() - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return( - enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, - enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - ) - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - WorkflowTaskAttempt: 5, // >= 3, should fail fast - }).AnyTimes() - // !IsWorkflowExecutionRunning() && !HasCompletedAnyWorkflowTask() - returns true so short circuits, HasCompletedAnyWorkflowTask never called - // !HadOrHasWorkflowTask() - returns true so this is false, block is skipped - // GetExecutionInfo().WorkflowTaskAttempt >= 3 - this triggers the fail-fast error - s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true) - s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true) - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.Error(err, "should fail fast when workflow task attempt >= 3") - - // VERIFY: Context metadata set before failing fast - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set even when failing fast") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set even when failing fast") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("Namespace not active in cluster - dispatch directly", func() { - // Test safeToDispatchDirectly when namespace is not active - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-not-active" - runID := "test-run-not-active" - workflowType := &commonpb.WorkflowType{Name: "test-wf-type-not-active"} - taskQueue := "test-queue-not-active" - - // Namespace is active in "other-cluster", NOT in "test-cluster" - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "other-cluster", // Different from current cluster - ) - - mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) - mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( - &matchingservice.QueryWorkflowResponse{ - QueryResult: payloads.EncodeString("query-result-not-active"), - }, nil) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() - s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return( - enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, - enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - ).AnyTimes() - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - WorkflowTaskAttempt: 0, - VersionHistories: &historyspb.VersionHistories{ - Histories: []*historyspb.VersionHistory{ - { - BranchToken: []byte("branch-token"), - Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, - }, - }, - }, - }).AnyTimes() - s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: runID, - Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - }).AnyTimes() - s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() - s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() - s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() - s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() - s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() - s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() - s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() - s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() - s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() - // These are checked when determining safeToDispatchDirectly - s.mutableState.EXPECT().HasPendingWorkflowTask().Return(false).AnyTimes() - s.mutableState.EXPECT().HasStartedWorkflowTask().Return(false).AnyTimes() - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) - s.NoError(err, "should succeed when dispatching directly (namespace not active)") - s.NotNil(resp) - s.NotNil(resp.Response) - - // VERIFY: Context metadata set for namespace not active path - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set for direct dispatch path") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set for direct dispatch path") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("GetCurrentWorkflowRunID error path", func() { - // Test error when fetching current run ID - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-runid-error" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - - // Return error from GetCurrentWorkflowRunID - s.workflowConsistencyChecker.EXPECT().GetCurrentWorkflowRunID( - gomock.Any(), - namespaceID.String(), - workflowID, - locks.PriorityHigh, - ).Return("", serviceerror.NewNotFound("workflow not found")) - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - // RunId is EMPTY - will try to fetch but fail - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - _, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, nil, nil) - s.Error(err, "should error when GetCurrentWorkflowRunID fails") - - // Verify metadata was NOT set (error before SetContextMetadata) - _, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.False(ok, "metadata should NOT be set - error before SetContextMetadata") - }) - - s.Run("Workflow not running - dispatch directly", func() { - // Test safeToDispatchDirectly when workflow is not running - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-not-running" - runID := "test-run-not-running" - workflowType := &commonpb.WorkflowType{Name: "test-wf-type-not-running"} - taskQueue := "test-queue-not-running" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) - mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( - &matchingservice.QueryWorkflowResponse{ - QueryResult: payloads.EncodeString("query-result-not-running"), - }, nil) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() - s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return( - enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, - enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, - ).AnyTimes() - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - WorkflowTaskAttempt: 0, - VersionHistories: &historyspb.VersionHistories{ - Histories: []*historyspb.VersionHistory{ - { - BranchToken: []byte("branch-token"), - Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, - }, - }, - }, - }).AnyTimes() - s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: runID, - Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, - }).AnyTimes() - s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() - s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() - s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() - s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() - s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() - s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() - s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() - s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() - // Workflow is NOT running - this makes safeToDispatchDirectly = true - s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) - s.NoError(err, "should succeed when dispatching directly (workflow not running)") - s.NotNil(resp) - s.NotNil(resp.Response) - - // VERIFY: Context metadata set for workflow not running path - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set for direct dispatch path") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set for direct dispatch path") - s.Equal(taskQueue, contextTaskQueue) - }) - - s.Run("No pending/started workflow tasks - dispatch directly", func() { - // Test safeToDispatchDirectly when no pending/started tasks - namespaceID := namespace.ID(uuid.NewString()) - workflowID := "test-workflow-no-pending" - runID := "test-run-no-pending" - workflowType := &commonpb.WorkflowType{Name: "test-wf-type-no-pending"} - taskQueue := "test-queue-no-pending" - - nsEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Name: "test-namespace"}, - &persistencespb.NamespaceConfig{}, - "test-cluster", - ) - - mockMatchingClient := matchingservicemock.NewMockMatchingServiceClient(s.controller) - mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return( - &matchingservice.QueryWorkflowResponse{ - QueryResult: payloads.EncodeString("query-result-no-pending"), - }, nil) - - s.shardContext.EXPECT().GetMetricsHandler().Return(metrics.NoopMetricsHandler).AnyTimes() - s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry) - s.namespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(nsEntry, nil) - s.shardContext.EXPECT().GetClusterMetadata().Return(s.clusterMetadata).AnyTimes() - s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("test-cluster").AnyTimes() - - workflowKey := definition.NewWorkflowKey(namespaceID.String(), workflowID, runID) - s.workflowConsistencyChecker.EXPECT().GetWorkflowLease( - gomock.Any(), - nil, - workflowKey, - locks.PriorityHigh, - ).Return(s.workflowLease, nil) - - s.mutableState.EXPECT().SetContextMetadata(gomock.Any()).Do(func(ctx context.Context) { - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowType, workflowType.GetName()) - contextutil.ContextMetadataSet(ctx, contextutil.MetadataKeyWorkflowTaskQueue, taskQueue) - }) - s.mutableState.EXPECT().GetWorkflowStateStatus().Return( - enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, - enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - ).AnyTimes() - s.mutableState.EXPECT().GetWorkflowType().Return(workflowType).AnyTimes() - s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - TaskQueue: taskQueue, - WorkflowTaskAttempt: 0, - VersionHistories: &historyspb.VersionHistories{ - Histories: []*historyspb.VersionHistory{ - { - BranchToken: []byte("branch-token"), - Items: []*historyspb.VersionHistoryItem{{EventId: 10, Version: 1}}, - }, - }, - }, - }).AnyTimes() - s.mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: runID, - Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - }).AnyTimes() - s.mutableState.EXPECT().GetCurrentBranchToken().Return([]byte("branch-token"), nil).AnyTimes() - s.mutableState.EXPECT().GetNextEventID().Return(int64(10)).AnyTimes() - s.mutableState.EXPECT().GetLastFirstEventIDTxnID().Return(int64(1), int64(1)).AnyTimes() - s.mutableState.EXPECT().GetLastCompletedWorkflowTaskStartedEventId().Return(int64(5)).AnyTimes() - s.mutableState.EXPECT().IsStickyTaskQueueSet().Return(false).AnyTimes() - s.mutableState.EXPECT().GetAssignedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetInheritedBuildId().Return("").AnyTimes() - s.mutableState.EXPECT().GetMostRecentWorkerVersionStamp().Return(nil).AnyTimes() - s.mutableState.EXPECT().HadOrHasWorkflowTask().Return(true).AnyTimes() - s.mutableState.EXPECT().HasCompletedAnyWorkflowTask().Return(true).AnyTimes() - // Workflow IS running, namespace IS active in cluster, but NO pending/started tasks - // This makes safeToDispatchDirectly = true - s.mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() - s.mutableState.EXPECT().HasPendingWorkflowTask().Return(false) - s.mutableState.EXPECT().HasStartedWorkflowTask().Return(false) - - ctx := contextutil.WithMetadataContext(context.Background()) - - request := &historyservice.QueryWorkflowRequest{ - NamespaceId: namespaceID.String(), - Request: &workflowservice.QueryWorkflowRequest{ - Namespace: "test-namespace", - Execution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - Query: &querypb.WorkflowQuery{ - QueryType: "test-query", - }, - }, - } - - resp, err := Invoke(ctx, request, s.shardContext, s.workflowConsistencyChecker, mockMatchingClient, mockMatchingClient) - s.NoError(err, "should succeed when no pending/started tasks") - s.NotNil(resp) - s.NotNil(resp.Response) - - // VERIFY: Context metadata set for no pending/started tasks path - contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type MUST be set for direct dispatch path") - s.Equal(workflowType.GetName(), contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue MUST be set for direct dispatch path") - s.Equal(taskQueue, contextTaskQueue) - }) - -} diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 7ce8a939b0..6e30b86870 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/contextutil" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/failure" "go.temporal.io/server/common/headers" @@ -481,9 +482,10 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnCompleted() { } taskqueue := "testTaskQueue" identity := "testIdentity" + workflowType := "wType" ms := workflow.TestLocalMutableState(s.historyEngine.shardContext, s.eventsCache, tests.LocalNamespaceEntry, execution.GetWorkflowId(), execution.GetRunId(), log.NewTestLogger()) - addWorkflowExecutionStartedEvent(ms, &execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowExecutionStartedEvent(ms, &execution, workflowType, taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) event := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) wt.StartedEventID = event.GetEventId() @@ -493,6 +495,7 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnCompleted() { gweResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs} s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gweResponse, nil) + ctx := contextutil.WithMetadataContext(context.Background()) request := &historyservice.QueryWorkflowRequest{ NamespaceId: tests.NamespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ @@ -501,11 +504,20 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnCompleted() { QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, }, } - resp, err := s.historyEngine.QueryWorkflow(context.Background(), request) + resp, err := s.historyEngine.QueryWorkflow(ctx, request) s.NoError(err) s.Nil(resp.GetResponse().QueryResult) s.NotNil(resp.GetResponse().QueryRejected) s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, resp.GetResponse().GetQueryRejected().GetStatus()) + + // Verify context metadata was set + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) } func (s *engineSuite) TestQueryWorkflow_RejectBasedOnFailed() { @@ -515,9 +527,10 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnFailed() { } taskqueue := "testTaskQueue" identity := "testIdentity" + workflowType := "wType" ms := workflow.TestLocalMutableState(s.historyEngine.shardContext, s.eventsCache, tests.LocalNamespaceEntry, execution.GetWorkflowId(), execution.GetRunId(), log.NewTestLogger()) - addWorkflowExecutionStartedEvent(ms, &execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowExecutionStartedEvent(ms, &execution, workflowType, taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) event := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) wt.StartedEventID = event.GetEventId() @@ -527,6 +540,7 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnFailed() { gweResponse := &persistence.GetWorkflowExecutionResponse{State: wfMs} s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gweResponse, nil) + ctx := contextutil.WithMetadataContext(context.Background()) request := &historyservice.QueryWorkflowRequest{ NamespaceId: tests.NamespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ @@ -535,12 +549,23 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnFailed() { QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, }, } - resp, err := s.historyEngine.QueryWorkflow(context.Background(), request) + resp, err := s.historyEngine.QueryWorkflow(ctx, request) s.NoError(err) s.Nil(resp.GetResponse().QueryResult) s.NotNil(resp.GetResponse().QueryRejected) s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, resp.GetResponse().GetQueryRejected().GetStatus()) + // Verify context metadata was set + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) + + // Second query with different reject condition + ctx2 := contextutil.WithMetadataContext(context.Background()) request = &historyservice.QueryWorkflowRequest{ NamespaceId: tests.NamespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ @@ -549,11 +574,20 @@ func (s *engineSuite) TestQueryWorkflow_RejectBasedOnFailed() { QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY, }, } - resp, err = s.historyEngine.QueryWorkflow(context.Background(), request) + resp, err = s.historyEngine.QueryWorkflow(ctx2, request) s.NoError(err) s.Nil(resp.GetResponse().QueryResult) s.NotNil(resp.GetResponse().QueryRejected) s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_FAILED, resp.GetResponse().GetQueryRejected().GetStatus()) + + // Verify context metadata was set for second query + contextWorkflowType, ok = contextutil.ContextMetadataGet(ctx2, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok = contextutil.ContextMetadataGet(ctx2, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) } func (s *engineSuite) TestQueryWorkflow_DirectlyThroughMatching() { @@ -563,9 +597,10 @@ func (s *engineSuite) TestQueryWorkflow_DirectlyThroughMatching() { } taskqueue := "testTaskQueue" identity := "testIdentity" + workflowType := "wType" ms := workflow.TestLocalMutableState(s.historyEngine.shardContext, s.eventsCache, tests.LocalNamespaceEntry, execution.GetWorkflowId(), execution.GetRunId(), log.NewTestLogger()) - addWorkflowExecutionStartedEvent(ms, &execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowExecutionStartedEvent(ms, &execution, workflowType, taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) @@ -575,6 +610,8 @@ func (s *engineSuite) TestQueryWorkflow_DirectlyThroughMatching() { s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gweResponse, nil) s.mockMatchingClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return(&matchingservice.QueryWorkflowResponse{QueryResult: payloads.EncodeBytes([]byte{1, 2, 3})}, nil) s.historyEngine.matchingClient = s.mockMatchingClient + + ctx := contextutil.WithMetadataContext(context.Background()) request := &historyservice.QueryWorkflowRequest{ NamespaceId: tests.NamespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ @@ -584,7 +621,7 @@ func (s *engineSuite) TestQueryWorkflow_DirectlyThroughMatching() { QueryRejectCondition: enumspb.QUERY_REJECT_CONDITION_NOT_OPEN, }, } - resp, err := s.historyEngine.QueryWorkflow(context.Background(), request) + resp, err := s.historyEngine.QueryWorkflow(ctx, request) s.NoError(err) s.NotNil(resp.GetResponse().QueryResult) s.Nil(resp.GetResponse().QueryRejected) @@ -593,6 +630,15 @@ func (s *engineSuite) TestQueryWorkflow_DirectlyThroughMatching() { err = payloads.Decode(resp.GetResponse().GetQueryResult(), &queryResult) s.NoError(err) s.Equal([]byte{1, 2, 3}, queryResult) + + // Verify context metadata was set + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) } func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Timeout() { @@ -602,8 +648,9 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Timeout() { } taskqueue := "testTaskQueue" identity := "testIdentity" + workflowType := "wType" ms := workflow.TestLocalMutableState(s.historyEngine.shardContext, s.eventsCache, tests.LocalNamespaceEntry, execution.GetWorkflowId(), execution.GetRunId(), log.NewTestLogger()) - addWorkflowExecutionStartedEvent(ms, &execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowExecutionStartedEvent(ms, &execution, workflowType, taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) @@ -626,11 +673,22 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Timeout() { wg := &sync.WaitGroup{} wg.Add(1) go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + metadataCtx := contextutil.WithMetadataContext(context.Background()) + ctx, cancel := context.WithTimeout(metadataCtx, time.Second*2) defer cancel() resp, err := s.historyEngine.QueryWorkflow(ctx, request) s.Error(err) s.Nil(resp) + + // Verify context metadata was set even though query timed out + contextWorkflowType, ok := contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) + wg.Done() }() @@ -656,8 +714,9 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() { } taskqueue := "testTaskQueue" identity := "testIdentity" + workflowType := "wType" ms := workflow.TestLocalMutableState(s.historyEngine.shardContext, s.eventsCache, tests.LocalNamespaceEntry, execution.GetWorkflowId(), execution.GetRunId(), log.NewTestLogger()) - addWorkflowExecutionStartedEvent(ms, &execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowExecutionStartedEvent(ms, &execution, workflowType, taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) @@ -685,6 +744,7 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() { loadedMS.(*workflow.MutableStateImpl).QueryRegistry = qr release(nil) + metadataCtx := contextutil.WithMetadataContext(context.Background()) request := &historyservice.QueryWorkflowRequest{ NamespaceId: tests.NamespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ @@ -692,10 +752,19 @@ func (s *engineSuite) TestQueryWorkflow_ConsistentQueryBufferFull() { Query: &querypb.WorkflowQuery{}, }, } - resp, err := s.historyEngine.QueryWorkflow(context.Background(), request) + resp, err := s.historyEngine.QueryWorkflow(metadataCtx, request) s.Nil(resp) s.Equal(consts.ErrConsistentQueryBufferExceeded, err) + // Verify context metadata was set even though query failed + contextWorkflowType, ok := contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) + // verify that after last query error, the previous pending query is still in the buffer pendingBufferedQueries := qr.GetBufferedIDs() s.Equal(1, len(pendingBufferedQueries)) @@ -709,8 +778,9 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Complete() { } taskqueue := "testTaskQueue" identity := "testIdentity" + workflowType := "wType" ms := workflow.TestLocalMutableState(s.historyEngine.shardContext, s.eventsCache, tests.LocalNamespaceEntry, execution.GetWorkflowId(), execution.GetRunId(), log.NewTestLogger()) - addWorkflowExecutionStartedEvent(ms, &execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowExecutionStartedEvent(ms, &execution, workflowType, taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) @@ -747,6 +817,7 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Complete() { } } + ctx := contextutil.WithMetadataContext(context.Background()) request := &historyservice.QueryWorkflowRequest{ NamespaceId: tests.NamespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ @@ -756,7 +827,7 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Complete() { } go asyncQueryUpdate(time.Second*2, []byte{1, 2, 3}) start := time.Now().UTC() - resp, err := s.historyEngine.QueryWorkflow(context.Background(), request) + resp, err := s.historyEngine.QueryWorkflow(ctx, request) s.True(time.Now().UTC().After(start.Add(time.Second))) s.NoError(err) @@ -765,6 +836,15 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Complete() { s.NoError(err) s.Equal([]byte{1, 2, 3}, queryResult) + // Verify context metadata was set + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) + ms1 := s.getMutableState(tests.NamespaceID, &execution) s.NotNil(ms1) qr := ms1.GetQueryRegistry() @@ -780,8 +860,9 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Unblocked() { } taskqueue := "testTaskQueue" identity := "testIdentity" + workflowType := "wType" ms := workflow.TestLocalMutableState(s.historyEngine.shardContext, s.eventsCache, tests.LocalNamespaceEntry, execution.GetWorkflowId(), execution.GetRunId(), log.NewTestLogger()) - addWorkflowExecutionStartedEvent(ms, &execution, "wType", taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) + addWorkflowExecutionStartedEvent(ms, &execution, workflowType, taskqueue, payloads.EncodeString("input"), 100*time.Second, 50*time.Second, 200*time.Second, identity) wt := addWorkflowTaskScheduledEvent(ms) startedEvent := addWorkflowTaskStartedEvent(ms, wt.ScheduledEventID, taskqueue, identity) addWorkflowTaskCompletedEvent(&s.Suite, ms, wt.ScheduledEventID, startedEvent.EventId, identity) @@ -810,6 +891,7 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Unblocked() { } } + ctx := contextutil.WithMetadataContext(context.Background()) request := &historyservice.QueryWorkflowRequest{ NamespaceId: tests.NamespaceID.String(), Request: &workflowservice.QueryWorkflowRequest{ @@ -819,7 +901,7 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Unblocked() { } go asyncQueryUpdate(time.Second*2, []byte{1, 2, 3}) start := time.Now().UTC() - resp, err := s.historyEngine.QueryWorkflow(context.Background(), request) + resp, err := s.historyEngine.QueryWorkflow(ctx, request) s.True(time.Now().UTC().After(start.Add(time.Second))) s.NoError(err) @@ -828,6 +910,15 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Unblocked() { s.NoError(err) s.Equal([]byte{1, 2, 3}, queryResult) + // Verify context metadata was set + contextWorkflowType, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowType) + s.True(ok, "context workflow type must be set") + s.Equal(workflowType, contextWorkflowType) + + contextTaskQueue, ok := contextutil.ContextMetadataGet(ctx, contextutil.MetadataKeyWorkflowTaskQueue) + s.True(ok, "context task queue must be set") + s.Equal(taskqueue, contextTaskQueue) + ms1 := s.getMutableState(tests.NamespaceID, &execution) s.NotNil(ms1) qr := ms1.GetQueryRegistry() From 9fe77c47c79a4ad9c86b1e26845586253d183bfe Mon Sep 17 00:00:00 2001 From: Nikki Dag Date: Tue, 10 Feb 2026 19:32:41 -0600 Subject: [PATCH 4/5] move SetContextMetadata to before defer --- .../api/respondworkflowtaskcompleted/api.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 750119c145..8fa1a0e441 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -147,6 +147,14 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( weContext := workflowLease.GetContext() ms := workflowLease.GetMutableState() currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId()) + + if len(request.Commands) == 0 { + // Context metadata is automatically set during mutable state transaction close. For RespondWorkflowTaskCompleted + // with no commands (e.g., workflow task heartbeat or only readonly messages like `update.Rejection`), the transaction + // is never closed. We explicitly call SetContextMetadata here to ensure workflow metadata is populated in the context. + ms.SetContextMetadata(ctx) + } + defer func() { var errForRelease error if releaseLeaseWithError { @@ -180,12 +188,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( } } - if len(request.Commands) == 0 { - // Context metadata is automatically set during mutable state transaction close. For RespondWorkflowTaskCompleted - // with no commands (e.g., workflow task heartbeat or only readonly messages like `update.Rejection`), the transaction - // is never closed. We explicitly call SetContextMetadata here to ensure workflow metadata is populated in the context. - workflowLease.GetMutableState().SetContextMetadata(ctx) - } workflowLease.GetReleaseFn()(errForRelease) }() From 69c3140233241c2072e3bacff7580debe91ed884 Mon Sep 17 00:00:00 2001 From: Nikki Dag Date: Tue, 10 Feb 2026 19:36:26 -0600 Subject: [PATCH 5/5] fmt and lint --- .../api/respondworkflowtaskcompleted/api.go | 1 - service/history/history_engine_test.go | 21 ++++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 8fa1a0e441..dfb5a161fb 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -188,7 +188,6 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( } } - workflowLease.GetReleaseFn()(errForRelease) }() diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 6e30b86870..fb9fd13dff 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -672,6 +672,10 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Timeout() { wg := &sync.WaitGroup{} wg.Add(1) + var capturedWorkflowType interface{} + var capturedWorkflowTypeOk bool + var capturedTaskQueue interface{} + var capturedTaskQueueOk bool go func() { metadataCtx := contextutil.WithMetadataContext(context.Background()) ctx, cancel := context.WithTimeout(metadataCtx, time.Second*2) @@ -680,14 +684,9 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Timeout() { s.Error(err) s.Nil(resp) - // Verify context metadata was set even though query timed out - contextWorkflowType, ok := contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowType) - s.True(ok, "context workflow type must be set") - s.Equal(workflowType, contextWorkflowType) - - contextTaskQueue, ok := contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowTaskQueue) - s.True(ok, "context task queue must be set") - s.Equal(taskqueue, contextTaskQueue) + // Capture context metadata to verify after goroutine completes + capturedWorkflowType, capturedWorkflowTypeOk = contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowType) + capturedTaskQueue, capturedTaskQueueOk = contextutil.ContextMetadataGet(metadataCtx, contextutil.MetadataKeyWorkflowTaskQueue) wg.Done() }() @@ -701,6 +700,12 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Timeout() { s.False(qr.HasUnblockedQuery()) s.False(qr.HasFailedQuery()) wg.Wait() + + // Verify context metadata was set even though query timed out + s.True(capturedWorkflowTypeOk, "context workflow type must be set") + s.Equal(workflowType, capturedWorkflowType) + s.True(capturedTaskQueueOk, "context task queue must be set") + s.Equal(taskqueue, capturedTaskQueue) s.False(qr.HasBufferedQuery()) s.False(qr.HasCompletedQuery()) s.False(qr.HasUnblockedQuery())