diff --git a/Gemfile b/Gemfile index e006afd08d..0c78517a73 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ gem 'bootsnap' # required by the Rails apps generated in tests gem 'ruby-prof', platform: :ruby gem 'pry' gem 'pry-stack_explorer', platform: :ruby +gem 'graphql-batch' if RUBY_VERSION >= "2.4" gem 'pry-byebug' end diff --git a/Rakefile b/Rakefile index ffa997ea15..b4ce2045d1 100644 --- a/Rakefile +++ b/Rakefile @@ -98,6 +98,12 @@ namespace :bench do prepare_benchmark GraphQLBenchmark.profile_large_result end + + desc "Compare GraphQL-Batch and GraphQL-Dataloader" + task :profile_batch_loaders do + prepare_benchmark + GraphQLBenchmark.profile_batch_loaders + end end namespace :test do diff --git a/benchmark/batch_loading.rb b/benchmark/batch_loading.rb new file mode 100644 index 0000000000..7e014628a3 --- /dev/null +++ b/benchmark/batch_loading.rb @@ -0,0 +1,143 @@ +module BatchLoading + class GraphQLBatchSchema < GraphQL::Schema + DATA = [ + { id: "1", name: "Bulls", player_ids: ["2", "3"] }, + { id: "2", name: "Michael Jordan", team_id: "1" }, + { id: "3", name: "Scottie Pippin", team_id: "1" }, + { id: "4", name: "Braves", player_ids: ["5", "6"] }, + { id: "5", name: "Chipper Jones", team_id: "4" }, + { id: "6", name: "Tom Glavine", team_id: "4" }, + ] + + class DataLoader < GraphQL::Batch::Loader + def initialize(column: :id) + @column = column + end + + def perform(keys) + keys.each do |key| + record = DATA.find { |d| d[@column] == key } + fulfill(key, record) + end + end + end + + class Team < GraphQL::Schema::Object + field :name, String, null: false + field :players, "[BatchLoading::GraphQLBatchSchema::Player]", null: false + + def players + DataLoader.load_many(object[:player_ids]) + end + end + + class Player < GraphQL::Schema::Object + field :name, String, null: false + field :team, Team, null: false + + def team + DataLoader.load(object[:team_id]) + end + end + + class Query < GraphQL::Schema::Object + field :team, Team, null: true do + argument :name, String, required: true + end + + def team(name:) + DataLoader.for(column: :name).load(name) + end + end + + query(Query) + use GraphQL::Execution::Interpreter + use GraphQL::Analysis::AST + use GraphQL::Batch + end + + class GraphQLDataloaderSchema < GraphQL::Schema + class DataSource < GraphQL::Dataloader::Source + def initialize(column: :id) + @column = column + end + + def fetch(keys) + keys.map { |key| + d = GraphQLBatchSchema::DATA.find { |d| d[@column] == key } + # p [key, @column, d] + d + } + end + end + + class Team < GraphQL::Schema::Object + field :name, String, null: false + field :players, "[BatchLoading::GraphQLDataloaderSchema::Player]", null: false + + def players + dataloader.with(DataSource).load_all(object[:player_ids]) + end + end + + class Player < GraphQL::Schema::Object + field :name, String, null: false + field :team, Team, null: false + + def team + dataloader.with(DataSource).load(object[:team_id]) + end + end + + class Query < GraphQL::Schema::Object + field :team, Team, null: true do + argument :name, String, required: true + end + + def team(name:) + dataloader.with(DataSource, column: :name).load(name) + end + end + + query(Query) + use GraphQL::Execution::Interpreter + use GraphQL::Analysis::AST + use GraphQL::Dataloader + end + + class GraphQLNoBatchingSchema < GraphQL::Schema + DATA = GraphQLBatchSchema::DATA + + class Team < GraphQL::Schema::Object + field :name, String, null: false + field :players, "[BatchLoading::GraphQLNoBatchingSchema::Player]", null: false + + def players + object[:player_ids].map { |id| DATA.find { |d| d[:id] == id } } + end + end + + class Player < GraphQL::Schema::Object + field :name, String, null: false + field :team, Team, null: false + + def team + DATA.find { |d| d[:id] == object[:team_id] } + end + end + + class Query < GraphQL::Schema::Object + field :team, Team, null: true do + argument :name, String, required: true + end + + def team(name:) + DATA.find { |d| d[:name] == name } + end + end + + query(Query) + use GraphQL::Execution::Interpreter + use GraphQL::Analysis::AST + end +end diff --git a/benchmark/run.rb b/benchmark/run.rb index ff335d529c..9b4287670e 100644 --- a/benchmark/run.rb +++ b/benchmark/run.rb @@ -5,6 +5,7 @@ require "benchmark/ips" require "ruby-prof" require "memory_profiler" +require "graphql/batch" module GraphQLBenchmark QUERY_STRING = GraphQL::Introspection::INTROSPECTION_QUERY @@ -123,6 +124,7 @@ class Schema < GraphQL::Schema query QueryType use GraphQL::Execution::Interpreter use GraphQL::Analysis::AST + use GraphQL::Dataloader end ALL_FIELDS = GraphQL.parse <<-GRAPHQL @@ -142,4 +144,71 @@ class Schema < GraphQL::Schema } GRAPHQL end + + def self.profile_batch_loaders + require_relative "./batch_loading" + include BatchLoading + + document = GraphQL.parse <<-GRAPHQL + { + braves: team(name: "Braves") { ...TeamFields } + bulls: team(name: "Bulls") { ...TeamFields } + } + + fragment TeamFields on Team { + players { + team { + players { + team { + name + } + } + } + } + } + GRAPHQL + batch_result = GraphQLBatchSchema.execute(document: document).to_h + dataloader_result = GraphQLDataloaderSchema.execute(document: document).to_h + no_batch_result = GraphQLNoBatchingSchema.execute(document: document).to_h + + results = [batch_result, dataloader_result, no_batch_result].uniq + if results.size > 1 + puts "Batch result:" + pp batch_result + puts "Dataloader result:" + pp dataloader_result + puts "No-batch result:" + pp no_batch_result + raise "Got different results -- fix implementation before benchmarking." + end + + Benchmark.ips do |x| + x.report("GraphQL::Batch") { GraphQLBatchSchema.execute(document: document) } + x.report("GraphQL::Dataloader") { GraphQLDataloaderSchema.execute(document: document) } + x.report("No Batching") { GraphQLNoBatchingSchema.execute(document: document) } + + x.compare! + end + + puts "========== GraphQL-Batch Memory ==============" + report = MemoryProfiler.report do + GraphQLBatchSchema.execute(document: document) + end + + report.pretty_print + + puts "========== Dataloader Memory =================" + report = MemoryProfiler.report do + GraphQLDataloaderSchema.execute(document: document) + end + + report.pretty_print + + puts "========== No Batch Memory ==============" + report = MemoryProfiler.report do + GraphQLNoBatchingSchema.execute(document: document) + end + + report.pretty_print + end end diff --git a/guides/dataloader/adopting.md b/guides/dataloader/adopting.md new file mode 100644 index 0000000000..bb259f910d --- /dev/null +++ b/guides/dataloader/adopting.md @@ -0,0 +1,100 @@ +--- +layout: guide +search: true +section: Dataloader +title: Dataloader vs. GraphQL-Batch +desc: Comparing and Contrasting Batch Loading Options +index: 3 +experimental: true +--- + +{{ "GraphQL::Dataloader" | api_doc }} solves the same problem as [`GraphQL::Batch`](https://github.com/shopify/graphql-batch). There are a few major differences between the modules: + + +- __Concurrency Primitive:__ GraphQL-Batch uses `Promise`s from [`promise.rb`](https://github.com/lgierth/promise.rb); GraphQL::Dataloader uses Ruby's [`Fiber` API](https://ruby-doc.org/core-3.0.0/Fiber.html). These primitives dictate how batch loading code is written (see below for comparisons). +- __Maturity:__ Frankly, GraphQL-Batch is about as old as GraphQL-Ruby, and it's been in production at Shopify, GitHub, and others for many years. GraphQL::Dataloader is new, and although Ruby has supported `Fiber`s since 1.9, they still aren't widely used. +- __Scope:__ It's not currently possible to use `GraphQL::Dataloader` _outside_ GraphQL. + +The incentive in writing `GraphQL::Dataloader` was to leverage `Fiber`'s ability to _transparently_ pause and resume work, which removes the need for `Promise`s (and removes the resulting complexity in the code). Additionally, `GraphQL::Dataloader` shoulde _eventually_ support Ruby 3.0's `Fiber.scheduler` API, which runs I/O in the background by default. + +## Comparison: Fetching a single object + +In this example, a single object is batch-loaded to satisfy a GraphQL field. + +- With __GraphQL-Batch__, you call a loader, which returns a `Promise`: + + ```ruby + record_promise = Loaders::Record.load(1) + ``` + + Then, under the hood, GraphQL-Ruby manages the promise (using its `lazy_resolve` feature, upstreamed from GraphQL-Batch many years ago). GraphQL-Ruby will call `.sync` on it when no futher execution is possible; `promise.rb` implements `Promise#sync` to execute the pending work. + +- With __GraphQL::Dataloader__, you get a source, then call `.load` on it, which may pause the current Fiber, but it returns the requested object. + + ```ruby + dataloader.with(Sources::Record).load(1) + ``` + + Since the requested object is (eventually) returned from `.load`, Nothing else is required. + +## Comparison: Fetching objects in sequence (dependent) + +In this example, one object is loaded, then another object is loaded _based on_ the first one. + +- With __GraphQL-Batch__, `.then { ... }` is used to join dependent code blocks: + + ```ruby + Loaders::Record.load(1).then do |record| + Loaders::OtherRecord.load(record.other_record_id) + end + ``` + + That call returns a `Promise`, which is stored by GraphQL-Ruby, and finally `.sync`ed. + +- With __GraphQL-Dataloader__, `.load(...)` returns the requested object (after a potential `Fiber` pause), so other method calls are necessary: + + ```ruby + record = dataloader.with(Sources::Record).load(1) + dataloader.with(Sources::OtherRecord).load(record.other_record_id) + ``` + +## Comparison: Fetching objects concurrently (independent) + +Sometimes, you need multiple _indepenedent_ records to perform a calcuation. Each record is loaded, then they're combined in some bit of work. + +- With __GraphQL-Batch__, `Promise.all(...)` is used to to wait for several pending loads: + + ```ruby + promise_1 = Loaders::Record.load(1) + promise_2 = Loaders::OtherRecord.load(2) + Promise.all([promise_1, promise_2]).then do |record, other_record| + do_something(record, other_record) + end + ``` + + If the objects are loaded from the same loader, then `.load_many` also works: + + ```ruby + Loaders::Record.load_many([1, 2]).then do |record, other_record| + do_something(record, other_record) + end + ``` + +- With __GraphQL::Dataloader__, each request is registered with `.request(...)` (which never pauses the Fiber), then data is loaded with `.load` (which will pause the Fiber as needed): + + ```ruby + # first, make some requests + request_1 = dataloader.with(Sources::Record).request(1) + request_2 = dataloader.with(Sources::OtherRecord).request(2) + # then, load the objects and do something + record = request_1.load + other_record = request_2.load + do_something(record, other_record) + ``` + + If the objects come from the same `Source`, then `.load_all` will return the objects directly: + + ```ruby + record, other_record = dataloader.with(Sources::Record).load_all([1, 2]) + do_something(record, other_record) + ``` diff --git a/guides/dataloader/dataloader.md b/guides/dataloader/dataloader.md new file mode 100644 index 0000000000..b4a343beac --- /dev/null +++ b/guides/dataloader/dataloader.md @@ -0,0 +1,20 @@ +--- +layout: guide +search: true +section: Dataloader +title: Dataloader +desc: The Dataloader orchestrates Fibers and Sources +index: 2 +experimental: true +--- + +{{ "GraphQL::Dataloader" | api_doc }} instances are created for each query (or multiplex) and they: + +- Cache {% internal_link "Source", "/dataloader/sources" %} instances for the duration of GraphQL execution +- Run pending Fibers to resolve data requirements and continue GraphQL execution + +During a query, you can access the dataloader instance with: + +- {{ "GraphQL::Query::Context#dataloader" | api_doc }} (`context.dataloader`, anywhere that query context is available) +- {{ "GraphQL::Schema::Object#dataloader" | api_doc }} (`dataloader` inside a resolver method) +- {{ "GraphQL::Schema::Resolver#dataloader" | api_doc }} (`dataloader` inside `def resolve` of a Resolver, Mutation, or Subscription class.) diff --git a/guides/dataloader/overview.md b/guides/dataloader/overview.md new file mode 100644 index 0000000000..20efea7735 --- /dev/null +++ b/guides/dataloader/overview.md @@ -0,0 +1,134 @@ +--- +layout: guide +search: true +section: Dataloader +title: Overview +desc: Getting started with the Fiber-based Dataloader +index: 0 +experimental: true +--- + +GraphQL-Ruby 1.12 includes {{ "GraphQL::Dataloader" | api_doc }}, a module for managing efficient database in a way that's transparent to application code, backed by Ruby's `Fiber` concurrency primitive. + +`GraphQL::Dataloader` is inspired by [`@bessey`'s proof-of-concept](https://github.com/bessey/graphql-fiber-test/tree/no-gem-changes) and [shopify/graphql-batch](https://github.com/shopify/graphql-batch). + +## Batch Loading + +`GraphQL::Dataloader` facilitates a two-stage approach to fetching data from external sources (like database or APIs): + +- First, GraphQL fields register their data requirements (eg, object IDs or query parameters) +- Then, after as many requirements have been gathered as possible, `GraphQL::Dataloader` initiates _actual_ fetches to external services + +That cycle is repeated during execution: data requirements are gathered until no further GraphQL fields can be executed, then `GraphQL::Dataloader` triggers external calls based on those requirements and GraphQL execution resumes. + +## Fibers + +`GraphQL::Dataloader` uses Ruby's `Fiber`, a lightweight concurrency primitive which supports application-level scheduling _within_ a `Thread`. By using `Fiber`, `GraphQL::Dataloader` can pause GraphQL execution when data is requested, then resume execution after the data is fetched. + +At a high level, `GraphQL::Dataloader`'s usage of `Fiber` looks like this: + +- GraphQL execution is run inside a Fiber. +- When that Fiber returns, if the Fiber was paused to wait for data, then GraphQL execution resumes with the _next_ (sibling) GraphQL field inside a new Fiber. +- That cycle continues until no further sibling fields are available and all known Fibers are paused. +- `GraphQL::Dataloader` takes the first paused Fiber and resumes it, causing the `GraphQL::Dataloader::Source` to execute its `#fetch(...)` call. That Fiber continues execution as far as it can. +- Likewise, paused Fibers are resumed, causing GraphQL execution to continue, until all paused Fibers are evaluated completely. + +Since _all_ GraphQL execution is inside a Fiber, __`Thread.current[...]` won't be set__. Those assignments are _Fiber-local_, so each new Fiber has an _empty_ `Thread.current`. For `GraphQL::Dataloader`, use GraphQL-Ruby's `context` object to provide "current" values to a GraphQL query. Inside `#fetch(...)` methods, those values can be re-assigned to `Thread.current` if application code requires it. + +## Getting Started + +To install {{ "GraphQL::Dataloader" | api_doc }}, add it to your schema with `use ...`, for example: + +```ruby +class MySchema < GraphQL::Schema + # ... + use GraphQL::Dataloader +end +``` + +Then, inside your schema, you can request batch-loaded objects by their lookup key with `dataloader.with(...).load(...)`: + +```ruby +field :user, Types::User, null: true do + argument :handle, String, required: true +end + +def user(handle:) + dataloader.with(Sources::UserByHandle).load(handle) +end +``` + +Or, load several objects by passing an array of lookup keys to `.load_all(...)`: + +```ruby +field :is_following, Boolean, null: false do + argument :follower_handle, String, required: true + argument :followed_handle, String, required: true +end + +def is_following(follower_handle:, followed_handle:) + follower, followed = dataloader + .with(Sources::UserByHandle) + .load_all([follower_handle, followed_handle]) + + followed && follower && follower.follows?(followed) +end +``` + +To prepare requests from several sources, use `.request(...)`, then call `.load` after all requests are registered: + +```ruby +class AddToList < GraphQL::Schema::Mutation + argument :handle, String, required: true + argument :list, String, required: true, as: :list_name + + field :list, Types::UserList, null: true + + def resolve(handle:, list_name:) + # first, register the requests: + user_request = dataloader.with(Sources::UserByHandle).request(handle) + list_request = dataloader.with(Sources::ListByName, context[:viewer]).request(list_name) + # then, use `.load` to wait for the external call and return the object: + user = user_request.load + list = list_request.load + # Now, all objects are ready. + list.add_user!(user) + { list: list } + end +end +``` + +### `loads:` and `object_from_id` + +`dataloader` is also available as `context.dataloader`, so you can use it to implement `MySchema.object_from_id`. For example: + +```ruby +class MySchema < GraphQL::Schema + def self.object_from_id(id, ctx) + model_class, database_id = IdDecoder.decode(id) + ctx.dataloader.with(Sources::RecordById, model_class).load(database_id) + end +end +``` + +Then, any arguments with `loads:` will use that method to fetch objects. For example: + +```ruby +class FollowUser < GraphQL::Schema::Mutation + argument :follow_id, ID, required: true, loads: Types::User + + field :followed, Types::User, null: true + + def resolve(follow:) + # `follow` was fetched using the Schema's `object_from_id` hook + context[:viewer].follow!(follow) + { followed: follow } + end +end +``` + +__Caveat:__ The currently implementation loads all `loads:` objects in the _same fiber_, so data will be fetched sequentially. + +## Data Sources + +To implement batch-loading data sources, see the {% internal_link "Sources guide", "/dataloader/sources" %}. diff --git a/guides/dataloader/sources.md b/guides/dataloader/sources.md new file mode 100644 index 0000000000..79cbe162a0 --- /dev/null +++ b/guides/dataloader/sources.md @@ -0,0 +1,136 @@ +--- +layout: guide +search: true +section: Dataloader +title: Sources +desc: Batch-loading objects for GraphQL::Dataloader +index: 1 +experimental: true +--- + +_Sources_ are what {{ "GraphQL::Dataloader" | api_doc }} uses to fetch data from external services. + +## Source Concepts + +Sources are classes that inherit from `GraphQL::Dataloader::Source`. A Source _must_ implement `def fetch(keys)` to return a list of objects, one for each of the given keys. A source _may_ implement `def initialize(dataloader, ...)` to accept other batching parameters. + +Sources will receive two kinds of inputs from `GraphQL::Dataloader`: + +- _keys_, which correspond to objects requested by the application. + + Keys are passed to `def fetch(keys)`, which must return an object (or `nil`) for each of `keys`, in the same order as `keys`. + + Under the hood, each Source instance maintains a `key => object` cache. + +- _batch parameters_, which are the basis of batched groups. For example, if you're loading records from different database tables, the the table name would be a batch parameter. + + Batch parameters are given to `dataloader.with(source_class, *batch_parameters)`, and the default is _no batch parameters_. When you define a source, you should add the batch parameters to `def initialize(dataloader, ...)` and store them in instance variables. + + (`dataloader.with(source_class, *batch_parameters)` returns an instance of `source_class` with the given batch parameters -- but it might be an instance which was cached by `dataloader`.) + +## Example: Loading Strings from Redis by Key + +The simplest source might fetch values based on their keys. For example: + +```ruby +# app/graphql/sources/redis_string.rb +class Sources::RedisString < GraphQL::Dataloader::Source + REDIS = Redis.new + def fetch(keys) + # Redis's `mget` will return a value for each key with a `nil` for any not-found key. + REDIS.mget(*keys) + end +end +``` + +This loader could be used in GraphQL like this: + +```ruby +some_string = dataloader.with(Sources::RedisString).load("some_key") +``` + +Calls to `.load(key)` will be batched, and when `GraphQL::Dataloader` can't go any further, it will dispatch a call to `def fetch(keys)` above. + +## Example: Loading ActiveRecord Objects by ID + +To fetch ActiveRecord objects by ID, the source should also accept the _model class_ as a batching parameter. For example: + +```ruby +# app/graphql/sources/active_record_object.rb +class Sources::ActiveRecordObject < GraphQL::Dataloader::Source + def initialize(model_class) + @model_class = model_class + end + + def fetch(ids) + records = @model_class.where(id: ids) + # return a list with `nil` for any ID that wasn't found + ids.map { |id| records.find { |r| r.id == id } } + end +end +``` + +This source could be used for any `model_class`, for example: + +```ruby +author = dataloader.for(Sources::ActiveRecordObject, ::User).load(1) +post = dataloader.for(Sources::ActiveRecordObject, ::Post).load(1) +``` + +## Example: Batched Calculations + +Besides fetching objects, Sources can return values from batched calculations. For example, a system could batch up checks for who a user follows: + +```ruby +# for a given user, batch checks to see whether this user follows another user. +# (The default `user.followings.where(followed_user_id: followed).exists?` would cause N+1 queries.) +class Sources::UserFollowingExists < GraphQL::Dataloader::Source + def initialize(user) + @user = user + end + + def fetch(handles) + # Prepare a `SELECT id FROM users WHERE handle IN(...)` statement + user_ids = ::User.where(handle: handles).select(:id) + # And use it to filter this user's followings: + followings = @user.followings.where(followed_user_id: user_ids) + # Now, for followings that _actually_ hit a user, get the handles for those users: + followed_users = ::User.where(id: followings.select(:followed_user_id)) + # Finally, return a result set, with one entry (true or false) for each of the given `handles` + handles.map { |h| !!followed_users.find { |u| u.handle == h }} + end +end +``` + +It could be used like this: + +```ruby +is_following = dataloader.with(Sources::UserFollowingExists, context[:viewer]).load(handle) +``` + +After all requests were batched, `#fetch` will return a Boolean result to `is_following`. + +## Example: Loading in a background thread + +Inside `Source#fetch(keys)`, you can call `dataloader.yield` to return control to the Dataloader. This way, it will proceed loading other Sources (if there are any), then return the source that yielded. + +A simple example, spinning up a new Thread: + +```ruby +def fetch(keys) + # spin up some work in a background thread + thread = Thread.new { + fetch_external_data(keys) + } + # return control to the dataloader + dataloader.yield + # at this point, + # the dataloader has tried everything else and come back to this source, + # so block if necessary: + thread.value +end +``` + +For a more robust asynchronous task primitive, check out [`Concurrent::Future`](http://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Future.html). + +Ruby 3.0 added built-in support for yielding Fibers that make I/O calls -- hopefully a future GraphQL-Ruby version will work with that! diff --git a/guides/guides.html b/guides/guides.html index 3577ac53aa..451c1f3b78 100644 --- a/guides/guides.html +++ b/guides/guides.html @@ -10,6 +10,7 @@ - name: Errors - name: Pagination - name: Relay + - name: Dataloader - name: Subscriptions - name: GraphQL Pro - name: GraphQL Pro - OperationStore diff --git a/lib/graphql.rb b/lib/graphql.rb index 8c671226bf..1c6e02077d 100644 --- a/lib/graphql.rb +++ b/lib/graphql.rb @@ -148,3 +148,4 @@ def match?(pattern) require "graphql/unauthorized_error" require "graphql/unauthorized_field_error" require "graphql/load_application_object_failed_error" +require "graphql/dataloader" diff --git a/lib/graphql/backtrace/tracer.rb b/lib/graphql/backtrace/tracer.rb index 221e8547ed..37567a7296 100644 --- a/lib/graphql/backtrace/tracer.rb +++ b/lib/graphql/backtrace/tracer.rb @@ -1,29 +1,35 @@ # frozen_string_literal: true module GraphQL class Backtrace + # TODO this is not fiber-friendly module Tracer module_function # Implement the {GraphQL::Tracing} API. def trace(key, metadata) - push_data = case key + case key when "lex", "parse" # No context here, don't have a query yet nil when "execute_multiplex", "analyze_multiplex" - metadata[:multiplex].queries + # No query context yet + nil when "validate", "analyze_query", "execute_query", "execute_query_lazy" - metadata[:query] || metadata[:queries] + query = metadata[:query] || metadata[:queries].first + push_data = query + multiplex = query.multiplex when "execute_field", "execute_field_lazy" # The interpreter passes `query:`, legacy passes `context:` - metadata[:context] || ((q = metadata[:query]) && q.context) + context = metadata[:context] || ((q = metadata[:query]) && q.context) + push_data = context + multiplex = context.query.multiplex else # Custom key, no backtrace data for this nil end if push_data - Thread.current[:last_graphql_backtrace_context] = push_data + multiplex.context[:last_graphql_backtrace_context] = push_data end if key == "execute_multiplex" @@ -32,7 +38,7 @@ def trace(key, metadata) rescue StandardError => err # This is an unhandled error from execution, # Re-raise it with a GraphQL trace. - potential_context = Thread.current[:last_graphql_backtrace_context] + potential_context = metadata[:multiplex].context[:last_graphql_backtrace_context] if potential_context.is_a?(GraphQL::Query::Context) || potential_context.is_a?(GraphQL::Query::Context::FieldResolutionContext) raise TracedError.new(err, potential_context) @@ -40,7 +46,7 @@ def trace(key, metadata) raise end ensure - Thread.current[:last_graphql_backtrace_context] = nil + metadata[:multiplex].context.delete(:last_graphql_backtrace_context) end else yield diff --git a/lib/graphql/dataloader.rb b/lib/graphql/dataloader.rb new file mode 100644 index 0000000000..f0c0b7ee3a --- /dev/null +++ b/lib/graphql/dataloader.rb @@ -0,0 +1,197 @@ +# frozen_string_literal: true + +require "graphql/dataloader/null_dataloader" +require "graphql/dataloader/request" +require "graphql/dataloader/request_all" +require "graphql/dataloader/source" + +module GraphQL + # This plugin supports Fiber-based concurrency, along with {GraphQL::Dataloader::Source}. + # + # @example Installing Dataloader + # + # class MySchema < GraphQL::Schema + # use GraphQL::Dataloader + # end + # + # @example Waiting for batch-loaded data in a GraphQL field + # + # field :team, Types::Team, null: true + # + # def team + # dataloader.with(Sources::Record, Team).load(object.team_id) + # end + # + class Dataloader + def self.use(schema) + schema.dataloader_class = self + end + + def initialize(multiplex_context) + @context = multiplex_context + @source_cache = Hash.new { |h, source_class| h[source_class] = Hash.new { |h2, batch_parameters| + source = source_class.new(*batch_parameters) + source.setup(self) + h2[batch_parameters] = source + } + } + @waiting_fibers = [] + @yielded_fibers = Set.new + end + + # @return [Hash] the {Multiplex} context + attr_reader :context + + # @api private + attr_reader :yielded_fibers + + # Add some work to this dataloader to be scheduled later. + # @param block Some work to enqueue + # @return [void] + def enqueue(&block) + @waiting_fibers << Fiber.new { + begin + yield + rescue StandardError => exception + exception + end + } + nil + end + + # Tell the dataloader that this fiber is waiting for data. + # + # Dataloader will resume the fiber after the requested data has been loaded (by another Fiber). + # + # @return [void] + def yield + Fiber.yield + nil + end + + # @return [Boolean] Returns true if the current Fiber has yielded once via Dataloader + def yielded? + @yielded_fibers.include?(Fiber.current) + end + + # Run all Fibers until they're all done + # + # Each cycle works like this: + # + # - Run each pending execution fiber (`@waiting_fibers`), + # - Then run each pending Source, preparing more data for those fibers. + # - Run each pending Source _again_ (if one Source requested more data from another Source) + # - Continue until there are no pending sources + # - Repeat: run execution fibers again ... + # + # @return [void] + def run + # Start executing Fibers. This will run until all the Fibers are done. + already_run_fibers = [] + while (current_fiber = @waiting_fibers.pop) + # Run each execution fiber, enqueuing it in `already_run_fibers` + # if it's still `.alive?`. + # Any spin-off continuations will be enqueued in `@waiting_fibers` (via {#enqueue}) + resume_fiber_and_enqueue_continuation(current_fiber, already_run_fibers) + + if @waiting_fibers.empty? + # Now, run all Sources which have become pending _before_ resuming GraphQL execution. + # Sources might queue up other Sources, which is fine -- those will also run before resuming execution. + # + # This is where an evented approach would be even better -- can we tell which + # fibers are ready to continue, and continue execution there? + # + source_fiber_stack = if (first_source_fiber = create_source_fiber) + [first_source_fiber] + else + nil + end + + if source_fiber_stack + while (outer_source_fiber = source_fiber_stack.pop) + resume_fiber_and_enqueue_continuation(outer_source_fiber, source_fiber_stack) + + # If this source caused more sources to become pending, run those before running this one again: + next_source_fiber = create_source_fiber + if next_source_fiber + source_fiber_stack << next_source_fiber + end + end + end + + # We ran all the first round of execution fibers, + # and we ran all the pending sources. + # So pick up any paused execution fibers and repeat. + @waiting_fibers.concat(already_run_fibers) + already_run_fibers.clear + end + end + nil + end + + # Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on. + # + # @param source_class [Class] + # @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`, + # and cached for the lifetime of this {Multiplex}. + def with(source_class, *batch_parameters) + @source_cache[source_class][batch_parameters] + end + + # @api private + attr_accessor :current_runtime + + private + + # Check if this fiber is still alive. + # If it is, and it should continue, then enqueue a continuation. + # If it is, re-enqueue it in `fiber_queue`. + # Otherwise, clean it up from @yielded_fibers. + # @return [void] + def resume_fiber_and_enqueue_continuation(fiber, fiber_stack) + result = fiber.resume + if result.is_a?(StandardError) + raise result + end + + # This fiber yielded; there's more to do here. + # (If `#alive?` is false, then the fiber concluded without yielding.) + if fiber.alive? + if !@yielded_fibers.include?(fiber) + # This fiber hasn't yielded yet, we should enqueue a continuation fiber + @yielded_fibers.add(fiber) + current_runtime.enqueue_selections_fiber + end + fiber_stack << fiber + else + # Keep this set clean so that fibers can be GC'ed during execution + @yielded_fibers.delete(fiber) + end + end + + # If there are pending sources, return a fiber for running them. + # Otherwise, return `nil`. + # + # @return [Fiber, nil] + def create_source_fiber + pending_sources = nil + @source_cache.each_value do |source_by_batch_params| + source_by_batch_params.each_value do |source| + if source.pending? + pending_sources ||= [] + pending_sources << source + end + end + end + + if pending_sources + source_fiber = Fiber.new do + pending_sources.each(&:run_pending_keys) + end + end + + source_fiber + end + end +end diff --git a/lib/graphql/dataloader/null_dataloader.rb b/lib/graphql/dataloader/null_dataloader.rb new file mode 100644 index 0000000000..32953b331a --- /dev/null +++ b/lib/graphql/dataloader/null_dataloader.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module GraphQL + class Dataloader + # The default implementation of dataloading -- all no-ops. + # + # The Dataloader interface isn't public, but it enables + # simple internal code while adding the option to add Dataloader. + class NullDataloader < Dataloader + def enqueue + yield + end + + # These are all no-ops because code was + # executed sychronously. + def run; end + def yield; end + def yielded?; false; end + end + end +end diff --git a/lib/graphql/dataloader/request.rb b/lib/graphql/dataloader/request.rb new file mode 100644 index 0000000000..8a26154cba --- /dev/null +++ b/lib/graphql/dataloader/request.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true +module GraphQL + class Dataloader + # @see Source#request which returns an instance of this + class Request + def initialize(source, key) + @source = source + @key = key + end + + # Call this method to cause the current Fiber to wait for the results of this request. + # + # @return [Object] the object loaded for `key` + def load + if @source.results.key?(@key) + @source.results[@key] + else + @source.sync + @source.results[@key] + end + end + end + end +end diff --git a/lib/graphql/dataloader/request_all.rb b/lib/graphql/dataloader/request_all.rb new file mode 100644 index 0000000000..df32d9cf74 --- /dev/null +++ b/lib/graphql/dataloader/request_all.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true +module GraphQL + class Dataloader + # @see Source#request_all which returns an instance of this. + class RequestAll < Request + def initialize(source, keys) + @source = source + @keys = keys + end + + # Call this method to cause the current Fiber to wait for the results of this request. + # + # @return [Array] One object for each of `keys` + def load + if @keys.any? { |k| !@source.results.key?(k) } + @source.sync + end + @keys.map { |k| @source.results[k] } + end + end + end +end diff --git a/lib/graphql/dataloader/source.rb b/lib/graphql/dataloader/source.rb new file mode 100644 index 0000000000..d1c8a11645 --- /dev/null +++ b/lib/graphql/dataloader/source.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +module GraphQL + class Dataloader + class Source + # @api private + attr_reader :results + + # Called by {Dataloader} to prepare the {Source}'s internal state + # @api private + def setup(dataloader) + @pending_keys = [] + @results = {} + @dataloader = dataloader + end + + attr_reader :dataloader + + # @return [Dataloader::Request] a pending request for a value from `key`. Call `.load` on that object to wait for the result. + def request(key) + if !@results.key?(key) + @pending_keys << key + end + Dataloader::Request.new(self, key) + end + + # @return [Dataloader::Request] a pending request for a values from `keys`. Call `.load` on that object to wait for the results. + def request_all(keys) + pending_keys = keys.select { |k| !@results.key?(k) } + @pending_keys.concat(pending_keys) + Dataloader::RequestAll.new(self, keys) + end + + # @param key [Object] A loading key which will be passed to {#fetch} if it isn't already in the internal cache. + # @return [Object] The result from {#fetch} for `key`. If `key` hasn't been loaded yet, the Fiber will yield until it's loaded. + def load(key) + if @results.key?(key) + @results[key] + else + @pending_keys << key + sync + @results[key] + end + end + + # @param keys [Array] Loading keys which will be passed to `#fetch` (or read from the internal cache). + # @return [Object] The result from {#fetch} for `keys`. If `keys` haven't been loaded yet, the Fiber will yield until they're loaded. + def load_all(keys) + if keys.any? { |k| !@results.key?(k) } + pending_keys = keys.select { |k| !@results.key?(k) } + @pending_keys.concat(pending_keys) + sync + end + + keys.map { |k| @results[k] } + end + + # Subclasses must implement this method to return a value for each of `keys` + # @param keys [Array] keys passed to {#load}, {#load_all}, {#request}, or {#request_all} + # @return [Array] A loaded value for each of `keys`. The array must match one-for-one to the list of `keys`. + def fetch(keys) + # somehow retrieve these from the backend + raise "Implement `#{self.class}#fetch(#{keys.inspect}) to return a record for each of the keys" + end + + # Wait for a batch, if there's anything to batch. + # Then run the batch and update the cache. + # @return [void] + def sync + @dataloader.yield + end + + # @return [Boolean] True if this source has any pending requests for data. + def pending? + @pending_keys.any? + end + + # Called by {GraphQL::Dataloader} to resolve and pending requests to this source. + # @api private + # @return [void] + def run_pending_keys + return if @pending_keys.empty? + fetch_keys = @pending_keys.uniq + @pending_keys = [] + results = fetch(fetch_keys) + fetch_keys.each_with_index do |key, idx| + @results[key] = results[idx] + end + nil + end + end + end +end diff --git a/lib/graphql/execution/interpreter.rb b/lib/graphql/execution/interpreter.rb index 63c8c408e6..17bba31045 100644 --- a/lib/graphql/execution/interpreter.rb +++ b/lib/graphql/execution/interpreter.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true +require "fiber" require "graphql/execution/interpreter/argument_value" require "graphql/execution/interpreter/arguments" require "graphql/execution/interpreter/arguments_cache" diff --git a/lib/graphql/execution/interpreter/runtime.rb b/lib/graphql/execution/interpreter/runtime.rb index 65ea486afa..68cfad70a6 100644 --- a/lib/graphql/execution/interpreter/runtime.rb +++ b/lib/graphql/execution/interpreter/runtime.rb @@ -19,8 +19,10 @@ class Runtime def initialize(query:, response:) @query = query + @dataloader = query.multiplex.dataloader @schema = query.schema @context = query.context + @multiplex_context = query.multiplex.context @interpreter_context = @context.namespace(:interpreter) @response = response @dead_paths = {} @@ -54,7 +56,17 @@ def run_eager # Root .authorized? returned false. write_in_response(path, nil) else - evaluate_selections(path, context.scoped_context, object_proxy, root_type, root_operation.selections, root_operation_type: root_op_type) + # Prepare this runtime state to be encapsulated in a Fiber + @progress_path = path + @progress_scoped_context = context.scoped_context + @progress_object = object_proxy + @progress_object_type = root_type + @progress_index = nil + @progress_is_eager_selection = root_op_type == "mutation" + @progress_selections = gather_selections(object_proxy, root_type, root_operation.selections) + + # Make the first fiber which will begin execution + enqueue_selections_fiber end delete_interpreter_context(:current_path) delete_interpreter_context(:current_field) @@ -63,7 +75,33 @@ def run_eager nil end - def gather_selections(owner_object, owner_type, selections, selections_by_name) + # Use `@dataloader` to enqueue a fiber that will pick up from the current point. + # @return [void] + def enqueue_selections_fiber + # Read these into local variables so that later assignments don't affect the block below. + path = @progress_path + scoped_context = @progress_scoped_context + owner_object = @progress_object + owner_type = @progress_object_type + idx = @progress_index + is_eager_selection = @progress_is_eager_selection + gathered_selections = @progress_selections + + @dataloader.enqueue { + evaluate_selections( + path, + scoped_context, + owner_object, + owner_type, + is_eager_selection: is_eager_selection, + after: idx, + gathered_selections: gathered_selections, + ) + } + nil + end + + def gather_selections(owner_object, owner_type, selections, selections_by_name = {}) selections.each do |node| # Skip gathering this if the directive says so if !directives_include?(node, owner_object, owner_type) @@ -115,151 +153,185 @@ def gather_selections(owner_object, owner_type, selections, selections_by_name) raise "Invariant: unexpected selection class: #{node.class}" end end + selections_by_name end NO_ARGS = {}.freeze - def evaluate_selections(path, scoped_context, owner_object, owner_type, selections, root_operation_type: nil) + # @return [void] + def evaluate_selections(path, scoped_context, owner_object, owner_type, is_eager_selection:, gathered_selections:, after:) set_all_interpreter_context(owner_object, nil, nil, path) - selections_by_name = {} - gather_selections(owner_object, owner_type, selections, selections_by_name) - selections_by_name.each do |result_name, field_ast_nodes_or_ast_node| - # As a performance optimization, the hash key will be a `Node` if - # there's only one selection of the field. But if there are multiple - # selections of the field, it will be an Array of nodes - if field_ast_nodes_or_ast_node.is_a?(Array) - field_ast_nodes = field_ast_nodes_or_ast_node - ast_node = field_ast_nodes.first - else - field_ast_nodes = nil - ast_node = field_ast_nodes_or_ast_node + + @progress_path = path + @progress_scoped_context = scoped_context + @progress_object = owner_object + @progress_object_type = owner_type + @progress_index = nil + @progress_is_eager_selection = is_eager_selection + @progress_selections = gathered_selections + + # Track `idx` manually to avoid an allocation on this hot path + idx = 0 + gathered_selections.each do |result_name, field_ast_nodes_or_ast_node| + prev_idx = idx + idx += 1 + # TODO: this is how a `progress` resumes where this left off. + # Is there a better way to seek in the hash? + # I think we could also use the array of keys; it supports seeking just fine. + if after && prev_idx <= after + next end - field_name = ast_node.name - field_defn = @fields_cache[owner_type][field_name] ||= owner_type.get_field(field_name) - is_introspection = false - if field_defn.nil? - field_defn = if owner_type == schema.query && (entry_point_field = schema.introspection_system.entry_point(name: field_name)) - is_introspection = true - entry_point_field - elsif (dynamic_field = schema.introspection_system.dynamic_field(name: field_name)) - is_introspection = true - dynamic_field - else - raise "Invariant: no field for #{owner_type}.#{field_name}" - end + @progress_index = prev_idx + # This is how the current runtime gives itself to `dataloader` + # so that the dataloader can enqueue another fiber to resume if needed. + @dataloader.current_runtime = self + evaluate_selection(path, result_name, field_ast_nodes_or_ast_node, scoped_context, owner_object, owner_type, is_eager_selection) + # The dataloader knows if ^^ that selection halted and later selections were executed in another fiber. + # If that's the case, then don't continue execution here. + if @dataloader.yielded? + break end - return_type = field_defn.type + end + nil + end - next_path = path.dup - next_path << result_name - next_path.freeze + # @return [void] + def evaluate_selection(path, result_name, field_ast_nodes_or_ast_node, scoped_context, owner_object, owner_type, is_eager_field) + # As a performance optimization, the hash key will be a `Node` if + # there's only one selection of the field. But if there are multiple + # selections of the field, it will be an Array of nodes + if field_ast_nodes_or_ast_node.is_a?(Array) + field_ast_nodes = field_ast_nodes_or_ast_node + ast_node = field_ast_nodes.first + else + field_ast_nodes = nil + ast_node = field_ast_nodes_or_ast_node + end + field_name = ast_node.name + field_defn = @fields_cache[owner_type][field_name] ||= owner_type.get_field(field_name) + is_introspection = false + if field_defn.nil? + field_defn = if owner_type == schema.query && (entry_point_field = schema.introspection_system.entry_point(name: field_name)) + is_introspection = true + entry_point_field + elsif (dynamic_field = schema.introspection_system.dynamic_field(name: field_name)) + is_introspection = true + dynamic_field + else + raise "Invariant: no field for #{owner_type}.#{field_name}" + end + end + return_type = field_defn.type - # This seems janky, but we need to know - # the field's return type at this path in order - # to propagate `null` - set_type_at_path(next_path, return_type) - # Set this before calling `run_with_directives`, so that the directive can have the latest path - set_all_interpreter_context(nil, field_defn, nil, next_path) + next_path = path.dup + next_path << result_name + next_path.freeze - context.scoped_context = scoped_context - object = owner_object + # This seems janky, but we need to know + # the field's return type at this path in order + # to propagate `null` + set_type_at_path(next_path, return_type) + # Set this before calling `run_with_directives`, so that the directive can have the latest path + set_all_interpreter_context(nil, field_defn, nil, next_path) - if is_introspection - object = authorized_new(field_defn.owner, object, context, next_path) - end + context.scoped_context = scoped_context + object = owner_object - begin - kwarg_arguments = arguments(object, field_defn, ast_node) - rescue GraphQL::ExecutionError => e - continue_value(next_path, e, owner_type, field_defn, return_type.non_null?, ast_node) - next - end + if is_introspection + object = authorized_new(field_defn.owner, object, context, next_path) + end - after_lazy(kwarg_arguments, owner: owner_type, field: field_defn, path: next_path, scoped_context: context.scoped_context, owner_object: object, arguments: kwarg_arguments) do |resolved_arguments| - if resolved_arguments.is_a? GraphQL::ExecutionError - continue_value(next_path, resolved_arguments, owner_type, field_defn, return_type.non_null?, ast_node) - next - end + begin + kwarg_arguments = arguments(object, field_defn, ast_node) + rescue GraphQL::ExecutionError => e + continue_value(next_path, e, owner_type, field_defn, return_type.non_null?, ast_node) + return + end - kwarg_arguments = if resolved_arguments.empty? && field_defn.extras.empty? - # We can avoid allocating the `{ Symbol => Object }` hash in this case - NO_ARGS - else - # Bundle up the extras, then make a new arguments instance - # that includes the extras, too. - extra_args = {} - field_defn.extras.each do |extra| - case extra - when :ast_node - extra_args[:ast_node] = ast_node - when :execution_errors - extra_args[:execution_errors] = ExecutionErrors.new(context, ast_node, next_path) - when :path - extra_args[:path] = next_path - when :lookahead - if !field_ast_nodes - field_ast_nodes = [ast_node] - end - extra_args[:lookahead] = Execution::Lookahead.new( - query: query, - ast_nodes: field_ast_nodes, - field: field_defn, - ) - when :argument_details - # Use this flag to tell Interpreter::Arguments to add itself - # to the keyword args hash _before_ freezing everything. - extra_args[:argument_details] = :__arguments_add_self - else - extra_args[extra] = field_defn.fetch_extra(extra, context) + after_lazy(kwarg_arguments, owner: owner_type, field: field_defn, path: next_path, scoped_context: context.scoped_context, owner_object: object, arguments: kwarg_arguments) do |resolved_arguments| + if resolved_arguments.is_a? GraphQL::ExecutionError + continue_value(next_path, resolved_arguments, owner_type, field_defn, return_type.non_null?, ast_node) + next + end + + kwarg_arguments = if resolved_arguments.empty? && field_defn.extras.empty? + # We can avoid allocating the `{ Symbol => Object }` hash in this case + NO_ARGS + else + # Bundle up the extras, then make a new arguments instance + # that includes the extras, too. + extra_args = {} + field_defn.extras.each do |extra| + case extra + when :ast_node + extra_args[:ast_node] = ast_node + when :execution_errors + extra_args[:execution_errors] = ExecutionErrors.new(context, ast_node, next_path) + when :path + extra_args[:path] = next_path + when :lookahead + if !field_ast_nodes + field_ast_nodes = [ast_node] end - end - resolved_arguments = resolved_arguments.merge_extras(extra_args) - resolved_arguments.keyword_arguments + extra_args[:lookahead] = Execution::Lookahead.new( + query: query, + ast_nodes: field_ast_nodes, + field: field_defn, + ) + when :argument_details + # Use this flag to tell Interpreter::Arguments to add itself + # to the keyword args hash _before_ freezing everything. + extra_args[:argument_details] = :__arguments_add_self + else + extra_args[extra] = field_defn.fetch_extra(extra, context) + end end + resolved_arguments = resolved_arguments.merge_extras(extra_args) + resolved_arguments.keyword_arguments + end - set_all_interpreter_context(nil, nil, kwarg_arguments, nil) + set_all_interpreter_context(nil, nil, kwarg_arguments, nil) - # Optimize for the case that field is selected only once - if field_ast_nodes.nil? || field_ast_nodes.size == 1 - next_selections = ast_node.selections - else - next_selections = [] - field_ast_nodes.each { |f| next_selections.concat(f.selections) } - end + # Optimize for the case that field is selected only once + if field_ast_nodes.nil? || field_ast_nodes.size == 1 + next_selections = ast_node.selections + else + next_selections = [] + field_ast_nodes.each { |f| next_selections.concat(f.selections) } + end - field_result = resolve_with_directives(object, ast_node) do - # Actually call the field resolver and capture the result - app_result = begin - query.with_error_handling do - query.trace("execute_field", {owner: owner_type, field: field_defn, path: next_path, query: query, object: object, arguments: kwarg_arguments}) do - field_defn.resolve(object, kwarg_arguments, context) - end + field_result = resolve_with_directives(object, ast_node) do + # Actually call the field resolver and capture the result + app_result = begin + query.with_error_handling do + query.trace("execute_field", {owner: owner_type, field: field_defn, path: next_path, query: query, object: object, arguments: kwarg_arguments}) do + field_defn.resolve(object, kwarg_arguments, context) end - rescue GraphQL::ExecutionError => err - err end - after_lazy(app_result, owner: owner_type, field: field_defn, path: next_path, scoped_context: context.scoped_context, owner_object: object, arguments: kwarg_arguments) do |inner_result| - continue_value = continue_value(next_path, inner_result, owner_type, field_defn, return_type.non_null?, ast_node) - if RawValue === continue_value - # Write raw value directly to the response without resolving nested objects - write_in_response(next_path, continue_value.resolve) - elsif HALT != continue_value - continue_field(next_path, continue_value, owner_type, field_defn, return_type, ast_node, next_selections, false, object, kwarg_arguments) - end + rescue GraphQL::ExecutionError => err + err + end + after_lazy(app_result, owner: owner_type, field: field_defn, path: next_path, scoped_context: context.scoped_context, owner_object: object, arguments: kwarg_arguments) do |inner_result| + continue_value = continue_value(next_path, inner_result, owner_type, field_defn, return_type.non_null?, ast_node) + if RawValue === continue_value + # Write raw value directly to the response without resolving nested objects + write_in_response(next_path, continue_value.resolve) + elsif HALT != continue_value + continue_field(next_path, continue_value, owner_type, field_defn, return_type, ast_node, next_selections, false, object, kwarg_arguments) end end + end - # If this field is a root mutation field, immediately resolve - # all of its child fields before moving on to the next root mutation field. - # (Subselections of this mutation will still be resolved level-by-level.) - if root_operation_type == "mutation" - Interpreter::Resolve.resolve_all([field_result]) - else - field_result - end + # If this field is a root mutation field, immediately resolve + # all of its child fields before moving on to the next root mutation field. + # (Subselections of this mutation will still be resolved level-by-level.) + if is_eager_field + Interpreter::Resolve.resolve_all([field_result]) end + + nil end end @@ -345,7 +417,8 @@ def continue_field(path, value, owner_type, field, current_type, ast_node, next_ if HALT != continue_value response_hash = {} write_in_response(path, response_hash) - evaluate_selections(path, context.scoped_context, continue_value, current_type, next_selections) + gathered_selections = gather_selections(continue_value, current_type, next_selections) + evaluate_selections(path, context.scoped_context, continue_value, current_type, is_eager_selection: false, gathered_selections: gathered_selections, after: nil) response_hash end end diff --git a/lib/graphql/execution/multiplex.rb b/lib/graphql/execution/multiplex.rb index 87ac53ca84..13316ee8bf 100644 --- a/lib/graphql/execution/multiplex.rb +++ b/lib/graphql/execution/multiplex.rb @@ -29,11 +29,13 @@ class Multiplex include Tracing::Traceable - attr_reader :context, :queries, :schema, :max_complexity + attr_reader :context, :queries, :schema, :max_complexity, :dataloader def initialize(schema:, queries:, context:, max_complexity:) @schema = schema @queries = queries + @queries.each { |q| q.multiplex = self } @context = context + @context[:dataloader] = @dataloader = @schema.dataloader_class.new(context) @tracers = schema.tracers + (context[:tracers] || []) # Support `context: {backtrace: true}` if context[:backtrace] && !@tracers.include?(GraphQL::Backtrace::Tracer) @@ -79,20 +81,30 @@ def run_as_multiplex(multiplex) multiplex.schema.query_execution_strategy.begin_multiplex(multiplex) queries = multiplex.queries # Do as much eager evaluation of the query as possible - results = queries.map do |query| - begin_query(query, multiplex) + results = [] + queries.each_with_index do |query, idx| + multiplex.dataloader.enqueue { + results[idx] = begin_query(query, multiplex) + } end + multiplex.dataloader.run + # Then, work through lazy results in a breadth-first way - multiplex.schema.query_execution_strategy.finish_multiplex(results, multiplex) + multiplex.dataloader.enqueue { + multiplex.schema.query_execution_strategy.finish_multiplex(results, multiplex) + } + multiplex.dataloader.run # Then, find all errors and assign the result to the query object - results.each_with_index.map do |data_result, idx| + results.each_with_index do |data_result, idx| query = queries[idx] finish_query(data_result, query, multiplex) # Get the Query::Result, not the Hash - query.result + results[idx] = query.result end + + results rescue Exception # TODO rescue at a higher level so it will catch errors in analysis, too # Assign values here so that the query's `@executed` becomes true diff --git a/lib/graphql/query.rb b/lib/graphql/query.rb index f08fa501b2..773cac3156 100644 --- a/lib/graphql/query.rb +++ b/lib/graphql/query.rb @@ -153,6 +153,8 @@ def interpreter? @interpreter end + attr_accessor :multiplex + def subscription_update? @subscription_topic && subscription? end diff --git a/lib/graphql/query/context.rb b/lib/graphql/query/context.rb index f830410dd0..68afce4045 100644 --- a/lib/graphql/query/context.rb +++ b/lib/graphql/query/context.rb @@ -158,6 +158,10 @@ def initialize(query:, schema: query.schema, values:, object:) @scoped_context = {} end + def dataloader + @dataloader ||= query.multiplex.dataloader + end + # @api private attr_writer :interpreter diff --git a/lib/graphql/schema.rb b/lib/graphql/schema.rb index 501b70ac2f..4e398f09fb 100644 --- a/lib/graphql/schema.rb +++ b/lib/graphql/schema.rb @@ -1155,6 +1155,14 @@ def union_memberships(type = nil) end end + # @api private + # @see GraphQL::Dataloader + def dataloader_class + @dataloader_class || GraphQL::Dataloader::NullDataloader + end + + attr_writer :dataloader_class + def references_to(to_type = nil, from: nil) @own_references_to ||= Hash.new { |h, k| h[k] = [] } if to_type @@ -1711,6 +1719,7 @@ def lazy_methods else @lazy_methods = GraphQL::Execution::Lazy::LazyMethodMap.new @lazy_methods.set(GraphQL::Execution::Lazy, :value) + @lazy_methods.set(GraphQL::Dataloader::Request, :load) end end @lazy_methods @@ -1969,6 +1978,10 @@ def add_directives_from(owner) end end + def dataloader_class + self.class.dataloader_class + end + # Install these here so that subclasses will also install it. use(GraphQL::Execution::Errors) use(GraphQL::Pagination::Connections) diff --git a/lib/graphql/schema/member/has_arguments.rb b/lib/graphql/schema/member/has_arguments.rb index 718a1f251f..3a300078cf 100644 --- a/lib/graphql/schema/member/has_arguments.rb +++ b/lib/graphql/schema/member/has_arguments.rb @@ -111,21 +111,24 @@ def coerce_arguments(parent_object, values, context) if has_value loads = arg_defn.loads loaded_value = nil + coerced_value = context.schema.error_handler.with_error_handling(context) do + arg_defn.type.coerce_input(value, context) + end + + # TODO this should probably be inside after_lazy if loads && !arg_defn.from_resolver? loaded_value = if arg_defn.type.list? - loaded_values = value.map { |val| load_application_object(arg_defn, loads, val, context) } + loaded_values = coerced_value.map { |val| load_application_object(arg_defn, loads, val, context) } context.schema.after_any_lazies(loaded_values) { |result| result } else - load_application_object(arg_defn, loads, value, context) + load_application_object(arg_defn, loads, coerced_value, context) end end coerced_value = if loaded_value loaded_value else - context.schema.error_handler.with_error_handling(context) do - arg_defn.type.coerce_input(value, context) - end + coerced_value end context.schema.after_lazy(coerced_value) do |coerced_value| diff --git a/lib/graphql/schema/object.rb b/lib/graphql/schema/object.rb index e86b21f66c..1de5f78235 100644 --- a/lib/graphql/schema/object.rb +++ b/lib/graphql/schema/object.rb @@ -14,6 +14,11 @@ class Object < GraphQL::Schema::Member # @return [GraphQL::Query::Context] the context instance for this query attr_reader :context + # @return [GraphQL::Dataloader] + def dataloader + context.dataloader + end + # Call this in a field method to return a value that should be returned to the client # without any further handling by GraphQL. def raw_value(obj) diff --git a/lib/graphql/schema/resolver.rb b/lib/graphql/schema/resolver.rb index ead8aa32be..da43b9a8f0 100644 --- a/lib/graphql/schema/resolver.rb +++ b/lib/graphql/schema/resolver.rb @@ -50,6 +50,11 @@ def initialize(object:, context:, field:) # @return [GraphQL::Query::Context] attr_reader :context + # @return [GraphQL::Dataloader] + def dataloader + context.dataloader + end + # @return [GraphQL::Schema::Field] attr_reader :field diff --git a/lib/graphql/schema/subscription.rb b/lib/graphql/schema/subscription.rb index 7a9c11240a..f5aa251645 100644 --- a/lib/graphql/schema/subscription.rb +++ b/lib/graphql/schema/subscription.rb @@ -25,6 +25,22 @@ def initialize(object:, context:, field:) @mode = context.query.subscription_update? ? :update : :subscribe end + def resolve_with_support(**args) + result = nil + unsubscribed = true + catch :graphql_subscription_unsubscribed do + result = super + unsubscribed = false + end + + + if unsubscribed + context.skip + else + result + end + end + # Implement the {Resolve} API def resolve(**args) # Dispatch based on `@mode`, which will raise a `NoMethodError` if we ever @@ -55,7 +71,8 @@ def subscribe(args = {}) def resolve_update(**args) ret_val = args.any? ? update(**args) : update if ret_val == :no_update - throw :graphql_no_subscription_update + context.namespace(:subscriptions)[:no_update] = true + context.skip else ret_val end @@ -80,6 +97,7 @@ def load_application_object_failed(err) # Call this to halt execution and remove this subscription from the system def unsubscribe + context.namespace(:subscriptions)[:unsubscribed] = true throw :graphql_subscription_unsubscribed end diff --git a/lib/graphql/subscriptions.rb b/lib/graphql/subscriptions.rb index a2b04b21e6..9af02bfe4d 100644 --- a/lib/graphql/subscriptions.rb +++ b/lib/graphql/subscriptions.rb @@ -107,31 +107,26 @@ def execute_update(subscription_id, event, object) variables = query_data.fetch(:variables) context = query_data.fetch(:context) operation_name = query_data.fetch(:operation_name) - result = nil - # this will be set to `false` unless `.execute` is terminated - # with a `throw :graphql_subscription_unsubscribed` - unsubscribed = true - catch(:graphql_subscription_unsubscribed) do - catch(:graphql_no_subscription_update) do - # Re-evaluate the saved query, - # but if it terminates early with a `throw`, - # it will stay `nil` - result = @schema.execute( - query: query_string, - context: context, - subscription_topic: event.topic, - operation_name: operation_name, - variables: variables, - root_value: object, - ) - end - unsubscribed = false + result = @schema.execute( + query: query_string, + context: context, + subscription_topic: event.topic, + operation_name: operation_name, + variables: variables, + root_value: object, + ) + subscriptions_context = result.context.namespace(:subscriptions) + if subscriptions_context[:no_update] + result = nil end + unsubscribed = subscriptions_context[:unsubscribed] + if unsubscribed # `unsubscribe` was called, clean up on our side # TODO also send `{more: false}` to client? delete_subscription(subscription_id) + result = nil end result diff --git a/spec/graphql/dataloader_spec.rb b/spec/graphql/dataloader_spec.rb new file mode 100644 index 0000000000..e854f9343c --- /dev/null +++ b/spec/graphql/dataloader_spec.rb @@ -0,0 +1,336 @@ +# frozen_string_literal: true +require "spec_helper" + +describe GraphQL::Dataloader do + class FiberSchema < GraphQL::Schema + module Database + extend self + DATA = {} + [ + { id: "1", name: "Wheat", type: "Grain" }, + { id: "2", name: "Corn", type: "Grain" }, + { id: "3", name: "Butter", type: "Dairy" }, + { id: "4", name: "Baking Soda", type: "LeaveningAgent" }, + { id: "5", name: "Cornbread", type: "Recipe", ingredient_ids: ["1", "2", "3", "4"] }, + { id: "6", name: "Grits", type: "Recipe", ingredient_ids: ["2", "3", "7"] }, + { id: "7", name: "Cheese", type: "Dairy" }, + ].each { |d| DATA[d[:id]] = d } + + def log + @log ||= [] + end + + def mget(ids) + log << [:mget, ids] + ids.map { |id| DATA[id] } + end + + def find_by(attribute, keys) + log << [:find_by, attribute, keys] + keys.map { |k| DATA.each_value.find { |v| v[attribute] == k } } + end + end + + class DataObject < GraphQL::Dataloader::Source + def initialize(column = :id) + @column = column + end + + def fetch(keys) + if @column == :id + Database.mget(keys) + else + Database.find_by(@column, keys) + end + end + end + + class NestedDataObject < GraphQL::Dataloader::Source + def fetch(ids) + @dataloader.with(DataObject).load_all(ids) + end + end + + class SlowDataObject < GraphQL::Dataloader::Source + def initialize(batch_key) + # This is just so that I can force different instances in test + @batch_key = batch_key + end + + def fetch(keys) + t = Thread.new { + sleep 0.5 + Database.mget(keys) + } + dataloader.yield + t.value + end + end + + module Ingredient + include GraphQL::Schema::Interface + field :name, String, null: false + field :id, ID, null: false + end + + class Grain < GraphQL::Schema::Object + implements Ingredient + end + + class LeaveningAgent < GraphQL::Schema::Object + implements Ingredient + end + + class Dairy < GraphQL::Schema::Object + implements Ingredient + end + + class Recipe < GraphQL::Schema::Object + field :name, String, null: false + field :ingredients, [Ingredient], null: false + + def ingredients + ingredients = dataloader.with(DataObject).load_all(object[:ingredient_ids]) + ingredients + end + + field :slow_ingredients, [Ingredient], null: false + + def slow_ingredients + # Use `object[:id]` here to force two different instances of the loader in the test + dataloader.with(SlowDataObject, object[:id]).load_all(object[:ingredient_ids]) + end + end + + class Query < GraphQL::Schema::Object + field :ingredient, Ingredient, null: true do + argument :id, ID, required: true + end + + def ingredient(id:) + dataloader.with(DataObject).load(id) + end + + field :ingredient_by_name, Ingredient, null: true do + argument :name, String, required: true + end + + def ingredient_by_name(name:) + dataloader.with(DataObject, :name).load(name) + end + + field :nested_ingredient, Ingredient, null: true do + argument :id, ID, required: true + end + + def nested_ingredient(id:) + dataloader.with(NestedDataObject).load(id) + end + + field :slow_recipe, Recipe, null: true do + argument :id, ID, required: true + end + + def slow_recipe(id:) + dataloader.with(SlowDataObject, id).load(id) + end + + field :recipe, Recipe, null: true do + argument :id, ID, required: true, loads: Recipe, as: :recipe + end + + def recipe(recipe:) + recipe + end + + field :recipe_ingredient, Ingredient, null: true do + argument :recipe_id, ID, required: true + argument :ingredient_number, Int, required: true + end + + def recipe_ingredient(recipe_id:, ingredient_number:) + recipe = dataloader.with(DataObject).load(recipe_id) + ingredient_id = recipe[:ingredient_ids][ingredient_number - 1] + dataloader.with(DataObject).load(ingredient_id) + end + + field :common_ingredients, [Ingredient], null: true do + argument :recipe_1_id, ID, required: true + argument :recipe_2_id, ID, required: true + end + + def common_ingredients(recipe_1_id:, recipe_2_id:) + req1 = dataloader.with(DataObject).request(recipe_1_id) + req2 = dataloader.with(DataObject).request(recipe_2_id) + recipe1 = req1.load + recipe2 = req2.load + common_ids = recipe1[:ingredient_ids] & recipe2[:ingredient_ids] + dataloader.with(DataObject).load_all(common_ids) + end + end + + query(Query) + + def self.object_from_id(id, ctx) + ctx.dataloader.with(DataObject).load(id) + end + + def self.resolve_type(type, obj, ctx) + get_type(obj[:type]) + end + + orphan_types(Grain, Dairy, Recipe, LeaveningAgent) + use GraphQL::Dataloader + end + + def database_log + FiberSchema::Database.log + end + + before do + database_log.clear + end + + it "Works with request(...)" do + res = FiberSchema.execute <<-GRAPHQL + { + commonIngredients(recipe1Id: 5, recipe2Id: 6) { + name + } + } + GRAPHQL + + expected_data = { + "data" => { + "commonIngredients" => [ + { "name" => "Corn" }, + { "name" => "Butter" }, + ] + } + } + assert_equal expected_data, res + assert_equal [[:mget, ["5", "6"]], [:mget, ["2", "3"]]], database_log + end + + it "batch-loads" do + res = FiberSchema.execute <<-GRAPHQL + { + i1: ingredient(id: 1) { name } + i2: ingredient(id: 2) { name } + r1: recipe(id: 5) { + ingredients { name } + } + ri1: recipeIngredient(recipeId: 6, ingredientNumber: 3) { + name + } + } + GRAPHQL + + expected_data = { + "i1" => { "name" => "Wheat" }, + "i2" => { "name" => "Corn" }, + "r1" => { + "ingredients" => [ + { "name" => "Wheat" }, + { "name" => "Corn" }, + { "name" => "Butter" }, + { "name" => "Baking Soda" }, + ], + }, + "ri1" => { + "name" => "Cheese", + }, + } + assert_equal(expected_data, res["data"]) + + expected_log = [ + [:mget, [ + "1", "2", # The first 2 ingredients + "5", # The first recipe + "6", # recipeIngredient recipeId + ]], + [:mget, [ + "7", # recipeIngredient ingredient_id + "3", "4", # The two unfetched ingredients the first recipe + ]], + ] + assert_equal expected_log, database_log + end + + it "caches and batch-loads across a multiplex" do + context = {} + result = FiberSchema.multiplex([ + { query: "{ i1: ingredient(id: 1) { name } i2: ingredient(id: 2) { name } }", }, + { query: "{ i2: ingredient(id: 2) { name } r1: recipe(id: 5) { ingredients { name } } }", }, + { query: "{ i1: ingredient(id: 1) { name } ri1: recipeIngredient(recipeId: 5, ingredientNumber: 2) { name } }", }, + ], context: context) + + expected_result = [ + {"data"=>{"i1"=>{"name"=>"Wheat"}, "i2"=>{"name"=>"Corn"}}}, + {"data"=>{"i2"=>{"name"=>"Corn"}, "r1"=>{"ingredients"=>[{"name"=>"Wheat"}, {"name"=>"Corn"}, {"name"=>"Butter"}, {"name"=>"Baking Soda"}]}}}, + {"data"=>{"i1"=>{"name"=>"Wheat"}, "ri1"=>{"name"=>"Corn"}}}, + ] + assert_equal expected_result, result + expected_log = [ + [:mget, ["1", "5", "2"]], + [:mget, ["3", "4"]], + ] + assert_equal expected_log, database_log + assert_equal 0, context[:dataloader].yielded_fibers.size, "All yielded fibers are cleaned up when they're finished" + end + + it "works with calls within sources" do + res = FiberSchema.execute <<-GRAPHQL + { + i1: nestedIngredient(id: 1) { name } + i2: nestedIngredient(id: 2) { name } + } + GRAPHQL + + expected_data = { "i1" => { "name" => "Wheat" }, "i2" => { "name" => "Corn" } } + assert_equal expected_data, res["data"] + assert_equal [[:mget, ["1", "2"]]], database_log + end + + it "works with batch parameters" do + res = FiberSchema.execute <<-GRAPHQL + { + i1: ingredientByName(name: "Butter") { id } + i2: ingredientByName(name: "Corn") { id } + i3: ingredientByName(name: "Gummi Bears") { id } + } + GRAPHQL + + expected_data = { + "i1" => { "id" => "3" }, + "i2" => { "id" => "2" }, + "i3" => nil, + } + assert_equal expected_data, res["data"] + assert_equal [[:find_by, :name, ["Butter", "Corn", "Gummi Bears"]]], database_log + end + + it "works with manual parallelism" do + start = Time.now.to_f + FiberSchema.execute <<-GRAPHQL + { + i1: slowRecipe(id: 5) { slowIngredients { name } } + i2: slowRecipe(id: 6) { slowIngredients { name } } + } + GRAPHQL + finish = Time.now.to_f + + # Each load slept for 0.5 second, so sequentially, this would have been 2s sequentially + assert_in_delta 1, finish - start, 0.1, "Load threads are executed in parallel" + expected_log = [ + # These were separated because of different recipe IDs: + [:mget, ["5"]], + [:mget, ["6"]], + # These were cached separately because of different recipe IDs: + [:mget, ["2", "3", "7"]], + [:mget, ["1", "2", "3", "4"]], + ] + # Sort them because threads may have returned in slightly different order + assert_equal expected_log.sort, database_log.sort + end +end diff --git a/spec/graphql/schema/subscription_spec.rb b/spec/graphql/schema/subscription_spec.rb index 4256c9d1da..84e91d5c65 100644 --- a/spec/graphql/schema/subscription_spec.rb +++ b/spec/graphql/schema/subscription_spec.rb @@ -126,7 +126,9 @@ def toot(body:) subscription(Subscription) rescue_from(StandardError) { |err, *rest| - raise "This should never happen: #{err.class}: #{err.message}" + err2 = RuntimeError.new("This should never happen: #{err.class}: #{err.message}") + err2.set_backtrace(err.backtrace) + raise err2 } def self.object_from_id(id, ctx) diff --git a/spec/support/dummy/schema.rb b/spec/support/dummy/schema.rb index 8698bbf865..0632ad8a47 100644 --- a/spec/support/dummy/schema.rb +++ b/spec/support/dummy/schema.rb @@ -525,6 +525,8 @@ def self.type_error(err, ctx) end end + use GraphQL::Dataloader + lazy_resolve(Proc, :call) end diff --git a/spec/support/jazz.rb b/spec/support/jazz.rb index b635ed60e0..059d7e39b0 100644 --- a/spec/support/jazz.rb +++ b/spec/support/jazz.rb @@ -932,6 +932,8 @@ def self.object_from_id(id, ctx) GloballyIdentifiableType.find(id) end + use GraphQL::Dataloader + if !TESTING_INTERPRETER use GraphQL::Execution::Execute use GraphQL::Analysis @@ -945,6 +947,8 @@ class SchemaWithoutIntrospection < GraphQL::Schema disable_introspection_entry_points + use GraphQL::Dataloader + if !TESTING_INTERPRETER use GraphQL::Execution::Execute use GraphQL::Analysis