Skip to content

Commit 81e0ea3

Browse files
committed
squashed
1 parent a735c19 commit 81e0ea3

File tree

4 files changed

+465
-45
lines changed

4 files changed

+465
-45
lines changed

core/src/main/scala/ox/either.scala

+72-34
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,31 @@ import scala.util.boundary.{Label, break}
77
import scala.util.control.NonFatal
88

99
object either:
10-
1110
private type NotNested = NotGiven[Label[Either[Nothing, Nothing]]]
1211

13-
/** Within an [[either]] block, allows unwrapping [[Either]] and [[Option]] values using [[ok()]]. The result is the right-value of an
12+
/** This technique allows to prevent implicit search from finding the givens thanks to ambiguity. In the scope that returns `A` it will be
13+
* impossible to find a given [[Supervised]] and [[Forked]].
14+
*/
15+
private type WithoutScopeMarkers[A] = (Supervised, Supervised) ?=> (Forked, Forked) ?=> A
16+
17+
private inline def availableInScope[A]: Boolean =
18+
summonFrom {
19+
case _: NotGiven[A] => false
20+
case _: A => true
21+
}
22+
23+
/** Within an [[either]] block, allows unwrapping [[Either]] and [[Option]] values using [[#ok()]]. The result is the right-value of an
1424
* `Either`, or the defined-value of the `Option`. In case a failure is encountered (a left-value of an `Either`, or a `None`), the
15-
* computation is short-circuited and the failure becomes the result. Failures can also be reported using [[fail()]].
25+
* computation is short-circuited and the failure becomes the result. Failures can also be reported using [[#fail()]].
1626
*
1727
* Uses the [[boundary]]-break mechanism.
1828
*
29+
* Uses ambiguity-based given removal technique (given't) to enable usage of [[#ok()]] combinator in [[either]] blocks nested inside
30+
* [[ox.fork]] blocks.
31+
*
1932
* @param body
20-
* The code block, within which [[Either]]s and [[Option]]s can be unwrapped using [[ok()]]. Failures can be reported using [[fail()]].
21-
* Both [[ok()]] and [[fail()]] are extension methods.
33+
* The code block, within which [[Either]]s and [[Option]]s can be unwrapped using [[#ok()]]. Failures can be reported using
34+
* [[#fail()]]. Both [[#ok()]] and [[#fail()]] are extension methods.
2235
* @tparam E
2336
* The error type.
2437
* @tparam A
@@ -36,39 +49,56 @@ object either:
3649
* v1.ok() ++ v2.ok()
3750
* }}}
3851
*/
39-
inline def apply[E, A](inline body: Label[Either[E, A]] ?=> A)(using
52+
inline def apply[E, A](inline body: WithoutScopeMarkers[Label[Either[E, A]] ?=> A])(using
4053
@implicitNotFound(
4154
"Nesting of either blocks is not allowed as it's error prone, due to type inference. Consider extracting the nested either block to a separate function."
4255
) nn: NotNested
43-
): Either[E, A] = boundary(Right(body))
56+
): Either[E, A] =
57+
given Forked = ForkedEvidence // just to satisfy the context function
58+
given Supervised = SupervisedEvidence
59+
boundary(Right(body))
4460

4561
extension [E, A](inline t: Either[E, A])
46-
/** Unwrap the value of the `Either`, short-circuiting the computation to the enclosing [[either]], in case this is a left-value. */
62+
/** Unwrap the value of the `Either`, short-circuiting the computation to the enclosing [[either]], in case this is a left-value. Can't
63+
* be used in forked blocks without an either block in fork to prevent escaped Breaks that crash forked threads.
64+
*/
4765
transparent inline def ok(): A =
48-
summonFrom {
49-
case given boundary.Label[Either[E, Nothing]] =>
50-
t match
51-
case Left(e) => break(Left(e))
52-
case Right(a) => a
53-
case given boundary.Label[Either[Nothing, Nothing]] =>
54-
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
55-
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
56-
}
66+
inline if availableInScope[Forked] && !availableInScope[Supervised] then
67+
error(
68+
"This use of .ok() belongs to either block outside of the fork and is therefore illegal. Use either block inside of the forked block."
69+
)
70+
else
71+
summonFrom {
72+
case given boundary.Label[Either[E, Nothing]] =>
73+
t match
74+
case Left(e) => break(Left(e))
75+
case Right(a) => a
76+
case given boundary.Label[Either[Nothing, Nothing]] =>
77+
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
78+
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
79+
}
5780

5881
extension [A](inline t: Option[A])
59-
/** Unwrap the value of the `Option`, short-circuiting the computation to the enclosing [[either]], in case this is a `None`. */
82+
/** Unwrap the value of the `Option`, short-circuiting the computation to the enclosing [[either]], in case this is a `None`. Can't be
83+
* used in forked blocks without an either block in fork to prevent escaped Breaks that crash forked threads.
84+
*/
6085
transparent inline def ok(): A =
61-
summonFrom {
62-
case given boundary.Label[Either[Unit, Nothing]] =>
63-
t match
64-
case None => break(Left(()))
65-
case Some(a) => a
66-
case given boundary.Label[Either[Nothing, Nothing]] =>
67-
error(
68-
"The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?\nNote that for options, the error type must contain a `Unit`."
69-
)
70-
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
71-
}
86+
inline if availableInScope[Forked] && !availableInScope[Supervised] then
87+
error(
88+
"This use of .ok() belongs to either block outside of the fork and is therefore illegal. Use either block inside of the forked block."
89+
)
90+
else
91+
summonFrom {
92+
case given boundary.Label[Either[Unit, Nothing]] =>
93+
t match
94+
case None => break(Left(()))
95+
case Some(a) => a
96+
case given boundary.Label[Either[Nothing, Nothing]] =>
97+
error(
98+
"The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?\nNote that for options, the error type must contain a `Unit`."
99+
)
100+
case _ => error("`.ok()` can only be used within an `either` call.\nIs it present?")
101+
}
72102

73103
extension [E, A](inline f: Fork[Either[E, A]])
74104
/** Join the fork and unwrap the value of its `Either` result, short-circuiting the computation to the enclosing [[either]], in case
@@ -80,12 +110,20 @@ object either:
80110
transparent inline def ok(): A = f.join().ok()
81111

82112
extension [E](e: E)
113+
/** Fail the computation by short-circuiting the enclosing [[either]] block with en error of type `E`. Can't be used in forked blocks
114+
* without an either block in fork to prevent escaped Breaks that crash forked threads.
115+
*/
83116
transparent inline def fail(): Nothing =
84-
summonFrom {
85-
case given boundary.Label[Either[E, Nothing]] => break(Left(e))
86-
case given boundary.Label[Either[Nothing, Nothing]] =>
87-
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
88-
}
117+
inline if availableInScope[Forked] && !availableInScope[Supervised] then
118+
error(
119+
"This use of .ok() belongs to either block outside of the fork and is therefore illegal. Use either block inside of the forked block."
120+
)
121+
else
122+
summonFrom {
123+
case given boundary.Label[Either[E, Nothing]] => break(Left(e))
124+
case given boundary.Label[Either[Nothing, Nothing]] =>
125+
error("The enclosing `either` call uses a different error type.\nIf it's explicitly typed, is the error type correct?")
126+
}
89127

90128
/** Catches non-fatal exceptions that occur when evaluating `t` and returns them as the left side of the returned `Either`. */
91129
inline def catching[T](inline t: => T): Either[Throwable, T] =

core/src/main/scala/ox/fork.scala

+19-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import java.util.concurrent.{CompletableFuture, Semaphore}
55
import scala.concurrent.ExecutionException
66
import scala.util.control.NonFatal
77

8+
/** Implicit evidence that given block of code will be evaluated in a forked scope on a separate thread and therefore that capture of
9+
* `scala.util.boundary.Label` instances is unsafe.
10+
*/
11+
private[ox] sealed abstract class Forked
12+
private[ox] case object ForkedEvidence extends Forked
13+
814
/** Starts a fork (logical thread of execution), which is guaranteed to complete before the enclosing [[supervised]] or [[supervisedError]]
915
* block completes.
1016
*
@@ -18,7 +24,7 @@ import scala.util.control.NonFatal
1824
* For alternate behaviors regarding ending the scope, see [[forkUser]], [[forkError]], [[forkUserError]], [[forkCancellable]] and
1925
* [[forkUnsupervised]].
2026
*/
21-
def fork[T](f: => T)(using Ox): Fork[T] = forkError(using summon[Ox].asNoErrorMode)(f)
27+
def fork[T](f: Forked ?=> T)(using Ox): Fork[T] = forkError(using summon[Ox].asNoErrorMode)(f)
2228

2329
/** Starts a fork (logical thread of execution), which is guaranteed to complete before the enclosing [[supervisedError]] block completes.
2430
*
@@ -30,13 +36,14 @@ def fork[T](f: => T)(using Ox): Fork[T] = forkError(using summon[Ox].asNoErrorMo
3036
* enclosing scope. If the [[ErrorMode]] provided when creating the scope using [[supervisedError]] classifies a fork return value as an
3137
* error, the scope ends (cancelling all other running forks).
3238
*/
33-
def forkError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
39+
def forkError[E, F[_], T](using OxError[E, F])(f: Forked ?=> F[T]): Fork[T] =
3440
val oxError = summon[OxError[E, F]]
3541
// the separate result future is needed to wait for the result, as there's no .join on individual tasks (only whole scopes can be joined)
3642
val result = new CompletableFuture[T]()
3743
oxError.scope.fork { () =>
3844
val supervisor = oxError.supervisor
3945
try
46+
given Forked = ForkedEvidence
4047
val resultOrError = f
4148
val errorMode = oxError.errorMode
4249
if errorMode.isError(resultOrError) then
@@ -63,7 +70,7 @@ def forkError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
6370
*
6471
* For alternate behaviors, see [[fork]], [[forkError]], [[forkUserError]], [[forkCancellable]] and [[forkUnsupervised]].
6572
*/
66-
def forkUser[T](f: => T)(using Ox): Fork[T] = forkUserError(using summon[Ox].asNoErrorMode)(f)
73+
def forkUser[T](f: Forked ?=> T)(using Ox): Fork[T] = forkUserError(using summon[Ox].asNoErrorMode)(f)
6774

6875
/** Starts a fork (logical thread of execution), which is guaranteed to complete before the enclosing [[supervisedError]] block completes.
6976
*
@@ -75,13 +82,14 @@ def forkUser[T](f: => T)(using Ox): Fork[T] = forkUserError(using summon[Ox].asN
7582
* enclosing scope. If the [[ErrorMode]] provided when creating the scope using [[supervisedError]] classifies a fork return value as an
7683
* error, the scope ends (cancelling all other running forks).
7784
*/
78-
def forkUserError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
85+
def forkUserError[E, F[_], T](using OxError[E, F])(f: Forked ?=> F[T]): Fork[T] =
7986
val oxError = summon[OxError[E, F]]
8087
val result = new CompletableFuture[T]()
8188
oxError.supervisor.forkStarts()
8289
oxError.scope.fork { () =>
8390
val supervisor = oxError.supervisor.asInstanceOf[DefaultSupervisor[E]]
8491
try
92+
given Forked = ForkedEvidence
8593
val resultOrError = f
8694
val errorMode = oxError.errorMode
8795
if errorMode.isError(resultOrError) then
@@ -105,9 +113,10 @@ def forkUserError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] =
105113
*
106114
* For alternate behaviors, see [[fork]], [[forkUser]] and [[forkCancellable]].
107115
*/
108-
def forkUnsupervised[T](f: => T)(using OxUnsupervised): Fork[T] =
116+
def forkUnsupervised[T](f: Forked ?=> T)(using OxUnsupervised): Fork[T] =
109117
val result = new CompletableFuture[T]()
110118
summon[OxUnsupervised].scope.fork { () =>
119+
given Forked = ForkedEvidence
111120
try result.complete(f)
112121
catch case e: Throwable => result.completeExceptionally(e)
113122
}
@@ -118,7 +127,7 @@ def forkUnsupervised[T](f: => T)(using OxUnsupervised): Fork[T] =
118127
*
119128
* If ran in a [[supervised]] scope, all forks behave as daemon threads (see [[fork]] for details).
120129
*/
121-
def forkAll[T](fs: Seq[() => T])(using Ox): Fork[Seq[T]] =
130+
def forkAll[T](fs: Seq[Forked ?=> () => T])(using Ox): Fork[Seq[T]] =
122131
val forks = fs.map(f => fork(f()))
123132
new Fork[Seq[T]]:
124133
override def join(): Seq[T] = forks.map(_.join())
@@ -136,7 +145,7 @@ def forkAll[T](fs: Seq[() => T])(using Ox): Fork[Seq[T]] =
136145
* Implementation note: a cancellable fork is created by starting a nested scope in a fork, and then starting a fork there. Hence, it is
137146
* more expensive than [[fork]], as two virtual threads are started.
138147
*/
139-
def forkCancellable[T](f: => T)(using OxUnsupervised): CancellableFork[T] =
148+
def forkCancellable[T](f: Forked ?=> T)(using OxUnsupervised): CancellableFork[T] =
140149
val result = new CompletableFuture[T]()
141150
// forks can be never run, if they are cancelled immediately - we need to detect this, not to await on result.get()
142151
val started = new AtomicBoolean(false)
@@ -149,7 +158,9 @@ def forkCancellable[T](f: => T)(using OxUnsupervised): CancellableFork[T] =
149158
nestedOx.scope.fork { () =>
150159
// "else" means that the fork is already cancelled, so doing nothing in that case
151160
if !started.getAndSet(true) then
152-
try result.complete(f).discard
161+
try
162+
given Forked = ForkedEvidence
163+
result.complete(f).discard
153164
catch case e: Throwable => result.completeExceptionally(e).discard
154165

155166
done.release() // the nested scope can now finish

core/src/main/scala/ox/supervised.scala

+26-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,17 @@ package ox
33
import java.util.concurrent.atomic.AtomicInteger
44
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
55
import scala.reflect.ClassTag
6+
import scala.util.NotGiven
7+
import scala.util.boundary.Label
8+
9+
sealed abstract class Supervised
10+
object SupervisedEvidence extends Supervised
11+
12+
private inline def availableInScope[A]: Boolean =
13+
compiletime.summonFrom {
14+
case _: NotGiven[A] => false
15+
case _: A => true
16+
}
617

718
/** Starts a new concurrency scope, which allows starting forks in the given code block `f`. Forks can be started using [[fork]],
819
* [[forkUser]], [[forkCancellable]] and [[forkUnsupervised]]. All forks are guaranteed to complete before this scope completes.
@@ -24,7 +35,19 @@ import scala.reflect.ClassTag
2435
* @see
2536
* [[supervisedError]] Starts a scope in supervised mode, with the additional ability to report application errors
2637
*/
27-
def supervised[T](f: Ox ?=> T): T = supervisedError(NoErrorMode)(f)
38+
inline def supervised[T](f: Supervised ?=> Ox ?=> T): T =
39+
inline if availableInScope[Label[Either[Nothing, Nothing]]] && availableInScope[Forked] then
40+
compiletime.error(
41+
"Nesting supervised scopes along with fork and either blocks is disallowed to prevent unsafe .ok() combinator usage on forks."
42+
)
43+
else supervisedErrorInternal(NoErrorMode)(f)
44+
45+
inline def supervisedError[E, F[_], T](em: ErrorMode[E, F])(f: Supervised ?=> OxError[E, F] ?=> F[T]): F[T] =
46+
inline if availableInScope[Label[Either[Nothing, Nothing]]] && availableInScope[Forked] then
47+
compiletime.error(
48+
"Nesting supervised scopes along with fork and either blocks is disallowed to prevent unsafe .ok() combinator usage on forks."
49+
)
50+
else supervisedErrorInternal(em)(f)
2851

2952
/** Starts a new concurrency scope, which allows starting forks in the given code block `f`. Forks can be started using [[fork]],
3053
* [[forkError]], [[forkUser]], [[forkUserError]], [[forkCancellable]] and [[forkUnsupervised]]. All forks are guaranteed to complete
@@ -36,12 +59,12 @@ def supervised[T](f: Ox ?=> T): T = supervisedError(NoErrorMode)(f)
3659
* @see
3760
* [[forkError]] On details how to use application errors.
3861
*/
39-
def supervisedError[E, F[_], T](em: ErrorMode[E, F])(f: OxError[E, F] ?=> F[T]): F[T] =
62+
def supervisedErrorInternal[E, F[_], T](em: ErrorMode[E, F])(f: Supervised ?=> OxError[E, F] ?=> F[T]): F[T] =
4063
val s = DefaultSupervisor[E]
4164
val capability = OxError(s, em)
4265
try
4366
val scopeResult = scopedWithCapability(capability) {
44-
val mainBodyFork = forkUserError(using capability)(f(using capability))
67+
val mainBodyFork = forkUserError(using capability)(f(using SupervisedEvidence)(using capability))
4568
val supervisorResult = s.join() // might throw if any supervised fork threw
4669
if supervisorResult == ErrorModeSupervisorResult.Success then
4770
// if no exceptions, the main f-fork must be done by now

0 commit comments

Comments
 (0)