Skip to content

Commit 28db815

Browse files
stepangothomasnield
authored andcommitted
combineLatest extensions for Observable and Flowable (#116)
* combineLatest extensions for Observable and Flowable * jdk8
1 parent 7d6cdaf commit 28db815

File tree

5 files changed

+92
-31
lines changed

5 files changed

+92
-31
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
language: java
22

33
jdk:
4-
- oraclejdk7
4+
- oraclejdk8
55

66
sudo: false
77
# as per http://blog.travis-ci.com/2014-12-17-faster-builds-with-container-based-infrastructure/

src/main/kotlin/io/reactivex/rxkotlin/flowable.kt

+14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.reactivex.rxkotlin
22

33
import io.reactivex.Flowable
4+
import io.reactivex.functions.BiFunction
5+
import io.reactivex.functions.Function3
46

57

68
fun BooleanArray.toFlowable(): Flowable<Boolean> = asIterable().toFlowable()
@@ -63,6 +65,18 @@ private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
6365
override fun iterator(): Iterator<T> = this@toIterable
6466
}
6567

68+
/**
69+
* Combine latest operator that produces [Pair]
70+
*/
71+
fun <T : Any, R : Any> Flowable<T>.combineLatest(flowable: Flowable<R>): Flowable<Pair<T, R>>
72+
= Flowable.combineLatest(this, flowable, BiFunction(::Pair))
73+
74+
/**
75+
* Combine latest operator that produces [Triple]
76+
*/
77+
fun <T : Any, R : Any, U : Any> Flowable<T>.combineLatest(flowable1: Flowable<R>, flowable2: Flowable<U>): Flowable<Triple<T, R, U>>
78+
= Flowable.combineLatest(this, flowable1, flowable2, Function3(::Triple))
79+
6680
//EXTENSION FUNCTION OPERATORS
6781

6882
/**

src/main/kotlin/io/reactivex/rxkotlin/observable.kt

+13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.reactivex.rxkotlin
22

33
import io.reactivex.Observable
4+
import io.reactivex.functions.BiFunction
5+
import io.reactivex.functions.Function3
46

57

68
fun BooleanArray.toObservable(): Observable<Boolean> = asIterable().toObservable()
@@ -63,6 +65,17 @@ private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
6365
override fun iterator(): Iterator<T> = this@toIterable
6466
}
6567

68+
/**
69+
* Combine latest operator that produces [Pair]
70+
*/
71+
fun <T : Any, R : Any> Observable<T>.combineLatest(observable: Observable<R>): Observable<Pair<T, R>>
72+
= Observable.combineLatest(this, observable, BiFunction(::Pair))
73+
74+
/**
75+
* Combine latest operator that produces [Triple]
76+
*/
77+
fun <T : Any, R : Any, U : Any> Observable<T>.combineLatest(observable1: Observable<R>, observable2: Observable<U>): Observable<Triple<T, R, U>>
78+
= Observable.combineLatest(this, observable1, observable2, Function3(::Triple))
6679

6780
// EXTENSION FUNCTION OPERATORS
6881

src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt

+49-29
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,41 @@
11
package io.reactivex.rxkotlin
22

3+
import io.reactivex.BackpressureStrategy
34
import io.reactivex.Flowable
4-
import org.junit.Assert
5+
import io.reactivex.Flowable.create
6+
import io.reactivex.FlowableEmitter
7+
import org.junit.Assert.assertEquals
8+
import org.junit.Assert.assertNotNull
59
import org.junit.Ignore
610
import org.junit.Test
711
import java.util.concurrent.atomic.AtomicInteger
812

913
class FlowableTest {
1014

11-
private fun <T: Any> bufferedFlowable(source: (io.reactivex.FlowableEmitter<T>) -> Unit) =
12-
io.reactivex.Flowable.create(source, io.reactivex.BackpressureStrategy.BUFFER)
15+
private fun <T : Any> bufferedFlowable(source: (FlowableEmitter<T>) -> Unit) =
16+
create(source, BackpressureStrategy.BUFFER)
1317

1418
@org.junit.Test fun testCreation() {
15-
val o0: io.reactivex.Flowable<Int> = io.reactivex.Flowable.empty()
19+
val o0: Flowable<Int> = Flowable.empty()
1620
val list = bufferedFlowable<Int> { s ->
1721
s.onNext(1)
1822
s.onNext(777)
1923
s.onComplete()
2024
}.toList().blockingGet()
21-
org.junit.Assert.assertEquals(listOf(1, 777), list)
22-
val o1: io.reactivex.Flowable<Int> = listOf(1, 2, 3).toFlowable()
23-
val o2: io.reactivex.Flowable<List<Int>> = io.reactivex.Flowable.just(listOf(1, 2, 3))
24-
25-
val o3: io.reactivex.Flowable<Int> = io.reactivex.Flowable.defer { bufferedFlowable<Int> { s -> s.onNext(1) } }
26-
val o4: io.reactivex.Flowable<Int> = Array(3) { 0 }.toFlowable()
27-
val o5: io.reactivex.Flowable<Int> = IntArray(3).toFlowable()
28-
29-
org.junit.Assert.assertNotNull(o0)
30-
org.junit.Assert.assertNotNull(o1)
31-
org.junit.Assert.assertNotNull(o2)
32-
org.junit.Assert.assertNotNull(o3)
33-
org.junit.Assert.assertNotNull(o4)
34-
org.junit.Assert.assertNotNull(o5)
25+
assertEquals(listOf(1, 777), list)
26+
val o1: Flowable<Int> = listOf(1, 2, 3).toFlowable()
27+
val o2: Flowable<List<Int>> = Flowable.just(listOf(1, 2, 3))
28+
29+
val o3: Flowable<Int> = Flowable.defer { bufferedFlowable<Int> { s -> s.onNext(1) } }
30+
val o4: Flowable<Int> = Array(3) { 0 }.toFlowable()
31+
val o5: Flowable<Int> = IntArray(3).toFlowable()
32+
33+
assertNotNull(o0)
34+
assertNotNull(o1)
35+
assertNotNull(o2)
36+
assertNotNull(o3)
37+
assertNotNull(o4)
38+
assertNotNull(o5)
3539
}
3640

3741
@org.junit.Test fun testExampleFromReadme() {
@@ -48,34 +52,34 @@ class FlowableTest {
4852
map { it.toString() }.
4953
blockingGet()
5054

51-
Assert.assertEquals("Hello", result)
55+
assertEquals("Hello", result)
5256
}
5357

5458
@Test fun iteratorFlowable() {
55-
Assert.assertEquals(listOf(1, 2, 3), listOf(1, 2, 3).iterator().toFlowable().toList().blockingGet())
59+
assertEquals(listOf(1, 2, 3), listOf(1, 2, 3).iterator().toFlowable().toList().blockingGet())
5660
}
5761

5862
@Test fun intProgressionStep1Empty() {
59-
Assert.assertEquals(listOf(1), (1..1).toFlowable().toList().blockingGet())
63+
assertEquals(listOf(1), (1..1).toFlowable().toList().blockingGet())
6064
}
6165

6266
@Test fun intProgressionStep1() {
63-
Assert.assertEquals((1..10).toList(), (1..10).toFlowable().toList().blockingGet())
67+
assertEquals((1..10).toList(), (1..10).toFlowable().toList().blockingGet())
6468
}
6569

6670
@Test fun intProgressionDownTo() {
67-
Assert.assertEquals((1 downTo 10).toList(), (1 downTo 10).toFlowable().toList().blockingGet())
71+
assertEquals((1 downTo 10).toList(), (1 downTo 10).toFlowable().toList().blockingGet())
6872
}
6973

7074
@Ignore("Too slow")
7175
@Test fun intProgressionOverflow() {
72-
Assert.assertEquals((0..10).toList().reversed(), (-10..Integer.MAX_VALUE).toFlowable().skip(Integer.MAX_VALUE.toLong()).map { Integer.MAX_VALUE - it }.toList().blockingGet())
76+
assertEquals((0..10).toList().reversed(), (-10..Integer.MAX_VALUE).toFlowable().skip(Integer.MAX_VALUE.toLong()).map { Integer.MAX_VALUE - it }.toList().blockingGet())
7377
}
7478

7579

7680
@Test fun testFold() {
7781
val result = listOf(1, 2, 3).toFlowable().reduce(0) { acc, e -> acc + e }.blockingGet()
78-
Assert.assertEquals(6, result)
82+
assertEquals(6, result)
7983
}
8084

8185
@Test fun `kotlin sequence should produce expected items and flowable be able to handle em`() {
@@ -93,24 +97,24 @@ class FlowableTest {
9397
toList().
9498
subscribe()
9599

96-
Assert.assertEquals(100, generated.get())
100+
assertEquals(100, generated.get())
97101
}
98102

99103
@Test fun testFlatMapSequence() {
100-
Assert.assertEquals(
104+
assertEquals(
101105
listOf(1, 2, 3, 2, 3, 4, 3, 4, 5),
102106
listOf(1, 2, 3).toFlowable().flatMapSequence { listOf(it, it + 1, it + 2).asSequence() }.toList().blockingGet()
103107
)
104108
}
105109

106110
@Test fun testCombineLatest() {
107111
val list = listOf(1, 2, 3, 2, 3, 4, 3, 4, 5)
108-
Assert.assertEquals(list, list.map { Flowable.just(it) }.combineLatest { it }.blockingFirst())
112+
assertEquals(list, list.map { Flowable.just(it) }.combineLatest { it }.blockingFirst())
109113
}
110114

111115
@Test fun testZip() {
112116
val list = listOf(1, 2, 3, 2, 3, 4, 3, 4, 5)
113-
Assert.assertEquals(list, list.map { Flowable.just(it) }.zip { it }.blockingFirst())
117+
assertEquals(list, list.map { Flowable.just(it) }.zip { it }.blockingFirst())
114118
}
115119

116120
@Test fun testCast() {
@@ -129,4 +133,20 @@ class FlowableTest {
129133
flowable.test()
130134
.assertError(ClassCastException::class.java)
131135
}
136+
137+
@Test fun combineLatestPair() {
138+
Flowable.just(3)
139+
.combineLatest(Flowable.just(10))
140+
.map { (x, y) -> x * y }
141+
.test()
142+
.assertValues(30)
143+
}
144+
145+
@Test fun combineLatestTriple() {
146+
Flowable.just(3)
147+
.combineLatest(Flowable.just(10), Flowable.just(20))
148+
.map { (x, y, z) -> x * y * z }
149+
.test()
150+
.assertValues(600)
151+
}
132152
}

src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.reactivex.rxkotlin
22

33
import io.reactivex.Observable
4-
import io.reactivex.observers.TestObserver
54
import org.junit.Assert.assertEquals
65
import org.junit.Assert.assertNotNull
76
import org.junit.Ignore
@@ -172,7 +171,22 @@ class ObservableTest {
172171
.assertValues(BigDecimal.valueOf(15, 1), 2, BigDecimal.valueOf(42), 15)
173172
.assertNoErrors()
174173
.assertComplete()
174+
}
175175

176+
@Test fun combineLatestPair() {
177+
Observable.just(3)
178+
.combineLatest(Observable.just(10))
179+
.map { (x, y) -> x * y }
180+
.test()
181+
.assertValues(30)
182+
}
183+
184+
@Test fun combineLatestTriple() {
185+
Observable.just(3)
186+
.combineLatest(Observable.just(10), Observable.just(20))
187+
.map { (x, y, z) -> x * y * z }
188+
.test()
189+
.assertValues(600)
176190
}
177191

178192
}

0 commit comments

Comments
 (0)