Skip to content

Commit 5513f0d

Browse files
committed
Lazy Concurrency Per Evaluation Layer
This change adds support for the concurrent resolution of lazy objects. While it is currently possible to return something like a `::Concurrent::Promise.execute` from a GraphQL method, there is currently no way to make this work in tandem with lazy objects. Consider the case in which we would like to use a Gem like `graphql-batch` (or a thread-safe alternative) but execute queries in parallel whenever possible: ``` { post(id: 10) { author { name } comments { count } } } ``` In this query you could imagine that the each of `post`, `author`, and `comments` are separate DB calls. While it may not be possible for `post` and `author` to be executed in parallel, certainly `author` and `comments` can. But we would still like to perform this operation lazily because if this query is expanded: ``` { a: post(id: 10) { author { name } comments { count } } b: post(id: 11) { author { name } comments { count } } } ``` We would like to be able to load the authors for post 10 and 11 in a single query. I have implemented this solution by allowing the `lazy_resolve` directive to accept an additional method name (the concurrent execution method). This method will be called on all layers (breadth first) before the `value` method is called. This ensures that concurrent execution can be delayed until the last possible moment (to enable batching) but it also ensures that multiple batches can be run in parallel if they are resolved in the same graph layer. Although intended for concurrent execution, it is not necessary for this new method to actually perform an operation concurrently (i.e it does not need to return a Thread or anything like that). This allows `graphql-ruby` to not enforce any specific parallel execution primitive (threads or `concurrent-ruby` could be used interchangeably). I know this is a large PR, so I am happy to split it up into multiple PRs if the overall approach is agreeable.
1 parent 87db4e3 commit 5513f0d

File tree

11 files changed

+289
-65
lines changed

11 files changed

+289
-65
lines changed

lib/graphql/execution/execute.rb

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -115,20 +115,29 @@ def resolve_field(object, field_ctx)
115115
end
116116

117117
if field_ctx.schema.lazy?(raw_value)
118-
field_ctx.value = Execution::Lazy.new {
119-
inner_value = field_ctx.trace("execute_field_lazy", {context: field_ctx}) {
120-
begin
118+
field_ctx.value = Execution::Lazy.new(
119+
value: -> {
120+
inner_value = field_ctx.trace("execute_field_lazy", {context: field_ctx}) {
121121
begin
122-
field_ctx.field.lazy_resolve(raw_value, arguments, field_ctx)
123-
rescue GraphQL::UnauthorizedError => err
124-
field_ctx.schema.unauthorized_object(err)
122+
begin
123+
field_ctx.field.lazy_resolve(raw_value, arguments, field_ctx)
124+
rescue GraphQL::UnauthorizedError => err
125+
field_ctx.schema.unauthorized_object(err)
126+
end
127+
rescue GraphQL::ExecutionError => err
128+
err
125129
end
126-
rescue GraphQL::ExecutionError => err
127-
err
130+
}
131+
continue_or_wait(inner_value, field_ctx.type, field_ctx)
132+
},
133+
exec: -> {
134+
if field_ctx.schema.concurrent?(raw_value)
135+
field_ctx.trace("execute_field_concurrent", {context: field_ctx}) {
136+
field_ctx.field.concurrent_exec(raw_value, arguments, field_ctx)
137+
}
128138
end
129139
}
130-
continue_or_wait(inner_value, field_ctx.type, field_ctx)
131-
}
140+
)
132141
else
133142
continue_or_wait(raw_value, field_ctx.type, field_ctx)
134143
end
@@ -143,8 +152,10 @@ def resolve_field(object, field_ctx)
143152
# and resolve child fields
144153
def continue_or_wait(raw_value, field_type, field_ctx)
145154
if field_ctx.schema.lazy?(raw_value)
146-
field_ctx.value = Execution::Lazy.new {
147-
inner_value = begin
155+
field_ctx.value = Execution::Lazy.new(
156+
value: -> {
157+
inner_value =
158+
begin
148159
begin
149160
field_ctx.schema.sync_lazy(raw_value)
150161
rescue GraphQL::UnauthorizedError => err
@@ -154,8 +165,14 @@ def continue_or_wait(raw_value, field_type, field_ctx)
154165
err
155166
end
156167

157-
field_ctx.value = continue_or_wait(inner_value, field_type, field_ctx)
158-
}
168+
field_ctx.value = continue_or_wait(inner_value, field_type, field_ctx)
169+
},
170+
exec: -> {
171+
if field_ctx.schema.concurrent?(raw_value)
172+
field_ctx.schema.exec_concurrent(raw_value)
173+
end
174+
}
175+
)
159176
else
160177
field_ctx.value = continue_resolve_field(raw_value, field_type, field_ctx)
161178
end

lib/graphql/execution/lazy.rb

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,39 @@ def self.resolve(val)
2020
end
2121

2222
# Create a {Lazy} which will get its inner value by calling the block
23-
# @param get_value_func [Proc] a block to get the inner value (later)
24-
def initialize(&get_value_func)
25-
@get_value_func = get_value_func
23+
# @param value_proc [Proc] a block to get the inner value (later)
24+
def initialize(original = nil, value:, exec:)
25+
@original = original
26+
@value_proc = value
27+
@exec_proc = exec
2628
@resolved = false
2729
end
2830

31+
def execute
32+
return if @resolved
33+
34+
exec =
35+
begin
36+
e = @exec_proc.call
37+
if e.is_a?(Lazy)
38+
e = e.execute
39+
end
40+
e
41+
rescue GraphQL::ExecutionError => err
42+
err
43+
end
44+
45+
if exec.is_a?(StandardError)
46+
raise exec
47+
end
48+
end
49+
2950
# @return [Object] The wrapped value, calling the lazy block if necessary
3051
def value
3152
if !@resolved
3253
@resolved = true
3354
@value = begin
34-
v = @get_value_func.call
55+
v = @value_proc.call
3556
if v.is_a?(Lazy)
3657
v = v.value
3758
end
@@ -50,22 +71,24 @@ def value
5071

5172
# @return [Lazy] A {Lazy} whose value depends on another {Lazy}, plus any transformations in `block`
5273
def then
53-
self.class.new {
54-
yield(value)
55-
}
74+
self.class.new(
75+
value: -> { yield(value) },
76+
exec: -> { execute }
77+
)
5678
end
5779

5880
# @param lazies [Array<Object>] Maybe-lazy objects
5981
# @return [Lazy] A lazy which will sync all of `lazies`
6082
def self.all(lazies)
61-
self.new {
62-
lazies.map { |l| l.is_a?(Lazy) ? l.value : l }
63-
}
83+
self.new(
84+
value: -> { lazies.map { |l| l.is_a?(Lazy) ? l.value : l } },
85+
exec: -> { lazies.each { |l| l.is_a?(Lazy) ? l.execute : l } }
86+
)
6487
end
6588

6689
# This can be used for fields which _had no_ lazy results
6790
# @api private
68-
NullResult = Lazy.new(){}
91+
NullResult = Lazy.new(value: -> {}, exec: -> {})
6992
NullResult.value
7093
end
7194
end

lib/graphql/execution/lazy/lazy_method_map.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ def initialize_copy(other)
2424
@storage = other.storage.dup
2525
end
2626

27+
LazySpec = Struct.new(:value_method, :exec_method)
28+
private_constant :LazySpec
29+
2730
# @param lazy_class [Class] A class which represents a lazy value (subclasses may also be used)
2831
# @param lazy_value_method [Symbol] The method to call on this class to get its value
29-
def set(lazy_class, lazy_value_method)
30-
@storage[lazy_class] = lazy_value_method
32+
def set(lazy_class, lazy_value_method, concurrent_exec_method)
33+
@storage[lazy_class] = LazySpec.new(lazy_value_method, concurrent_exec_method)
3134
end
3235

3336
# @param value [Object] an object which may have a `lazy_value_method` registered for its class or superclasses

lib/graphql/execution/lazy/resolve.rb

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,17 @@ def self.resolve_in_place(value)
3535
if acc.empty?
3636
Lazy::NullResult
3737
else
38-
Lazy.new {
39-
acc.each_with_index { |ctx, idx|
40-
acc[idx] = ctx.value.value
41-
}
42-
resolve_in_place(acc)
43-
}
38+
Lazy.new(
39+
value: -> {
40+
acc.each { |ctx| ctx.value.execute }
41+
42+
acc.each_with_index { |ctx, idx|
43+
acc[idx] = ctx.value.value
44+
}
45+
resolve_in_place(acc)
46+
},
47+
exec: -> {}
48+
)
4449
end
4550
end
4651

lib/graphql/field.rb

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ class Field
123123
include GraphQL::Define::InstanceDefinable
124124
accepts_definitions :name, :description, :deprecation_reason,
125125
:resolve, :lazy_resolve,
126+
:concurrent_exec,
126127
:type, :arguments,
127128
:property, :hash_key, :complexity,
128129
:mutation, :function,
@@ -138,6 +139,7 @@ class Field
138139
:name, :deprecation_reason, :description, :description=, :property, :hash_key,
139140
:mutation, :arguments, :complexity, :function,
140141
:resolve, :resolve=, :lazy_resolve, :lazy_resolve=, :lazy_resolve_proc, :resolve_proc,
142+
:concurrent_exec, :concurrent_exec=, :concurrent_exec_proc,
141143
:type, :type=, :name=, :property=, :hash_key=,
142144
:relay_node_field, :relay_nodes_field, :edges?, :edge_class, :subscription_scope,
143145
:introspection?
@@ -155,6 +157,9 @@ class Field
155157
# @return [<#call(obj, args, ctx)>] A proc-like object which can be called trigger a lazy resolution
156158
attr_reader :lazy_resolve_proc
157159

160+
# @return [<#call(obj, args, ctx)>] A proc-like object which can be called trigger a concurrent execution
161+
attr_reader :concurrent_exec_proc
162+
158163
# @return [String] The name of this field on its {GraphQL::ObjectType} (or {GraphQL::InterfaceType})
159164
attr_reader :name
160165
alias :graphql_name :name
@@ -218,6 +223,7 @@ def initialize
218223
@arguments = {}
219224
@resolve_proc = build_default_resolver
220225
@lazy_resolve_proc = DefaultLazyResolve
226+
@concurrent_exec_proc = DefaultConcurrentExec
221227
@relay_node_field = false
222228
@connection = false
223229
@connection_max_page_size = nil
@@ -307,12 +313,21 @@ def lazy_resolve=(new_lazy_resolve_proc)
307313
@lazy_resolve_proc = new_lazy_resolve_proc
308314
end
309315

316+
def concurrent_exec(obj, args, ctx)
317+
@concurrent_exec_proc.call(obj, args, ctx)
318+
end
319+
320+
def concurrent_exec=(new_concurrent_exec_proc)
321+
@concurrent_exec_proc = new_concurrent_exec_proc
322+
end
323+
310324
# Prepare a lazy value for this field. It may be `then`-ed and resolved later.
311325
# @return [GraphQL::Execution::Lazy] A lazy wrapper around `obj` and its registered method name
312326
def prepare_lazy(obj, args, ctx)
313-
GraphQL::Execution::Lazy.new {
314-
lazy_resolve(obj, args, ctx)
315-
}
327+
GraphQL::Execution::Lazy.new(
328+
value: -> { lazy_resolve(obj, args, ctx) },
329+
exec: -> { concurrent_exec(obj, args, ctx) }
330+
)
316331
end
317332

318333
private
@@ -326,5 +341,11 @@ def self.call(obj, args, ctx)
326341
ctx.schema.sync_lazy(obj)
327342
end
328343
end
344+
345+
module DefaultConcurrentExec
346+
def self.call(obj, args, ctx)
347+
ctx.schema.exec_concurrent(obj)
348+
end
349+
end
329350
end
330351
end

lib/graphql/schema.rb

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ class Schema
9292
query_analyzer: ->(schema, analyzer) { schema.query_analyzers << analyzer },
9393
multiplex_analyzer: ->(schema, analyzer) { schema.multiplex_analyzers << analyzer },
9494
middleware: ->(schema, middleware) { schema.middleware << middleware },
95-
lazy_resolve: ->(schema, lazy_class, lazy_value_method) { schema.lazy_methods.set(lazy_class, lazy_value_method) },
95+
lazy_resolve: ->(schema, lazy_class, lazy_value_method, concurrent_exec_method = nil) {
96+
schema.lazy_methods.set(lazy_class, lazy_value_method, concurrent_exec_method)
97+
},
9698
rescue_from: ->(schema, err_class, &block) { schema.rescue_from(err_class, &block)},
9799
tracer: ->(schema, tracer) { schema.tracers.push(tracer) }
98100

@@ -157,7 +159,7 @@ def initialize
157159
@parse_error_proc = DefaultParseError
158160
@instrumenters = Hash.new { |h, k| h[k] = [] }
159161
@lazy_methods = GraphQL::Execution::Lazy::LazyMethodMap.new
160-
@lazy_methods.set(GraphQL::Execution::Lazy, :value)
162+
@lazy_methods.set(GraphQL::Execution::Lazy, :value, :execute)
161163
@cursor_encoder = Base64Encoder
162164
# Default to the built-in execution strategy:
163165
@query_execution_strategy = self.class.default_execution_strategy
@@ -604,14 +606,26 @@ class InvalidDocumentError < Error; end;
604606

605607
# @return [Symbol, nil] The method name to lazily resolve `obj`, or nil if `obj`'s class wasn't registered wtih {#lazy_resolve}.
606608
def lazy_method_name(obj)
607-
@lazy_methods.get(obj)
609+
spec = @lazy_methods.get(obj)
610+
spec && spec.value_method
611+
end
612+
613+
# @return [Symbol, nil] The method name to concurrently resolve `obj`, or nil if `obj`'s class wasn't registered wtih {#lazy_resolve} with a concurrent method.
614+
def concurrent_method_name(obj)
615+
spec = @lazy_methods.get(obj)
616+
spec && spec.exec_method
608617
end
609618

610619
# @return [Boolean] True if this object should be lazily resolved
611620
def lazy?(obj)
612621
!!lazy_method_name(obj)
613622
end
614623

624+
# @return [Boolean] True if this object should be concurrently executed
625+
def concurrent?(obj)
626+
!!concurrent_method_name(obj)
627+
end
628+
615629
# Return the GraphQL IDL for the schema
616630
# @param context [Hash]
617631
# @param only [<#call(member, ctx)>]
@@ -656,7 +670,8 @@ class << self
656670
:execute, :multiplex,
657671
:static_validator, :introspection_system,
658672
:query_analyzers, :tracers, :instrumenters,
659-
:validate, :multiplex_analyzers, :lazy?, :lazy_method_name, :after_lazy, :sync_lazy,
673+
:validate, :multiplex_analyzers,
674+
:lazy?, :lazy_method_name, :concurrent_method_name, :after_lazy, :sync_lazy,
660675
# Configuration
661676
:max_complexity=, :max_depth=,
662677
:metadata,
@@ -717,8 +732,8 @@ def to_graphql
717732
end
718733
end
719734
schema_defn.instrumenters[:query] << GraphQL::Schema::Member::Instrumentation
720-
lazy_classes.each do |lazy_class, value_method|
721-
schema_defn.lazy_methods.set(lazy_class, value_method)
735+
lazy_classes.each do |lazy_class, (value_method, exec_method)|
736+
schema_defn.lazy_methods.set(lazy_class, value_method, exec_method)
722737
end
723738
if @rescues
724739
@rescues.each do |err_class, handler|
@@ -915,8 +930,8 @@ def type_error(type_err, ctx)
915930
DefaultTypeError.call(type_err, ctx)
916931
end
917932

918-
def lazy_resolve(lazy_class, value_method)
919-
lazy_classes[lazy_class] = value_method
933+
def lazy_resolve(lazy_class, value_method, exec_method = nil)
934+
lazy_classes[lazy_class] = [value_method, exec_method]
920935
end
921936

922937
def instrument(instrument_step, instrumenter, options = {})
@@ -1028,13 +1043,16 @@ def resolve_type(type, obj, ctx = :__undefined__)
10281043
# @api private
10291044
def after_lazy(value)
10301045
if lazy?(value)
1031-
GraphQL::Execution::Lazy.new do
1032-
result = sync_lazy(value)
1033-
# The returned result might also be lazy, so check it, too
1034-
after_lazy(result) do |final_result|
1035-
yield(final_result) if block_given?
1036-
end
1037-
end
1046+
GraphQL::Execution::Lazy.new(
1047+
value: -> {
1048+
result = sync_lazy(value)
1049+
# The returned result might also be lazy, so check it, too
1050+
after_lazy(result) do |final_result|
1051+
yield(final_result) if block_given?
1052+
end
1053+
},
1054+
exec: -> { exec_concurrent(value) }
1055+
)
10381056
else
10391057
yield(value) if block_given?
10401058
end
@@ -1062,6 +1080,24 @@ def sync_lazy(value)
10621080
}
10631081
end
10641082

1083+
# Override this method to handle lazy concurrent objects in a custom way.
1084+
# @param value [Object] an instance of a class registered with {.lazy_resolve}
1085+
# @param ctx [GraphQL::Query::Context] the context for this query
1086+
# @return [Object] A GraphQL-ready (non-lazy) object
1087+
def self.exec_concurrent(value)
1088+
yield(value)
1089+
end
1090+
1091+
# @see Schema.exec_concurrent for a hook to override
1092+
# @api private
1093+
def exec_concurrent(value)
1094+
self.class.exec_concurrent(value) do |v|
1095+
if concurrent_method = concurrent_method_name(v)
1096+
value.public_send(concurrent_method)
1097+
end
1098+
end
1099+
end
1100+
10651101
protected
10661102

10671103
def rescues?

0 commit comments

Comments
 (0)