Skip to content

Commit 6b5a732

Browse files
Eliminate *BatchObserver in favour of Publisher
`reactive-streams` has become the de-facto standard for reactive frameworks; we thus use this as a base to allow seamless interop (rather than prompt an extra adapter layer).
1 parent 2032e33 commit 6b5a732

13 files changed

+105
-191
lines changed

build.gradle

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ def getDevelopmentVersion() {
3030
version
3131
}
3232

33-
def slf4jVersion = '1.7.30'
3433
def releaseVersion = System.env.RELEASE_VERSION
3534
version = releaseVersion ? releaseVersion : getDevelopmentVersion()
3635
group = 'com.graphql-java'
@@ -64,12 +63,18 @@ jar {
6463
}
6564
}
6665

66+
def slf4jVersion = '1.7.30'
67+
def reactiveStreamsVersion = '1.0.3'
68+
6769
dependencies {
6870
api 'org.slf4j:slf4j-api:' + slf4jVersion
71+
api 'org.reactivestreams:reactive-streams:' + reactiveStreamsVersion
72+
6973
testImplementation 'org.slf4j:slf4j-simple:' + slf4jVersion
7074
testImplementation 'junit:junit:4.12'
7175
testImplementation 'org.awaitility:awaitility:2.0.0'
7276
testImplementation 'com.github.ben-manes.caffeine:caffeine:2.9.0'
77+
testImplementation 'io.projectreactor:reactor-core:3.6.6'
7378
}
7479

7580
task sourcesJar(type: Jar) {

src/main/java/org/dataloader/BatchObserver.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext;
1111
import org.dataloader.stats.context.IncrementLoadCountStatisticsContext;
1212
import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext;
13+
import org.reactivestreams.Subscriber;
14+
import org.reactivestreams.Subscription;
1315

1416
import java.time.Clock;
1517
import java.time.Instant;
@@ -246,7 +248,7 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object>
246248
return batchLoad
247249
.thenApply(values -> {
248250
assertResultSize(keys, values);
249-
if (isObserverLoader() || isMapObserverLoader()) {
251+
if (isPublisherLoader() || isMappedPublisherLoader()) {
250252
// We have already completed the queued futures by the time the overall batchLoad future has completed.
251253
return values;
252254
}
@@ -428,10 +430,10 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts,
428430
.context(context).keyContexts(keys, keyContexts).build();
429431
if (isMapLoader()) {
430432
batchLoad = invokeMapBatchLoader(keys, environment);
431-
} else if (isObserverLoader()) {
432-
batchLoad = invokeObserverBatchLoader(keys, keyContexts, queuedFutures, environment);
433-
} else if (isMapObserverLoader()) {
434-
batchLoad = invokeMappedObserverBatchLoader(keys, keyContexts, queuedFutures, environment);
433+
} else if (isPublisherLoader()) {
434+
batchLoad = invokePublisherBatchLoader(keys, keyContexts, queuedFutures, environment);
435+
} else if (isMappedPublisherLoader()) {
436+
batchLoad = invokeMappedPublisherBatchLoader(keys, keyContexts, queuedFutures, environment);
435437
} else {
436438
batchLoad = invokeListBatchLoader(keys, environment);
437439
}
@@ -503,46 +505,46 @@ private CompletableFuture<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoade
503505
});
504506
}
505507

506-
private CompletableFuture<List<V>> invokeObserverBatchLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
508+
private CompletableFuture<List<V>> invokePublisherBatchLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
507509
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
508-
BatchObserver<V> observer = new BatchObserverImpl(loadResult, keys, keyContexts, queuedFutures);
510+
Subscriber<V> subscriber = new DataLoaderSubscriber(loadResult, keys, keyContexts, queuedFutures);
509511

510512
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
511-
if (batchLoadFunction instanceof ObserverBatchLoaderWithContext) {
512-
ObserverBatchLoaderWithContext<K, V> loadFunction = (ObserverBatchLoaderWithContext<K, V>) batchLoadFunction;
513+
if (batchLoadFunction instanceof PublisherBatchLoaderWithContext) {
514+
PublisherBatchLoaderWithContext<K, V> loadFunction = (PublisherBatchLoaderWithContext<K, V>) batchLoadFunction;
513515
if (batchLoaderScheduler != null) {
514-
BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer, environment);
516+
BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, subscriber, environment);
515517
batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, environment);
516518
} else {
517-
loadFunction.load(keys, observer, environment);
519+
loadFunction.load(keys, subscriber, environment);
518520
}
519521
} else {
520-
ObserverBatchLoader<K, V> loadFunction = (ObserverBatchLoader<K, V>) batchLoadFunction;
522+
PublisherBatchLoader<K, V> loadFunction = (PublisherBatchLoader<K, V>) batchLoadFunction;
521523
if (batchLoaderScheduler != null) {
522-
BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer);
524+
BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, subscriber);
523525
batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, null);
524526
} else {
525-
loadFunction.load(keys, observer);
527+
loadFunction.load(keys, subscriber);
526528
}
527529
}
528530
return loadResult;
529531
}
530532

531-
private CompletableFuture<List<V>> invokeMappedObserverBatchLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
533+
private CompletableFuture<List<V>> invokeMappedPublisherBatchLoader(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
532534
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
533-
MappedBatchObserver<K, V> observer = new MappedBatchObserverImpl(loadResult, keys, keyContexts, queuedFutures);
535+
Subscriber<Map.Entry<K, V>> observer = new DataLoaderMapEntrySubscriber(loadResult, keys, keyContexts, queuedFutures);
534536

535537
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
536-
if (batchLoadFunction instanceof MappedObserverBatchLoaderWithContext) {
537-
MappedObserverBatchLoaderWithContext<K, V> loadFunction = (MappedObserverBatchLoaderWithContext<K, V>) batchLoadFunction;
538+
if (batchLoadFunction instanceof MappedPublisherBatchLoaderWithContext) {
539+
MappedPublisherBatchLoaderWithContext<K, V> loadFunction = (MappedPublisherBatchLoaderWithContext<K, V>) batchLoadFunction;
538540
if (batchLoaderScheduler != null) {
539541
BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer, environment);
540542
batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, environment);
541543
} else {
542544
loadFunction.load(keys, observer, environment);
543545
}
544546
} else {
545-
MappedObserverBatchLoader<K, V> loadFunction = (MappedObserverBatchLoader<K, V>) batchLoadFunction;
547+
MappedPublisherBatchLoader<K, V> loadFunction = (MappedPublisherBatchLoader<K, V>) batchLoadFunction;
546548
if (batchLoaderScheduler != null) {
547549
BatchLoaderScheduler.ScheduledObserverBatchLoaderCall loadCall = () -> loadFunction.load(keys, observer);
548550
batchLoaderScheduler.scheduleObserverBatchLoader(loadCall, keys, null);
@@ -557,12 +559,12 @@ private boolean isMapLoader() {
557559
return batchLoadFunction instanceof MappedBatchLoader || batchLoadFunction instanceof MappedBatchLoaderWithContext;
558560
}
559561

560-
private boolean isObserverLoader() {
561-
return batchLoadFunction instanceof ObserverBatchLoader;
562+
private boolean isPublisherLoader() {
563+
return batchLoadFunction instanceof PublisherBatchLoader;
562564
}
563565

564-
private boolean isMapObserverLoader() {
565-
return batchLoadFunction instanceof MappedObserverBatchLoader;
566+
private boolean isMappedPublisherLoader() {
567+
return batchLoadFunction instanceof MappedPublisherBatchLoader;
566568
}
567569

568570
int dispatchDepth() {
@@ -616,7 +618,8 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
616618
return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
617619
}
618620

619-
private class BatchObserverImpl implements BatchObserver<V> {
621+
private class DataLoaderSubscriber implements Subscriber<V> {
622+
620623
private final CompletableFuture<List<V>> valuesFuture;
621624
private final List<K> keys;
622625
private final List<Object> callContexts;
@@ -628,7 +631,7 @@ private class BatchObserverImpl implements BatchObserver<V> {
628631
private boolean onErrorCalled = false;
629632
private boolean onCompletedCalled = false;
630633

631-
private BatchObserverImpl(
634+
private DataLoaderSubscriber(
632635
CompletableFuture<List<V>> valuesFuture,
633636
List<K> keys,
634637
List<Object> callContexts,
@@ -640,6 +643,11 @@ private BatchObserverImpl(
640643
this.queuedFutures = queuedFutures;
641644
}
642645

646+
@Override
647+
public void onSubscribe(Subscription subscription) {
648+
subscription.request(keys.size());
649+
}
650+
643651
@Override
644652
public void onNext(V value) {
645653
assert !onErrorCalled && !onCompletedCalled;
@@ -671,7 +679,7 @@ public void onNext(V value) {
671679
}
672680

673681
@Override
674-
public void onCompleted() {
682+
public void onComplete() {
675683
assert !onErrorCalled;
676684
onCompletedCalled = true;
677685

@@ -701,7 +709,7 @@ public void onError(Throwable ex) {
701709
}
702710
}
703711

704-
private class MappedBatchObserverImpl implements MappedBatchObserver<K, V> {
712+
private class DataLoaderMapEntrySubscriber implements Subscriber<Map.Entry<K, V>> {
705713
private final CompletableFuture<List<V>> valuesFuture;
706714
private final List<K> keys;
707715
private final List<Object> callContexts;
@@ -714,7 +722,7 @@ private class MappedBatchObserverImpl implements MappedBatchObserver<K, V> {
714722
private boolean onErrorCalled = false;
715723
private boolean onCompletedCalled = false;
716724

717-
private MappedBatchObserverImpl(
725+
private DataLoaderMapEntrySubscriber(
718726
CompletableFuture<List<V>> valuesFuture,
719727
List<K> keys,
720728
List<Object> callContexts,
@@ -737,8 +745,15 @@ private MappedBatchObserverImpl(
737745
}
738746

739747
@Override
740-
public void onNext(K key, V value) {
748+
public void onSubscribe(Subscription subscription) {
749+
subscription.request(keys.size());
750+
}
751+
752+
@Override
753+
public void onNext(Map.Entry<K, V> entry) {
741754
assert !onErrorCalled && !onCompletedCalled;
755+
K key = entry.getKey();
756+
V value = entry.getValue();
742757

743758
Object callContext = callContextByKey.get(key);
744759
CompletableFuture<V> future = queuedFutureByKey.get(key);
@@ -765,7 +780,7 @@ public void onNext(K key, V value) {
765780
}
766781

767782
@Override
768-
public void onCompleted() {
783+
public void onComplete() {
769784
assert !onErrorCalled;
770785
onCompletedCalled = true;
771786

src/main/java/org/dataloader/MappedBatchObserver.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

src/main/java/org/dataloader/MappedObserverBatchLoaderWithContext.java

Lines changed: 0 additions & 10 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
package org.dataloader;
22

3+
import org.reactivestreams.Subscriber;
4+
35
import java.util.List;
6+
import java.util.Map;
47

58
/**
69
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
710
* <p>
8-
* The function will call the provided {@link MappedBatchObserver} to process the key/value pairs it has retrieved to allow
11+
* The function will call the provided {@link Subscriber} to process the key/value pairs it has retrieved to allow
912
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available
1013
* (rather than when all values have been retrieved).
1114
*
1215
* @param <K> type parameter indicating the type of keys to use for data load requests.
1316
* @param <V> type parameter indicating the type of values returned
1417
*/
15-
public interface MappedObserverBatchLoader<K, V> {
16-
void load(List<K> keys, MappedBatchObserver<K, V> observer);
18+
public interface MappedPublisherBatchLoader<K, V> {
19+
void load(List<K> keys, Subscriber<Map.Entry<K, V>> subscriber);
1720
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.dataloader;
2+
3+
import org.reactivestreams.Subscriber;
4+
5+
import java.util.List;
6+
import java.util.Map;
7+
8+
/**
9+
* A {@link MappedPublisherBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}.
10+
*/
11+
public interface MappedPublisherBatchLoaderWithContext<K, V> {
12+
void load(List<K> keys, Subscriber<Map.Entry<K, V>> subscriber, BatchLoaderEnvironment environment);
13+
}

src/main/java/org/dataloader/ObserverBatchLoaderWithContext.java

Lines changed: 0 additions & 10 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package org.dataloader;
22

3+
import org.reactivestreams.Subscriber;
4+
35
import java.util.List;
46

57
/**
68
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
79
* <p>
8-
* The function will call the provided {@link BatchObserver} to process the values it has retrieved to allow
10+
* The function will call the provided {@link Subscriber} to process the values it has retrieved to allow
911
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available
1012
* (rather than when all values have been retrieved).
1113
* <p>
@@ -14,6 +16,6 @@
1416
* @param <K> type parameter indicating the type of keys to use for data load requests.
1517
* @param <V> type parameter indicating the type of values returned
1618
*/
17-
public interface ObserverBatchLoader<K, V> {
18-
void load(List<K> keys, BatchObserver<V> observer);
19+
public interface PublisherBatchLoader<K, V> {
20+
void load(List<K> keys, Subscriber<V> subscriber);
1921
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.dataloader;
2+
3+
import org.reactivestreams.Subscriber;
4+
5+
import java.util.List;
6+
7+
/**
8+
* An {@link PublisherBatchLoader} with a {@link BatchLoaderEnvironment} provided as an extra parameter to {@link #load}.
9+
*/
10+
public interface PublisherBatchLoaderWithContext<K, V> {
11+
void load(List<K> keys, Subscriber<V> subscriber, BatchLoaderEnvironment environment);
12+
}

0 commit comments

Comments
 (0)