Skip to content

Commit 152d90b

Browse files
committed
Improve atomicity of shared compiler classpath jar copying
We were copying a file used by multiple threads directly to its destination. Problem is that copying is not an atomic action, so we could end up in states where the file wasn't correct when it was used. This should avoid that issue by first copying the file to a temp file and then using an atomic move to move the file to the destination used by other threads.
1 parent beed4d1 commit 152d90b

File tree

1 file changed

+92
-34
lines changed

1 file changed

+92
-34
lines changed

src/main/scala/higherkindness/rules_scala/workers/common/AnnexScalaInstance.scala

Lines changed: 92 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,41 @@ package higherkindness.rules_scala
22
package workers.common
33

44
import xsbti.compile.ScalaInstance
5-
import java.io.File
5+
import java.io.{File, IOException}
66
import java.net.URLClassLoader
7-
import java.nio.file.{FileAlreadyExistsException, Files, Path, Paths}
7+
import java.nio.file.{AtomicMoveNotSupportedException, FileAlreadyExistsException, Files, Path, Paths, StandardCopyOption}
88
import java.util.Properties
99
import java.util.concurrent.ConcurrentHashMap
1010
import scala.collection.immutable.TreeMap
11+
import scala.util.control.NonFatal
1112

1213
object AnnexScalaInstance {
1314
// See the comment on getAnnexScalaInstance as to why this is necessary
1415
private val instanceCache: ConcurrentHashMap[Set[Path], AnnexScalaInstance] =
1516
new ConcurrentHashMap[Set[Path], AnnexScalaInstance]()
1617

18+
/**
19+
* The worker will use this directory to store temp files in order to better perform atomic file copies.
20+
*/
21+
private val tmpWorkerJarDir = Paths.get("annex-tmp-worker-jars")
22+
Files.createDirectories(tmpWorkerJarDir)
23+
24+
/**
25+
* The worker will store compiler classpath jars in this directory to enable sharing of classloaders used by the Scala
26+
* compiler across compilation requests.
27+
*/
28+
private val workerJarDir = Paths.get("work-request-jars")
29+
Files.createDirectories(workerJarDir)
30+
1731
/**
1832
* We only need to care about minimizing the number of AnnexScalaInstances we create if things are being run as a
1933
* worker. Otherwise just create the AnnexScalaInstance and be done with it because the process won't be long lived.
2034
*/
21-
def getAnnexScalaInstance(allJars: Array[File], workDir: Path, isWorker: Boolean): AnnexScalaInstance = {
35+
def getAnnexScalaInstance(
36+
allJars: Array[File],
37+
workDir: Path,
38+
isWorker: Boolean,
39+
): AnnexScalaInstance = {
2240
if (isWorker) {
2341
getAnnexScalaInstance(allJars, workDir)
2442
} else {
@@ -81,7 +99,7 @@ object AnnexScalaInstance {
8199
absoluteWorkDir.relativize(absoluteJarPath),
82100
replaceExternal = false,
83101
)
84-
mapBuilder.addOne(jar.toPath -> comparablePath)
102+
mapBuilder.addOne(jar.toPath -> workerJarDir.resolve(comparablePath))
85103
keyBuilder.addOne(comparablePath)
86104
}
87105
val workRequestJarToWorkerJar = mapBuilder.result()
@@ -101,40 +119,80 @@ object AnnexScalaInstance {
101119
val key = keyBuilder.result()
102120

103121
Option(instanceCache.get(key)).getOrElse {
104-
// Copy all the jars to the worker's directory because in a sandboxed world the
105-
// jars can go away after the work request, so we can't rely on them sticking around.
106-
// This should only happen once per compiler version, so it shouldn't happen often.
107-
workRequestJarToWorkerJar.foreach { case (workRequestJar, workerJar) =>
108-
this.synchronized {
109-
// Check for existence of the file just in case another request is also writing these jars
110-
// Copying a file is not atomic, so we don't want to end up in a funky state where two
111-
// copies of the same file happen at the same time and cause something bad to happen.
112-
if (!Files.exists(workerJar)) {
113-
try {
114-
Files.createDirectories(workerJar.getParent())
115-
Files.copy(workRequestJar, workerJar)
116-
} catch {
117-
// We do not care if the file already exists
118-
case _: FileAlreadyExistsException => {}
119-
case e: Throwable => throw new Exception("Error adding file to instance cache", e)
122+
this.synchronized {
123+
// Requests that need the same Scala instance will likely race to this point to create
124+
// the same Scala instance. This is especially true as the worker is first starting up.
125+
// Considering that, we first check if the desired instance now exists to avoid duplicate work.
126+
Option(instanceCache.get(key)).getOrElse {
127+
// Copy all the jars to the worker's directory because in a sandboxed world the
128+
// jars can go away after the work request, so we can't rely on them sticking around.
129+
// This should only happen once per compiler version, so it shouldn't happen often.
130+
workRequestJarToWorkerJar.foreach { case (workRequestJar, workerJar) =>
131+
// Do a more atomic copy of a file by creating a temp file and then moving
132+
// the temp file to the destination. We can do a move atomically, but cannot do
133+
// a copy atomically. Copying directly to the destination file risks the file existing
134+
// at the destination in a partially completed state.
135+
if (Files.notExists(workerJar)) {
136+
var tmpWorkerJar: Option[Path] = None
137+
138+
try {
139+
tmpWorkerJar = Some(Files.createTempFile(tmpWorkerJarDir, workerJar.getFileName.toString, "tmp"))
140+
141+
Files.copy(
142+
workRequestJar,
143+
tmpWorkerJar.get,
144+
StandardCopyOption.REPLACE_EXISTING,
145+
StandardCopyOption.COPY_ATTRIBUTES,
146+
)
147+
Files.createDirectories(workerJar.getParent())
148+
149+
try {
150+
Files.move(tmpWorkerJar.get, workerJar, StandardCopyOption.ATOMIC_MOVE)
151+
} catch {
152+
case e: AtomicMoveNotSupportedException =>
153+
// Fall back to regular move when ATOMIC_MOVE isn't supported.
154+
// Because it's not atomic, there's a risk the file may already exist.
155+
try {
156+
Files.move(tmpWorkerJar.get, workerJar)
157+
} catch {
158+
case e: FileAlreadyExistsException => {}
159+
}
160+
}
161+
} catch {
162+
case e @ (_: IOException | _: InterruptedException) =>
163+
// An error occurred which may have left a partially written file, so we delete the
164+
// file to be safe.
165+
// Note that this could be a ClosedByInterruptException, which is a subtype of
166+
// IOException and indicates the operation was interrupted (very likely because
167+
// the Bazel request was cancelled).
168+
Files.deleteIfExists(workerJar)
169+
throw e
170+
case NonFatal(e) =>
171+
throw new Exception(s"Error copying worker jar: ${workerJar}", e)
172+
} finally {
173+
tmpWorkerJar.foreach { tmpWorkerJar =>
174+
Files.deleteIfExists(tmpWorkerJar)
175+
}
176+
}
177+
} else if (!Files.exists(workerJar)) {
178+
// Files.exists is not the complement of Files.notExists because both return false
179+
// when the existence of the file cannot be determined.
180+
throw new Exception(s"Cannot determine existence of worker jar: ${workerJar}")
120181
}
121182
}
122-
}
123-
}
124183

125-
val instance = new AnnexScalaInstance(Array.from(workRequestJarToWorkerJar.values.map(_.toFile())))
126-
val instanceInsertedByOtherThreadOrNull = instanceCache.putIfAbsent(key, instance)
184+
val instance = new AnnexScalaInstance(Array.from(workRequestJarToWorkerJar.values.map(_.toFile())))
185+
val instanceInsertedByOtherThreadOrNull = instanceCache.putIfAbsent(key, instance)
127186

128-
// putIfAbsent is atomic, but there exists time between the get and the putIfAbsent.
129-
// This handles the scenario in which the AnnexScalaInstance is created and inserted
130-
// by another thread after we ran our .get.
131-
// We could also handle this by generating the AnnexScalaInstance every time and only
132-
// using a putIfAbsent, but that's likely more expensive because of all the classloaders
133-
// that get constructed when creating an AnnexScalaInstance.
134-
if (instanceInsertedByOtherThreadOrNull == null) {
135-
instance
136-
} else {
137-
instanceInsertedByOtherThreadOrNull
187+
// putIfAbsent is atomic, but there could exist a time between the get and the putIfAbsent
188+
// in which the AnnexScalaInstance is created and inserted by another thread. Depends on
189+
// how things are synchronized.
190+
if (instanceInsertedByOtherThreadOrNull == null) {
191+
instance
192+
} else {
193+
instanceInsertedByOtherThreadOrNull
194+
}
195+
}
138196
}
139197
}
140198
}

0 commit comments

Comments
 (0)