diff --git a/chasm/lib/tests/library.go b/chasm/lib/tests/library.go index a173d095a33..a4076b3576d 100644 --- a/chasm/lib/tests/library.go +++ b/chasm/lib/tests/library.go @@ -16,7 +16,13 @@ func (l *library) Name() string { func (l *library) Components() []*chasm.RegistrableComponent { return []*chasm.RegistrableComponent{ - chasm.NewRegistrableComponent[*PayloadStore]("payloadStore"), + chasm.NewRegistrableComponent[*PayloadStore]("payloadStore", + chasm.WithSearchAttributes( + PayloadTotalCountSearchAttribute, + PayloadTotalSizeSearchAttribute, + chasm.SearchAttributeTemporalScheduledByID, + ), + ), } } diff --git a/chasm/lib/tests/payload.go b/chasm/lib/tests/payload.go index 81ba2eff10d..11f7ae7ad4c 100644 --- a/chasm/lib/tests/payload.go +++ b/chasm/lib/tests/payload.go @@ -6,7 +6,6 @@ import ( "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/tests/gen/testspb/v1" "go.temporal.io/server/common" - "go.temporal.io/server/common/searchattribute" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -15,15 +14,16 @@ const ( TotalSizeMemoFieldName = "TotalSize" ) -// TODO: Register proper SA for TotalCount and TotalSize -// For now, CHASM framework does NOT support Per-Component SearchAttributes -// so just update a random existing pre-defined SA to make sure the logic works. const ( - TestKeywordSAFieldName = searchattribute.TemporalScheduledById - TestKeywordSAFieldValue = "test-keyword-value" + TestScheduleID = "TestScheduleID" + PayloadTotalCountSAAlias = "PayloadTotalCount" + PayloadTotalSizeSAAlias = "PayloadTotalSize" ) var ( + PayloadTotalCountSearchAttribute = chasm.NewSearchAttributeInt(PayloadTotalCountSAAlias, chasm.SearchAttributeFieldInt01) + PayloadTotalSizeSearchAttribute = chasm.NewSearchAttributeInt(PayloadTotalSizeSAAlias, chasm.SearchAttributeFieldInt02) + _ chasm.VisibilitySearchAttributesProvider = (*PayloadStore)(nil) _ chasm.VisibilityMemoProvider = (*PayloadStore)(nil) ) @@ -147,12 +147,12 @@ func (s *PayloadStore) LifecycleState( // SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface func (s *PayloadStore) SearchAttributes( - _ chasm.Context, -) map[string]chasm.VisibilityValue { - // TODO: UpsertSearchAttribute as well when CHASM framework supports Per-Component SearchAttributes - // For now, we just update a random existing pre-defined SA to make sure the logic works. - return map[string]chasm.VisibilityValue{ - TestKeywordSAFieldName: chasm.VisibilityValueString(TestKeywordSAFieldValue), + ctx chasm.Context, +) []chasm.SearchAttributeKeyValue { + return []chasm.SearchAttributeKeyValue{ + PayloadTotalCountSearchAttribute.Value(s.State.TotalCount), + PayloadTotalSizeSearchAttribute.Value(s.State.TotalSize), + chasm.SearchAttributeTemporalScheduledByID.Value(TestScheduleID), } } diff --git a/chasm/registrable_component.go b/chasm/registrable_component.go index 49ea0e1f630..2a5c1cd412d 100644 --- a/chasm/registrable_component.go +++ b/chasm/registrable_component.go @@ -1,7 +1,10 @@ package chasm import ( + "fmt" "reflect" + + enumspb "go.temporal.io/api/enums/v1" ) type ( @@ -13,6 +16,8 @@ type ( ephemeral bool singleCluster bool shardingFn func(EntityKey) string + + searchAttributesMapper *VisibilitySearchAttributesMapper } RegistrableComponentOption func(*RegistrableComponent) @@ -56,6 +61,40 @@ func WithShardingFn( } } +func WithSearchAttributes( + searchAttributes ...SearchAttribute, +) RegistrableComponentOption { + return func(rc *RegistrableComponent) { + if len(searchAttributes) == 0 { + return + } + rc.searchAttributesMapper = &VisibilitySearchAttributesMapper{ + aliasToField: make(map[string]string, len(searchAttributes)), + fieldToAlias: make(map[string]string, len(searchAttributes)), + saTypeMap: make(map[string]enumspb.IndexedValueType, len(searchAttributes)), + } + + for _, sa := range searchAttributes { + alias := sa.definition().alias + field := sa.definition().field + valueType := sa.definition().valueType + + if _, ok := rc.searchAttributesMapper.aliasToField[alias]; ok { + //nolint:forbidigo + panic(fmt.Sprintf("registrable component validation error: search attribute alias %q is already defined", alias)) + } + if _, ok := rc.searchAttributesMapper.fieldToAlias[field]; ok { + //nolint:forbidigo + panic(fmt.Sprintf("registrable component validation error: search attribute field %q is already defined", field)) + } + + rc.searchAttributesMapper.aliasToField[alias] = field + rc.searchAttributesMapper.fieldToAlias[field] = alias + rc.searchAttributesMapper.saTypeMap[field] = valueType + } + } +} + // fqType returns the fully qualified name of the component, which is a combination of // the library name and the component type. This is used to uniquely identify // the component in the registry. diff --git a/chasm/search_attribute.go b/chasm/search_attribute.go new file mode 100644 index 00000000000..9324ea275bb --- /dev/null +++ b/chasm/search_attribute.go @@ -0,0 +1,384 @@ +package chasm + +import ( + "fmt" + "time" + + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/server/common/searchattribute" +) + +// CHASM Search Attribute User Guide: +// +// This contains CHASM search attribute field constants. These predefined fields correspond to the exact column name in Visibility storage. +// For each root component, search attributes can be mapped from a user defined alias to these fields. +// +// To define a CHASM search attribute, create this as a package/global scoped variable. Below is an example: +// var testComponentCompletedSearchAttribute = NewSearchAttributeBool("Completed", SearchAttributeFieldBool01) +// var testComponentFailedSearchAttribute = NewSearchAttributeBool("Failed", SearchAttributeFieldBool02) +// var testComponentStartTimeSearchAttribute = NewSearchAttributeTime("StartTime", SearchAttributeFieldDateTime01) +// +// Each CHASM search attribute field is associated with a specific indexed value type. The Value() method of a search attribute +// specifies the supported value type to set at compile time. eg. DateTime values must be set with a time.Time typed value. +// +// Each root component can ONLY use a predefined search attribute field ONCE. Developers should NOT reassign aliases to different fields. +// Reassiging aliases to different fields will result in incorrect visibility query results. +// +// To register these search attributes with the CHASM Registry, use the WithSearchAttributes() option when creating the component in the library. +// eg. +// NewRegistrableComponent[T]("testcomponent", WithSearchAttributes(testComponentCompletedSearchAttribute, testComponentStartTimeSearchAttribute)) +var ( + SearchAttributeFieldBool01 = newSearchAttributeFieldBool(1) + SearchAttributeFieldBool02 = newSearchAttributeFieldBool(2) + + SearchAttributeFieldDateTime01 = newSearchAttributeFieldDateTime(1) + SearchAttributeFieldDateTime02 = newSearchAttributeFieldDateTime(2) + + SearchAttributeFieldInt01 = newSearchAttributeFieldInt(1) + SearchAttributeFieldInt02 = newSearchAttributeFieldInt(2) + + SearchAttributeFieldDouble01 = newSearchAttributeFieldDouble(1) + SearchAttributeFieldDouble02 = newSearchAttributeFieldDouble(2) + + SearchAttributeFieldKeyword01 = newSearchAttributeFieldKeyword(1) + SearchAttributeFieldKeyword02 = newSearchAttributeFieldKeyword(2) + SearchAttributeFieldKeyword03 = newSearchAttributeFieldKeyword(3) + SearchAttributeFieldKeyword04 = newSearchAttributeFieldKeyword(4) + + SearchAttributeFieldKeywordList01 = newSearchAttributeFieldKeywordList(1) + SearchAttributeFieldKeywordList02 = newSearchAttributeFieldKeywordList(2) + + SearchAttributeTemporalChangeVersion = newSearchAttributeKeywordListByField(searchattribute.TemporalChangeVersion) + SearchAttributeBinaryChecksums = newSearchAttributeKeywordListByField(searchattribute.BinaryChecksums) + SearchAttributeBuildIds = newSearchAttributeKeywordListByField(searchattribute.BuildIds) + SearchAttributeBatcherNamespace = newSearchAttributeKeywordByField(searchattribute.BatcherNamespace) + SearchAttributeBatcherUser = newSearchAttributeKeywordByField(searchattribute.BatcherUser) + SearchAttributeTemporalScheduledStartTime = newSearchAttributeDateTimeByField(searchattribute.TemporalScheduledStartTime) + SearchAttributeTemporalScheduledByID = newSearchAttributeKeywordByField(searchattribute.TemporalScheduledById) + SearchAttributeTemporalSchedulePaused = newSearchAttributeBoolByField(searchattribute.TemporalSchedulePaused) + SearchAttributeTemporalNamespaceDivision = newSearchAttributeKeywordByField(searchattribute.TemporalNamespaceDivision) + SearchAttributeTemporalPauseInfo = newSearchAttributeKeywordListByField(searchattribute.TemporalPauseInfo) + SearchAttributeTemporalReportedProblems = newSearchAttributeKeywordListByField(searchattribute.TemporalReportedProblems) + SearchAttributeTemporalWorkerDeploymentVersion = newSearchAttributeKeywordByField(searchattribute.TemporalWorkerDeploymentVersion) + SearchAttributeTemporalWorkflowVersioningBehavior = newSearchAttributeKeywordByField(searchattribute.TemporalWorkflowVersioningBehavior) + SearchAttributeTemporalWorkerDeployment = newSearchAttributeKeywordByField(searchattribute.TemporalWorkerDeployment) +) + +var ( + _ SearchAttribute = (*searchAttributeDefinition)(nil) + _ SearchAttribute = (*SearchAttributeBool)(nil) + _ SearchAttribute = (*SearchAttributeDateTime)(nil) + _ SearchAttribute = (*SearchAttributeInt)(nil) + _ SearchAttribute = (*SearchAttributeDouble)(nil) + _ SearchAttribute = (*SearchAttributeKeyword)(nil) + _ SearchAttribute = (*SearchAttributeKeywordList)(nil) +) + +type ( + // SearchAttribute is a shared interface for all search attribute types. Each type must embed searchAttributeDefinition. + SearchAttribute interface { + definition() searchAttributeDefinition + } + + searchAttributeDefinition struct { + alias string + field string + valueType enumspb.IndexedValueType + } + + // SearchAttributeKeyValue is a key value pair of a search attribute. + // Represents the current value of a search attribute in a CHASM Component during a transaction. + SearchAttributeKeyValue struct { + // Alias refers to the user defined name of the search attribute + Alias string + // Field refers to a fully formed schema field, which is a Predefined CHASM search attribute + Field string + // Value refers to the current value of the search attribute. Must support encoding to a Payload. + Value VisibilityValue + } +) + +// SearchAttributeFieldBool is a search attribute field for a boolean value. +type SearchAttributeFieldBool struct { + field string +} + +func newSearchAttributeFieldBool(index int) SearchAttributeFieldBool { + return SearchAttributeFieldBool{ + field: resolveFieldName(enumspb.INDEXED_VALUE_TYPE_BOOL, index), + } +} + +// SearchAttributeFieldDateTime is a search attribute field for a datetime value. +type SearchAttributeFieldDateTime struct { + field string +} + +func newSearchAttributeFieldDateTime(index int) SearchAttributeFieldDateTime { + return SearchAttributeFieldDateTime{ + field: resolveFieldName(enumspb.INDEXED_VALUE_TYPE_DATETIME, index), + } +} + +// SearchAttributeFieldInt is a search attribute field for an integer value. +type SearchAttributeFieldInt struct { + field string +} + +func newSearchAttributeFieldInt(index int) SearchAttributeFieldInt { + return SearchAttributeFieldInt{ + field: resolveFieldName(enumspb.INDEXED_VALUE_TYPE_INT, index), + } +} + +// SearchAttributeFieldDouble is a search attribute field for a double value. +type SearchAttributeFieldDouble struct { + field string +} + +func newSearchAttributeFieldDouble(index int) SearchAttributeFieldDouble { + return SearchAttributeFieldDouble{ + field: resolveFieldName(enumspb.INDEXED_VALUE_TYPE_DOUBLE, index), + } +} + +// SearchAttributeFieldKeyword is a search attribute field for a keyword value. +type SearchAttributeFieldKeyword struct { + field string +} + +func newSearchAttributeFieldKeyword(index int) SearchAttributeFieldKeyword { + return SearchAttributeFieldKeyword{ + field: resolveFieldName(enumspb.INDEXED_VALUE_TYPE_KEYWORD, index), + } +} + +// SearchAttributeFieldKeywordList is a search attribute field for a keyword list value. +type SearchAttributeFieldKeywordList struct { + field string +} + +func newSearchAttributeFieldKeywordList(index int) SearchAttributeFieldKeywordList { + return SearchAttributeFieldKeywordList{ + field: resolveFieldName(enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, index), + } +} + +func resolveFieldName(valueType enumspb.IndexedValueType, index int) string { + // Columns are named like TemporalBool01, TemporalDatetime01, TemporalDouble01, TemporalInt01. + return fmt.Sprintf("%s%s%02d", searchattribute.ReservedPrefix, valueType.String(), index) +} + +func (s searchAttributeDefinition) definition() searchAttributeDefinition { + return s +} + +// SearchAttributeBool is a search attribute for a boolean value. +type SearchAttributeBool struct { + searchAttributeDefinition +} + +// NewSearchAttributeBool creates a new boolean search attribute given a predefined chasm field +func NewSearchAttributeBool(alias string, boolField SearchAttributeFieldBool) SearchAttributeBool { + return SearchAttributeBool{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: alias, + field: boolField.field, + valueType: enumspb.INDEXED_VALUE_TYPE_BOOL, + }, + } +} + +func newSearchAttributeBoolByField(field string) SearchAttributeBool { + return SearchAttributeBool{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: field, + field: field, + valueType: enumspb.INDEXED_VALUE_TYPE_BOOL, + }, + } +} + +// Value sets the boolean value of the search attribute. +func (s SearchAttributeBool) Value(value bool) SearchAttributeKeyValue { + return SearchAttributeKeyValue{ + Alias: s.alias, + Field: s.field, + Value: VisibilityValueBool(value), + } +} + +// SearchAttributeDateTime is a search attribute for a datetime value. +type SearchAttributeDateTime struct { + searchAttributeDefinition +} + +// NewSearchAttributeDateTime creates a new date time search attribute given a predefined chasm field +func NewSearchAttributeDateTime(alias string, datetimeField SearchAttributeFieldDateTime) SearchAttributeDateTime { + return SearchAttributeDateTime{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: alias, + field: datetimeField.field, + valueType: enumspb.INDEXED_VALUE_TYPE_DATETIME, + }, + } +} + +func newSearchAttributeDateTimeByField(field string) SearchAttributeDateTime { + return SearchAttributeDateTime{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: field, + field: field, + valueType: enumspb.INDEXED_VALUE_TYPE_DATETIME, + }, + } +} + +// Value sets the date time value of the search attribute. +func (s SearchAttributeDateTime) Value(value time.Time) SearchAttributeKeyValue { + return SearchAttributeKeyValue{ + Alias: s.alias, + Field: s.field, + Value: VisibilityValueTime(value), + } +} + +// SearchAttributeInt is a search attribute for an integer value. +type SearchAttributeInt struct { + searchAttributeDefinition +} + +// NewSearchAttributeInt creates a new integer search attribute given a predefined chasm field +func NewSearchAttributeInt(alias string, intField SearchAttributeFieldInt) SearchAttributeInt { + return SearchAttributeInt{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: alias, + field: intField.field, + valueType: enumspb.INDEXED_VALUE_TYPE_INT, + }, + } +} + +func newSearchAttributeIntByField(field string) SearchAttributeInt { + return SearchAttributeInt{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: field, + field: field, + valueType: enumspb.INDEXED_VALUE_TYPE_INT, + }, + } +} + +// Value sets the integer value of the search attribute. +func (s SearchAttributeInt) Value(value int64) SearchAttributeKeyValue { + return SearchAttributeKeyValue{ + Alias: s.alias, + Field: s.field, + Value: VisibilityValueInt64(value), + } +} + +// SearchAttributeDouble is a search attribute for a double value. +type SearchAttributeDouble struct { + searchAttributeDefinition +} + +// NewSearchAttributeDouble creates a new double search attribute given a predefined chasm field +func NewSearchAttributeDouble(alias string, doubleField SearchAttributeFieldDouble) SearchAttributeDouble { + return SearchAttributeDouble{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: alias, + field: doubleField.field, + valueType: enumspb.INDEXED_VALUE_TYPE_DOUBLE, + }, + } +} + +func newSearchAttributeDoubleByField(field string) SearchAttributeDouble { + return SearchAttributeDouble{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: field, + field: field, + valueType: enumspb.INDEXED_VALUE_TYPE_DOUBLE, + }, + } +} + +// Value sets the double value of the search attribute. +func (s SearchAttributeDouble) Value(value float64) SearchAttributeKeyValue { + return SearchAttributeKeyValue{ + Alias: s.alias, + Field: s.field, + Value: VisibilityValueFloat64(value), + } +} + +// SearchAttributeKeyword is a search attribute for a keyword value. +type SearchAttributeKeyword struct { + searchAttributeDefinition +} + +// NewSearchAttributeKeyword creates a new keyword search attribute given a predefined chasm field +func NewSearchAttributeKeyword(alias string, keywordField SearchAttributeFieldKeyword) SearchAttributeKeyword { + return SearchAttributeKeyword{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: alias, + field: keywordField.field, + valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }, + } +} + +func newSearchAttributeKeywordByField(field string) SearchAttributeKeyword { + return SearchAttributeKeyword{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: field, + field: field, + valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }, + } +} + +// Value sets the string value of the search attribute. +func (s SearchAttributeKeyword) Value(value string) SearchAttributeKeyValue { + return SearchAttributeKeyValue{ + Alias: s.alias, + Field: s.field, + Value: VisibilityValueString(value), + } +} + +// SearchAttributeKeywordList is a search attribute for a keyword list value. +type SearchAttributeKeywordList struct { + searchAttributeDefinition +} + +// NewSearchAttributeKeywordList creates a new keyword list search attribute given a predefined chasm field +func NewSearchAttributeKeywordList(alias string, keywordListField SearchAttributeFieldKeywordList) SearchAttributeKeywordList { + return SearchAttributeKeywordList{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: alias, + field: keywordListField.field, + valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, + }, + } +} + +func newSearchAttributeKeywordListByField(field string) SearchAttributeKeywordList { + return SearchAttributeKeywordList{ + searchAttributeDefinition: searchAttributeDefinition{ + alias: field, + field: field, + valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, + }, + } +} + +// Value sets the string list value of the search attribute. +func (s SearchAttributeKeywordList) Value(value []string) SearchAttributeKeyValue { + return SearchAttributeKeyValue{ + Alias: s.alias, + Field: s.field, + Value: VisibilityValueStringSlice(value), + } +} diff --git a/chasm/test_component_test.go b/chasm/test_component_test.go index 68d2d02d957..a97448c370d 100644 --- a/chasm/test_component_test.go +++ b/chasm/test_component_test.go @@ -63,8 +63,17 @@ type ( ) const ( - testComponentStartTimeSAKey = "StartTimeSAKey" - testComponentStartTimeMemoKey = "StartTimeMemoKey" + TestComponentStartTimeSAKey = "StartTimeSAKey" + TestComponentRunIDSAKey = "RunIdSAKey" + TestComponentStartTimeMemoKey = "StartTimeMemoKey" +) + +var ( + TestComponentStartTimeSearchAttribute = NewSearchAttributeDateTime(TestComponentStartTimeSAKey, SearchAttributeFieldDateTime01) + TestComponentRunIDPredefinedSA = newSearchAttributeKeywordByField(TestComponentRunIDSAKey) + + _ VisibilitySearchAttributesProvider = (*TestComponent)(nil) + _ VisibilityMemoProvider = (*TestComponent)(nil) ) func (tc *TestComponent) LifecycleState(_ Context) LifecycleState { @@ -95,16 +104,18 @@ func (tc *TestComponent) Fail(_ MutableContext) { } // SearchAttributes implements VisibilitySearchAttributesProvider interface. -func (tc *TestComponent) SearchAttributes(_ Context) map[string]VisibilityValue { - return map[string]VisibilityValue{ - testComponentStartTimeSAKey: VisibilityValueTime(tc.ComponentData.GetStartTime().AsTime()), +func (tc *TestComponent) SearchAttributes(_ Context) []SearchAttributeKeyValue { + return []SearchAttributeKeyValue{ + TestComponentStartTimeSearchAttribute.Value(tc.ComponentData.GetStartTime().AsTime()), + TestComponentRunIDPredefinedSA.Value(tc.ComponentData.GetRunId()), + SearchAttributeTemporalScheduledByID.Value(tc.ComponentData.GetRunId()), } } // Memo implements VisibilityMemoProvider interface. func (tc *TestComponent) Memo(_ Context) map[string]VisibilityValue { return map[string]VisibilityValue{ - testComponentStartTimeMemoKey: VisibilityValueTime(tc.ComponentData.GetStartTime().AsTime()), + TestComponentStartTimeMemoKey: VisibilityValueTime(tc.ComponentData.GetStartTime().AsTime()), } } diff --git a/chasm/test_library_test.go b/chasm/test_library_test.go index cf9178735c8..509ee5ae0fc 100644 --- a/chasm/test_library_test.go +++ b/chasm/test_library_test.go @@ -39,7 +39,8 @@ func (l *TestLibrary) Name() string { func (l *TestLibrary) Components() []*RegistrableComponent { return []*RegistrableComponent{ - NewRegistrableComponent[*TestComponent]("test_component"), + NewRegistrableComponent[*TestComponent]("test_component", + WithSearchAttributes(TestComponentStartTimeSearchAttribute)), NewRegistrableComponent[*TestSubComponent1]("test_sub_component_1"), NewRegistrableComponent[*TestSubComponent11]("test_sub_component_11"), NewRegistrableComponent[*TestSubComponent2]("test_sub_component_2"), diff --git a/chasm/tree.go b/chasm/tree.go index 158665ac2f1..3f09b53aab0 100644 --- a/chasm/tree.go +++ b/chasm/tree.go @@ -307,7 +307,8 @@ func newTreeInitSearchAttributesAndMemo( // and currentMemo will just never be used. if saProvider, ok := rootComponent.(VisibilitySearchAttributesProvider); ok { - root.currentSA = saProvider.SearchAttributes(immutableContext) + saSlice := saProvider.SearchAttributes(immutableContext) + root.currentSA = searchAttributeKeyValuesToMap(saSlice) } if memoProvider, ok := rootComponent.(VisibilityMemoProvider); ok { root.currentMemo = memoProvider.Memo(immutableContext) @@ -316,6 +317,14 @@ func newTreeInitSearchAttributesAndMemo( return nil } +func searchAttributeKeyValuesToMap(saSlice []SearchAttributeKeyValue) map[string]VisibilityValue { + result := make(map[string]VisibilityValue, len(saSlice)) + for _, sa := range saSlice { + result[sa.Field] = sa.Value + } + return result +} + func (n *Node) SetRootComponent( rootComponent Component, ) { @@ -1452,7 +1461,8 @@ func (n *Node) closeTransactionForceUpdateVisibility( saProvider, ok := rootComponent.(VisibilitySearchAttributesProvider) if ok { - newSA := saProvider.SearchAttributes(immutableContext) + saSlice := saProvider.SearchAttributes(immutableContext) + newSA := searchAttributeKeyValuesToMap(saSlice) if !maps.EqualFunc(n.currentSA, newSA, isVisibilityValueEqual) { needUpdate = true } @@ -2029,7 +2039,8 @@ func (n *Node) ApplyMutation( } saProvider, ok := rootComponent.(VisibilitySearchAttributesProvider) if ok { - n.currentSA = saProvider.SearchAttributes(immutableContext) + saSlice := saProvider.SearchAttributes(immutableContext) + n.currentSA = searchAttributeKeyValuesToMap(saSlice) } memoProvider, ok := rootComponent.(VisibilityMemoProvider) if ok { diff --git a/chasm/tree_test.go b/chasm/tree_test.go index 7d1de490882..ee9db013de0 100644 --- a/chasm/tree_test.go +++ b/chasm/tree_test.go @@ -825,7 +825,7 @@ func (s *nodeSuite) TestApplyMutation() { } root, err := s.newTestTree(persistenceNodes) s.NoError(err) - s.Len(root.currentSA, 1) + s.Len(root.currentSA, 3) s.Len(root.currentMemo, 1) // Manually deserialize some tasks to populate the taskValueCache @@ -898,10 +898,11 @@ func (s *nodeSuite) TestApplyMutation() { // Validate root node got updated. s.Equal(updatedRoot, root.serializedNode) s.NotNil(root.value) - s.Len(root.currentSA, 1) + s.Len(root.currentSA, 3) s.Len(root.currentMemo, 1) - s.True(root.currentSA[testComponentStartTimeSAKey].(VisibilityValueTime).Equal(VisibilityValueTime(now))) - s.True(root.currentMemo[testComponentStartTimeMemoKey].(VisibilityValueTime).Equal(VisibilityValueTime(now))) + s.Contains(root.currentSA, "TemporalDatetime01") + s.True(root.currentSA["TemporalDatetime01"].(VisibilityValueTime).Equal(VisibilityValueTime(now))) + s.True(root.currentMemo[TestComponentStartTimeMemoKey].(VisibilityValueTime).Equal(VisibilityValueTime(now))) // Validate the "child" node got updated. nodeSC1, ok := root.children["SubComponent1"] @@ -1032,10 +1033,11 @@ func (s *nodeSuite) TestApplySnapshot() { s.Equal(expectedMutation, root.mutation) // Validate visibility search attributes and memo are updated as well. - s.Len(root.currentSA, 1) + s.Len(root.currentSA, 3) s.Len(root.currentMemo, 1) - s.True(root.currentSA[testComponentStartTimeSAKey].(VisibilityValueTime).Equal(VisibilityValueTime(now.AsTime()))) - s.True(root.currentMemo[testComponentStartTimeMemoKey].(VisibilityValueTime).Equal(VisibilityValueTime(now.AsTime()))) + s.Contains(root.currentSA, "TemporalDatetime01") + s.True(root.currentSA["TemporalDatetime01"].(VisibilityValueTime).Equal(VisibilityValueTime(now.AsTime()))) + s.True(root.currentMemo[TestComponentStartTimeMemoKey].(VisibilityValueTime).Equal(VisibilityValueTime(now.AsTime()))) } func (s *nodeSuite) TestApplyMutation_OutOfOrder() { diff --git a/chasm/visibility.go b/chasm/visibility.go index e787a5f73c0..945e93f9587 100644 --- a/chasm/visibility.go +++ b/chasm/visibility.go @@ -2,8 +2,11 @@ package chasm import ( "context" + "fmt" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/payload" ) @@ -17,10 +20,8 @@ const ( // allows the CHASM framework to automatically determine, at the end of // a transaction, if a visibility task needs to be generated to update the // visibility record with the returned search attributes. -// -// TODO: Improve this interface after support registering CHASM search attributes. type VisibilitySearchAttributesProvider interface { - SearchAttributes(Context) map[string]VisibilityValue + SearchAttributes(Context) []SearchAttributeKeyValue } // VisibilityMemoProvider if implemented by the root Component, @@ -31,6 +32,45 @@ type VisibilityMemoProvider interface { Memo(Context) map[string]VisibilityValue } +// VisibilitySearchAttributesMapper is a mapper for CHASM search attributes. +type VisibilitySearchAttributesMapper struct { + aliasToField map[string]string + fieldToAlias map[string]string + saTypeMap map[string]enumspb.IndexedValueType +} + +// Alias returns the alias for a given field. +func (v *VisibilitySearchAttributesMapper) Alias(field string) (string, error) { + if v == nil { + return "", serviceerror.NewInvalidArgument("visibility search attributes mapper not defined") + } + alias, ok := v.fieldToAlias[field] + if !ok { + return "", serviceerror.NewInvalidArgument(fmt.Sprintf("visibility search attributes mapper has no registered field %q", field)) + } + return alias, nil +} + +// Field returns the field for a given alias. +func (v *VisibilitySearchAttributesMapper) Field(alias string) (string, error) { + if v == nil { + return "", serviceerror.NewInvalidArgument("visibility search attributes mapper not defined") + } + field, ok := v.aliasToField[alias] + if !ok { + return "", serviceerror.NewInvalidArgument(fmt.Sprintf("visibility search attributes mapper has no registered alias %q", alias)) + } + return field, nil +} + +// SATypeMap returns the type map for the CHASM search attributes. +func (v *VisibilitySearchAttributesMapper) SATypeMap() map[string]enumspb.IndexedValueType { + if v == nil { + return nil + } + return v.saTypeMap +} + type Visibility struct { UnimplementedComponent diff --git a/chasm/visibility_value.go b/chasm/visibility_value.go index 98d4ff4813a..e50cfc8404a 100644 --- a/chasm/visibility_value.go +++ b/chasm/visibility_value.go @@ -5,7 +5,9 @@ import ( "time" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/searchattribute" ) type VisibilityValue interface { @@ -17,6 +19,7 @@ type VisibilityValueInt int func (v VisibilityValueInt) MustEncode() *commonpb.Payload { p, _ := payload.Encode(int(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_INT) return p } @@ -32,6 +35,7 @@ type VisibilityValueInt32 int32 func (v VisibilityValueInt32) MustEncode() *commonpb.Payload { p, _ := payload.Encode(int32(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_INT) return p } @@ -47,6 +51,7 @@ type VisibilityValueInt64 int64 func (v VisibilityValueInt64) MustEncode() *commonpb.Payload { p, _ := payload.Encode(int64(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_INT) return p } @@ -61,7 +66,9 @@ func (v VisibilityValueInt64) Equal(other VisibilityValue) bool { type VisibilityValueString string func (v VisibilityValueString) MustEncode() *commonpb.Payload { - return payload.EncodeString(string(v)) + p := payload.EncodeString(string(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_KEYWORD) + return p } func (v VisibilityValueString) Equal(other VisibilityValue) bool { @@ -76,6 +83,7 @@ type VisibilityValueBool bool func (v VisibilityValueBool) MustEncode() *commonpb.Payload { p, _ := payload.Encode(bool(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_BOOL) return p } @@ -91,6 +99,7 @@ type VisibilityValueFloat64 float64 func (v VisibilityValueFloat64) MustEncode() *commonpb.Payload { p, _ := payload.Encode(float64(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_DOUBLE) return p } @@ -106,6 +115,7 @@ type VisibilityValueTime time.Time func (v VisibilityValueTime) MustEncode() *commonpb.Payload { p, _ := payload.Encode(time.Time(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_DATETIME) return p } @@ -135,6 +145,7 @@ type VisibilityValueStringSlice []string func (v VisibilityValueStringSlice) MustEncode() *commonpb.Payload { p, _ := payload.Encode([]string(v)) + searchattribute.SetMetadataType(p, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) return p } diff --git a/common/persistence/visibility/store/sql/visibility_store.go b/common/persistence/visibility/store/sql/visibility_store.go index cf50e3726ee..c5872a0cb6d 100644 --- a/common/persistence/visibility/store/sql/visibility_store.go +++ b/common/persistence/visibility/store/sql/visibility_store.go @@ -469,6 +469,11 @@ func (s *VisibilityStore) processRowSearchAttributes( // In SQLite, keyword list can return a string when there's only one element. // This changes it into a slice. for name, value := range rowSearchAttributes { + // TODO: CHASM search attributes are not in the typeMap and SQL only stores raw values (no metadata). + // The Encode() call below will fail to add type metadata, causing decode issues. + if searchattribute.IsChasmSearchAttribute(name) { + continue + } tp, err := saTypeMap.GetType(name) if err != nil { return nil, err diff --git a/common/searchattribute/defs.go b/common/searchattribute/defs.go index a0675b6cb3a..d11f45f2ed0 100644 --- a/common/searchattribute/defs.go +++ b/common/searchattribute/defs.go @@ -279,6 +279,18 @@ func IsPreallocatedCSAFieldName(name string, valueType enumspb.IndexedValueType) return re != nil && re.MatchString(name) } +var chasmSearchAttributePattern = regexp.MustCompile(`^Temporal(Bool|Datetime|Int|Double|Text|Keyword|KeywordList)(0[1-9]|[1-9][0-9])$`) + +// IsChasmSearchAttribute checks if a field name matches the pattern for CHASM search attributes. +// CHASM search attributes follow the pattern: Temporal where NN is 01-99 +// Examples: TemporalInt01, TemporalDatetime02, TemporalDouble01, etc. +func IsChasmSearchAttribute(name string) bool { + if !strings.HasPrefix(name, ReservedPrefix) { + return false + } + return chasmSearchAttributePattern.MatchString(name) +} + // QueryWithAnyNamespaceDivision returns a modified workflow visibility query that disables // special handling of namespace division and so matches workflows in all namespace divisions. // Normally a query that didn't explicitly mention TemporalNamespaceDivision would be limited diff --git a/common/searchattribute/encode.go b/common/searchattribute/encode.go index 5341a5ea15f..373eca5de6f 100644 --- a/common/searchattribute/encode.go +++ b/common/searchattribute/encode.go @@ -28,11 +28,14 @@ func Encode(searchAttributes map[string]interface{}, typeMap *NameTypeMap) (*com saType := enumspb.INDEXED_VALUE_TYPE_UNSPECIFIED if typeMap != nil { saType, err = typeMap.getType(saName, customCategory|predefinedCategory) - if err != nil { + if err != nil && !IsChasmSearchAttribute(saName) { lastErr = err continue } - setMetadataType(valPayload, saType) + // TODO: CHASM search attributes read from visibility stores (e.g., during queries) + // will not have type metadata set, which may cause issues on the decode path. + // This is acceptable for now as CHASM query support is not yet implemented. + SetMetadataType(valPayload, saType) } } return &commonpb.SearchAttributes{IndexedFields: indexedFields}, lastErr @@ -59,7 +62,11 @@ func Decode( var err error saType, err = typeMap.getType(saName, customCategory|predefinedCategory) if err != nil { - lastErr = err + // If the search attribute name is not in typeMap but the payload has type metadata, + // we can still decode it (e.g., for CHASM search attributes). + if _, hasTypeMetadata := saPayload.Metadata[MetadataType]; !hasTypeMetadata { + lastErr = err + } } } diff --git a/common/searchattribute/encode_test.go b/common/searchattribute/encode_test.go index 3b4b74371cf..ca4b1c1fbb6 100644 --- a/common/searchattribute/encode_test.go +++ b/common/searchattribute/encode_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" enumspb "go.temporal.io/api/enums/v1" ) @@ -195,8 +196,7 @@ func Test_Decode_Error(t *testing.T) { }}, true, ) - assert.Error(err) - assert.ErrorIs(err, ErrInvalidName) + require.NoError(t, err) assert.Len(sa.IndexedFields, 3) assert.Equal("val1", vals["key1"]) assert.Equal(int64(2), vals["key2"]) diff --git a/common/searchattribute/encode_value.go b/common/searchattribute/encode_value.go index 939391351b4..8b7b9c4be64 100644 --- a/common/searchattribute/encode_value.go +++ b/common/searchattribute/encode_value.go @@ -20,7 +20,7 @@ func EncodeValue(val interface{}, t enumspb.IndexedValueType) (*commonpb.Payload return nil, err } - setMetadataType(valPayload, t) + SetMetadataType(valPayload, t) return valPayload, nil } diff --git a/common/searchattribute/search_attirbute.go b/common/searchattribute/search_attirbute.go index 6e18ed7bc27..3103cb17f68 100644 --- a/common/searchattribute/search_attirbute.go +++ b/common/searchattribute/search_attirbute.go @@ -43,11 +43,11 @@ func ApplyTypeMap(searchAttributes *commonpb.SearchAttributes, typeMap NameTypeM if err != nil { continue } - setMetadataType(saPayload, valueType) + SetMetadataType(saPayload, valueType) } } -func setMetadataType(p *commonpb.Payload, t enumspb.IndexedValueType) { +func SetMetadataType(p *commonpb.Payload, t enumspb.IndexedValueType) { if t == enumspb.INDEXED_VALUE_TYPE_UNSPECIFIED { return } diff --git a/common/searchattribute/stringify.go b/common/searchattribute/stringify.go index a41f6f96abb..8c212a7da91 100644 --- a/common/searchattribute/stringify.go +++ b/common/searchattribute/stringify.go @@ -125,7 +125,7 @@ func parseValueOrArray(valStr string, t enumspb.IndexedValueType) (*commonpb.Pay return nil, err } - setMetadataType(valPayload, t) + SetMetadataType(valPayload, t) return valPayload, nil } diff --git a/service/history/chasm_engine_test.go b/service/history/chasm_engine_test.go index bbfde594637..5d872f96662 100644 --- a/service/history/chasm_engine_test.go +++ b/service/history/chasm_engine_test.go @@ -608,6 +608,8 @@ const ( ) var ( + testComponentPausedSearchAttribute = chasm.NewSearchAttributeBool(testComponentPausedSAName, chasm.SearchAttributeFieldBool01) + _ chasm.VisibilitySearchAttributesProvider = (*testComponent)(nil) _ chasm.VisibilityMemoProvider = (*testComponent)(nil) ) @@ -622,9 +624,9 @@ func (l *testComponent) LifecycleState(_ chasm.Context) chasm.LifecycleState { return chasm.LifecycleStateRunning } -func (l *testComponent) SearchAttributes(_ chasm.Context) map[string]chasm.VisibilityValue { - return map[string]chasm.VisibilityValue{ - testComponentPausedSAName: chasm.VisibilityValueBool(l.ActivityInfo.Paused), +func (l *testComponent) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue { + return []chasm.SearchAttributeKeyValue{ + testComponentPausedSearchAttribute.Value(l.ActivityInfo.Paused), } } @@ -652,6 +654,7 @@ func (l *testChasmLibrary) Name() string { func (l *testChasmLibrary) Components() []*chasm.RegistrableComponent { return []*chasm.RegistrableComponent{ - chasm.NewRegistrableComponent[*testComponent]("test_component"), + chasm.NewRegistrableComponent[*testComponent]("test_component", + chasm.WithSearchAttributes(testComponentPausedSearchAttribute)), } } diff --git a/service/history/visibility_queue_task_executor.go b/service/history/visibility_queue_task_executor.go index e1ff4e82769..3d7913cefe8 100644 --- a/service/history/visibility_queue_task_executor.go +++ b/service/history/visibility_queue_task_executor.go @@ -409,8 +409,8 @@ func (t *visibilityQueueTaskExecutor) processChasmTask( return err } if saProvider, ok := rootComponent.(chasm.VisibilitySearchAttributesProvider); ok { - for key, value := range saProvider.SearchAttributes(visTaskContext) { - searchattributes[key] = value.MustEncode() + for _, sa := range saProvider.SearchAttributes(visTaskContext) { + searchattributes[sa.Field] = sa.Value.MustEncode() } } if memoProvider, ok := rootComponent.(chasm.VisibilityMemoProvider); ok { diff --git a/service/history/visibility_queue_task_executor_test.go b/service/history/visibility_queue_task_executor_test.go index cf24ea09421..c839ec4ed67 100644 --- a/service/history/visibility_queue_task_executor_test.go +++ b/service/history/visibility_queue_task_executor_test.go @@ -617,7 +617,8 @@ func (s *visibilityQueueTaskExecutorSuite) TestProcessChasmTask_RunningExecution s.Equal("TestLibrary.test_component", actualArchetype) var paused bool - err = payload.Decode(request.SearchAttributes.IndexedFields[testComponentPausedSAName], &paused) + // SearchAttribute now uses field name (TemporalBool01) instead of alias (PausedSA) + err = payload.Decode(request.SearchAttributes.IndexedFields["TemporalBool01"], &paused) s.NoError(err) s.True(paused) diff --git a/tests/chasm_test.go b/tests/chasm_test.go index 7739d0a0794..1bc7e9017fa 100644 --- a/tests/chasm_test.go +++ b/tests/chasm_test.go @@ -119,9 +119,15 @@ func (s *ChasmTestSuite) TestPayloadStoreVisibility() { s.True(ok) s.NoError(payload.Decode(p, &intVal)) s.Equal(0, intVal) - var strVal string - s.NoError(payload.Decode(visRecord.SearchAttributes.IndexedFields[tests.TestKeywordSAFieldName], &strVal)) - s.Equal(tests.TestKeywordSAFieldValue, strVal) + var totalCount int + s.NoError(payload.Decode(visRecord.SearchAttributes.IndexedFields["TemporalInt01"], &totalCount)) + s.Equal(0, totalCount) + var totalSize int + s.NoError(payload.Decode(visRecord.SearchAttributes.IndexedFields["TemporalInt02"], &totalSize)) + var scheduledByID string + s.NoError(payload.Decode(visRecord.SearchAttributes.IndexedFields["TemporalScheduledById"], &scheduledByID)) + s.Equal(tests.TestScheduleID, scheduledByID) + s.Equal(0, totalSize) addPayloadResp, err := tests.AddPayloadHandler( engineContext,