Skip to content

Commit 9da4779

Browse files
authored
Merge pull request #128 from graphql-java/batch_scheduler
Batch scheduler function support
2 parents db61b14 + 23492f2 commit 9da4779

File tree

7 files changed

+395
-5
lines changed

7 files changed

+395
-5
lines changed

README.md

+46-1
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,52 @@ and there are also gains to this different mode of operation:
510510
However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures
511511
in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs.
512512

513-
## Scheduled Dispatching
513+
## The BatchLoader Scheduler
514+
515+
By default, when `dataLoader.dispatch()` is called, the `BatchLoader` / `MappedBatchLoader` function will be invoked
516+
immediately.
517+
518+
However, you can provide your own `BatchLoaderScheduler` that allows this call to be done some time into
519+
the future.
520+
521+
You will be passed a callback (`ScheduledBatchLoaderCall` / `ScheduledMapBatchLoaderCall`) and you are expected
522+
to eventually call this callback method to make the batch loading happen.
523+
524+
The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invoking the batch loading functions.
525+
526+
```java
527+
new BatchLoaderScheduler() {
528+
529+
@Override
530+
public <K, V> CompletionStage<List<V>> scheduleBatchLoader(ScheduledBatchLoaderCall<V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
531+
return CompletableFuture.supplyAsync(() -> {
532+
snooze(10);
533+
return scheduledCall.invoke();
534+
}).thenCompose(Function.identity());
535+
}
536+
537+
@Override
538+
public <K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall<K, V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
539+
return CompletableFuture.supplyAsync(() -> {
540+
snooze(10);
541+
return scheduledCall.invoke();
542+
}).thenCompose(Function.identity());
543+
}
544+
};
545+
```
546+
547+
You are given the keys to be loaded and an optional `BatchLoaderEnvironment` for informative purposes. You can't change the list of
548+
keys that will be loaded via this mechanism say.
549+
550+
Also note, because there is a max batch size, it is possible for this scheduling to happen N times for a given `dispatch()`
551+
call. The total set of keys will be sliced into batches themselves and then the `BatchLoaderScheduler` will be called for
552+
each batch of keys.
553+
554+
Do not assume that a single call to `dispatch()` results in a single call to `BatchLoaderScheduler`.
555+
556+
This code is inspired from the scheduling code in the [reference JS implementation](https://github.com/graphql/dataloader#batch-scheduling)
557+
558+
## Scheduled Registry Dispatching
514559

515560
`ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a
516561
predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked.

src/main/java/org/dataloader/DataLoaderHelper.java

+31-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.dataloader.annotations.GuardedBy;
44
import org.dataloader.annotations.Internal;
55
import org.dataloader.impl.CompletableFutureKit;
6+
import org.dataloader.scheduler.BatchLoaderScheduler;
67
import org.dataloader.stats.StatisticsCollector;
78
import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext;
89
import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext;
@@ -417,10 +418,23 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts)
417418
@SuppressWarnings("unchecked")
418419
private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
419420
CompletionStage<List<V>> loadResult;
421+
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
420422
if (batchLoadFunction instanceof BatchLoaderWithContext) {
421-
loadResult = ((BatchLoaderWithContext<K, V>) batchLoadFunction).load(keys, environment);
423+
BatchLoaderWithContext<K, V> loadFunction = (BatchLoaderWithContext<K, V>) batchLoadFunction;
424+
if (batchLoaderScheduler != null) {
425+
BatchLoaderScheduler.ScheduledBatchLoaderCall<V> loadCall = () -> loadFunction.load(keys, environment);
426+
loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, environment);
427+
} else {
428+
loadResult = loadFunction.load(keys, environment);
429+
}
422430
} else {
423-
loadResult = ((BatchLoader<K, V>) batchLoadFunction).load(keys);
431+
BatchLoader<K, V> loadFunction = (BatchLoader<K, V>) batchLoadFunction;
432+
if (batchLoaderScheduler != null) {
433+
BatchLoaderScheduler.ScheduledBatchLoaderCall<V> loadCall = () -> loadFunction.load(keys);
434+
loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, null);
435+
} else {
436+
loadResult = loadFunction.load(keys);
437+
}
424438
}
425439
return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture();
426440
}
@@ -434,10 +448,23 @@ private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoad
434448
private CompletableFuture<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
435449
CompletionStage<Map<K, V>> loadResult;
436450
Set<K> setOfKeys = new LinkedHashSet<>(keys);
451+
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
437452
if (batchLoadFunction instanceof MappedBatchLoaderWithContext) {
438-
loadResult = ((MappedBatchLoaderWithContext<K, V>) batchLoadFunction).load(setOfKeys, environment);
453+
MappedBatchLoaderWithContext<K, V> loadFunction = (MappedBatchLoaderWithContext<K, V>) batchLoadFunction;
454+
if (batchLoaderScheduler != null) {
455+
BatchLoaderScheduler.ScheduledMappedBatchLoaderCall<K, V> loadCall = () -> loadFunction.load(setOfKeys, environment);
456+
loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, environment);
457+
} else {
458+
loadResult = loadFunction.load(setOfKeys, environment);
459+
}
439460
} else {
440-
loadResult = ((MappedBatchLoader<K, V>) batchLoadFunction).load(setOfKeys);
461+
MappedBatchLoader<K, V> loadFunction = (MappedBatchLoader<K, V>) batchLoadFunction;
462+
if (batchLoaderScheduler != null) {
463+
BatchLoaderScheduler.ScheduledMappedBatchLoaderCall<K, V> loadCall = () -> loadFunction.load(setOfKeys);
464+
loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, null);
465+
} else {
466+
loadResult = loadFunction.load(setOfKeys);
467+
}
441468
}
442469
CompletableFuture<Map<K, V>> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture();
443470
return mapBatchLoad.thenApply(map -> {

src/main/java/org/dataloader/DataLoaderOptions.java

+24
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.dataloader.annotations.PublicApi;
2020
import org.dataloader.impl.Assertions;
21+
import org.dataloader.scheduler.BatchLoaderScheduler;
2122
import org.dataloader.stats.NoOpStatisticsCollector;
2223
import org.dataloader.stats.StatisticsCollector;
2324

@@ -46,6 +47,7 @@ public class DataLoaderOptions {
4647
private Supplier<StatisticsCollector> statisticsCollector;
4748
private BatchLoaderContextProvider environmentProvider;
4849
private ValueCacheOptions valueCacheOptions;
50+
private BatchLoaderScheduler batchLoaderScheduler;
4951

5052
/**
5153
* Creates a new data loader options with default settings.
@@ -58,6 +60,7 @@ public DataLoaderOptions() {
5860
statisticsCollector = NoOpStatisticsCollector::new;
5961
environmentProvider = NULL_PROVIDER;
6062
valueCacheOptions = ValueCacheOptions.newOptions();
63+
batchLoaderScheduler = null;
6164
}
6265

6366
/**
@@ -77,6 +80,7 @@ public DataLoaderOptions(DataLoaderOptions other) {
7780
this.statisticsCollector = other.statisticsCollector;
7881
this.environmentProvider = other.environmentProvider;
7982
this.valueCacheOptions = other.valueCacheOptions;
83+
batchLoaderScheduler = other.batchLoaderScheduler;
8084
}
8185

8286
/**
@@ -304,4 +308,24 @@ public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOption
304308
this.valueCacheOptions = Assertions.nonNull(valueCacheOptions);
305309
return this;
306310
}
311+
312+
/**
313+
* @return the {@link BatchLoaderScheduler} to use, which can be null
314+
*/
315+
public BatchLoaderScheduler getBatchLoaderScheduler() {
316+
return batchLoaderScheduler;
317+
}
318+
319+
/**
320+
* Sets in a new {@link BatchLoaderScheduler} that allows the call to a {@link BatchLoader} function to be scheduled
321+
* to some future time.
322+
*
323+
* @param batchLoaderScheduler the scheduler
324+
*
325+
* @return the data loader options for fluent coding
326+
*/
327+
public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) {
328+
this.batchLoaderScheduler = batchLoaderScheduler;
329+
return this;
330+
}
307331
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.dataloader.scheduler;
2+
3+
import org.dataloader.BatchLoader;
4+
import org.dataloader.BatchLoaderEnvironment;
5+
import org.dataloader.DataLoader;
6+
import org.dataloader.DataLoaderOptions;
7+
import org.dataloader.MappedBatchLoader;
8+
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.concurrent.CompletionStage;
12+
13+
/**
14+
* By default, when {@link DataLoader#dispatch()} is called, the {@link BatchLoader} / {@link MappedBatchLoader} function will be invoked
15+
* immediately. However, you can provide your own {@link BatchLoaderScheduler} that allows this call to be done some time into
16+
* the future. You will be passed a callback ({@link ScheduledBatchLoaderCall} / {@link ScheduledMappedBatchLoaderCall} and you are expected
17+
* to eventually call this callback method to make the batch loading happen.
18+
* <p>
19+
* Note: Because there is a {@link DataLoaderOptions#maxBatchSize()} it is possible for this scheduling to happen N times for a given {@link DataLoader#dispatch()}
20+
* call. The total set of keys will be sliced into batches themselves and then the {@link BatchLoaderScheduler} will be called for
21+
* each batch of keys. Do not assume that a single call to {@link DataLoader#dispatch()} results in a single call to {@link BatchLoaderScheduler}.
22+
*/
23+
public interface BatchLoaderScheduler {
24+
25+
26+
/**
27+
* This represents a callback that will invoke a {@link BatchLoader} function under the covers
28+
*
29+
* @param <V> the value type
30+
*/
31+
interface ScheduledBatchLoaderCall<V> {
32+
CompletionStage<List<V>> invoke();
33+
}
34+
35+
/**
36+
* This represents a callback that will invoke a {@link MappedBatchLoader} function under the covers
37+
*
38+
* @param <K> the key type
39+
* @param <V> the value type
40+
*/
41+
interface ScheduledMappedBatchLoaderCall<K, V> {
42+
CompletionStage<Map<K, V>> invoke();
43+
}
44+
45+
/**
46+
* This is called to schedule a {@link BatchLoader} call.
47+
*
48+
* @param scheduledCall the callback that needs to be invoked to allow the {@link BatchLoader} to proceed.
49+
* @param keys this is the list of keys that will be passed to the {@link BatchLoader}.
50+
* This is provided only for informative reasons and you cant change the keys that are used
51+
* @param environment this is the {@link BatchLoaderEnvironment} in place,
52+
* which can be null if it's a simple {@link BatchLoader} call
53+
* @param <K> the key type
54+
* @param <V> the value type
55+
*
56+
* @return a promise to the values that come from the {@link BatchLoader}
57+
*/
58+
<K, V> CompletionStage<List<V>> scheduleBatchLoader(ScheduledBatchLoaderCall<V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment);
59+
60+
/**
61+
* This is called to schedule a {@link MappedBatchLoader} call.
62+
*
63+
* @param scheduledCall the callback that needs to be invoked to allow the {@link MappedBatchLoader} to proceed.
64+
* @param keys this is the list of keys that will be passed to the {@link MappedBatchLoader}.
65+
* This is provided only for informative reasons and you cant change the keys that are used
66+
* @param environment this is the {@link BatchLoaderEnvironment} in place,
67+
* which can be null if it's a simple {@link MappedBatchLoader} call
68+
* @param <K> the key type
69+
* @param <V> the value type
70+
*
71+
* @return a promise to the values that come from the {@link BatchLoader}
72+
*/
73+
<K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall<K, V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment);
74+
}

src/test/java/ReadmeExamples.java

+26
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.dataloader.fixtures.UserManager;
1313
import org.dataloader.registries.DispatchPredicate;
1414
import org.dataloader.registries.ScheduledDataLoaderRegistry;
15+
import org.dataloader.scheduler.BatchLoaderScheduler;
1516
import org.dataloader.stats.Statistics;
1617
import org.dataloader.stats.ThreadLocalStatisticsCollector;
1718

@@ -23,6 +24,7 @@
2324
import java.util.Set;
2425
import java.util.concurrent.CompletableFuture;
2526
import java.util.concurrent.CompletionStage;
27+
import java.util.function.Function;
2628
import java.util.stream.Collectors;
2729

2830
import static java.lang.String.format;
@@ -278,6 +280,30 @@ private void statsConfigExample() {
278280
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);
279281
}
280282

283+
private void snooze(int i) {
284+
}
285+
286+
private void BatchLoaderSchedulerExample() {
287+
new BatchLoaderScheduler() {
288+
289+
@Override
290+
public <K, V> CompletionStage<List<V>> scheduleBatchLoader(ScheduledBatchLoaderCall<V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
291+
return CompletableFuture.supplyAsync(() -> {
292+
snooze(10);
293+
return scheduledCall.invoke();
294+
}).thenCompose(Function.identity());
295+
}
296+
297+
@Override
298+
public <K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall<K, V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
299+
return CompletableFuture.supplyAsync(() -> {
300+
snooze(10);
301+
return scheduledCall.invoke();
302+
}).thenCompose(Function.identity());
303+
}
304+
};
305+
}
306+
281307
private void ScheduledDispatche() {
282308
DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10)
283309
.or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200)));

src/test/java/org/dataloader/fixtures/TestKit.java

+27
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
package org.dataloader.fixtures;
22

33
import org.dataloader.BatchLoader;
4+
import org.dataloader.BatchLoaderWithContext;
45
import org.dataloader.DataLoader;
56
import org.dataloader.DataLoaderFactory;
67
import org.dataloader.DataLoaderOptions;
8+
import org.dataloader.MappedBatchLoader;
9+
import org.dataloader.MappedBatchLoaderWithContext;
710

811
import java.util.ArrayList;
912
import java.util.Collection;
13+
import java.util.HashMap;
1014
import java.util.List;
15+
import java.util.Map;
16+
import java.util.Set;
1117
import java.util.concurrent.CompletableFuture;
1218

1319
import static java.util.stream.Collectors.toList;
@@ -19,6 +25,27 @@ public static <T> BatchLoader<T, T> keysAsValues() {
1925
return CompletableFuture::completedFuture;
2026
}
2127

28+
public static <T> BatchLoaderWithContext<T, T> keysAsValuesWithContext() {
29+
return (keys, env) -> CompletableFuture.completedFuture(keys);
30+
}
31+
32+
public static <K, V> MappedBatchLoader<K, V> keysAsMapOfValues() {
33+
return keys -> mapOfKeys(keys);
34+
}
35+
36+
public static <K, V> MappedBatchLoaderWithContext<K, V> keysAsMapOfValuesWithContext() {
37+
return (keys, env) -> mapOfKeys(keys);
38+
}
39+
40+
private static <K, V> CompletableFuture<Map<K, V>> mapOfKeys(Set<K> keys) {
41+
Map<K, V> map = new HashMap<>();
42+
for (K key : keys) {
43+
//noinspection unchecked
44+
map.put(key, (V) key);
45+
}
46+
return CompletableFuture.completedFuture(map);
47+
}
48+
2249
public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
2350
return keys -> {
2451
List<K> ks = new ArrayList<>(keys);

0 commit comments

Comments
 (0)