@@ -11,19 +11,20 @@ import kotlin.coroutines.*
11
11
@BenchmarkMode(Mode .Throughput , Mode .AverageTime )
12
12
@OutputTimeUnit(TimeUnit .MICROSECONDS )
13
13
@State(Scope .Benchmark )
14
+ @Threads(3 )
14
15
@Fork(1 )
15
16
open class BufferedChannelBenchmark {
16
17
@Param(" 1" , " 2" )
17
18
var capacity: Int = 0
18
19
19
20
@Benchmark
20
21
fun channelPipeline (): Int = runBlocking {
21
- run (Dispatchers .IO .limitedParallelism( 3 ) )
22
+ run (Dispatchers .Unconfined )
22
23
}
23
24
24
- private suspend inline fun run (context : CoroutineContext ): Int =
25
- Channel .range( 100_000 , context)
26
- .filter( context) { it % 4 == 0 }
25
+ private suspend inline fun run (context : CoroutineContext ) =
26
+ Channel
27
+ .range( 100_000 , context)
27
28
.fold(0 ) { a, b -> a + b }
28
29
29
30
private fun Channel.Factory.range (count : Int , context : CoroutineContext ) = GlobalScope .produce(context, capacity) {
@@ -33,13 +34,6 @@ open class BufferedChannelBenchmark {
33
34
34
35
// Migrated from deprecated operators, are good only for stressing channels
35
36
36
- private fun ReceiveChannel<Int>.filter (context : CoroutineContext = Dispatchers .Unconfined , predicate : suspend (Int ) -> Boolean ): ReceiveChannel <Int > =
37
- GlobalScope .produce(context, onCompletion = { cancel() }) {
38
- for (e in this @filter) {
39
- if (predicate(e)) send(e)
40
- }
41
- }
42
-
43
37
private suspend inline fun <E , R > ReceiveChannel<E>.fold (initial : R , operation : (acc: R , E ) -> R ): R {
44
38
var accumulator = initial
45
39
consumeEach {
0 commit comments