Skip to content

Commit 9ace98d

Browse files
committed
Add batching primitives
1 parent 8160538 commit 9ace98d

File tree

10 files changed

+421
-73
lines changed

10 files changed

+421
-73
lines changed

lib/graphql/define.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require "graphql/define/assign_argument"
2+
require "graphql/define/assign_batch_resolve"
23
require "graphql/define/assign_connection"
34
require "graphql/define/assign_enum_value"
45
require "graphql/define/assign_global_id_field"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module GraphQL
2+
module Define
3+
module AssignBatchResolve
4+
def self.call(field_defn, loader, *loader_args, resolve_func)
5+
field_defn.batch_loader = GraphQL::Execution::Batch::BatchLoader.new(loader, loader_args)
6+
field_defn.resolve = resolve_func
7+
end
8+
end
9+
end
10+
end

lib/graphql/execution.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
require "graphql/execution/batch"
12
require "graphql/execution/deferred_execution"
23
require "graphql/execution/directive_checks"
34
require "graphql/execution/exec_frame"
45
require "graphql/execution/exec_scope"
56
require "graphql/execution/exec_stream"
67
require "graphql/execution/exec_thread"
78
require "graphql/execution/merge_branch_result"
9+
require "graphql/execution/merge_collector"
810
require "graphql/execution/typecast"

lib/graphql/execution/batch.rb

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
module GraphQL
2+
module Execution
3+
module Batch
4+
def self.resolve(item_arg, func)
5+
BatchResolve.new(item_arg, func )
6+
end
7+
8+
class BatchLoader
9+
attr_reader :func, :args
10+
def initialize(func, args)
11+
@func = func
12+
@args = args
13+
end
14+
end
15+
16+
class BatchResolve
17+
attr_reader :item_arg, :func
18+
def initialize(item_arg, func)
19+
@item_arg = item_arg
20+
@func = func
21+
end
22+
end
23+
24+
class Accumulator
25+
def initialize
26+
@storage = init_storage
27+
end
28+
29+
def register(loader, group_args, frame, batch_resolve)
30+
key = [loader, group_args]
31+
callback = [frame, batch_resolve.func]
32+
@storage[key][batch_resolve.item_arg] << callback
33+
end
34+
35+
def any?
36+
@storage.any?
37+
end
38+
39+
def resolve_all(&block)
40+
batches = @storage
41+
@storage = init_storage
42+
batches.each do |(loader, group_args), item_arg_callbacks|
43+
item_args = item_arg_callbacks.keys
44+
loader.call(*group_args, item_args) { |item_arg, result|
45+
callbacks = item_arg_callbacks[item_arg]
46+
callbacks.each do |(frame, func)|
47+
next_result = func.nil? ? result : func.call(result)
48+
yield(frame, next_result)
49+
end
50+
}
51+
end
52+
end
53+
54+
private
55+
56+
def init_storage
57+
# { [loader, group_args] => { item_arg => [callback, ...] } }
58+
Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } }
59+
end
60+
end
61+
end
62+
end
63+
end

lib/graphql/execution/deferred_execution.rb

Lines changed: 120 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def initialize(field_name, value)
7575
#
7676
# @return [Object] the initial result, without any defers
7777
def execute(ast_operation, root_type, query_object)
78-
collector = query_object.context[CONTEXT_PATCH_TARGET]
78+
collector = query_object.context[CONTEXT_PATCH_TARGET] || MergeCollector.new
7979
irep_root = query_object.internal_representation[ast_operation.name]
8080

8181
scope = ExecScope.new(query_object)
@@ -88,81 +88,93 @@ def execute(ast_operation, root_type, query_object)
8888
)
8989

9090
initial_result = resolve_or_defer_frame(scope, initial_thread, initial_frame)
91+
resolve_batches(scope, initial_thread, query_object, initial_result)
9192

92-
if collector
93-
initial_data = if initial_result == DEFERRED_RESULT
94-
{}
95-
else
96-
initial_result
97-
end
98-
initial_patch = {"data" => initial_data}
93+
initial_data = if initial_result == DEFERRED_RESULT
94+
{}
95+
else
96+
initial_result
97+
end
9998

100-
initial_errors = initial_thread.errors + query_object.context.errors
101-
error_idx = initial_errors.length
99+
initial_patch = {"data" => initial_data}
102100

103-
if initial_errors.any?
104-
initial_patch["errors"] = initial_errors.map(&:to_h)
105-
end
101+
initial_errors = initial_thread.errors + query_object.context.errors
102+
error_idx = initial_errors.length
106103

107-
collector.patch(
108-
path: [],
109-
value: initial_patch
110-
)
104+
if initial_errors.any?
105+
initial_patch["errors"] = initial_errors.map(&:to_h)
106+
end
111107

112-
defers = initial_thread.defers
113-
while defers.any?
114-
next_defers = []
115-
defers.each do |deferral|
116-
deferred_thread = ExecThread.new
117-
case deferral
118-
when ExecFrame
119-
deferred_frame = deferral
120-
deferred_result = resolve_frame(scope, deferred_thread, deferred_frame)
121-
122-
when ExecStream
123-
begin
124-
list_frame = deferral.frame
125-
inner_type = deferral.type
126-
item, idx = deferral.enumerator.next
127-
deferred_frame = ExecFrame.new({
128-
node: list_frame.node,
129-
path: list_frame.path + [idx],
130-
type: inner_type,
131-
value: item,
132-
})
133-
deferred_result = resolve_value(scope, deferred_thread, deferred_frame, item, inner_type)
134-
deferred_thread.defers << deferral
135-
rescue StopIteration
136-
# The enum is done
137-
end
138-
else
139-
raise("Can't continue deferred #{deferred_frame.class.name}")
140-
end
108+
collector.patch(
109+
path: [],
110+
value: initial_patch
111+
)
141112

142-
# TODO: Should I patch nil?
143-
if !deferred_result.nil?
144-
collector.patch(
145-
path: ["data"] + deferred_frame.path,
146-
value: deferred_result
147-
)
113+
defers = initial_thread.defers
114+
while defers.any?
115+
next_defers = []
116+
defers.each do |deferral|
117+
deferred_thread = ExecThread.new
118+
case deferral
119+
when ExecFrame
120+
deferred_frame = deferral
121+
deferred_result = resolve_frame(scope, deferred_thread, deferred_frame)
122+
if deferred_result.is_a?(GraphQL::Execution::Batch::BatchResolve)
123+
# TODO: this will miss nested resolves
124+
res = {}
125+
resolve_batches(scope, deferred_thread, query_object, res)
126+
deferred_result = get_in(res, deferred_frame.path)
148127
end
128+
when ExecStream
129+
begin
130+
list_frame = deferral.frame
131+
inner_type = deferral.type
132+
item, idx = deferral.enumerator.next
133+
deferred_frame = ExecFrame.new({
134+
node: list_frame.node,
135+
path: list_frame.path + [idx],
136+
type: inner_type,
137+
value: item,
138+
})
139+
field_defn = scope.get_field(list_frame.type, list_frame.node.definition_name)
140+
deferred_result = resolve_value(scope, deferred_thread, deferred_frame, item, field_defn, inner_type)
141+
142+
# TODO: deep merge ??
143+
res = {}
144+
resolve_batches(scope, deferred_thread, query_object, res)
145+
if res.any?
146+
deferred_result = get_in(res, deferred_frame.path)
147+
end
149148

150-
deferred_thread.errors.each do |deferred_error|
151-
collector.patch(
152-
path: ["errors", error_idx],
153-
value: deferred_error.to_h
154-
)
155-
error_idx += 1
149+
deferred_thread.defers << deferral
150+
rescue StopIteration
151+
# The enum is done
156152
end
157-
next_defers.push(*deferred_thread.defers)
153+
else
154+
raise("Can't continue deferred #{deferred_frame.class.name}")
155+
end
156+
157+
# TODO: Should I patch nil?
158+
if !deferred_result.nil?
159+
collector.patch(
160+
path: ["data"].concat(deferred_frame.path),
161+
value: deferred_result
162+
)
158163
end
159-
defers = next_defers
164+
165+
deferred_thread.errors.each do |deferred_error|
166+
collector.patch(
167+
path: ["errors", error_idx],
168+
value: deferred_error.to_h
169+
)
170+
error_idx += 1
171+
end
172+
next_defers.push(*deferred_thread.defers)
160173
end
161-
else
162-
query_object.context.errors.concat(initial_thread.errors)
174+
defers = next_defers
163175
end
164176

165-
initial_result
177+
initial_data
166178
end
167179

168180
private
@@ -197,6 +209,7 @@ def resolve_frame(scope, thread, frame)
197209
thread,
198210
frame,
199211
field_result,
212+
field_defn,
200213
return_type_defn,
201214
)
202215
else
@@ -250,7 +263,9 @@ def resolve_selections(scope, thread, outer_frame)
250263
# This value has already been resolved in another type branch
251264
end
252265

253-
if inner_result != DEFERRED_RESULT
266+
if inner_result == DEFERRED_RESULT || inner_result.is_a?(GraphQL::Execution::Batch::BatchResolve)
267+
# This result was dealt with by the thread
268+
else
254269
GraphQL::Execution::MergeBranchResult.merge(selection_result, { selection_name => inner_result })
255270
end
256271
end
@@ -319,13 +334,22 @@ def resolve_field_frame(scope, thread, frame, field_defn)
319334
# expected to be an instance of `type_defn`.
320335
# This coerces terminals and recursively resolves non-terminals (object, list, non-null).
321336
# @return [Object] the response-ready version of `value`
322-
def resolve_value(scope, thread, frame, value, type_defn)
323-
if value.nil? || value.is_a?(GraphQL::ExecutionError)
337+
def resolve_value(scope, thread, frame, value, field_defn, type_defn)
338+
case value
339+
when nil, GraphQL::ExecutionError
324340
if type_defn.kind.non_null?
325341
raise InnerInvalidNullError.new(frame.node.ast_node.name, value)
326342
else
327343
nil
328344
end
345+
when GraphQL::Execution::Batch::BatchResolve
346+
scope.query.accumulator.register(
347+
field_defn.batch_loader.func,
348+
field_defn.batch_loader.args,
349+
frame,
350+
value
351+
)
352+
value
329353
else
330354
case type_defn.kind
331355
when GraphQL::TypeKinds::SCALAR, GraphQL::TypeKinds::ENUM
@@ -337,11 +361,11 @@ def resolve_value(scope, thread, frame, value, type_defn)
337361
if !resolved_type.is_a?(GraphQL::ObjectType) || !possible_types.include?(resolved_type)
338362
raise GraphQL::UnresolvedTypeError.new(frame.node.definition_name, type_defn, frame.node.parent.return_type, resolved_type, possible_types)
339363
else
340-
resolve_value(scope, thread, frame, value, resolved_type)
364+
resolve_value(scope, thread, frame, value, field_defn, resolved_type)
341365
end
342366
when GraphQL::TypeKinds::NON_NULL
343367
wrapped_type = type_defn.of_type
344-
resolve_value(scope, thread, frame, value, wrapped_type)
368+
resolve_value(scope, thread, frame, value, field_defn, wrapped_type)
345369
when GraphQL::TypeKinds::LIST
346370
wrapped_type = type_defn.of_type
347371
items_enumerator = value.map.with_index
@@ -361,7 +385,7 @@ def resolve_value(scope, thread, frame, value, type_defn)
361385
type: wrapped_type,
362386
value: item,
363387
})
364-
resolve_value(scope, thread, inner_frame, item, wrapped_type)
388+
resolve_value(scope, thread, inner_frame, item, field_defn, wrapped_type)
365389
end
366390
resolved_values
367391
end
@@ -378,6 +402,33 @@ def resolve_value(scope, thread, frame, value, type_defn)
378402
end
379403
end
380404
end
405+
406+
def set_in(data, path, value)
407+
path = path.dup
408+
last = path.pop
409+
path.each do |key|
410+
data = data[key] ||= {}
411+
end
412+
data[last] = value
413+
end
414+
415+
def get_in(data, path)
416+
path.each do |key|
417+
data = data[key]
418+
end
419+
data
420+
end
421+
422+
def resolve_batches(scope, thread, query_object, merge_target)
423+
while query_object.accumulator.any?
424+
query_object.accumulator.resolve_all do |frame, value|
425+
# TODO cache on frame
426+
field_defn = scope.get_field(frame.type, frame.node.definition_name)
427+
finished_value = resolve_value(scope, thread, frame, value, field_defn, field_defn.type)
428+
set_in(merge_target, frame.path, finished_value)
429+
end
430+
end
431+
end
381432
end
382433
end
383434
end
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
module GraphQL
2+
module Execution
3+
class MergeCollector
4+
attr_reader :result
5+
def initialize
6+
@result = nil
7+
end
8+
9+
def patch(path:, value:)
10+
if @result.nil?
11+
# first patch
12+
@result = value
13+
else
14+
last = path.pop
15+
target = @result
16+
path.each do |key|
17+
target = target[key]
18+
end
19+
target[last] = value
20+
end
21+
end
22+
end
23+
end
24+
end

0 commit comments

Comments
 (0)