1
+ package benchmarks
2
+
3
+ import kotlinx.coroutines.*
4
+ import kotlinx.coroutines.channels.*
5
+ import org.openjdk.jmh.annotations.*
6
+ import java.util.concurrent.*
7
+ import kotlin.coroutines.*
8
+
9
+ @Warmup(iterations = 7 , time = 1 )
10
+ @Measurement(iterations = 5 , time = 1 )
11
+ @BenchmarkMode(Mode .Throughput , Mode .AverageTime )
12
+ @OutputTimeUnit(TimeUnit .MICROSECONDS )
13
+ @State(Scope .Benchmark )
14
+ @Fork(1 )
15
+ open class BufferedChannelBenchmark {
16
+ @Param(" 1" , " 2" )
17
+ var capacity: Int = 0
18
+
19
+ @Benchmark
20
+ fun channelPipeline (): Int = runBlocking {
21
+ run (Dispatchers .IO .limitedParallelism(3 ))
22
+ }
23
+
24
+ private suspend inline fun run (context : CoroutineContext ): Int =
25
+ Channel .range(100_000 , context)
26
+ .filter(context) { it % 4 == 0 }
27
+ .fold(0 ) { a, b -> a + b }
28
+
29
+ private fun Channel.Factory.range (count : Int , context : CoroutineContext ) = GlobalScope .produce(context, capacity) {
30
+ for (i in 0 until count)
31
+ send(i)
32
+ }
33
+
34
+ // Migrated from deprecated operators, are good only for stressing channels
35
+
36
+ private fun ReceiveChannel<Int>.filter (context : CoroutineContext = Dispatchers .Unconfined , predicate : suspend (Int ) -> Boolean ): ReceiveChannel <Int > =
37
+ GlobalScope .produce(context, onCompletion = { cancel() }) {
38
+ for (e in this @filter) {
39
+ if (predicate(e)) send(e)
40
+ }
41
+ }
42
+
43
+ private suspend inline fun <E , R > ReceiveChannel<E>.fold (initial : R , operation : (acc: R , E ) -> R ): R {
44
+ var accumulator = initial
45
+ consumeEach {
46
+ accumulator = operation(accumulator, it)
47
+ }
48
+ return accumulator
49
+ }
50
+ }
0 commit comments