Skip to content

Commit 4256055

Browse files
committed
Fix dropped errors in CompletionStage's *EitherAsync methods
For `*Async` variants of `CompletionStage` methods, the (first) deferred argument was wrapped with `onto` so that its callbacks are invoked on the given `executor`. This results in a new deferred connected to the original one. In case of the `*EitherAsync` methods, that new deferred would then leak when `alt` chooses the `other` one but the `onto` deferred ends up in an error state since there was no way for the caller to attach an error handler to it. Furthermore, for async variants of methods accepting two completion stages, `f` was never executed on the given executor. For example, `then-combine` was defined like this: (defn- then-combine [d other ^BiFunction f] (assert-some other f) (fmap-deferred (zip d other) (fn [[x y]] (.apply f x y)))) Now `then-combine-async` (defined via `def-async-for-dual`) would wrap `d` with `(onto d executor)` before calling `then-combine`. However, the deferred passed to `fmap-deferred` was the one returned from `(zip d other)` which would not have the given executor attached. Thus, `f` would have been executed on a different executor than desired. This patch addresses both issues by adding a `result` deferred as the final argument to every method implementation. This deferred is bound to the desired `executor`. The operation `f` is then attached as a callback to that `result` deferred and the deferred of the main operation is connected to it. For synchronous implementations, `nil` can be passed as `result` in which case the callback is attached directly to the main operation deferred to save some overhead. This means that now the original deferred is passed to `alt` so that all error states can be handled by the caller. It also removes the `def-async-for` and `def-async-for-dual` macros and opts to instead always explicitly pass the `result` deferred in all method implementations. This isn't much more verbose but hopefully serves to make the code bit easier to follow. Finally, it renames `fmap-deferred` to `shallow-chain` for consistency with the established `chain` naming and introduces the analogous `shallow-connect` (also private for now).
1 parent cff7428 commit 4256055

File tree

1 file changed

+118
-141
lines changed

1 file changed

+118
-141
lines changed

src/manifold/deferred.clj

Lines changed: 118 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@
7474

7575
to-completable-future
7676

77-
when-complete when-complete-async)
77+
when-complete when-complete-async
78+
79+
deferred cs-default-executor)
7880

7981
;; The potemkin abstract type for
8082
;; implementations such as CompletionStage
@@ -84,94 +86,99 @@
8486
ADeferred
8587
CompletionStage
8688
(thenApply [d f]
87-
(then-apply d f))
89+
(then-apply d f nil))
8890
(thenApplyAsync [d f]
89-
(then-apply-async d f))
91+
(then-apply d f (deferred (cs-default-executor))))
9092
(thenApplyAsync [d f executor]
91-
(then-apply-async d f executor))
93+
(then-apply d f (deferred executor)))
9294

9395
(thenAccept [d f]
94-
(then-accept d f))
96+
(then-accept d f nil))
9597
(thenAcceptAsync [d f]
96-
(then-accept-async d f))
98+
(then-accept d f (deferred (cs-default-executor))))
9799
(thenAcceptAsync [d f executor]
98-
(then-accept-async d f executor))
100+
(then-accept d f (deferred executor)))
99101

100102
(thenRun [d f]
101-
(then-run d f))
103+
(then-run d f nil))
102104
(thenRunAsync [d f]
103-
(then-run-async d f))
105+
(then-run d f (deferred (cs-default-executor))))
104106
(thenRunAsync [d f executor]
105-
(then-run-async d f executor))
107+
(then-run d f (deferred executor)))
106108

107109
(thenCombine [d other f]
108-
(then-combine d other f))
110+
(then-combine d other f nil))
109111
(thenCombineAsync [d other f]
110-
(then-combine-async d other f))
112+
(then-combine d other f (deferred (cs-default-executor))))
111113
(thenCombineAsync [d other f executor]
112-
(then-combine-async d other f executor))
114+
(then-combine d other f (deferred executor)))
113115

114116
(thenAcceptBoth [d other f]
115-
(then-accept-both d other f))
117+
(then-accept-both d other f nil))
116118
(thenAcceptBothAsync [d other f]
117-
(then-accept-both-async d other f))
119+
(then-accept-both d other f (deferred (cs-default-executor))))
118120
(thenAcceptBothAsync [d other f executor]
119-
(then-accept-both-async d other f executor))
121+
(then-accept-both d other f (deferred executor)))
120122

121123
(runAfterBoth [d other f]
122-
(run-after-both d other f))
124+
(run-after-both d other f nil))
123125
(runAfterBothAsync [d other f]
124-
(run-after-both-async d other f))
126+
(run-after-both d other f (deferred (cs-default-executor))))
125127
(runAfterBothAsync [d other f executor]
126-
(run-after-both-async d other f executor))
128+
(run-after-both d other f (deferred executor)))
127129

128130
(applyToEither [d other f]
129-
(apply-to-either d other f))
131+
(apply-to-either d other f nil))
130132
(applyToEitherAsync [d other f]
131-
(apply-to-either-async d other f))
133+
(apply-to-either d other f (deferred (cs-default-executor))))
132134
(applyToEitherAsync [d other f executor]
133-
(apply-to-either-async d other f executor))
135+
(apply-to-either d other f (deferred executor)))
134136

135137
(acceptEither [d other f]
136-
(accept-either d other f))
138+
(accept-either d other f nil))
137139
(acceptEitherAsync [d other f]
138-
(accept-either-async d other f))
140+
(accept-either d other f (deferred (cs-default-executor))))
139141
(acceptEitherAsync [d other f executor]
140-
(accept-either-async d other f executor))
142+
(accept-either d other f (deferred executor)))
141143

142144
(runAfterEither [d other f]
143-
(run-after-either d other f))
145+
(run-after-either d other f nil))
144146
(runAfterEitherAsync [d other f]
145-
(run-after-either-async d other f))
147+
(run-after-either d other f (deferred (cs-default-executor))))
146148
(runAfterEitherAsync [d other f executor]
147-
(run-after-either-async d other f executor))
149+
(run-after-either d other f (deferred executor)))
148150

149151
(thenCompose [d f]
150-
(then-compose d f))
152+
(then-compose d f nil))
151153
(thenComposeAsync [d f]
152-
(then-compose-async d f))
154+
(then-compose d f (deferred (cs-default-executor))))
153155
(thenComposeAsync [d f executor]
154-
(then-compose-async d f executor))
156+
(then-compose d f (deferred executor)))
155157

156158
(handle [d f]
157-
(then-handle d f))
159+
(then-handle d f nil))
158160
(handleAsync [d f]
159-
(then-handle-async d f))
161+
(then-handle d f (deferred (cs-default-executor))))
160162
(handleAsync [d f executor]
161-
(then-handle-async d f executor))
163+
(then-handle d f (deferred executor)))
162164

163165
(exceptionally [d f]
164-
(then-exceptionally d f))
165-
166-
(toCompletableFuture [d]
167-
(to-completable-future d))
166+
(then-exceptionally d f nil))
167+
;; Only available since Java 12
168+
;; (exceptionallyAsync [d f]
169+
;; (then-exceptionally d f (deferred (cs-default-executor))))
170+
;; (exceptionallyAsync [d f executor]
171+
;; (then-exceptionally d (deferred f)))
168172

169173
(whenComplete [d f]
170-
(when-complete d f))
174+
(when-complete d f nil))
171175
(whenCompleteAsync [d f]
172-
(when-complete-async d f))
176+
(when-complete d f (deferred (cs-default-executor))))
173177
(whenCompleteAsync [d f executor]
174-
(when-complete-async d f executor)))
178+
(when-complete d f (deferred executor)))
179+
180+
(toCompletableFuture [d]
181+
(to-completable-future d)))
175182

176183
(definline realized?
177184
"Returns true if the manifold deferred is realized."
@@ -1532,141 +1539,109 @@
15321539
;; CompletionStage helper fns
15331540
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
15341541

1535-
(defmacro ^:no-doc def-async-for
1536-
"Defines a CompletionStage async version of the function associated with
1537-
the given symbol, with '-async' appended."
1538-
[fn-name]
1539-
(let [async-name (symbol (str (name fn-name) "-async"))]
1540-
`(defn- ~async-name
1541-
([d# f#]
1542-
(~async-name d# f# (or (ex/executor) (ex/execute-pool))))
1543-
([d# f# executor#]
1544-
(~fn-name (onto d# executor#) f#)))))
1545-
1546-
(defmacro ^:no-doc def-async-for-dual
1547-
"Defines a CompletionStage async version of the two-deferred
1548-
function associated with the given symbol, with '-async' appended."
1549-
[fn-name]
1550-
(let [async-name (symbol (str (name fn-name) "-async"))]
1551-
`(defn- ~async-name
1552-
([d# d2# f#]
1553-
(~async-name d# d2# f# (or (ex/executor) (ex/execute-pool))))
1554-
([d# d2# f# executor#]
1555-
(~fn-name (onto d# executor#) d2# f#)))))
1556-
1557-
(defn- fmap-deferred
1558-
"Returns a new deferred with function `f` applies to realized value of `d`.
1559-
(Like fmap but for deferreds.)
1560-
1561-
This function does not unwrap the result of f; it will only be applied to
1562-
the immediate value of `d`. This is for mimicking CompletionStage's
1563-
behavior."
1542+
(defn- cs-default-executor []
1543+
(or (ex/executor) (ex/execute-pool)))
1544+
1545+
(defn- shallow-connect
1546+
"Like `connect` but without implicit unwrapping of conveyed value."
1547+
[from to]
1548+
(on-realized from
1549+
(fn [val] (success! to val))
1550+
(fn [error] (error! to error))))
1551+
1552+
(defn- shallow-chain
1553+
"Returns a new deferred with function `f` applied to realized value of `d`.
1554+
1555+
Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the
1556+
immediate value of `d`. This is for mimicking `CompletionStage`'s behavior."
15641557
[d f]
15651558
(let [d' (deferred)]
15661559
(on-realized d
15671560
(fn [val] (success! d' (f val)))
15681561
(fn [error] (error! d' error)))
15691562
d'))
15701563

1571-
(defn- then-apply [d ^Function f]
1572-
(assert-some f)
1573-
(fmap-deferred d #(.apply f %)))
1564+
(defn- completion-stage-result [d f result]
1565+
(when result
1566+
(shallow-connect d result))
1567+
(shallow-chain (or result d) f))
15741568

1575-
(def-async-for then-apply)
1569+
(defn- then-apply [d ^Function f result]
1570+
(assert-some f)
1571+
(completion-stage-result d #(.apply f %) result))
15761572

1577-
(defn- then-accept [d ^Consumer c]
1573+
(defn- then-accept [d ^Consumer c result]
15781574
(assert-some c)
1579-
(fmap-deferred d #(.accept c %)))
1580-
1581-
(def-async-for then-accept)
1575+
(completion-stage-result d #(.accept c %) result))
15821576

1583-
(defn- then-run [d ^Runnable f]
1577+
(defn- then-run [d ^Runnable f result]
15841578
(assert-some f)
1585-
(fmap-deferred d (fn [_] (.run f))))
1586-
1587-
(def-async-for then-run)
1579+
(completion-stage-result d (fn [_] (.run f)) result))
15881580

1589-
1590-
(defn- then-combine [d other ^BiFunction f]
1581+
(defn- then-combine [d other ^BiFunction f result]
15911582
(assert-some other f)
1592-
(fmap-deferred (zip d other)
1593-
(fn [[x y]] (.apply f x y))))
1594-
1595-
(def-async-for-dual then-combine)
1583+
(completion-stage-result (zip d other)
1584+
(fn [[x y]] (.apply f x y))
1585+
result))
15961586

1597-
1598-
(defn- then-accept-both [d other ^BiConsumer f]
1587+
(defn- then-accept-both [d other ^BiConsumer f result]
15991588
(assert-some other f)
1600-
(fmap-deferred (zip d other)
1601-
(fn [[x y]] (.accept f x y))))
1602-
1603-
(def-async-for-dual then-accept-both)
1589+
(completion-stage-result (zip d other)
1590+
(fn [[x y]] (.accept f x y))
1591+
result))
16041592

1605-
1606-
(defn- run-after-both [d other ^Runnable f]
1593+
(defn- run-after-both [d other ^Runnable f result]
16071594
(assert-some other f)
1608-
(fmap-deferred (zip d other)
1609-
(fn [[_ _]] (.run f))))
1610-
1595+
(completion-stage-result (zip d other)
1596+
(fn [[_ _]] (.run f))
1597+
result))
16111598

1612-
(def-async-for-dual run-after-both)
1613-
1614-
1615-
(defn- apply-to-either [d other ^Function f]
1599+
(defn- apply-to-either [d other ^Function f result]
16161600
(assert-some other f)
1617-
(then-apply (alt d other) f))
1618-
1619-
(def-async-for-dual apply-to-either)
1601+
(then-apply (alt d other) f result))
16201602

1621-
1622-
(defn- accept-either [d other ^Function f]
1603+
(defn- accept-either [d other ^Function f result]
16231604
(assert-some other f)
1624-
(then-accept (alt d other) f))
1625-
1626-
(def-async-for-dual accept-either)
1605+
(then-accept (alt d other) f result))
16271606

1628-
1629-
(defn- run-after-either [d other ^Function f]
1607+
(defn- run-after-either [d other ^Function f result]
16301608
(assert-some other f)
1631-
(then-run (alt d other) f))
1632-
1633-
(def-async-for-dual run-after-either)
1609+
(then-run (alt d other) f result))
16341610

1635-
1636-
(defn- then-compose [d ^Function f]
1611+
(defn- then-compose [d ^Function f result]
16371612
(assert-some f)
16381613
(let [d' (deferred)]
1639-
(on-realized d
1640-
(fn [val]
1641-
(on-realized (->deferred (.apply f val))
1642-
#(success! d' %)
1643-
#(error! d' %)))
1644-
(fn [error] (error! d' error)))
1614+
(-> (completion-stage-result d #(->deferred (.apply f %)) result)
1615+
(on-realized (fn [fd]
1616+
(shallow-connect fd d'))
1617+
(fn [error]
1618+
(error! d' error))))
16451619
d'))
16461620

1647-
(def-async-for then-compose)
1648-
1649-
1650-
(defn- then-handle [d ^BiFunction f]
1621+
(defn- then-handle [d ^BiFunction f result]
16511622
(assert-some f)
1623+
;; Can't use `completion-stage-result` here because it only covers
1624+
;; the success case.
1625+
(when result
1626+
(shallow-connect d result))
16521627
(let [d' (deferred)]
16531628
(on-realized
1654-
d
1629+
(or result d)
16551630
(fn [val] (success! d' (.apply f val nil)))
16561631
(fn [error] (success! d' (.apply f nil error))))
16571632
d'))
16581633

1659-
1660-
(def-async-for then-handle)
1661-
1662-
1663-
(defn- then-exceptionally [d ^Function f]
1634+
(defn- then-exceptionally [d ^Function f result]
16641635
(assert-some f)
1636+
;; Can't use `completion-stage-result` here because it only covers
1637+
;; the success case.
1638+
(when result
1639+
(shallow-connect d result))
16651640
(let [d' (deferred)]
16661641
(on-realized
1667-
d
1668-
(fn [val] (success! d' val))
1669-
(fn [error] (success! d' (.apply f error))))
1642+
(or result d)
1643+
(fn [val] (success! d' val))
1644+
(fn [error] (success! d' (.apply f error))))
16701645
d'))
16711646

16721647
(defn- to-completable-future [d]
@@ -1679,10 +1654,14 @@
16791654

16801655
result))
16811656

1682-
(defn- when-complete [d ^BiConsumer f]
1657+
(defn- when-complete [d ^BiConsumer f result]
16831658
(assert-some f)
1659+
;; Can't use `completion-stage-result` here because it only covers
1660+
;; the success case.
1661+
(when result
1662+
(shallow-connect d result))
16841663
(let [d' (deferred)]
1685-
(on-realized d
1664+
(on-realized (or result d)
16861665
(fn [val]
16871666
(try (.accept f val nil)
16881667
(success! d' val)
@@ -1695,8 +1674,6 @@
16951674
(error! d' err)))))
16961675
d'))
16971676

1698-
(def-async-for when-complete)
1699-
17001677
;;;
17011678

17021679
(alter-meta! #'->Deferred assoc :private true)

0 commit comments

Comments
 (0)