18
18
package io.bazel.kotlin.builder.tasks
19
19
20
20
import com.google.common.truth.Truth.assertThat
21
+ import com.google.common.truth.Truth.assertWithMessage
21
22
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest
22
23
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse
23
24
import kotlinx.coroutines.CoroutineName
@@ -27,10 +28,10 @@ import kotlinx.coroutines.channels.Channel
27
28
import kotlinx.coroutines.channels.sendBlocking
28
29
import kotlinx.coroutines.coroutineScope
29
30
import kotlinx.coroutines.debug.junit4.CoroutinesTimeout
30
- import kotlinx.coroutines.delay
31
31
import kotlinx.coroutines.launch
32
32
import kotlinx.coroutines.runBlocking
33
33
import kotlinx.coroutines.test.runBlockingTest
34
+ import kotlinx.coroutines.yield
34
35
import org.junit.Rule
35
36
import org.junit.Test
36
37
import java.io.ByteArrayOutputStream
@@ -68,8 +69,6 @@ class BazelWorkerTest {
68
69
69
70
@Test
70
71
fun persistentWorker () {
71
- val workerInput = WorkerChannel (" in" )
72
- val workerOutput = WorkerChannel (" out" )
73
72
runBlockingTest {
74
73
val program = object : CommandLineProgram {
75
74
override fun apply (workingDir : Path , args : List <String >): Int {
@@ -83,23 +82,27 @@ class BazelWorkerTest {
83
82
}
84
83
}
85
84
85
+ val workerInput = WorkerChannel (" in" )
86
+ val workerOutput = WorkerChannel (" out" )
86
87
val execution = ByteArrayOutputStream ()
87
88
88
- val done = GlobalScope .async(CoroutineName (" worker" )) {
89
+ val worker = GlobalScope .async(CoroutineName (" worker" )) {
89
90
WorkerIO (workerInput.input,
90
91
PrintStream (workerOutput.output),
91
- execution) {}.use { io ->
92
- PersistentWorker (io, program).run (listOf ())
93
- }
92
+ execution) {}
93
+ .use { io ->
94
+ PersistentWorker (io, program).run (listOf ())
95
+ }
94
96
}
95
97
96
- // asserts scope to ensure all asserts are run.
98
+ // asserts scope to ensure all cases are run.
97
99
// messy, can be cleaned up -- since kotlin channels love to block and can easily starve a
98
100
// dispatcher, it's necessary to read each channel in a different coroutine.
99
101
// The coroutineScope prevents the assertions from happening outside of the expected
100
102
// asynchronicity.
101
103
coroutineScope {
102
104
launch {
105
+ assertWithMessage(" worker is active" ).that(worker.isActive).isTrue()
103
106
workerInput.send(request(1 , " ok" ))
104
107
}
105
108
launch {
@@ -110,6 +113,7 @@ class BazelWorkerTest {
110
113
111
114
coroutineScope {
112
115
launch {
116
+ assertWithMessage(" worker is active" ).that(worker.isActive).isTrue()
113
117
workerInput.send(request(2 , " fail" ))
114
118
}
115
119
launch {
@@ -120,6 +124,7 @@ class BazelWorkerTest {
120
124
121
125
coroutineScope {
122
126
launch {
127
+ assertWithMessage(" worker is active" ).that(worker.isActive).isTrue()
123
128
workerInput.send(request(3 , " error" ))
124
129
}
125
130
launch {
@@ -131,6 +136,7 @@ class BazelWorkerTest {
131
136
// an interrupt kills the worker.
132
137
coroutineScope {
133
138
launch {
139
+ assertWithMessage(" worker is active" ).that(worker.isActive).isTrue()
134
140
workerInput.send(request(4 , " interrupt" ))
135
141
workerInput.close()
136
142
}
@@ -140,22 +146,20 @@ class BazelWorkerTest {
140
146
}
141
147
}
142
148
143
-
144
- assertThat(done.await()).isEqualTo(0 )
149
+ assertThat(worker.await()).isEqualTo(0 )
145
150
}
146
151
}
147
152
148
- private fun request (id : Int , vararg args : String ) = with ( WorkRequest .newBuilder()) {
149
- setRequestId(id)
153
+ private fun request (id : Int , vararg args : String ) = WorkRequest .newBuilder(). apply {
154
+ requestId = id
150
155
addAllArguments(args.asList())
151
156
}.build()
152
157
153
- private fun response (id : Int , exitCode : Int , output : String = "") =
154
- with (WorkResponse .newBuilder()) {
155
- setExitCode(exitCode)
156
- setOutput(output)
157
- setRequestId(id)
158
- }.build()
158
+ private fun response (id : Int , code : Int , out : String = "") = WorkResponse .newBuilder().apply {
159
+ exitCode = code
160
+ output = out
161
+ requestId = id
162
+ }.build()
159
163
160
164
/* * WorkerChannel encapsulates the communication between the test and the worker. */
161
165
class WorkerChannel (
@@ -181,10 +185,8 @@ class BazelWorkerTest {
181
185
class ChannelInputStream (val channel : Channel <Byte >, val name : String ) : InputStream() {
182
186
override fun read (): Int {
183
187
return runBlocking(CoroutineName (" $name .read()" )) {
184
- // since pipes block until the next event, this simulates that without starving
185
- // other routines.
186
- while (channel.isEmpty) {
187
- delay(5L )
188
+ if (channel.isEmpty) {
189
+ yield ()
188
190
}
189
191
// read blocking -- this better simulates the java InputStream behaviour.
190
192
return @runBlocking channel.receive().toInt()
@@ -193,14 +195,11 @@ class BazelWorkerTest {
193
195
194
196
override fun read (b : ByteArray , off : Int , len : Int ): Int {
195
197
return runBlocking(CoroutineName (" $name .read(ByteArray,Int,Int)" )) {
196
- // since pipes block until the next event, this simulates that without starving
197
- // other routines.
198
- while (channel.isEmpty) {
199
- delay(5L )
198
+ if (channel.isEmpty) {
199
+ yield ()
200
200
}
201
- val end = Math .min(b.size, off + len - 1 )
202
201
var read = 0
203
- for (i in off.. end ) {
202
+ for (i in off.. b.size.coerceAtMost(off + len - 1 ) ) {
204
203
val rb = channel.receive()
205
204
b[i] = rb
206
205
read++
@@ -218,10 +217,8 @@ class BazelWorkerTest {
218
217
219
218
override fun write (ba : ByteArray , off : Int , len : Int ) {
220
219
runBlocking(CoroutineName (" $name .write(ByteArray, Int, Int)" )) {
221
- var sent = 0
222
- for (i in off.. Math .min(ba.size, off + len - 1 )) {
220
+ for (i in off.. ba.size.coerceAtMost(off + len - 1 )) {
223
221
channel.sendBlocking(ba[i])
224
- sent++
225
222
}
226
223
}
227
224
}
0 commit comments