Skip to content

Gating dispatcher based on uncollected results #454

@t-kalinowski

Description

@t-kalinowski

Problem

mirai launches tasks eagerly. If result consumers are slower than producers, results accumulate in memory with no backpressure.

Proposal

Add a dispatcher option to gate new task launches until a configurable number of results have been collected.

Sketch:

daemons(dispatcher = TRUE, max_uncollected = 10)

Usage example:

mirai::daemons(10, dispatcher = TRUE, max_uncollected = 10)

ps <- lapply(1:20, function(x) {
  mirai::mirai({
    # fast background producer
    lock <- filelock::lock("test.log")
    cat(as.character(Sys.time()), "\n", file = "test.log", append = TRUE)
    filelock::unlock(lock)
  }) |> 
    promises::then(function(x) {
      # slow main-thread consumer
      Sys.sleep(2)
    })
})

p <- promises::promise_all(.list = ps)

Workers are blocked once max_uncollected results are waiting to be collected in the main process. New tasks are dispatched only after the main thread consumes items.

Benefits

  • Prevents unbounded memory growth.
  • Removes need for implementing a manual queue.
  • Matches channel(maxsize) semantics common in other systems.

Example Use Cases

  • ML dataloader: avoid preloading large batches faster than they can be consumed by the model.
  • DB write operations: prevent results from piling up while waiting on a single-threaded writer.

References

cc: @dfalbel

Metadata

Metadata

Assignees

No one assigned

    Labels

    featurea feature request or enhancement

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions