Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 118 additions & 141 deletions src/manifold/deferred.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@

to-completable-future

when-complete when-complete-async)
when-complete when-complete-async

cs-default-executor)

;; The potemkin abstract type for
;; implementations such as CompletionStage
Expand All @@ -84,94 +86,99 @@
ADeferred
CompletionStage
(thenApply [d f]
(then-apply d f))
(then-apply d f nil))
(thenApplyAsync [d f]
(then-apply-async d f))
(then-apply d f (cs-default-executor)))
(thenApplyAsync [d f executor]
(then-apply-async d f executor))
(then-apply d f executor))

(thenAccept [d f]
(then-accept d f))
(then-accept d f nil))
(thenAcceptAsync [d f]
(then-accept-async d f))
(then-accept d f (cs-default-executor)))
(thenAcceptAsync [d f executor]
(then-accept-async d f executor))
(then-accept d f executor))

(thenRun [d f]
(then-run d f))
(then-run d f nil))
(thenRunAsync [d f]
(then-run-async d f))
(then-run d f (cs-default-executor)))
(thenRunAsync [d f executor]
(then-run-async d f executor))
(then-run d f executor))

(thenCombine [d other f]
(then-combine d other f))
(then-combine d other f nil))
(thenCombineAsync [d other f]
(then-combine-async d other f))
(then-combine d other f (cs-default-executor)))
(thenCombineAsync [d other f executor]
(then-combine-async d other f executor))
(then-combine d other f executor))

(thenAcceptBoth [d other f]
(then-accept-both d other f))
(then-accept-both d other f nil))
(thenAcceptBothAsync [d other f]
(then-accept-both-async d other f))
(then-accept-both d other f (cs-default-executor)))
(thenAcceptBothAsync [d other f executor]
(then-accept-both-async d other f executor))
(then-accept-both d other f executor))

(runAfterBoth [d other f]
(run-after-both d other f))
(run-after-both d other f nil))
(runAfterBothAsync [d other f]
(run-after-both-async d other f))
(run-after-both d other f (cs-default-executor)))
(runAfterBothAsync [d other f executor]
(run-after-both-async d other f executor))
(run-after-both d other f executor))

(applyToEither [d other f]
(apply-to-either d other f))
(apply-to-either d other f nil))
(applyToEitherAsync [d other f]
(apply-to-either-async d other f))
(apply-to-either d other f (cs-default-executor)))
(applyToEitherAsync [d other f executor]
(apply-to-either-async d other f executor))
(apply-to-either d other f executor))

(acceptEither [d other f]
(accept-either d other f))
(accept-either d other f nil))
(acceptEitherAsync [d other f]
(accept-either-async d other f))
(accept-either d other f (cs-default-executor)))
(acceptEitherAsync [d other f executor]
(accept-either-async d other f executor))
(accept-either d other f executor))

(runAfterEither [d other f]
(run-after-either d other f))
(run-after-either d other f nil))
(runAfterEitherAsync [d other f]
(run-after-either-async d other f))
(run-after-either d other f (cs-default-executor)))
(runAfterEitherAsync [d other f executor]
(run-after-either-async d other f executor))
(run-after-either d other f executor))

(thenCompose [d f]
(then-compose d f))
(then-compose d f nil))
(thenComposeAsync [d f]
(then-compose-async d f))
(then-compose d f (cs-default-executor)))
(thenComposeAsync [d f executor]
(then-compose-async d f executor))
(then-compose d f executor))

(handle [d f]
(then-handle d f))
(then-handle d f nil))
(handleAsync [d f]
(then-handle-async d f))
(then-handle d f (cs-default-executor)))
(handleAsync [d f executor]
(then-handle-async d f executor))
(then-handle d f executor))

(exceptionally [d f]
(then-exceptionally d f))

(toCompletableFuture [d]
(to-completable-future d))
(then-exceptionally d f nil))
;; Only available since Java 12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some kind of conditional to add these so they won't get forgotten?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a way to implement methods conditionally depending on the Java version. Are you aware of one? I mainly put these in for completion's sake (no pun intended 😄) so that we could uncomment them in the future when we decide to drop support for Java versions < 12 (which Clojure itself likely will do from 1.13 on).

;; (exceptionallyAsync [d f]
;; (then-exceptionally d f (cs-default-executor)))
;; (exceptionallyAsync [d f executor]
;; (then-exceptionally d executor))

(whenComplete [d f]
(when-complete d f))
(when-complete d f nil))
(whenCompleteAsync [d f]
(when-complete-async d f))
(when-complete d f (cs-default-executor)))
(whenCompleteAsync [d f executor]
(when-complete-async d f executor)))
(when-complete d f executor))

(toCompletableFuture [d]
(to-completable-future d)))

(definline realized?
"Returns true if the manifold deferred is realized."
Expand Down Expand Up @@ -1538,141 +1545,111 @@
;; CompletionStage helper fns
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defmacro ^:no-doc def-async-for
"Defines a CompletionStage async version of the function associated with
the given symbol, with '-async' appended."
[fn-name]
(let [async-name (symbol (str (name fn-name) "-async"))]
`(defn- ~async-name
([d# f#]
(~async-name d# f# (or (ex/executor) (ex/execute-pool))))
([d# f# executor#]
(~fn-name (onto d# executor#) f#)))))

(defmacro ^:no-doc def-async-for-dual
"Defines a CompletionStage async version of the two-deferred
function associated with the given symbol, with '-async' appended."
[fn-name]
(let [async-name (symbol (str (name fn-name) "-async"))]
`(defn- ~async-name
([d# d2# f#]
(~async-name d# d2# f# (or (ex/executor) (ex/execute-pool))))
([d# d2# f# executor#]
(~fn-name (onto d# executor#) d2# f#)))))

(defn- fmap-deferred
"Returns a new deferred with function `f` applies to realized value of `d`.
(Like fmap but for deferreds.)

This function does not unwrap the result of f; it will only be applied to
the immediate value of `d`. This is for mimicking CompletionStage's
behavior."
(defn- cs-default-executor []
(or (ex/executor) (ex/execute-pool)))

(defn- shallow-connect
"Like `connect` but without implicit unwrapping of conveyed value."
[from to]
(on-realized from
(fn [val] (success! to val))
(fn [error] (error! to error))))

(defn- shallow-chain
"Returns a new deferred with function `f` applied to realized value of `d`.

Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the
immediate value of `d`. This is for mimicking `CompletionStage`'s behavior."
[d f]
(let [d' (deferred)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this deferred run on the default executor regardless of where d is getting executed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the executor of d' will be nil which means its callbacks will be invoked on the same executor / thread which realizes it. So basically, d' inherits the executor of d here. This behavior isn't documented very well, alas. The best I could find is this paragraph:

In these cases, we can move the stream or deferred `onto` an executor, guaranteeing that all actions resulting from an operation will be enqueued onto a thread pool rather than immediately executed. This executor can be generated via `manifold.executor/instrumented-executor`, or using the convenience methods `fixed-thread-executor` and `utilization-executor`. In addition to providing automatic instrumentation, these executors will guarantee that any streams or deferred created within their scope will also be "on" that executor. For this reason, it's often sufficient to only call `onto` on a single stream in a topology, as everything downstream of it will transitively be executed on the executor.

The implementation lives here:

(clojure.core/loop []
(when-let [^IDeferredListener l# (.poll ~'listeners)]
(try
(if (nil? ~executor)
(~(if success? `.onSuccess `.onError) ^IDeferredListener l# ~val)
(.execute ~(with-meta executor {:tag "java.util.concurrent.Executor"})
(fn []
(try
(~(if success? `.onSuccess `.onError) l# ~val)
(catch Throwable e#
#_(.printStackTrace e#)
(log/error e# "error in deferred handler"))))))
(catch Throwable e#
#_(.printStackTrace e#)
(log/error e# "error in deferred handler")))
(recur)))

And finally, here's an example which empirically confirms this behavior:

(let [ex (ex/fixed-thread-executor
          1
          {:thread-factory (ex/thread-factory
                            (constantly "my executor")
                            (deliver (promise) nil))})
      d (deferred ex)
      d' (shallow-chain d #(prn :one % (.getName (Thread/currentThread))))]
  (chain d' #(prn :two % (.getName (Thread/currentThread))))
  (success! d :ok))

Output:

:one :ok "my executor"
:two nil "my executor"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every day i learn something new

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here! 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very well pointed! I should probably have documented this detail with a comment in the original implementation, but I'm glad you've written it out explicitly

(on-realized d
(fn [val] (success! d' (f val)))
(fn [error] (error! d' error)))
d'))

(defn- then-apply [d ^Function f]
(assert-some f)
(fmap-deferred d #(.apply f %)))

(def-async-for then-apply)

(defn- then-accept [d ^Consumer c]
(assert-some c)
(fmap-deferred d #(.accept c %)))
(defn- shallow-onto [^IDeferred d executor]
(if (identical? executor (.executor d))
d
(let [d' (deferred executor)]
(shallow-connect d d')
d')))

(def-async-for then-accept)
(defn- shallow-chain-onto [d f executor]
(-> d
(shallow-onto executor)
(shallow-chain f)))

(defn- then-run [d ^Runnable f]
(defn- then-apply [d ^Function f executor]
(assert-some f)
(fmap-deferred d (fn [_] (.run f))))
(shallow-chain-onto d #(.apply f %) executor))

(def-async-for then-run)
(defn- then-accept [d ^Consumer c executor]
(assert-some c)
(shallow-chain-onto d #(.accept c %) executor))

(defn- then-run [d ^Runnable f executor]
(assert-some f)
(shallow-chain-onto d (fn [_] (.run f)) executor))

(defn- then-combine [d other ^BiFunction f]
(defn- then-combine [d other ^BiFunction f executor]
(assert-some other f)
(fmap-deferred (zip d other)
(fn [[x y]] (.apply f x y))))

(def-async-for-dual then-combine)
(shallow-chain-onto (zip d other)
(fn [[x y]] (.apply f x y))
executor))


(defn- then-accept-both [d other ^BiConsumer f]
(defn- then-accept-both [d other ^BiConsumer f executor]
(assert-some other f)
(fmap-deferred (zip d other)
(fn [[x y]] (.accept f x y))))

(def-async-for-dual then-accept-both)
(shallow-chain-onto (zip d other)
(fn [[x y]] (.accept f x y))
executor))


(defn- run-after-both [d other ^Runnable f]
(defn- run-after-both [d other ^Runnable f executor]
(assert-some other f)
(fmap-deferred (zip d other)
(fn [[_ _]] (.run f))))


(def-async-for-dual run-after-both)
(shallow-chain-onto (zip d other)
(fn [[_ _]] (.run f))
executor))


(defn- apply-to-either [d other ^Function f]
(defn- apply-to-either [d other ^Function f executor]
(assert-some other f)
(then-apply (alt d other) f))

(def-async-for-dual apply-to-either)
(then-apply (alt d other) f executor))


(defn- accept-either [d other ^Function f]
(defn- accept-either [d other ^Function f executor]
(assert-some other f)
(then-accept (alt d other) f))

(def-async-for-dual accept-either)

(then-accept (alt d other) f executor))

(defn- run-after-either [d other ^Function f]
(defn- run-after-either [d other ^Function f executor]
(assert-some other f)
(then-run (alt d other) f))

(def-async-for-dual run-after-either)

(then-run (alt d other) f executor))

(defn- then-compose [d ^Function f]
(defn- then-compose [d ^Function f executor]
(assert-some f)
(let [d' (deferred)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still necessary? Shouldn't the provided target (result) be used instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit subtle indeed 😅 The purpose of d' is to unwrap the deferred returned from f (= fd), so AFAICT we still need it. If it helps: thenCompose is basically the bind / flatMap operation of the CompletionStage monad 😬

(on-realized d
(fn [val]
(on-realized (->deferred (.apply f val))
#(success! d' %)
#(error! d' %)))
(fn [error] (error! d' error)))
(-> (shallow-chain-onto d #(->deferred (.apply f %)) executor)
(on-realized (fn [fd]
(shallow-connect fd d'))
(fn [error]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I loved the join . fmap implementation for then-compose using shallow-connect here, well done!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 🙏 Though I have to admit that I'm not fluent in Monadic, so I am not familiar with the join . fmap idiom. The code here is just the nicest way I could come up with to express this operation 😅

(error! d' error))))
d'))

(def-async-for then-compose)


(defn- then-handle [d ^BiFunction f]
(defn- then-handle [d ^BiFunction f executor]
(assert-some f)
;; Can't use `shallow-chain-onto` here because it only covers the success case.
(let [d' (deferred)]
(on-realized
d
(shallow-onto d executor)
(fn [val] (success! d' (.apply f val nil)))
(fn [error] (success! d' (.apply f nil error))))
d'))


(def-async-for then-handle)


(defn- then-exceptionally [d ^Function f]
(defn- then-exceptionally [d ^Function f executor]
(assert-some f)
;; Can't use `shallow-chain-onto` here because it only covers
;; the success case.
(let [d' (deferred)]
(on-realized
d
(fn [val] (success! d' val))
(fn [error] (success! d' (.apply f error))))
(shallow-onto d executor)
(fn [val] (success! d' val))
(fn [error] (success! d' (.apply f error))))
d'))

(defn- to-completable-future [d]
Expand All @@ -1685,10 +1662,12 @@

result))

(defn- when-complete [d ^BiConsumer f]
(defn- when-complete [d ^BiConsumer f executor]
(assert-some f)
;; Can't use `shallow-chain-onto` here because it only covers
;; the success case.
(let [d' (deferred)]
(on-realized d
(on-realized (shallow-onto d executor)
(fn [val]
(try (.accept f val nil)
(success! d' val)
Expand All @@ -1701,8 +1680,6 @@
(error! d' err)))))
d'))

(def-async-for when-complete)

;;;

(alter-meta! #'->Deferred assoc :private true)
Expand Down