Skip to content

Iterative consumption of flows #3274

Open
@lowasser

Description

@lowasser

This is the product of some library design work we've been doing at Google, and we wanted to take this upstream in case there was interest. The Flow API is almost exclusively "push"-based, not "pull" based. The main alternative approach is to adapt it to a Channel, but that often introduces inconvenient concurrency issues (arbitrary buffering, the risk of forgetting to cancel the channel).

What we want is more or less a scoped iteration method. Here is a sketch of some examples that we think would be better with the proposed API than what Flow can do today.

suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean = iterate {
  while (hasNext()) {
    if (!predicate(next()) return@iterate false
  }
  true
}

fun <T> Flow<T>.pairElements(): Flow<Pair<T, T>> = flow {
  iterate {
    while (hasNext()) {
      val a = next()
      if (!hasNext()) break
      val b = next()
      emit(Pair(a, b))
    }
  }
}

// this API sort of generalizes e.g. https://github.com/cashapp/turbine
fun testStreamingRpcResponse() = runTest {
  val responses = stub.doRpc(request)
  responses.iterate {
    assertThat(next()).isEqualTo(response1)
    doStimulus(1)
    assertThat(next()).isEqualTo(response2)
    doStimulus(2)
    assertThat(hasNext()).isFalse()
  }
}

In short, what we're proposing is

suspend fun <T, R> Flow<T>.iterate(block: ??Iterator<T>.() -> R): R

It's not precisely clear what the ??Iterator should be: we might reuse ChannelIterator, but we're not particularly fond of the requirement to call hasNext() in the test case especially, and of course this isn't actually a Channel. For ourselves, we are inclined to introduce a separate FlowIterator type.

We have an implementation that does not introduce undue concurrency. To be a little more specific, it does not continue the backing Flow.collect until the the next call to hasNext, etc.; it's a linear transformation in the same way as map and filter but not buffer or conflate. We can PR it if there's interest.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions