You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have a flow of data, which is sent to http stream. I want to have some keep-alive pings in this stream, i.e. send en empty event, if there is nothing coming to flow.
So I want to have a stream transformer, which calls a function every time, when nothing is comming long enough.
The Shape of the API
I've implemented it as the following utility function
@OptIn(ExperimentalCoroutinesApi::class)
public fun <T> Flow<T>.onIdle(interval: Duration, block: suspend ProducerScope<T>.() -> Unit) = channelFlow {
val data = produceIn(this)
var finished = false
while (!finished) {
select {
data.onReceiveCatching {
if (it.isSuccess) {
send(it.getOrThrow())
} else {
finished = true
}
}
onTimeout(interval) {
block()
}
}
}
}
The text was updated successfully, but these errors were encountered:
Use case
I have a flow of data, which is sent to http stream. I want to have some keep-alive pings in this stream, i.e. send en empty event, if there is nothing coming to flow.
So I want to have a stream transformer, which calls a function every time, when nothing is comming long enough.
The Shape of the API
I've implemented it as the following utility function
The text was updated successfully, but these errors were encountered: