Description
This is the product of some library design work we've been doing at Google, and we wanted to take this upstream in case there was interest. The Flow
API is almost exclusively "push"-based, not "pull" based. The main alternative approach is to adapt it to a Channel
, but that often introduces inconvenient concurrency issues (arbitrary buffering, the risk of forgetting to cancel the channel).
What we want is more or less a scoped iteration method. Here is a sketch of some examples that we think would be better with the proposed API than what Flow
can do today.
suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean = iterate {
while (hasNext()) {
if (!predicate(next()) return@iterate false
}
true
}
fun <T> Flow<T>.pairElements(): Flow<Pair<T, T>> = flow {
iterate {
while (hasNext()) {
val a = next()
if (!hasNext()) break
val b = next()
emit(Pair(a, b))
}
}
}
// this API sort of generalizes e.g. https://github.com/cashapp/turbine
fun testStreamingRpcResponse() = runTest {
val responses = stub.doRpc(request)
responses.iterate {
assertThat(next()).isEqualTo(response1)
doStimulus(1)
assertThat(next()).isEqualTo(response2)
doStimulus(2)
assertThat(hasNext()).isFalse()
}
}
In short, what we're proposing is
suspend fun <T, R> Flow<T>.iterate(block: ??Iterator<T>.() -> R): R
It's not precisely clear what the ??Iterator
should be: we might reuse ChannelIterator
, but we're not particularly fond of the requirement to call hasNext()
in the test case especially, and of course this isn't actually a Channel
. For ourselves, we are inclined to introduce a separate FlowIterator
type.
We have an implementation that does not introduce undue concurrency. To be a little more specific, it does not continue the backing Flow.collect
until the the next call to hasNext
, etc.; it's a linear transformation in the same way as map
and filter
but not buffer
or conflate
. We can PR it if there's interest.