Skip to content

Commit 10c3966

Browse files
authored
Merge pull request #228 from mickverm/patch/blockingsubscribeby
Added Maybe, Single, Completable blockingSubscribeBy
2 parents 98b4fa2 + f41ec48 commit 10c3966

File tree

7 files changed

+64
-1
lines changed

7 files changed

+64
-1
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ Learn more about building this project with JitPack [here](https://jitpack.io/#R
225225
|Completable|subscribeBy()|Disposable|Allows named arguments to construct a CompletableObserver|
226226
|Observable<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking Observer|
227227
|Flowable<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking Subscriber|
228+
|Single<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking SingleObserver|
229+
|Maybe<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking MaybeObserver|
230+
|Completable<T>|blockingSubscribeBy()|Unit|Allows named arguments to construct a blocking CompletableObserver|
228231
|Disposable|addTo()|Disposable|Adds a `Disposable` to the specified `CompositeDisposable`|
229232
|CompositeDisposable|plusAssign()|Disposable|Operator function to add a `Disposable` to this`CompositeDisposable`|
230233

build.gradle.kts

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ val examplesImplementation by configurations.getting {
3939
}
4040

4141
dependencies {
42-
api("io.reactivex.rxjava3:rxjava:3.0.0-RC9")
42+
api("io.reactivex.rxjava3:rxjava:3.0.0")
4343
implementation(kotlin("stdlib"))
4444

4545
testImplementation("org.funktionale:funktionale-partials:1.0.0-final")

src/main/kotlin/io/reactivex/rxkotlin3/subscribers.kt

+28
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,31 @@ fun <T : Any> Flowable<T>.blockingSubscribeBy(
107107
onComplete: () -> Unit = onCompleteStub,
108108
onNext: (T) -> Unit = onNextStub
109109
): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
110+
111+
/**
112+
* Overloaded blockingSubscribe function that allows passing named parameters
113+
*/
114+
@SchedulerSupport(SchedulerSupport.NONE)
115+
fun <T : Any> Maybe<T>.blockingSubscribeBy(
116+
onError: (Throwable) -> Unit = onErrorStub,
117+
onComplete: () -> Unit = onCompleteStub,
118+
onSuccess: (T) -> Unit = onNextStub
119+
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
120+
121+
/**
122+
* Overloaded blockingSubscribe function that allows passing named parameters
123+
*/
124+
@SchedulerSupport(SchedulerSupport.NONE)
125+
fun <T : Any> Single<T>.blockingSubscribeBy(
126+
onError: (Throwable) -> Unit = onErrorStub,
127+
onSuccess: (T) -> Unit = onNextStub
128+
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())
129+
130+
/**
131+
* Overloaded blockingSubscribe function that allows passing named parameters
132+
*/
133+
@SchedulerSupport(SchedulerSupport.NONE)
134+
fun Completable.blockingSubscribeBy(
135+
onError: (Throwable) -> Unit = onErrorStub,
136+
onComplete: () -> Unit = onCompleteStub
137+
): Unit = blockingSubscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer())

src/test/kotlin/io/reactivex/rxkotlin3/CompletableTest.kt

+10
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,14 @@ class CompletableTest : KotlinTests() {
7777
.subscribeBy(onError = {}) as LambdaConsumerIntrospection
7878
Assert.assertTrue(disposable.hasCustomOnError())
7979
}
80+
81+
@Test
82+
fun testBlockingSubscribeBy() {
83+
Completable.complete()
84+
.blockingSubscribeBy {
85+
a.received(Unit)
86+
}
87+
Mockito.verify(a, Mockito.times(1))
88+
.received(Unit)
89+
}
8090
}

src/test/kotlin/io/reactivex/rxkotlin3/FlowableTest.kt

+1
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ class FlowableTest {
194194
}
195195
Assert.assertTrue(first.get() == "Alpha")
196196
}
197+
197198
@Test
198199
fun testPairZip() {
199200

src/test/kotlin/io/reactivex/rxkotlin3/MaybeTest.kt

+11
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,17 @@ class MaybeTest {
3333
Assert.assertTrue(disposable.hasCustomOnError())
3434
}
3535

36+
@Test
37+
fun testBlockingSubscribeBy() {
38+
val first = AtomicReference<String>()
39+
40+
Maybe.just("Alpha")
41+
.blockingSubscribeBy {
42+
first.set(it)
43+
}
44+
Assert.assertTrue(first.get() == "Alpha")
45+
}
46+
3647
@Test fun testConcatAll() {
3748
(0 until 10)
3849
.map { Maybe.just(it) }

src/test/kotlin/io/reactivex/rxkotlin3/SingleTest.kt

+10
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ class SingleTest : KotlinTests() {
4141
Assert.assertTrue(disposable.hasCustomOnError())
4242
}
4343

44+
@Test
45+
fun testBlockingSubscribeBy() {
46+
Single.just("Alpha")
47+
.blockingSubscribeBy {
48+
a.received(it)
49+
}
50+
verify(a, Mockito.times(1))
51+
.received("Alpha")
52+
}
53+
4454
@Test fun testConcatAll() {
4555
(0 until 10)
4656
.map { Single.just(it) }

0 commit comments

Comments
 (0)