-
Notifications
You must be signed in to change notification settings - Fork 16
Open
Labels
featurea feature request or enhancementa feature request or enhancement
Description
Problem
mirai launches tasks eagerly. If result consumers are slower than producers, results accumulate in memory with no backpressure.
Proposal
Add a dispatcher option to gate new task launches until a configurable number of results have been collected.
Sketch:
daemons(dispatcher = TRUE, max_uncollected = 10)Usage example:
mirai::daemons(10, dispatcher = TRUE, max_uncollected = 10)
ps <- lapply(1:20, function(x) {
mirai::mirai({
# fast background producer
lock <- filelock::lock("test.log")
cat(as.character(Sys.time()), "\n", file = "test.log", append = TRUE)
filelock::unlock(lock)
}) |>
promises::then(function(x) {
# slow main-thread consumer
Sys.sleep(2)
})
})
p <- promises::promise_all(.list = ps)Workers are blocked once max_uncollected results are waiting to be collected in the main process. New tasks are dispatched only after the main thread consumes items.
Benefits
- Prevents unbounded memory growth.
- Removes need for implementing a manual queue.
- Matches
channel(maxsize)semantics common in other systems.
Example Use Cases
- ML dataloader: avoid preloading large batches faster than they can be consumed by the model.
- DB write operations: prevent results from piling up while waiting on a single-threaded writer.
References
cc: @dfalbel
shikokuchuo
Metadata
Metadata
Assignees
Labels
featurea feature request or enhancementa feature request or enhancement