diff --git a/src/manifold/deferred.clj b/src/manifold/deferred.clj index 8b7e84d9..ca3f7e19 100644 --- a/src/manifold/deferred.clj +++ b/src/manifold/deferred.clj @@ -74,7 +74,9 @@ to-completable-future - when-complete when-complete-async) + when-complete when-complete-async + + cs-default-executor) ;; The potemkin abstract type for ;; implementations such as CompletionStage @@ -84,94 +86,99 @@ ADeferred CompletionStage (thenApply [d f] - (then-apply d f)) + (then-apply d f nil)) (thenApplyAsync [d f] - (then-apply-async d f)) + (then-apply d f (cs-default-executor))) (thenApplyAsync [d f executor] - (then-apply-async d f executor)) + (then-apply d f executor)) (thenAccept [d f] - (then-accept d f)) + (then-accept d f nil)) (thenAcceptAsync [d f] - (then-accept-async d f)) + (then-accept d f (cs-default-executor))) (thenAcceptAsync [d f executor] - (then-accept-async d f executor)) + (then-accept d f executor)) (thenRun [d f] - (then-run d f)) + (then-run d f nil)) (thenRunAsync [d f] - (then-run-async d f)) + (then-run d f (cs-default-executor))) (thenRunAsync [d f executor] - (then-run-async d f executor)) + (then-run d f executor)) (thenCombine [d other f] - (then-combine d other f)) + (then-combine d other f nil)) (thenCombineAsync [d other f] - (then-combine-async d other f)) + (then-combine d other f (cs-default-executor))) (thenCombineAsync [d other f executor] - (then-combine-async d other f executor)) + (then-combine d other f executor)) (thenAcceptBoth [d other f] - (then-accept-both d other f)) + (then-accept-both d other f nil)) (thenAcceptBothAsync [d other f] - (then-accept-both-async d other f)) + (then-accept-both d other f (cs-default-executor))) (thenAcceptBothAsync [d other f executor] - (then-accept-both-async d other f executor)) + (then-accept-both d other f executor)) (runAfterBoth [d other f] - (run-after-both d other f)) + (run-after-both d other f nil)) (runAfterBothAsync [d other f] - (run-after-both-async d other f)) + (run-after-both d other f (cs-default-executor))) (runAfterBothAsync [d other f executor] - (run-after-both-async d other f executor)) + (run-after-both d other f executor)) (applyToEither [d other f] - (apply-to-either d other f)) + (apply-to-either d other f nil)) (applyToEitherAsync [d other f] - (apply-to-either-async d other f)) + (apply-to-either d other f (cs-default-executor))) (applyToEitherAsync [d other f executor] - (apply-to-either-async d other f executor)) + (apply-to-either d other f executor)) (acceptEither [d other f] - (accept-either d other f)) + (accept-either d other f nil)) (acceptEitherAsync [d other f] - (accept-either-async d other f)) + (accept-either d other f (cs-default-executor))) (acceptEitherAsync [d other f executor] - (accept-either-async d other f executor)) + (accept-either d other f executor)) (runAfterEither [d other f] - (run-after-either d other f)) + (run-after-either d other f nil)) (runAfterEitherAsync [d other f] - (run-after-either-async d other f)) + (run-after-either d other f (cs-default-executor))) (runAfterEitherAsync [d other f executor] - (run-after-either-async d other f executor)) + (run-after-either d other f executor)) (thenCompose [d f] - (then-compose d f)) + (then-compose d f nil)) (thenComposeAsync [d f] - (then-compose-async d f)) + (then-compose d f (cs-default-executor))) (thenComposeAsync [d f executor] - (then-compose-async d f executor)) + (then-compose d f executor)) (handle [d f] - (then-handle d f)) + (then-handle d f nil)) (handleAsync [d f] - (then-handle-async d f)) + (then-handle d f (cs-default-executor))) (handleAsync [d f executor] - (then-handle-async d f executor)) + (then-handle d f executor)) (exceptionally [d f] - (then-exceptionally d f)) - - (toCompletableFuture [d] - (to-completable-future d)) + (then-exceptionally d f nil)) + ;; Only available since Java 12 + ;; (exceptionallyAsync [d f] + ;; (then-exceptionally d f (cs-default-executor))) + ;; (exceptionallyAsync [d f executor] + ;; (then-exceptionally d executor)) (whenComplete [d f] - (when-complete d f)) + (when-complete d f nil)) (whenCompleteAsync [d f] - (when-complete-async d f)) + (when-complete d f (cs-default-executor))) (whenCompleteAsync [d f executor] - (when-complete-async d f executor))) + (when-complete d f executor)) + + (toCompletableFuture [d] + (to-completable-future d))) (definline realized? "Returns true if the manifold deferred is realized." @@ -1538,35 +1545,21 @@ ;; CompletionStage helper fns ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defmacro ^:no-doc def-async-for - "Defines a CompletionStage async version of the function associated with - the given symbol, with '-async' appended." - [fn-name] - (let [async-name (symbol (str (name fn-name) "-async"))] - `(defn- ~async-name - ([d# f#] - (~async-name d# f# (or (ex/executor) (ex/execute-pool)))) - ([d# f# executor#] - (~fn-name (onto d# executor#) f#))))) - -(defmacro ^:no-doc def-async-for-dual - "Defines a CompletionStage async version of the two-deferred - function associated with the given symbol, with '-async' appended." - [fn-name] - (let [async-name (symbol (str (name fn-name) "-async"))] - `(defn- ~async-name - ([d# d2# f#] - (~async-name d# d2# f# (or (ex/executor) (ex/execute-pool)))) - ([d# d2# f# executor#] - (~fn-name (onto d# executor#) d2# f#))))) - -(defn- fmap-deferred - "Returns a new deferred with function `f` applies to realized value of `d`. - (Like fmap but for deferreds.) - - This function does not unwrap the result of f; it will only be applied to - the immediate value of `d`. This is for mimicking CompletionStage's - behavior." +(defn- cs-default-executor [] + (or (ex/executor) (ex/execute-pool))) + +(defn- shallow-connect + "Like `connect` but without implicit unwrapping of conveyed value." + [from to] + (on-realized from + (fn [val] (success! to val)) + (fn [error] (error! to error)))) + +(defn- shallow-chain + "Returns a new deferred with function `f` applied to realized value of `d`. + + Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the + immediate value of `d`. This is for mimicking `CompletionStage`'s behavior." [d f] (let [d' (deferred)] (on-realized d @@ -1574,105 +1567,89 @@ (fn [error] (error! d' error))) d')) -(defn- then-apply [d ^Function f] - (assert-some f) - (fmap-deferred d #(.apply f %))) - -(def-async-for then-apply) - -(defn- then-accept [d ^Consumer c] - (assert-some c) - (fmap-deferred d #(.accept c %))) +(defn- shallow-onto [^IDeferred d executor] + (if (identical? executor (.executor d)) + d + (let [d' (deferred executor)] + (shallow-connect d d') + d'))) -(def-async-for then-accept) +(defn- shallow-chain-onto [d f executor] + (-> d + (shallow-onto executor) + (shallow-chain f))) -(defn- then-run [d ^Runnable f] +(defn- then-apply [d ^Function f executor] (assert-some f) - (fmap-deferred d (fn [_] (.run f)))) + (shallow-chain-onto d #(.apply f %) executor)) -(def-async-for then-run) +(defn- then-accept [d ^Consumer c executor] + (assert-some c) + (shallow-chain-onto d #(.accept c %) executor)) +(defn- then-run [d ^Runnable f executor] + (assert-some f) + (shallow-chain-onto d (fn [_] (.run f)) executor)) -(defn- then-combine [d other ^BiFunction f] +(defn- then-combine [d other ^BiFunction f executor] (assert-some other f) - (fmap-deferred (zip d other) - (fn [[x y]] (.apply f x y)))) - -(def-async-for-dual then-combine) + (shallow-chain-onto (zip d other) + (fn [[x y]] (.apply f x y)) + executor)) - -(defn- then-accept-both [d other ^BiConsumer f] +(defn- then-accept-both [d other ^BiConsumer f executor] (assert-some other f) - (fmap-deferred (zip d other) - (fn [[x y]] (.accept f x y)))) - -(def-async-for-dual then-accept-both) + (shallow-chain-onto (zip d other) + (fn [[x y]] (.accept f x y)) + executor)) - -(defn- run-after-both [d other ^Runnable f] +(defn- run-after-both [d other ^Runnable f executor] (assert-some other f) - (fmap-deferred (zip d other) - (fn [[_ _]] (.run f)))) - - -(def-async-for-dual run-after-both) + (shallow-chain-onto (zip d other) + (fn [[_ _]] (.run f)) + executor)) - -(defn- apply-to-either [d other ^Function f] +(defn- apply-to-either [d other ^Function f executor] (assert-some other f) - (then-apply (alt d other) f)) - -(def-async-for-dual apply-to-either) + (then-apply (alt d other) f executor)) - -(defn- accept-either [d other ^Function f] +(defn- accept-either [d other ^Function f executor] (assert-some other f) - (then-accept (alt d other) f)) - -(def-async-for-dual accept-either) - + (then-accept (alt d other) f executor)) -(defn- run-after-either [d other ^Function f] +(defn- run-after-either [d other ^Function f executor] (assert-some other f) - (then-run (alt d other) f)) - -(def-async-for-dual run-after-either) - + (then-run (alt d other) f executor)) -(defn- then-compose [d ^Function f] +(defn- then-compose [d ^Function f executor] (assert-some f) (let [d' (deferred)] - (on-realized d - (fn [val] - (on-realized (->deferred (.apply f val)) - #(success! d' %) - #(error! d' %))) - (fn [error] (error! d' error))) + (-> (shallow-chain-onto d #(->deferred (.apply f %)) executor) + (on-realized (fn [fd] + (shallow-connect fd d')) + (fn [error] + (error! d' error)))) d')) -(def-async-for then-compose) - - -(defn- then-handle [d ^BiFunction f] +(defn- then-handle [d ^BiFunction f executor] (assert-some f) + ;; Can't use `shallow-chain-onto` here because it only covers the success case. (let [d' (deferred)] (on-realized - d + (shallow-onto d executor) (fn [val] (success! d' (.apply f val nil))) (fn [error] (success! d' (.apply f nil error)))) d')) - -(def-async-for then-handle) - - -(defn- then-exceptionally [d ^Function f] +(defn- then-exceptionally [d ^Function f executor] (assert-some f) + ;; Can't use `shallow-chain-onto` here because it only covers + ;; the success case. (let [d' (deferred)] (on-realized - d - (fn [val] (success! d' val)) - (fn [error] (success! d' (.apply f error)))) + (shallow-onto d executor) + (fn [val] (success! d' val)) + (fn [error] (success! d' (.apply f error)))) d')) (defn- to-completable-future [d] @@ -1685,10 +1662,12 @@ result)) -(defn- when-complete [d ^BiConsumer f] +(defn- when-complete [d ^BiConsumer f executor] (assert-some f) + ;; Can't use `shallow-chain-onto` here because it only covers + ;; the success case. (let [d' (deferred)] - (on-realized d + (on-realized (shallow-onto d executor) (fn [val] (try (.accept f val nil) (success! d' val) @@ -1701,8 +1680,6 @@ (error! d' err))))) d')) -(def-async-for when-complete) - ;;; (alter-meta! #'->Deferred assoc :private true)