1
+ package com.lukaslechner.coroutineusecasesonandroid.playground.flow.channels
2
+
3
+ import kotlinx.coroutines.channels.Channel
4
+ import kotlinx.coroutines.channels.consumeEach
5
+ import kotlinx.coroutines.channels.produce
6
+ import kotlinx.coroutines.coroutineScope
7
+ import kotlinx.coroutines.delay
8
+ import kotlinx.coroutines.flow.Flow
9
+ import kotlinx.coroutines.flow.consumeAsFlow
10
+ import kotlinx.coroutines.flow.flow
11
+ import kotlinx.coroutines.launch
12
+
13
+ /* *
14
+ UseCase: We want to trigger some processing. While the downstream is currently
15
+ busy, we want to drop the current trigger emission.
16
+
17
+ With a SharedFlow, this is not possible, since we need to define a buffer size of
18
+ > 0 for buffer strategies like "DROP_LATEST". Channels however have a buffer sice of 0
19
+ by default.
20
+
21
+ Another option is to use the custom operator "dropIfBusy" (see below)
22
+
23
+ See also: https://stackoverflow.com/questions/64844821/how-to-drop-latest-with-coroutine-flowt/74560222#74560222
24
+ **/
25
+
26
+ fun <T > Flow<T>.dropIfBusy (): Flow <T > = flow {
27
+ coroutineScope {
28
+ val channel = produce {
29
+ collect { trySend(it) }
30
+ }
31
+ channel.consumeEach { emit(it) }
32
+ }
33
+ }
34
+
35
+ suspend fun main (): Unit = coroutineScope {
36
+
37
+ val channel = Channel <Int >()
38
+
39
+ launch {
40
+ channel
41
+ .consumeAsFlow()
42
+ .collect {
43
+ println (" Process $it " )
44
+ delay(1000 )
45
+ println (" $it processed" )
46
+ }
47
+ }
48
+
49
+ launch {
50
+
51
+ delay(100 )
52
+
53
+ // 1 should be processed
54
+ channel.trySend(1 )
55
+ println (" sharedFlow emits 1" )
56
+
57
+ // 2 should not be processed since downstream is busy
58
+ channel.trySend(2 )
59
+ println (" sharedFlow emits 2" )
60
+
61
+ // 3 should be processed again
62
+ delay(2000 )
63
+ channel.trySend(3 )
64
+ println (" sharedFlow emits 3" )
65
+ }
66
+ }
0 commit comments