Skip to content

Commit b31b83a

Browse files
committed
Fix dropped errors in CompletionStage's *EitherAsync methods
For `*Async` variants of `CompletionStage` methods, the (first) deferred argument was wrapped with `onto` so that its callbacks are invoked on the given `executor`. This results in a new deferred connected to the original one. In case of the `*EitherAsync` methods, that new deferred would then leak when `alt` chooses the `other` one but the `onto` deferred ends up in an error state since there was no way for the caller to attach an error handler to it. Furthermore, for async variants of methods accepting two completion stages, `f` was never executed on the given executor. For example, `then-combine` was defined like this: (defn- then-combine [d other ^BiFunction f] (assert-some other f) (fmap-deferred (zip d other) (fn [[x y]] (.apply f x y)))) Now `then-combine-async` (defined via `def-async-for-dual`) would wrap `d` with `(onto d executor)` before calling `then-combine`. However, the deferred passed to `fmap-deferred` was the one returned from `(zip d other)` which would not have the given executor attached. Thus, `f` would have been executed on a different executor than desired. This patch addresses both issues by adding a `result` deferred as the final argument to every method implementation. This deferred is bound to the desired `executor`. The operation `f` is then attached as a callback to that `result` deferred and the deferred of the main operation is connected to it. For synchronous implementations, `nil` can be passed as `result` in which case the callback is attached directly to the main operation deferred to save some overhead. This means that now the original deferred is passed to `alt` so that all error states can be handled by the caller. It also removes the `def-async-for` and `def-async-for-dual` macros and opts to instead always explicitly pass the `result` deferred in all method implementations. This isn't much more verbose but hopefully serves to make the code bit easier to follow. Finally, it renames `fmap-deferred` to `shallow-chain` for consistency with the established `chain` naming and introduces the analogous `shallow-connect` (also private for now).
1 parent 84c1c37 commit b31b83a

File tree

1 file changed

+122
-145
lines changed

1 file changed

+122
-145
lines changed

src/manifold/deferred.clj

Lines changed: 122 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@
7474

7575
to-completable-future
7676

77-
when-complete when-complete-async)
77+
when-complete when-complete-async
78+
79+
deferred cs-default-executor)
7880

7981
;; The potemkin abstract type for
8082
;; implementations such as CompletionStage
@@ -84,94 +86,99 @@
8486
ADeferred
8587
CompletionStage
8688
(thenApply [d f]
87-
(then-apply d f))
89+
(then-apply d f nil))
8890
(thenApplyAsync [d f]
89-
(then-apply-async d f))
91+
(then-apply d f (deferred (cs-default-executor))))
9092
(thenApplyAsync [d f executor]
91-
(then-apply-async d f executor))
93+
(then-apply d f (deferred executor)))
9294

9395
(thenAccept [d f]
94-
(then-accept d f))
96+
(then-accept d f nil))
9597
(thenAcceptAsync [d f]
96-
(then-accept-async d f))
98+
(then-accept d f (deferred (cs-default-executor))))
9799
(thenAcceptAsync [d f executor]
98-
(then-accept-async d f executor))
100+
(then-accept d f (deferred executor)))
99101

100102
(thenRun [d f]
101-
(then-run d f))
103+
(then-run d f nil))
102104
(thenRunAsync [d f]
103-
(then-run-async d f))
105+
(then-run d f (deferred (cs-default-executor))))
104106
(thenRunAsync [d f executor]
105-
(then-run-async d f executor))
107+
(then-run d f (deferred executor)))
106108

107109
(thenCombine [d other f]
108-
(then-combine d other f))
110+
(then-combine d other f nil))
109111
(thenCombineAsync [d other f]
110-
(then-combine-async d other f))
112+
(then-combine d other f (deferred (cs-default-executor))))
111113
(thenCombineAsync [d other f executor]
112-
(then-combine-async d other f executor))
114+
(then-combine d other f (deferred executor)))
113115

114116
(thenAcceptBoth [d other f]
115-
(then-accept-both d other f))
117+
(then-accept-both d other f nil))
116118
(thenAcceptBothAsync [d other f]
117-
(then-accept-both-async d other f))
119+
(then-accept-both d other f (deferred (cs-default-executor))))
118120
(thenAcceptBothAsync [d other f executor]
119-
(then-accept-both-async d other f executor))
121+
(then-accept-both d other f (deferred executor)))
120122

121123
(runAfterBoth [d other f]
122-
(run-after-both d other f))
124+
(run-after-both d other f nil))
123125
(runAfterBothAsync [d other f]
124-
(run-after-both-async d other f))
126+
(run-after-both d other f (deferred (cs-default-executor))))
125127
(runAfterBothAsync [d other f executor]
126-
(run-after-both-async d other f executor))
128+
(run-after-both d other f (deferred executor)))
127129

128130
(applyToEither [d other f]
129-
(apply-to-either d other f))
131+
(apply-to-either d other f nil))
130132
(applyToEitherAsync [d other f]
131-
(apply-to-either-async d other f))
133+
(apply-to-either d other f (deferred (cs-default-executor))))
132134
(applyToEitherAsync [d other f executor]
133-
(apply-to-either-async d other f executor))
135+
(apply-to-either d other f (deferred executor)))
134136

135137
(acceptEither [d other f]
136-
(accept-either d other f))
138+
(accept-either d other f nil))
137139
(acceptEitherAsync [d other f]
138-
(accept-either-async d other f))
140+
(accept-either d other f (deferred (cs-default-executor))))
139141
(acceptEitherAsync [d other f executor]
140-
(accept-either-async d other f executor))
142+
(accept-either d other f (deferred executor)))
141143

142144
(runAfterEither [d other f]
143-
(run-after-either d other f))
145+
(run-after-either d other f nil))
144146
(runAfterEitherAsync [d other f]
145-
(run-after-either-async d other f))
147+
(run-after-either d other f (deferred (cs-default-executor))))
146148
(runAfterEitherAsync [d other f executor]
147-
(run-after-either-async d other f executor))
149+
(run-after-either d other f (deferred executor)))
148150

149151
(thenCompose [d f]
150-
(then-compose d f))
152+
(then-compose d f nil))
151153
(thenComposeAsync [d f]
152-
(then-compose-async d f))
154+
(then-compose d f (deferred (cs-default-executor))))
153155
(thenComposeAsync [d f executor]
154-
(then-compose-async d f executor))
156+
(then-compose d f (deferred executor)))
155157

156158
(handle [d f]
157-
(then-handle d f))
159+
(then-handle d f nil))
158160
(handleAsync [d f]
159-
(then-handle-async d f))
161+
(then-handle d f (deferred (cs-default-executor))))
160162
(handleAsync [d f executor]
161-
(then-handle-async d f executor))
163+
(then-handle d f (deferred executor)))
162164

163165
(exceptionally [d f]
164-
(then-exceptionally d f))
165-
166-
(toCompletableFuture [d]
167-
(to-completable-future d))
166+
(then-exceptionally d f nil))
167+
;; Only available since Java 12
168+
;; (exceptionallyAsync [d f]
169+
;; (then-exceptionally d f (deferred (cs-default-executor))))
170+
;; (exceptionallyAsync [d f executor]
171+
;; (then-exceptionally d (deferred executor)))
168172

169173
(whenComplete [d f]
170-
(when-complete d f))
174+
(when-complete d f nil))
171175
(whenCompleteAsync [d f]
172-
(when-complete-async d f))
176+
(when-complete d f (deferred (cs-default-executor))))
173177
(whenCompleteAsync [d f executor]
174-
(when-complete-async d f executor)))
178+
(when-complete d f (deferred executor)))
179+
180+
(toCompletableFuture [d]
181+
(to-completable-future d)))
175182

176183
(definline realized?
177184
"Returns true if the manifold deferred is realized."
@@ -1538,157 +1545,129 @@
15381545
;; CompletionStage helper fns
15391546
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
15401547

1541-
(defmacro ^:no-doc def-async-for
1542-
"Defines a CompletionStage async version of the function associated with
1543-
the given symbol, with '-async' appended."
1544-
[fn-name]
1545-
(let [async-name (symbol (str (name fn-name) "-async"))]
1546-
`(defn- ~async-name
1547-
([d# f#]
1548-
(~async-name d# f# (or (ex/executor) (ex/execute-pool))))
1549-
([d# f# executor#]
1550-
(~fn-name (onto d# executor#) f#)))))
1551-
1552-
(defmacro ^:no-doc def-async-for-dual
1553-
"Defines a CompletionStage async version of the two-deferred
1554-
function associated with the given symbol, with '-async' appended."
1555-
[fn-name]
1556-
(let [async-name (symbol (str (name fn-name) "-async"))]
1557-
`(defn- ~async-name
1558-
([d# d2# f#]
1559-
(~async-name d# d2# f# (or (ex/executor) (ex/execute-pool))))
1560-
([d# d2# f# executor#]
1561-
(~fn-name (onto d# executor#) d2# f#)))))
1562-
1563-
(defn- fmap-deferred
1564-
"Returns a new deferred with function `f` applies to realized value of `d`.
1565-
(Like fmap but for deferreds.)
1566-
1567-
This function does not unwrap the result of f; it will only be applied to
1568-
the immediate value of `d`. This is for mimicking CompletionStage's
1569-
behavior."
1548+
(defn- cs-default-executor []
1549+
(or (ex/executor) (ex/execute-pool)))
1550+
1551+
(defn- shallow-connect
1552+
"Like `connect` but without implicit unwrapping of conveyed value."
1553+
[from to]
1554+
(on-realized from
1555+
(fn [val] (success! to val))
1556+
(fn [error] (error! to error))))
1557+
1558+
(defn- shallow-chain
1559+
"Returns a new deferred with function `f` applied to realized value of `d`.
1560+
1561+
Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the
1562+
immediate value of `d`. This is for mimicking `CompletionStage`'s behavior."
15701563
[d f]
15711564
(let [d' (deferred)]
15721565
(on-realized d
15731566
(fn [val] (success! d' (f val)))
15741567
(fn [error] (error! d' error)))
15751568
d'))
15761569

1577-
(defn- then-apply [d ^Function f]
1578-
(assert-some f)
1579-
(fmap-deferred d #(.apply f %)))
1570+
(defn- completion-stage-result [d f to]
1571+
(when to
1572+
(shallow-connect d to))
1573+
(shallow-chain (or to d) f))
15801574

1581-
(def-async-for then-apply)
1575+
(defn- then-apply [d ^Function f to]
1576+
(assert-some f)
1577+
(completion-stage-result d #(.apply f %) to))
15821578

1583-
(defn- then-accept [d ^Consumer c]
1579+
(defn- then-accept [d ^Consumer c to]
15841580
(assert-some c)
1585-
(fmap-deferred d #(.accept c %)))
1586-
1587-
(def-async-for then-accept)
1581+
(completion-stage-result d #(.accept c %) to))
15881582

1589-
(defn- then-run [d ^Runnable f]
1583+
(defn- then-run [d ^Runnable f to]
15901584
(assert-some f)
1591-
(fmap-deferred d (fn [_] (.run f))))
1592-
1593-
(def-async-for then-run)
1585+
(completion-stage-result d (fn [_] (.run f)) to))
15941586

1595-
1596-
(defn- then-combine [d other ^BiFunction f]
1587+
(defn- then-combine [d other ^BiFunction f to]
15971588
(assert-some other f)
1598-
(fmap-deferred (zip d other)
1599-
(fn [[x y]] (.apply f x y))))
1600-
1601-
(def-async-for-dual then-combine)
1589+
(completion-stage-result (zip d other)
1590+
(fn [[x y]] (.apply f x y))
1591+
to))
16021592

1603-
1604-
(defn- then-accept-both [d other ^BiConsumer f]
1593+
(defn- then-accept-both [d other ^BiConsumer f to]
16051594
(assert-some other f)
1606-
(fmap-deferred (zip d other)
1607-
(fn [[x y]] (.accept f x y))))
1608-
1609-
(def-async-for-dual then-accept-both)
1595+
(completion-stage-result (zip d other)
1596+
(fn [[x y]] (.accept f x y))
1597+
to))
16101598

1611-
1612-
(defn- run-after-both [d other ^Runnable f]
1599+
(defn- run-after-both [d other ^Runnable f to]
16131600
(assert-some other f)
1614-
(fmap-deferred (zip d other)
1615-
(fn [[_ _]] (.run f))))
1616-
1601+
(completion-stage-result (zip d other)
1602+
(fn [[_ _]] (.run f))
1603+
to))
16171604

1618-
(def-async-for-dual run-after-both)
1619-
1620-
1621-
(defn- apply-to-either [d other ^Function f]
1605+
(defn- apply-to-either [d other ^Function f to]
16221606
(assert-some other f)
1623-
(then-apply (alt d other) f))
1624-
1625-
(def-async-for-dual apply-to-either)
1607+
(then-apply (alt d other) f to))
16261608

1627-
1628-
(defn- accept-either [d other ^Function f]
1609+
(defn- accept-either [d other ^Function f to]
16291610
(assert-some other f)
1630-
(then-accept (alt d other) f))
1631-
1632-
(def-async-for-dual accept-either)
1611+
(then-accept (alt d other) f to))
16331612

1634-
1635-
(defn- run-after-either [d other ^Function f]
1613+
(defn- run-after-either [d other ^Function f to]
16361614
(assert-some other f)
1637-
(then-run (alt d other) f))
1638-
1639-
(def-async-for-dual run-after-either)
1615+
(then-run (alt d other) f to))
16401616

1641-
1642-
(defn- then-compose [d ^Function f]
1617+
(defn- then-compose [d ^Function f to]
16431618
(assert-some f)
16441619
(let [d' (deferred)]
1645-
(on-realized d
1646-
(fn [val]
1647-
(on-realized (->deferred (.apply f val))
1648-
#(success! d' %)
1649-
#(error! d' %)))
1650-
(fn [error] (error! d' error)))
1620+
(-> (completion-stage-result d #(->deferred (.apply f %)) to)
1621+
(on-realized (fn [fd]
1622+
(shallow-connect fd d'))
1623+
(fn [error]
1624+
(error! d' error))))
16511625
d'))
16521626

1653-
(def-async-for then-compose)
1654-
1655-
1656-
(defn- then-handle [d ^BiFunction f]
1627+
(defn- then-handle [d ^BiFunction f to]
16571628
(assert-some f)
1629+
;; Can't use `completion-stage-result` here because it only covers
1630+
;; the success case.
1631+
(when to
1632+
(shallow-connect d to))
16581633
(let [d' (deferred)]
16591634
(on-realized
1660-
d
1635+
(or to d)
16611636
(fn [val] (success! d' (.apply f val nil)))
16621637
(fn [error] (success! d' (.apply f nil error))))
16631638
d'))
16641639

1665-
1666-
(def-async-for then-handle)
1667-
1668-
1669-
(defn- then-exceptionally [d ^Function f]
1640+
(defn- then-exceptionally [d ^Function f to]
16701641
(assert-some f)
1642+
;; Can't use `completion-stage-result` here because it only covers
1643+
;; the success case.
1644+
(when to
1645+
(shallow-connect d to))
16711646
(let [d' (deferred)]
16721647
(on-realized
1673-
d
1674-
(fn [val] (success! d' val))
1675-
(fn [error] (success! d' (.apply f error))))
1648+
(or to d)
1649+
(fn [val] (success! d' val))
1650+
(fn [error] (success! d' (.apply f error))))
16761651
d'))
16771652

16781653
(defn- to-completable-future [d]
16791654

1680-
(let [result (CompletableFuture.)]
1655+
(let [to (CompletableFuture.)]
16811656

16821657
(on-realized d
1683-
#(.complete result %)
1684-
#(.completeExceptionally result %))
1658+
#(.complete to %)
1659+
#(.completeExceptionally to %))
16851660

1686-
result))
1661+
to))
16871662

1688-
(defn- when-complete [d ^BiConsumer f]
1663+
(defn- when-complete [d ^BiConsumer f to]
16891664
(assert-some f)
1665+
;; Can't use `completion-stage-result` here because it only covers
1666+
;; the success case.
1667+
(when to
1668+
(shallow-connect d to))
16901669
(let [d' (deferred)]
1691-
(on-realized d
1670+
(on-realized (or to d)
16921671
(fn [val]
16931672
(try (.accept f val nil)
16941673
(success! d' val)
@@ -1701,8 +1680,6 @@
17011680
(error! d' err)))))
17021681
d'))
17031682

1704-
(def-async-for when-complete)
1705-
17061683
;;;
17071684

17081685
(alter-meta! #'->Deferred assoc :private true)

0 commit comments

Comments
 (0)