Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions common/clock/event_time_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ type (
// channel on which the current time is sent when a timer fires
c chan time.Time
}

// fakeTicker is a fake implementation of [Ticker].
fakeTicker struct {
timeSource *EventTimeSource
interval time.Duration
ch chan time.Time
timer *fakeTimer
stopped bool
}
)

var _ TimeSource = (*EventTimeSource)(nil)
Expand Down Expand Up @@ -95,6 +104,20 @@ func (ts *EventTimeSource) NewTimer(d time.Duration) (<-chan time.Time, Timer) {
return c, timer
}

// NewTicker creates a Ticker that sends the current time on a channel after each tick.
// It returns the channel and the Ticker.
func (ts *EventTimeSource) NewTicker(d time.Duration) (<-chan time.Time, Ticker) {
c := make(chan time.Time, 1)
ticker := &fakeTicker{
timeSource: ts,
interval: d,
ch: c,
}
// Schedule the first tick
ticker.scheduleNextTick()
return c, ticker
}

func (ts *EventTimeSource) addTimer(t *fakeTimer) {
ts.mu.Lock()
defer ts.mu.Unlock()
Expand Down Expand Up @@ -219,3 +242,29 @@ func (t *fakeTimer) Stop() bool {

return true
}

func (t *fakeTicker) scheduleNextTick() {
target := t.timeSource.Now().Add(t.interval)
t.timer = &fakeTimer{
timeSource: t.timeSource,
deadline: target,
callback: func() {
if !t.stopped {
select {
case t.ch <- target:
default:
}
// Schedule next tick
t.scheduleNextTick()
}
},
}
t.timeSource.addTimer(t.timer)
}

func (t *fakeTicker) Stop() {
t.stopped = true
if t.timer != nil {
t.timer.Stop()
}
}
11 changes: 11 additions & 0 deletions common/clock/time_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import (
)

type (
// Ticker represents a ticker that can be stopped.
Ticker interface {
Stop()
}
// TimeSource is an interface to make it easier to test code that uses time.
TimeSource interface {
Now() time.Time
Since(t time.Time) time.Duration
AfterFunc(d time.Duration, f func()) Timer
NewTimer(d time.Duration) (<-chan time.Time, Timer)
NewTicker(d time.Duration) (<-chan time.Time, Ticker)
}
// Timer is a timer returned by TimeSource.AfterFunc. Unlike the timers returned by [time.NewTimer] or time.Ticker,
// this timer does not have a channel. That is because the callback already reacts to the timer firing.
Expand Down Expand Up @@ -54,3 +59,9 @@ func (ts RealTimeSource) NewTimer(d time.Duration) (<-chan time.Time, Timer) {
t := time.NewTimer(d)
return t.C, t
}

// NewTicker is a pass-through to time.NewTicker.
func (ts RealTimeSource) NewTicker(d time.Duration) (<-chan time.Time, Ticker) {
t := time.NewTicker(d)
return t.C, t
}
2 changes: 2 additions & 0 deletions service/matching/backlog_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -79,6 +80,7 @@ func (s *BacklogManagerTestSuite) SetupTest() {
s.ptqMgr.EXPECT().QueueKey().Return(queue).AnyTimes()
s.ptqMgr.EXPECT().ProcessSpooledTask(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
s.ptqMgr.EXPECT().GetFairnessWeightOverrides().AnyTimes().Return(fairnessWeightOverrides{ /* To avoid deadlock with gomock method */ })
s.ptqMgr.EXPECT().TimeSource().Return(clock.NewRealTimeSource()).AnyTimes()

var ctx context.Context
ctx, s.cancelCtx = context.WithCancel(context.Background())
Expand Down
3 changes: 3 additions & 0 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/membership"
Expand Down Expand Up @@ -70,6 +71,7 @@ type (
RateLimiter TaskDispatchRateLimiter `optional:"true"`
WorkersRegistry workers.Registry
Serializer serialization.Serializer
TaskClock clock.TimeSource
}
)

Expand Down Expand Up @@ -112,6 +114,7 @@ func NewHandler(
params.SearchAttributeMapperProvider,
params.RateLimiter,
params.Serializer,
params.TaskClock,
),
namespaceRegistry: params.NamespaceRegistry,
workersRegistry: params.WorkersRegistry,
Expand Down
5 changes: 3 additions & 2 deletions service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"go.temporal.io/server/api/matchingservicemock/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payloads"
Expand Down Expand Up @@ -88,11 +89,11 @@
t.childConfig = tlCfg
t.fwdr, err = newForwarder(&t.childConfig.forwarderConfig, t.queue, t.client)
t.Assert().NoError(err)
t.childMatcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopMetricsHandler, t.newDefaultRateLimiter())
t.childMatcher = newTaskMatcher(tlCfg, t.fwdr, metrics.NoopMetricsHandler, t.newDefaultRateLimiter(), clock.NewRealTimeSource())

Check failure on line 92 in service/matching/matcher_test.go

View workflow job for this annotation

GitHub Actions / golangci

too many arguments in call to newTaskMatcher

Check failure on line 92 in service/matching/matcher_test.go

View workflow job for this annotation

GitHub Actions / Unit test

too many arguments in call to newTaskMatcher
t.childMatcher.Start()

t.rootConfig = newTaskQueueConfig(prtn.TaskQueue(), cfg, "test-namespace")
t.rootMatcher = newTaskMatcher(t.rootConfig, nil, metrics.NoopMetricsHandler, t.newDefaultRateLimiter())
t.rootMatcher = newTaskMatcher(t.rootConfig, nil, metrics.NoopMetricsHandler, t.newDefaultRateLimiter(), clock.NewRealTimeSource())

Check failure on line 96 in service/matching/matcher_test.go

View workflow job for this annotation

GitHub Actions / golangci

too many arguments in call to newTaskMatcher

Check failure on line 96 in service/matching/matcher_test.go

View workflow job for this annotation

GitHub Actions / Unit test

too many arguments in call to newTaskMatcher
t.rootMatcher.Start()
}

Expand Down
69 changes: 38 additions & 31 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,27 @@ type (

// Implements matching.Engine
matchingEngineImpl struct {
status int32
taskManager persistence.TaskManager
fairTaskManager persistence.FairTaskManager
historyClient resource.HistoryClient
matchingRawClient resource.MatchingRawClient
workerDeploymentClient workerdeployment.Client
tokenSerializer *tasktoken.Serializer
historySerializer serialization.Serializer
logger log.Logger
throttledLogger log.ThrottledLogger
namespaceRegistry namespace.Registry
hostInfoProvider membership.HostInfoProvider
serviceResolver membership.ServiceResolver
membershipChangedCh chan *membership.ChangedEvent
clusterMeta cluster.Metadata
timeSource clock.TimeSource
status int32
taskManager persistence.TaskManager
fairTaskManager persistence.FairTaskManager
historyClient resource.HistoryClient
matchingRawClient resource.MatchingRawClient
workerDeploymentClient workerdeployment.Client
tokenSerializer *tasktoken.Serializer
historySerializer serialization.Serializer
logger log.Logger
throttledLogger log.ThrottledLogger
namespaceRegistry namespace.Registry
hostInfoProvider membership.HostInfoProvider
serviceResolver membership.ServiceResolver
membershipChangedCh chan *membership.ChangedEvent
clusterMeta cluster.Metadata
// systemClock is always real wall-clock time. Used for infrastructure
// concerns: liveness timers, gradual config rollouts. Never faked in tests.
systemClock clock.TimeSource
// taskClock is used for business/semantic logic: sticky worker availability,
// poll timing, task tracking. May be faked in tests.
taskClock clock.TimeSource
visibilityManager manager.VisibilityManager
nexusEndpointClient *nexusEndpointClient
nexusEndpointsOwnershipLostCh chan struct{}
Expand Down Expand Up @@ -216,6 +221,7 @@ func NewEngine(
saMapperProvider searchattribute.MapperProvider,
rateLimiter TaskDispatchRateLimiter,
historySerializer serialization.Serializer,
timeSource clock.TimeSource,
) Engine {
scopedMetricsHandler := metricsHandler.WithTags(metrics.OperationTag(metrics.MatchingEngineScope))
e := &matchingEngineImpl{
Expand All @@ -234,7 +240,8 @@ func NewEngine(
serviceResolver: resolver,
membershipChangedCh: make(chan *membership.ChangedEvent, 1), // allow one signal to be buffered while we're working
clusterMeta: clusterMeta,
timeSource: clock.NewRealTimeSource(), // No need to mock this at the moment
systemClock: clock.NewRealTimeSource(),
taskClock: timeSource,
visibilityManager: visibilityManager,
nexusEndpointClient: newEndpointClient(config.NexusEndpointsRefreshInterval, nexusEndpointManager),
nexusEndpointsOwnershipLostCh: make(chan struct{}),
Expand Down Expand Up @@ -530,7 +537,7 @@ func (e *matchingEngineImpl) AddWorkflowTask(
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, !sticky, loadCauseTask)
if err != nil {
return "", false, err
} else if sticky && !stickyWorkerAvailable(pm) {
} else if sticky && !e.stickyWorkerAvailable(pm) {
return "", false, serviceerrors.NewStickyWorkerUnavailable()
}

Expand Down Expand Up @@ -1080,7 +1087,7 @@ func (e *matchingEngineImpl) QueryWorkflow(
pm, _, err := e.getTaskQueuePartitionManager(ctx, partition, !sticky, loadCauseQuery)
if err != nil {
return nil, err
} else if sticky && !stickyWorkerAvailable(pm) {
} else if sticky && !e.stickyWorkerAvailable(pm) {
return nil, serviceerrors.NewStickyWorkerUnavailable()
}

Expand Down Expand Up @@ -1655,7 +1662,7 @@ func (e *matchingEngineImpl) UpdateWorkerVersioningRules(
)
}

updatedClock := hlc.Next(clk, e.timeSource)
updatedClock := hlc.Next(clk, e.taskClock)
var versioningData *persistencespb.VersioningData
switch req.GetOperation().(type) {
case *workflowservice.UpdateWorkerVersioningRulesRequest_InsertAssignmentRule:
Expand Down Expand Up @@ -1843,7 +1850,7 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
tmp := hlc.Zero(e.clusterMeta.GetClusterID())
clk = tmp
}
updatedClock := hlc.Next(clk, e.timeSource)
updatedClock := hlc.Next(clk, e.taskClock)
var versioningData *persistencespb.VersioningData
switch req.GetOperation().(type) {
case *matchingservice.UpdateWorkerBuildIdCompatibilityRequest_ApplyPublicRequest_:
Expand Down Expand Up @@ -1893,7 +1900,7 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
if operationCreatedTombstones {
opts := UserDataUpdateOptions{Source: "UpdateWorkerBuildIdCompatibility/clear-tombstones"}
_, err = pm.GetUserDataManager().UpdateUserData(ctx, opts, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) {
updatedClock := hlc.Next(data.GetClock(), e.timeSource)
updatedClock := hlc.Next(data.GetClock(), e.taskClock)
// Avoid mutation
ret := common.CloneProto(data)
ret.Clock = updatedClock
Expand Down Expand Up @@ -1978,7 +1985,7 @@ func (e *matchingEngineImpl) SyncDeploymentUserData(
if clk == nil {
clk = hlc.Zero(e.clusterMeta.GetClusterID())
}
now := hlc.Next(clk, e.timeSource)
now := hlc.Next(clk, e.taskClock)
// clone the whole thing so we can just mutate
data = common.CloneProto(data)

Expand Down Expand Up @@ -2577,7 +2584,7 @@ func (e *matchingEngineImpl) CreateNexusEndpoint(ctx context.Context, request *m
res, err := e.nexusEndpointClient.CreateNexusEndpoint(ctx, &internalCreateNexusEndpointRequest{
spec: request.GetSpec(),
clusterID: e.clusterMeta.GetClusterID(),
timeSource: e.timeSource,
timeSource: e.taskClock,
})
if err != nil {
e.logger.Error("Failed to create Nexus endpoint", tag.Error(err), tag.Endpoint(request.GetSpec().GetName()))
Expand All @@ -2594,7 +2601,7 @@ func (e *matchingEngineImpl) UpdateNexusEndpoint(ctx context.Context, request *m
version: request.GetVersion(),
spec: request.GetSpec(),
clusterID: e.clusterMeta.GetClusterID(),
timeSource: e.timeSource,
timeSource: e.taskClock,
})
if err != nil {
e.logger.Error("Failed to update Nexus endpoint", tag.Error(err), tag.Endpoint(request.GetSpec().GetName()))
Expand Down Expand Up @@ -2699,7 +2706,7 @@ func (e *matchingEngineImpl) getUserDataBatcher(namespaceId namespace.ID) *strea
fn := func(batch []*userDataUpdate) error {
return e.applyUserDataUpdateBatch(namespaceId, batch)
}
newBatcher := stream_batcher.NewBatcher[*userDataUpdate, error](fn, userDataBatcherOptions, e.timeSource)
newBatcher := stream_batcher.NewBatcher[*userDataUpdate, error](fn, userDataBatcherOptions, e.taskClock)
batcher, _ := e.userDataUpdateBatchers.GetOrSet(namespaceId, newBatcher)
return batcher
}
Expand Down Expand Up @@ -2763,7 +2770,7 @@ func (e *matchingEngineImpl) pollTask(
return nil, false, err
}

pollMetadata.localPollStartTime = e.timeSource.Now()
pollMetadata.localPollStartTime = e.taskClock.Now()

// We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
Expand Down Expand Up @@ -3238,7 +3245,7 @@ func newRecordTaskStartedContext(
func (e *matchingEngineImpl) reviveBuildId(ns *namespace.Namespace, taskQueue string, buildId *persistencespb.BuildId) *persistencespb.BuildId {
// Bump the stamp and ensure it's newer than the deletion stamp.
prevStamp := common.CloneProto(buildId.StateUpdateTimestamp)
stamp := hlc.Next(prevStamp, e.timeSource)
stamp := hlc.Next(prevStamp, e.taskClock)
stamp.ClusterId = e.clusterMeta.GetClusterID()
e.logger.Info("Revived build ID while applying replication event",
tag.WorkflowNamespace(ns.Name().String()),
Expand All @@ -3254,8 +3261,8 @@ func (e *matchingEngineImpl) reviveBuildId(ns *namespace.Namespace, taskQueue st

// We use a very short timeout for considering a sticky worker available, since tasks can also
// be processed on the normal queue.
func stickyWorkerAvailable(pm taskQueuePartitionManager) bool {
return pm != nil && pm.HasPollerAfter("", time.Now().Add(-stickyPollerUnavailableWindow))
func (e *matchingEngineImpl) stickyWorkerAvailable(pm taskQueuePartitionManager) bool {
return pm != nil && pm.HasPollerAfter("", e.taskClock.Now().Add(-stickyPollerUnavailableWindow))
}

func buildRateLimitConfig(update *workflowservice.UpdateTaskQueueConfigRequest_RateLimitUpdate, updateTime *timestamppb.Timestamp, updateIdentity string) *taskqueuepb.RateLimitConfig {
Expand Down Expand Up @@ -3357,7 +3364,7 @@ func (e *matchingEngineImpl) UpdateTaskQueueConfig(
if existingClock == nil {
existingClock = hlc.Zero(e.clusterMeta.GetClusterID())
}
now := hlc.Next(existingClock, e.timeSource)
now := hlc.Next(existingClock, e.taskClock)
protoTs := hlc.ProtoTimestamp(now)

// Update relevant config fields
Expand Down
3 changes: 2 additions & 1 deletion service/matching/matching_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ func newMatchingEngine(
serviceResolver: mockServiceResolver,
membershipChangedCh: make(chan *membership.ChangedEvent, 1),
clusterMeta: clustertest.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(false, true)),
timeSource: clock.NewRealTimeSource(),
systemClock: clock.NewRealTimeSource(),
taskClock: clock.NewRealTimeSource(),
visibilityManager: mockVisibilityManager,
nexusEndpointClient: newEndpointClient(config.NexusEndpointsRefreshInterval, nexusEndpointManager),
nexusEndpointsOwnershipLostCh: make(chan struct{}),
Expand Down
14 changes: 9 additions & 5 deletions service/matching/physical_task_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func newPhysicalTaskQueueManager(
}
pqMgr.deploymentRegistrationCh <- struct{}{} // seed

pqMgr.pollerHistory = newPollerHistory(partitionMgr.config.PollerHistoryTTL())
pqMgr.pollerHistory = newPollerHistory(partitionMgr.config.PollerHistoryTTL(), e.taskClock)

pqMgr.liveness = newLiveness(
clock.NewRealTimeSource(),
e.systemClock,
config.MaxTaskQueueIdleTime,
func() { pqMgr.UnloadFromPartitionManager(unloadCauseIdle) },
)
Expand Down Expand Up @@ -827,6 +827,10 @@ func (c *physicalTaskQueueManagerImpl) QueueKey() *PhysicalTaskQueueKey {
return c.queue
}

func (c *physicalTaskQueueManagerImpl) TimeSource() clock.TimeSource {
return c.partitionMgr.engine.taskClock
}

func (c *physicalTaskQueueManagerImpl) UnloadFromPartitionManager(unloadCause unloadCause) {
c.partitionMgr.unloadPhysicalQueue(c, unloadCause)
}
Expand All @@ -852,7 +856,7 @@ func (c *physicalTaskQueueManagerImpl) makePollerScalingDecisionImpl(
pollStartTime time.Time,
statsFn func() *taskqueuepb.TaskQueueStats,
) *taskqueuepb.PollerScalingDecision {
pollWaitTime := c.partitionMgr.engine.timeSource.Since(pollStartTime)
pollWaitTime := c.partitionMgr.engine.taskClock.Since(pollStartTime)
// If a poller has waited around a while, we can always suggest a decrease.
if pollWaitTime >= c.partitionMgr.config.PollerScalingWaitTime() {
// Decrease if any poll matched after sitting idle for some configured period
Expand Down Expand Up @@ -933,8 +937,8 @@ func (c *physicalTaskQueueManagerImpl) getOrCreateTaskTracker(
}

// Initalize all task trackers together; or the timeframes won't line up.
c.tasksAdded[priorityKey] = newTaskTracker(c.partitionMgr.engine.timeSource)
c.tasksDispatched[priorityKey] = newTaskTracker(c.partitionMgr.engine.timeSource)
c.tasksAdded[priorityKey] = newTaskTracker(c.partitionMgr.engine.taskClock)
c.tasksDispatched[priorityKey] = newTaskTracker(c.partitionMgr.engine.taskClock)

return intervals[priorityKey]
}
Expand Down
Loading
Loading