Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 122 additions & 145 deletions src/manifold/deferred.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@

to-completable-future

when-complete when-complete-async)
when-complete when-complete-async

deferred cs-default-executor)

;; The potemkin abstract type for
;; implementations such as CompletionStage
Expand All @@ -84,94 +86,99 @@
ADeferred
CompletionStage
(thenApply [d f]
(then-apply d f))
(then-apply d f nil))
(thenApplyAsync [d f]
(then-apply-async d f))
(then-apply d f (deferred (cs-default-executor))))
(thenApplyAsync [d f executor]
(then-apply-async d f executor))
(then-apply d f (deferred executor)))

(thenAccept [d f]
(then-accept d f))
(then-accept d f nil))
(thenAcceptAsync [d f]
(then-accept-async d f))
(then-accept d f (deferred (cs-default-executor))))
(thenAcceptAsync [d f executor]
(then-accept-async d f executor))
(then-accept d f (deferred executor)))

(thenRun [d f]
(then-run d f))
(then-run d f nil))
(thenRunAsync [d f]
(then-run-async d f))
(then-run d f (deferred (cs-default-executor))))
(thenRunAsync [d f executor]
(then-run-async d f executor))
(then-run d f (deferred executor)))

(thenCombine [d other f]
(then-combine d other f))
(then-combine d other f nil))
(thenCombineAsync [d other f]
(then-combine-async d other f))
(then-combine d other f (deferred (cs-default-executor))))
(thenCombineAsync [d other f executor]
(then-combine-async d other f executor))
(then-combine d other f (deferred executor)))

(thenAcceptBoth [d other f]
(then-accept-both d other f))
(then-accept-both d other f nil))
(thenAcceptBothAsync [d other f]
(then-accept-both-async d other f))
(then-accept-both d other f (deferred (cs-default-executor))))
(thenAcceptBothAsync [d other f executor]
(then-accept-both-async d other f executor))
(then-accept-both d other f (deferred executor)))

(runAfterBoth [d other f]
(run-after-both d other f))
(run-after-both d other f nil))
(runAfterBothAsync [d other f]
(run-after-both-async d other f))
(run-after-both d other f (deferred (cs-default-executor))))
(runAfterBothAsync [d other f executor]
(run-after-both-async d other f executor))
(run-after-both d other f (deferred executor)))

(applyToEither [d other f]
(apply-to-either d other f))
(apply-to-either d other f nil))
(applyToEitherAsync [d other f]
(apply-to-either-async d other f))
(apply-to-either d other f (deferred (cs-default-executor))))
(applyToEitherAsync [d other f executor]
(apply-to-either-async d other f executor))
(apply-to-either d other f (deferred executor)))

(acceptEither [d other f]
(accept-either d other f))
(accept-either d other f nil))
(acceptEitherAsync [d other f]
(accept-either-async d other f))
(accept-either d other f (deferred (cs-default-executor))))
(acceptEitherAsync [d other f executor]
(accept-either-async d other f executor))
(accept-either d other f (deferred executor)))

(runAfterEither [d other f]
(run-after-either d other f))
(run-after-either d other f nil))
(runAfterEitherAsync [d other f]
(run-after-either-async d other f))
(run-after-either d other f (deferred (cs-default-executor))))
(runAfterEitherAsync [d other f executor]
(run-after-either-async d other f executor))
(run-after-either d other f (deferred executor)))

(thenCompose [d f]
(then-compose d f))
(then-compose d f nil))
(thenComposeAsync [d f]
(then-compose-async d f))
(then-compose d f (deferred (cs-default-executor))))
(thenComposeAsync [d f executor]
(then-compose-async d f executor))
(then-compose d f (deferred executor)))

(handle [d f]
(then-handle d f))
(then-handle d f nil))
(handleAsync [d f]
(then-handle-async d f))
(then-handle d f (deferred (cs-default-executor))))
(handleAsync [d f executor]
(then-handle-async d f executor))
(then-handle d f (deferred executor)))

(exceptionally [d f]
(then-exceptionally d f))

(toCompletableFuture [d]
(to-completable-future d))
(then-exceptionally d f nil))
;; Only available since Java 12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some kind of conditional to add these so they won't get forgotten?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a way to implement methods conditionally depending on the Java version. Are you aware of one? I mainly put these in for completion's sake (no pun intended 😄) so that we could uncomment them in the future when we decide to drop support for Java versions < 12 (which Clojure itself likely will do from 1.13 on).

;; (exceptionallyAsync [d f]
;; (then-exceptionally d f (deferred (cs-default-executor))))
;; (exceptionallyAsync [d f executor]
;; (then-exceptionally d (deferred executor)))

(whenComplete [d f]
(when-complete d f))
(when-complete d f nil))
(whenCompleteAsync [d f]
(when-complete-async d f))
(when-complete d f (deferred (cs-default-executor))))
(whenCompleteAsync [d f executor]
(when-complete-async d f executor)))
(when-complete d f (deferred executor)))

(toCompletableFuture [d]
(to-completable-future d)))

(definline realized?
"Returns true if the manifold deferred is realized."
Expand Down Expand Up @@ -1538,157 +1545,129 @@
;; CompletionStage helper fns
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defmacro ^:no-doc def-async-for
"Defines a CompletionStage async version of the function associated with
the given symbol, with '-async' appended."
[fn-name]
(let [async-name (symbol (str (name fn-name) "-async"))]
`(defn- ~async-name
([d# f#]
(~async-name d# f# (or (ex/executor) (ex/execute-pool))))
([d# f# executor#]
(~fn-name (onto d# executor#) f#)))))

(defmacro ^:no-doc def-async-for-dual
"Defines a CompletionStage async version of the two-deferred
function associated with the given symbol, with '-async' appended."
[fn-name]
(let [async-name (symbol (str (name fn-name) "-async"))]
`(defn- ~async-name
([d# d2# f#]
(~async-name d# d2# f# (or (ex/executor) (ex/execute-pool))))
([d# d2# f# executor#]
(~fn-name (onto d# executor#) d2# f#)))))

(defn- fmap-deferred
"Returns a new deferred with function `f` applies to realized value of `d`.
(Like fmap but for deferreds.)
This function does not unwrap the result of f; it will only be applied to
the immediate value of `d`. This is for mimicking CompletionStage's
behavior."
(defn- cs-default-executor []
(or (ex/executor) (ex/execute-pool)))

(defn- shallow-connect
"Like `connect` but without implicit unwrapping of conveyed value."
[from to]
(on-realized from
(fn [val] (success! to val))
(fn [error] (error! to error))))

(defn- shallow-chain
"Returns a new deferred with function `f` applied to realized value of `d`.
Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the
immediate value of `d`. This is for mimicking `CompletionStage`'s behavior."
[d f]
(let [d' (deferred)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this deferred run on the default executor regardless of where d is getting executed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the executor of d' will be nil which means its callbacks will be invoked on the same executor / thread which realizes it. So basically, d' inherits the executor of d here. This behavior isn't documented very well, alas. The best I could find is this paragraph:

In these cases, we can move the stream or deferred `onto` an executor, guaranteeing that all actions resulting from an operation will be enqueued onto a thread pool rather than immediately executed. This executor can be generated via `manifold.executor/instrumented-executor`, or using the convenience methods `fixed-thread-executor` and `utilization-executor`. In addition to providing automatic instrumentation, these executors will guarantee that any streams or deferred created within their scope will also be "on" that executor. For this reason, it's often sufficient to only call `onto` on a single stream in a topology, as everything downstream of it will transitively be executed on the executor.

The implementation lives here:

(clojure.core/loop []
(when-let [^IDeferredListener l# (.poll ~'listeners)]
(try
(if (nil? ~executor)
(~(if success? `.onSuccess `.onError) ^IDeferredListener l# ~val)
(.execute ~(with-meta executor {:tag "java.util.concurrent.Executor"})
(fn []
(try
(~(if success? `.onSuccess `.onError) l# ~val)
(catch Throwable e#
#_(.printStackTrace e#)
(log/error e# "error in deferred handler"))))))
(catch Throwable e#
#_(.printStackTrace e#)
(log/error e# "error in deferred handler")))
(recur)))

And finally, here's an example which empirically confirms this behavior:

(let [ex (ex/fixed-thread-executor
          1
          {:thread-factory (ex/thread-factory
                            (constantly "my executor")
                            (deliver (promise) nil))})
      d (deferred ex)
      d' (shallow-chain d #(prn :one % (.getName (Thread/currentThread))))]
  (chain d' #(prn :two % (.getName (Thread/currentThread))))
  (success! d :ok))

Output:

:one :ok "my executor"
:two nil "my executor"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every day i learn something new

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here! 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very well pointed! I should probably have documented this detail with a comment in the original implementation, but I'm glad you've written it out explicitly

(on-realized d
(fn [val] (success! d' (f val)))
(fn [error] (error! d' error)))
d'))

(defn- then-apply [d ^Function f]
(assert-some f)
(fmap-deferred d #(.apply f %)))
(defn- completion-stage-result [d f to]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I'm all in for dropping the haskell-like fmap terminology and going for more familiar manifold terms (I loved your decision with shallow-chain), but can't we name this completion-stage-result function to something more meaningful/specifc?
Perhaps shallow-chain-with or shallow-chain-into? I don't think it deserves a documentation string since its implementation is very simple and shallow-chain is well explained (thanks for re-wording its docs btw), but I think this code would benefit from a better name for this, since it's used throughout the implementation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I'm all in for dropping the haskell-like fmap terminology and going for more familiar manifold terms (I loved your decision with shallow-chain),

Thanks, glad you like it, too 😊 Always feels a bit intrusive to rename functions like that but harmonizing it with the existing manifold terminology seemed prudent.

but can't we name this completion-stage-result function to something more meaningful/specifc?

Ah yes, thanks for noticing! I'm not too happy with that name myself but forgot to mention it in the description.

Perhaps shallow-chain-with or shallow-chain-into? I don't think it deserves a documentation string since its implementation is very simple and shallow-chain is well explained (thanks for re-wording its docs btw), but I think this code would benefit from a better name for this, since it's used throughout the implementation.

Good ideas. Perhaps even shallow-chain-onto to bring the onto back in? Looking at it now, we could actually change the signature to accept an executor instead of a deferred, making the onto name even more fitting. Originally, I was passing in a deferred even in the synchronous case but as you can see, I have since refactored it so that we're passing nil instead now. Will give this a try!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed 9fb23ad as a fixup commit (will squash before merge later). I realized that I could further decompose it so that we now also have shallow-onto 😄 WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is much better, lgtm

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm much happier with this now, too. Are you still reviewing? Otherwise, would you be so kind to hit "approve" on the PR for posterity? 🙏

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot!

(when to
(shallow-connect d to))
(shallow-chain (or to d) f))

(def-async-for then-apply)
(defn- then-apply [d ^Function f to]
(assert-some f)
(completion-stage-result d #(.apply f %) to))

(defn- then-accept [d ^Consumer c]
(defn- then-accept [d ^Consumer c to]
(assert-some c)
(fmap-deferred d #(.accept c %)))

(def-async-for then-accept)
(completion-stage-result d #(.accept c %) to))

(defn- then-run [d ^Runnable f]
(defn- then-run [d ^Runnable f to]
(assert-some f)
(fmap-deferred d (fn [_] (.run f))))

(def-async-for then-run)
(completion-stage-result d (fn [_] (.run f)) to))


(defn- then-combine [d other ^BiFunction f]
(defn- then-combine [d other ^BiFunction f to]
(assert-some other f)
(fmap-deferred (zip d other)
(fn [[x y]] (.apply f x y))))

(def-async-for-dual then-combine)
(completion-stage-result (zip d other)
(fn [[x y]] (.apply f x y))
to))


(defn- then-accept-both [d other ^BiConsumer f]
(defn- then-accept-both [d other ^BiConsumer f to]
(assert-some other f)
(fmap-deferred (zip d other)
(fn [[x y]] (.accept f x y))))

(def-async-for-dual then-accept-both)
(completion-stage-result (zip d other)
(fn [[x y]] (.accept f x y))
to))


(defn- run-after-both [d other ^Runnable f]
(defn- run-after-both [d other ^Runnable f to]
(assert-some other f)
(fmap-deferred (zip d other)
(fn [[_ _]] (.run f))))

(completion-stage-result (zip d other)
(fn [[_ _]] (.run f))
to))

(def-async-for-dual run-after-both)


(defn- apply-to-either [d other ^Function f]
(defn- apply-to-either [d other ^Function f to]
(assert-some other f)
(then-apply (alt d other) f))

(def-async-for-dual apply-to-either)
(then-apply (alt d other) f to))


(defn- accept-either [d other ^Function f]
(defn- accept-either [d other ^Function f to]
(assert-some other f)
(then-accept (alt d other) f))

(def-async-for-dual accept-either)
(then-accept (alt d other) f to))


(defn- run-after-either [d other ^Function f]
(defn- run-after-either [d other ^Function f to]
(assert-some other f)
(then-run (alt d other) f))

(def-async-for-dual run-after-either)
(then-run (alt d other) f to))


(defn- then-compose [d ^Function f]
(defn- then-compose [d ^Function f to]
(assert-some f)
(let [d' (deferred)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still necessary? Shouldn't the provided target (result) be used instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit subtle indeed 😅 The purpose of d' is to unwrap the deferred returned from f (= fd), so AFAICT we still need it. If it helps: thenCompose is basically the bind / flatMap operation of the CompletionStage monad 😬

(on-realized d
(fn [val]
(on-realized (->deferred (.apply f val))
#(success! d' %)
#(error! d' %)))
(fn [error] (error! d' error)))
(-> (completion-stage-result d #(->deferred (.apply f %)) to)
(on-realized (fn [fd]
(shallow-connect fd d'))
(fn [error]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I loved the join . fmap implementation for then-compose using shallow-connect here, well done!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 🙏 Though I have to admit that I'm not fluent in Monadic, so I am not familiar with the join . fmap idiom. The code here is just the nicest way I could come up with to express this operation 😅

(error! d' error))))
d'))

(def-async-for then-compose)


(defn- then-handle [d ^BiFunction f]
(defn- then-handle [d ^BiFunction f to]
(assert-some f)
;; Can't use `completion-stage-result` here because it only covers
;; the success case.
(when to
(shallow-connect d to))
(let [d' (deferred)]
(on-realized
d
(or to d)
(fn [val] (success! d' (.apply f val nil)))
(fn [error] (success! d' (.apply f nil error))))
d'))


(def-async-for then-handle)


(defn- then-exceptionally [d ^Function f]
(defn- then-exceptionally [d ^Function f to]
(assert-some f)
;; Can't use `completion-stage-result` here because it only covers
;; the success case.
(when to
(shallow-connect d to))
(let [d' (deferred)]
(on-realized
d
(fn [val] (success! d' val))
(fn [error] (success! d' (.apply f error))))
(or to d)
(fn [val] (success! d' val))
(fn [error] (success! d' (.apply f error))))
d'))

(defn- to-completable-future [d]

(let [result (CompletableFuture.)]
(let [to (CompletableFuture.)]

(on-realized d
#(.complete result %)
#(.completeExceptionally result %))
#(.complete to %)
#(.completeExceptionally to %))

result))
to))

(defn- when-complete [d ^BiConsumer f]
(defn- when-complete [d ^BiConsumer f to]
(assert-some f)
;; Can't use `completion-stage-result` here because it only covers
;; the success case.
(when to
(shallow-connect d to))
(let [d' (deferred)]
(on-realized d
(on-realized (or to d)
(fn [val]
(try (.accept f val nil)
(success! d' val)
Expand All @@ -1701,8 +1680,6 @@
(error! d' err)))))
d'))

(def-async-for when-complete)

;;;

(alter-meta! #'->Deferred assoc :private true)
Expand Down