@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit
31
31
import javax.inject.Inject
32
32
import kotlinx.coroutines.delay
33
33
import kotlinx.coroutines.launch
34
+ import kotlinx.coroutines.test.runTest
34
35
import misk.hibernate.Transacter
35
36
import misk.hibernate.load
36
37
import misk.scope.ActionScope
@@ -84,7 +85,7 @@ class BackfillRunnerTest {
84
85
assertThat(partition.run_state).isEqualTo(BackfillState .RUNNING )
85
86
}
86
87
87
- runBlockingTestCancellable {
88
+ runTest {
88
89
runner.start(this )
89
90
}
90
91
@@ -111,7 +112,7 @@ class BackfillRunnerTest {
111
112
assertThat(partition.pkey_cursor).isNull()
112
113
assertThat(partition.state).isEqualTo(BackfillState .RUNNING )
113
114
114
- runBlockingTestCancellable {
115
+ runTest {
115
116
runner.start(this )
116
117
}
117
118
@@ -147,7 +148,7 @@ class BackfillRunnerTest {
147
148
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
148
149
val runner = startBackfill(numThreads = 1 )
149
150
150
- runBlockingTestCancellable {
151
+ runTest {
151
152
runner.start(this )
152
153
153
154
// We should only get numthreads=1 calls in parallel, then it must wait for more room.
@@ -170,7 +171,7 @@ class BackfillRunnerTest {
170
171
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
171
172
val runner = startBackfill(numThreads = 1 )
172
173
173
- runBlockingTestCancellable {
174
+ runTest {
174
175
runner.start(this )
175
176
val firstRequest = fakeBackfilaClientServiceClient.runBatchRequests.receive()
176
177
assertThat(firstRequest.parameters).containsEntry(" cheese" , " cheddar" .encodeUtf8())
@@ -188,7 +189,7 @@ class BackfillRunnerTest {
188
189
)
189
190
}
190
191
191
- runBlockingTestCancellable {
192
+ runTest {
192
193
runner.start(this )
193
194
194
195
val firstRequest = fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -201,7 +202,7 @@ class BackfillRunnerTest {
201
202
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
202
203
val runner = startBackfill(numThreads = 3 )
203
204
204
- runBlockingTestCancellable {
205
+ runTest {
205
206
runner.start(this )
206
207
207
208
// We should only get numthreads=3 calls in parallel, then it must wait for more room.
@@ -256,7 +257,7 @@ class BackfillRunnerTest {
256
257
partition.precomputing_done = true
257
258
}
258
259
259
- runBlockingTestCancellable {
260
+ runTest {
260
261
runner.start(this )
261
262
262
263
assertThat(fakeBackfilaClientServiceClient.getNextBatchRangeRequests.receive()).isNotNull()
@@ -296,7 +297,7 @@ class BackfillRunnerTest {
296
297
partition.precomputing_done = true
297
298
}
298
299
299
- runBlockingTestCancellable {
300
+ runTest {
300
301
launch { runner.start(this ) }
301
302
302
303
// Process 4 getNextBatchRangeRequests
@@ -376,7 +377,7 @@ class BackfillRunnerTest {
376
377
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
377
378
val runner = startBackfill(numThreads = 1 )
378
379
379
- runBlockingTestCancellable {
380
+ runTest {
380
381
launch { runner.start(this ) }
381
382
// Leave awaiting run batch response
382
383
fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -407,7 +408,7 @@ class BackfillRunnerTest {
407
408
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
408
409
val runner = startBackfill(numThreads = 2 )
409
410
410
- runBlockingTestCancellable {
411
+ runTest {
411
412
launch { runner.start(this ) }
412
413
413
414
val firstRequest = fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -451,7 +452,7 @@ class BackfillRunnerTest {
451
452
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
452
453
val runner = startBackfill(numThreads = 1 )
453
454
454
- runBlockingTestCancellable {
455
+ runTest {
455
456
launch { runner.start(this ) }
456
457
457
458
val firstRequest = fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -492,7 +493,7 @@ class BackfillRunnerTest {
492
493
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
493
494
val runner = startBackfill(numThreads = 2 )
494
495
495
- runBlockingTestCancellable {
496
+ runTest {
496
497
launch { runner.start(this ) }
497
498
498
499
val firstRequest = fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -550,7 +551,7 @@ class BackfillRunnerTest {
550
551
partition.precomputing_done = true
551
552
}
552
553
553
- runBlockingTestCancellable {
554
+ runTest {
554
555
launch { runner.start(this ) }
555
556
556
557
assertThat(fakeBackfilaClientServiceClient.getNextBatchRangeRequests.receive()).isNotNull()
@@ -576,7 +577,7 @@ class BackfillRunnerTest {
576
577
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
577
578
val runner = startBackfill(numThreads = 1 )
578
579
579
- runBlockingTestCancellable {
580
+ runTest {
580
581
launch { runner.start(this ) }
581
582
582
583
fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -591,7 +592,7 @@ class BackfillRunnerTest {
591
592
// Nothing sent yet - the backoff is 1000ms
592
593
assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull()).isNull()
593
594
delay(500 )
594
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
595
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
595
596
fakeBackfilaClientServiceClient.runBatchResponses.send(
596
597
Result .success(RunBatchResponse .Builder ().build()),
597
598
)
@@ -609,7 +610,7 @@ class BackfillRunnerTest {
609
610
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
610
611
val runner = startBackfill(numThreads = 1 , extraSleepMs = 1000L )
611
612
612
- runBlockingTestCancellable {
613
+ runTest {
613
614
launch { runner.start(this ) }
614
615
615
616
fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -621,7 +622,7 @@ class BackfillRunnerTest {
621
622
// Nothing sent yet - the backoff is 1000ms
622
623
assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull()).isNull()
623
624
delay(500 )
624
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
625
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
625
626
fakeBackfilaClientServiceClient.runBatchResponses.send(
626
627
Result .success(RunBatchResponse .Builder ().build()),
627
628
)
@@ -639,7 +640,7 @@ class BackfillRunnerTest {
639
640
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
640
641
val runner = startBackfill(numThreads = 2 , extraSleepMs = 1000L )
641
642
642
- runBlockingTestCancellable {
643
+ runTest {
643
644
launch { runner.start(this ) }
644
645
645
646
fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -655,7 +656,7 @@ class BackfillRunnerTest {
655
656
// Nothing sent yet - the backoff is 1000ms
656
657
assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull()).isNull()
657
658
delay(500 )
658
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
659
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
659
660
fakeBackfilaClientServiceClient.runBatchResponses.send(
660
661
Result .success(RunBatchResponse .Builder ().build()),
661
662
)
@@ -678,7 +679,7 @@ class BackfillRunnerTest {
678
679
partition.precomputing_done = true
679
680
}
680
681
681
- runBlockingTestCancellable {
682
+ runTest {
682
683
launch { runner.start(this ) }
683
684
684
685
assertThat(fakeBackfilaClientServiceClient.getNextBatchRangeRequests.receive()).isNotNull()
@@ -697,9 +698,9 @@ class BackfillRunnerTest {
697
698
// The first batch has no matching records, so no delay is added.
698
699
// The second batch, with nonzero matching count, gets run immediately.
699
700
val runBatchRequest =
700
- fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()
701
+ fakeBackfilaClientServiceClient.runBatchRequests.receive ()
701
702
assertThat(runBatchRequest).isNotNull()
702
- assertThat(runBatchRequest!! .batch_range).isEqualTo(
703
+ assertThat(runBatchRequest.batch_range).isEqualTo(
703
704
KeyRange (" 100" .encodeUtf8(), " 199" .encodeUtf8()),
704
705
)
705
706
runner.stop()
@@ -710,13 +711,13 @@ class BackfillRunnerTest {
710
711
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
711
712
val runner = startBackfill(numThreads = 3 )
712
713
713
- runBlockingTestCancellable {
714
+ runTest {
714
715
launch { runner.start(this ) }
715
716
716
717
// We should only get numthreads=3 calls in parallel, then it must wait for more room.
717
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
718
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
719
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
718
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
719
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
720
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
720
721
assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull()).isNull()
721
722
722
723
scope.fakeCaller(user = " molly" ) {
@@ -735,9 +736,9 @@ class BackfillRunnerTest {
735
736
fakeBackfilaClientServiceClient.runBatchResponses.send(
736
737
Result .success(RunBatchResponse .Builder ().build()),
737
738
)
738
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
739
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
740
- assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull ()).isNotNull()
739
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
740
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
741
+ assertThat(fakeBackfilaClientServiceClient.runBatchRequests.receive ()).isNotNull()
741
742
assertThat(fakeBackfilaClientServiceClient.runBatchRequests.tryReceive().getOrNull()).isNull()
742
743
runner.stop()
743
744
}
@@ -753,7 +754,7 @@ class BackfillRunnerTest {
753
754
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
754
755
val runner = startBackfill(numThreads = 1 )
755
756
756
- runBlockingTestCancellable {
757
+ runTest {
757
758
launch { runner.start(this ) }
758
759
759
760
val initialRequest = fakeBackfilaClientServiceClient.runBatchRequests.receive()
@@ -834,7 +835,7 @@ class BackfillRunnerTest {
834
835
fakeBackfilaClientServiceClient.dontBlockGetNextBatch()
835
836
val runner = startBackfill(numThreads = 3 )
836
837
837
- runBlockingTestCancellable {
838
+ runTest {
838
839
launch { runner.start(this ) }
839
840
840
841
// We should only get numthreads=3 calls in parallel, then it must wait for more room.
0 commit comments