@@ -61,14 +61,24 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
61
61
val ec = ExecutionContext .fromExecutor(fjp)
62
62
63
63
// Map of request id to the runnable responsible for executing that request id
64
- val activeRequests = new ConcurrentHashMap [Int , CancellableTask [Int ]](poolSize)
64
+ val activeRequests = new ConcurrentHashMap [Int , ( WorkerProtocol . WorkRequest , CancellableTask [Int ]) ](poolSize)
65
65
66
66
def writeResponse (
67
67
requestId : Int ,
68
68
maybeOutStream : Option [OutputStream ],
69
69
maybeExitCode : Option [Int ],
70
70
wasCancelled : Boolean = false ,
71
71
): Unit = {
72
+ // Remove the request from our book keeping right before we respond to Bazel. If
73
+ // we respond to Bazel about the request before removing it,then there is a race:
74
+ // Bazel could make a request with the same requestId to this worker before the
75
+ // requestId is removed from the worker's book keeping.
76
+ //
77
+ // Ideally Bazel will not send a request to this worker with the same requestId
78
+ // as another request before we've responded to the original request. If that
79
+ // happens, then there's a race regardless of what we do.
80
+ activeRequests.remove(requestId)
81
+
72
82
// Defined here so all writes to stdout are synchronized
73
83
stdout.synchronized {
74
84
val builder = WorkerProtocol .WorkResponse .newBuilder
@@ -88,8 +98,6 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
88
98
.build()
89
99
.writeDelimitedTo(stdout)
90
100
}
91
-
92
- activeRequests.remove(requestId)
93
101
}
94
102
95
103
/**
@@ -128,10 +136,10 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
128
136
129
137
// From the Bazel doc: "The server may send cancel requests for requests that the worker
130
138
// has already responded to, in which case the cancel request must be ignored."
131
- Option (activeRequests.get(requestId)).foreach { activeRequest =>
139
+ Option (activeRequests.get(requestId)).foreach { case (_, workTask) =>
132
140
// Cancel will wait for the thread to complete or be interrupted, so we do it in a future
133
141
// to prevent blocking the worker from processing more requests
134
- Future (activeRequest .cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
142
+ Future (workTask .cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
135
143
scala.concurrent.ExecutionContext .global,
136
144
)
137
145
}
@@ -227,9 +235,14 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
227
235
// for this requestId. If that's the case, we have a book keeping error or there are
228
236
// two active requests with the same ID. Either of which is not good and something we
229
237
// should just crash on.
230
- if (activeRequests.putIfAbsent(requestId, workTask) != null ) {
238
+ val alreadyActiveRequest = activeRequests.putIfAbsent(requestId, (request, workTask))
239
+ if (alreadyActiveRequest != null ) {
240
+ val (activeRequest, _) = alreadyActiveRequest
231
241
throw new AnnexDuplicateActiveRequestException (
232
- s " Received a WorkRequest with an already active request id: ${requestId}" ,
242
+ s """ Received a WorkRequest with an already active request id: ${requestId}.
243
+ Currently active request: ${activeRequest.toString}
244
+ New request with the same id: ${request.toString}
245
+ """ ,
233
246
)
234
247
} else {
235
248
workTask.execute(ec)
0 commit comments