Skip to content

Commit 1673ddf

Browse files
authored
Reimplement Multi.flatMapIterable + TCK test (#1467)
1 parent 859745f commit 1673ddf

File tree

7 files changed

+723
-4
lines changed

7 files changed

+723
-4
lines changed

common/reactive/src/main/java/io/helidon/common/reactive/Multi.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,23 @@ default <U> Multi<U> flatMap(Function<T, Flow.Publisher<U>> mapper, long maxConc
160160
* @param <U> output item type
161161
* @return Multi
162162
*/
163-
default <U> Multi<U> flatMapIterable(Function<T, Iterable<U>> iterableMapper) {
164-
MultiFlatMapProcessor<T, U> processor = MultiFlatMapProcessor.fromIterableMapper(iterableMapper);
165-
this.subscribe(processor);
166-
return processor;
163+
default <U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> iterableMapper) {
164+
return flatMapIterable(iterableMapper, 32);
165+
}
166+
167+
/**
168+
* Transform item with supplied function and flatten resulting {@link Iterable} to downstream.
169+
*
170+
* @param iterableMapper {@link Function} receiving item as parameter and returning {@link Iterable}
171+
* @param prefetch the number of upstream items to request upfront, then 75% of this value after
172+
* 75% received and mapped
173+
* @param <U> output item type
174+
* @return Multi
175+
*/
176+
default <U> Multi<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> iterableMapper,
177+
int prefetch) {
178+
Objects.requireNonNull(iterableMapper, "iterableMapper is null");
179+
return new MultiFlatMapIterable<>(this, iterableMapper, prefetch);
167180
}
168181

169182
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.helidon.common.reactive;
19+
20+
import java.util.Iterator;
21+
import java.util.Objects;
22+
import java.util.concurrent.ConcurrentLinkedQueue;
23+
import java.util.concurrent.Flow;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.function.Function;
27+
28+
/**
29+
* Map each upstream item into an Iterable and stream their values.
30+
* @param <T> the upstream item type
31+
* @param <R> the output item type
32+
*/
33+
final class MultiFlatMapIterable<T, R> implements Multi<R> {
34+
35+
private final Multi<T> source;
36+
37+
private final Function<? super T, ? extends Iterable<? extends R>> mapper;
38+
39+
private final int prefetch;
40+
41+
MultiFlatMapIterable(Multi<T> source,
42+
Function<? super T, ? extends Iterable<? extends R>> mapper,
43+
int prefetch) {
44+
this.source = source;
45+
this.mapper = mapper;
46+
this.prefetch = prefetch;
47+
}
48+
49+
@Override
50+
public void subscribe(Flow.Subscriber<? super R> subscriber) {
51+
source.subscribe(new FlatMapIterableSubscriber<>(subscriber, mapper, prefetch));
52+
}
53+
54+
static final class FlatMapIterableSubscriber<T, R>
55+
extends AtomicInteger
56+
implements Flow.Subscriber<T>, Flow.Subscription {
57+
58+
private final Flow.Subscriber<? super R> downstream;
59+
60+
private final Function<? super T, ? extends Iterable<? extends R>> mapper;
61+
62+
private final int prefetch;
63+
64+
private final AtomicLong requested;
65+
66+
private final ConcurrentLinkedQueue<T> queue;
67+
68+
private Flow.Subscription upstream;
69+
70+
private long emitted;
71+
72+
private volatile boolean upstreamDone;
73+
private Throwable error;
74+
75+
private volatile boolean canceled;
76+
77+
private Iterator<? extends R> currentIterator;
78+
79+
private int upstreamConsumed;
80+
81+
FlatMapIterableSubscriber(Flow.Subscriber<? super R> downstream,
82+
Function<? super T, ? extends Iterable<? extends R>> mapper,
83+
int prefetch) {
84+
this.downstream = downstream;
85+
this.mapper = mapper;
86+
this.prefetch = prefetch;
87+
this.requested = new AtomicLong();
88+
this.queue = new ConcurrentLinkedQueue<>();
89+
}
90+
91+
@Override
92+
public void onSubscribe(Flow.Subscription subscription) {
93+
SubscriptionHelper.validate(upstream, subscription);
94+
upstream = subscription;
95+
downstream.onSubscribe(this);
96+
subscription.request(prefetch);
97+
}
98+
99+
@Override
100+
public void onNext(T item) {
101+
queue.offer(item);
102+
drain();
103+
}
104+
105+
@Override
106+
public void onError(Throwable throwable) {
107+
error = throwable;
108+
upstreamDone = true;
109+
drain();
110+
}
111+
112+
@Override
113+
public void onComplete() {
114+
upstreamDone = true;
115+
drain();
116+
}
117+
118+
@Override
119+
public void request(long n) {
120+
if (n <= 0L) {
121+
onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden!"));
122+
} else {
123+
SubscriptionHelper.addRequest(requested, n);
124+
drain();
125+
}
126+
}
127+
128+
@Override
129+
public void cancel() {
130+
canceled = true;
131+
upstream.cancel();
132+
drain();
133+
}
134+
135+
void drain() {
136+
if (getAndIncrement() != 0) {
137+
return;
138+
}
139+
140+
Iterator<? extends R> iterator = currentIterator;
141+
Flow.Subscriber<? super R> downstream = this.downstream;
142+
long e = emitted;
143+
int limit = prefetch - (prefetch >> 2);
144+
145+
int missed = 1;
146+
outer:
147+
for (;;) {
148+
149+
if (canceled) {
150+
iterator = null;
151+
currentIterator = null;
152+
queue.clear();
153+
} else {
154+
if (upstreamDone) {
155+
Throwable ex = error;
156+
if (ex != null) {
157+
canceled = true;
158+
downstream.onError(ex);
159+
continue;
160+
}
161+
}
162+
if (iterator == null) {
163+
boolean d = upstreamDone;
164+
T item = queue.poll();
165+
boolean empty = item == null;
166+
167+
if (d && empty) {
168+
canceled = true;
169+
downstream.onComplete();
170+
continue;
171+
}
172+
173+
if (!empty) {
174+
175+
int c = upstreamConsumed + 1;
176+
if (c == limit) {
177+
upstreamConsumed = 0;
178+
upstream.request(limit);
179+
} else {
180+
upstreamConsumed = c;
181+
}
182+
183+
boolean hasNext;
184+
try {
185+
iterator = Objects.requireNonNull(
186+
mapper.apply(item).iterator(),
187+
"The Iterable returned a null iterator"
188+
);
189+
190+
if (canceled) {
191+
continue;
192+
}
193+
194+
hasNext = iterator.hasNext();
195+
} catch (Throwable ex) {
196+
canceled = true;
197+
downstream.onError(ex);
198+
continue;
199+
}
200+
201+
if (!hasNext) {
202+
iterator = null;
203+
continue;
204+
}
205+
currentIterator = iterator;
206+
}
207+
}
208+
209+
if (iterator != null) {
210+
long r = requested.get();
211+
212+
while (e != r) {
213+
214+
if (canceled) {
215+
continue outer;
216+
}
217+
218+
R result;
219+
220+
try {
221+
result = Objects.requireNonNull(iterator.next(),
222+
"The iterator returned a null item");
223+
} catch (Throwable ex) {
224+
canceled = true;
225+
downstream.onError(ex);
226+
continue outer;
227+
}
228+
229+
if (canceled) {
230+
continue outer;
231+
}
232+
233+
downstream.onNext(result);
234+
e++;
235+
236+
if (canceled) {
237+
continue outer;
238+
}
239+
240+
boolean hasNext;
241+
try {
242+
hasNext = iterator.hasNext();
243+
} catch (Throwable ex) {
244+
canceled = true;
245+
downstream.onError(ex);
246+
continue outer;
247+
}
248+
249+
if (canceled) {
250+
continue outer;
251+
}
252+
253+
if (!hasNext) {
254+
iterator = null;
255+
currentIterator = null;
256+
continue outer;
257+
}
258+
}
259+
}
260+
}
261+
262+
emitted = e;
263+
missed = addAndGet(-missed);
264+
if (missed == 0) {
265+
break;
266+
}
267+
}
268+
}
269+
}
270+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import org.reactivestreams.tck.TestEnvironment;
20+
import org.reactivestreams.tck.flow.FlowPublisherVerification;
21+
22+
import java.util.Collections;
23+
import java.util.concurrent.Flow;
24+
import java.util.stream.IntStream;
25+
26+
public class MultiFlatMapIterableManyToManyTckTest extends FlowPublisherVerification<Integer> {
27+
28+
public MultiFlatMapIterableManyToManyTckTest() {
29+
super(new TestEnvironment(50));
30+
}
31+
32+
@Override
33+
public Flow.Publisher<Integer> createFlowPublisher(long l) {
34+
return Multi.just(l >> 1, l - (l >> 1))
35+
.flatMapIterable(v -> () -> IntStream.range(0, v.intValue()).boxed().iterator());
36+
}
37+
38+
@Override
39+
public Flow.Publisher<Integer> createFailedFlowPublisher() {
40+
return null;
41+
}
42+
43+
@Override
44+
public long maxElementsFromPublisher() {
45+
return 10;
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package io.helidon.common.reactive;
18+
19+
import org.reactivestreams.tck.TestEnvironment;
20+
import org.reactivestreams.tck.flow.FlowPublisherVerification;
21+
22+
import java.util.Collections;
23+
import java.util.concurrent.Flow;
24+
import java.util.stream.IntStream;
25+
26+
public class MultiFlatMapIterableManyToOneTckTest extends FlowPublisherVerification<Integer> {
27+
28+
public MultiFlatMapIterableManyToOneTckTest() {
29+
super(new TestEnvironment(50));
30+
}
31+
32+
@Override
33+
public Flow.Publisher<Integer> createFlowPublisher(long l) {
34+
return Multi.from(() -> IntStream.range(0, (int)l).boxed().iterator())
35+
.flatMapIterable(Collections::singleton);
36+
}
37+
38+
@Override
39+
public Flow.Publisher<Integer> createFailedFlowPublisher() {
40+
return null;
41+
}
42+
43+
@Override
44+
public long maxElementsFromPublisher() {
45+
return 10;
46+
}
47+
}

0 commit comments

Comments
 (0)