25
25
import static org .junit .Assert .assertNull ;
26
26
import static org .junit .Assert .assertTrue ;
27
27
import static org .junit .Assert .fail ;
28
+ import static org .mockito .Mockito .mock ;
29
+ import static org .mockito .Mockito .when ;
28
30
29
31
import com .google .common .base .Strings ;
30
32
import com .google .common .util .concurrent .UncheckedExecutionException ;
@@ -411,6 +413,27 @@ public interface TestWorkflow2 {
411
413
List <String > getTrace ();
412
414
}
413
415
416
+ public interface TestWorkflow3 {
417
+
418
+ @ WorkflowMethod
419
+ String execute (String taskList );
420
+
421
+ @ SignalMethod (name = "testSignal" )
422
+ void signal1 (String arg );
423
+
424
+ @ QueryMethod (name = "getState" )
425
+ String getState ();
426
+ }
427
+
428
+ public interface TestWorkflowQuery {
429
+
430
+ @ WorkflowMethod ()
431
+ String execute (String taskList );
432
+
433
+ @ QueryMethod ()
434
+ String query ();
435
+ }
436
+
414
437
public static class TestSyncWorkflowImpl implements TestWorkflow1 {
415
438
416
439
@ Override
@@ -4334,13 +4357,13 @@ public void testGetVersion2() {
4334
4357
4335
4358
static CompletableFuture <Boolean > executionStarted = new CompletableFuture <>();
4336
4359
4337
- public static class TestGetVersionWithoutDecisionEventWorkflowImpl
4338
- implements TestWorkflowSignaled {
4360
+ public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflow3 {
4339
4361
4340
4362
CompletablePromise <Boolean > signalReceived = Workflow .newPromise ();
4363
+ String result = "" ;
4341
4364
4342
4365
@ Override
4343
- public String execute () {
4366
+ public String execute (String taskList ) {
4344
4367
try {
4345
4368
if (!getVersionExecuted .contains ("getVersionWithoutDecisionEvent" )) {
4346
4369
// Execute getVersion in non-replay mode.
@@ -4353,10 +4376,11 @@ public String execute() {
4353
4376
int version = Workflow .getVersion ("test_change" , Workflow .DEFAULT_VERSION , 1 );
4354
4377
if (version == Workflow .DEFAULT_VERSION ) {
4355
4378
signalReceived .get ();
4356
- return "result 1" ;
4379
+ result = "result 1" ;
4357
4380
} else {
4358
- return "result 2" ;
4381
+ result = "result 2" ;
4359
4382
}
4383
+ return result ;
4360
4384
}
4361
4385
} catch (Exception e ) {
4362
4386
throw new RuntimeException ("failed to get from signal" );
@@ -4369,6 +4393,11 @@ public String execute() {
4369
4393
public void signal1 (String arg ) {
4370
4394
signalReceived .complete (true );
4371
4395
}
4396
+
4397
+ @ Override
4398
+ public String getState () {
4399
+ return result ;
4400
+ }
4372
4401
}
4373
4402
4374
4403
@ Test
@@ -4377,25 +4406,26 @@ public void testGetVersionWithoutDecisionEvent() throws Exception {
4377
4406
executionStarted = new CompletableFuture <>();
4378
4407
getVersionExecuted .remove ("getVersionWithoutDecisionEvent" );
4379
4408
startWorkerFor (TestGetVersionWithoutDecisionEventWorkflowImpl .class );
4380
- TestWorkflowSignaled workflowStub =
4409
+ TestWorkflow3 workflowStub =
4381
4410
workflowClient .newWorkflowStub (
4382
- TestWorkflowSignaled .class , newWorkflowOptionsBuilder (taskList ).build ());
4383
- WorkflowClient .start (workflowStub ::execute );
4411
+ TestWorkflow3 .class , newWorkflowOptionsBuilder (taskList ).build ());
4412
+ WorkflowClient .start (workflowStub ::execute , taskList );
4384
4413
executionStarted .get ();
4385
4414
workflowStub .signal1 ("test signal" );
4386
- String result = workflowStub .execute ();
4415
+ String result = workflowStub .execute (taskList );
4387
4416
assertEquals ("result 1" , result );
4417
+ assertEquals ("result 1" , workflowStub .getState ());
4388
4418
}
4389
4419
4390
4420
// The following test covers the scenario where getVersion call is removed before a
4391
4421
// non-version-marker decision.
4392
- public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflow1 {
4422
+ public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflowQuery {
4423
+ String result = "" ;
4393
4424
4394
4425
@ Override
4395
4426
public String execute (String taskList ) {
4396
4427
TestActivities testActivities =
4397
4428
Workflow .newActivityStub (TestActivities .class , newActivityOptions1 (taskList ));
4398
- String result ;
4399
4429
// Test removing a version check in replay code.
4400
4430
if (!getVersionExecuted .contains (taskList )) {
4401
4431
int version = Workflow .getVersion ("test_change" , Workflow .DEFAULT_VERSION , 1 );
@@ -4412,25 +4442,33 @@ public String execute(String taskList) {
4412
4442
result += testActivities .activity ();
4413
4443
return result ;
4414
4444
}
4445
+
4446
+ @ Override
4447
+ public String query () {
4448
+ return result ;
4449
+ }
4415
4450
}
4416
4451
4417
4452
@ Test
4418
4453
public void testGetVersionRemovedInReplay () {
4419
4454
startWorkerFor (TestGetVersionRemovedInReplayWorkflowImpl .class );
4420
- TestWorkflow1 workflowStub =
4455
+ TestWorkflowQuery workflowStub =
4421
4456
workflowClient .newWorkflowStub (
4422
- TestWorkflow1 .class , newWorkflowOptionsBuilder (taskList ).build ());
4457
+ TestWorkflowQuery .class , newWorkflowOptionsBuilder (taskList ).build ());
4423
4458
String result = workflowStub .execute (taskList );
4424
4459
assertEquals ("activity22activity" , result );
4425
4460
tracer .setExpected (
4461
+ "registerQuery TestWorkflowQuery::query" ,
4426
4462
"getVersion" ,
4427
4463
"executeActivity TestActivities::activity2" ,
4428
4464
"executeActivity TestActivities::activity" );
4465
+ assertEquals ("activity22activity" , workflowStub .query ());
4429
4466
}
4430
4467
4431
4468
// The following test covers the scenario where getVersion call is removed before another
4432
4469
// version-marker decision.
4433
- public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflow1 {
4470
+ public static class TestGetVersionRemovedInReplay2WorkflowImpl implements TestWorkflowQuery {
4471
+ String result = "" ;
4434
4472
4435
4473
@ Override
4436
4474
public String execute (String taskList ) {
@@ -4445,19 +4483,30 @@ public String execute(String taskList) {
4445
4483
Workflow .getVersion ("test_change_2" , Workflow .DEFAULT_VERSION , 2 );
4446
4484
}
4447
4485
4448
- return testActivities .activity ();
4486
+ result = testActivities .activity ();
4487
+ return result ;
4488
+ }
4489
+
4490
+ @ Override
4491
+ public String query () {
4492
+ return result ;
4449
4493
}
4450
4494
}
4451
4495
4452
4496
@ Test
4453
4497
public void testGetVersionRemovedInReplay2 () {
4454
4498
startWorkerFor (TestGetVersionRemovedInReplay2WorkflowImpl .class );
4455
- TestWorkflow1 workflowStub =
4499
+ TestWorkflowQuery workflowStub =
4456
4500
workflowClient .newWorkflowStub (
4457
- TestWorkflow1 .class , newWorkflowOptionsBuilder (taskList ).build ());
4501
+ TestWorkflowQuery .class , newWorkflowOptionsBuilder (taskList ).build ());
4458
4502
String result = workflowStub .execute (taskList );
4459
4503
assertEquals ("activity" , result );
4460
- tracer .setExpected ("getVersion" , "getVersion" , "executeActivity TestActivities::activity" );
4504
+ tracer .setExpected (
4505
+ "registerQuery TestWorkflowQuery::query" ,
4506
+ "getVersion" ,
4507
+ "getVersion" ,
4508
+ "executeActivity TestActivities::activity" );
4509
+ assertEquals ("activity" , workflowStub .query ());
4461
4510
}
4462
4511
4463
4512
public static class TestVersionNotSupportedWorkflowImpl implements TestWorkflow1 {
@@ -5162,15 +5211,6 @@ public void testParallelLocalActivityExecutionWorkflow() {
5162
5211
result );
5163
5212
}
5164
5213
5165
- public interface TestWorkflowQuery {
5166
-
5167
- @ WorkflowMethod ()
5168
- String execute (String taskList );
5169
-
5170
- @ QueryMethod ()
5171
- String query ();
5172
- }
5173
-
5174
5214
public static final class TestLocalActivityAndQueryWorkflow implements TestWorkflowQuery {
5175
5215
5176
5216
String message = "initial value" ;
@@ -5846,4 +5886,73 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
5846
5886
next .upsertSearchAttributes (searchAttributes );
5847
5887
}
5848
5888
}
5889
+
5890
+ public static class TestGetVersionWorkflowRetryImpl implements TestWorkflow3 {
5891
+ private String result = "" ;
5892
+
5893
+ @ Override
5894
+ public String execute (String taskList ) {
5895
+ int version = Workflow .getVersion ("test_change" , Workflow .DEFAULT_VERSION , 1 );
5896
+ int act = 0 ;
5897
+ if (version == 1 ) {
5898
+ ActivityOptions options =
5899
+ new ActivityOptions .Builder ()
5900
+ .setTaskList (taskList )
5901
+ .setHeartbeatTimeout (Duration .ofSeconds (5 ))
5902
+ .setScheduleToCloseTimeout (Duration .ofSeconds (5 ))
5903
+ .setScheduleToStartTimeout (Duration .ofSeconds (5 ))
5904
+ .setStartToCloseTimeout (Duration .ofSeconds (10 ))
5905
+ .setRetryOptions (
5906
+ new RetryOptions .Builder ()
5907
+ .setMaximumAttempts (3 )
5908
+ .setInitialInterval (Duration .ofSeconds (1 ))
5909
+ .build ())
5910
+ .build ();
5911
+
5912
+ TestActivities testActivities = Workflow .newActivityStub (TestActivities .class , options );
5913
+ act = testActivities .activity1 (1 );
5914
+ }
5915
+
5916
+ result += "activity" + act ;
5917
+ return result ;
5918
+ }
5919
+
5920
+ @ Override
5921
+ public void signal1 (String arg ) {
5922
+ Workflow .sleep (1000 );
5923
+ }
5924
+
5925
+ @ Override
5926
+ public String getState () {
5927
+ return result ;
5928
+ }
5929
+ }
5930
+
5931
+ @ Test
5932
+ public void testGetVersionRetry () throws ExecutionException , InterruptedException {
5933
+ TestActivities activity = mock (TestActivities .class );
5934
+ when (activity .activity1 (1 )).thenReturn (1 );
5935
+ worker .registerActivitiesImplementations (activity );
5936
+
5937
+ startWorkerFor (TestGetVersionWorkflowRetryImpl .class );
5938
+ TestWorkflow3 workflowStub =
5939
+ workflowClient .newWorkflowStub (
5940
+ TestWorkflow3 .class , newWorkflowOptionsBuilder (taskList ).build ());
5941
+ CompletableFuture <String > result = WorkflowClient .execute (workflowStub ::execute , taskList );
5942
+ workflowStub .signal1 ("test" );
5943
+ assertEquals ("activity1" , result .get ());
5944
+
5945
+ // test replay
5946
+ assertEquals ("activity1" , workflowStub .getState ());
5947
+ }
5948
+
5949
+ @ Test
5950
+ public void testGetVersionWithRetryReplay () throws Exception {
5951
+ // Avoid executing 4 times
5952
+ Assume .assumeFalse ("skipping for docker tests" , useExternalService );
5953
+ Assume .assumeFalse ("skipping for sticky off" , disableStickyExecution );
5954
+
5955
+ WorkflowReplayer .replayWorkflowExecutionFromResource (
5956
+ "testGetVersionWithRetryHistory.json" , TestGetVersionWorkflowRetryImpl .class );
5957
+ }
5849
5958
}
0 commit comments