From 71cecc7577a953898e83d42ce12c075e33e1e96f Mon Sep 17 00:00:00 2001 From: Christopher Holmes Date: Thu, 6 Mar 2025 09:10:31 +0000 Subject: [PATCH 1/5] feat: hook into Process forking to reset metric readers --- metrics_sdk/lib/opentelemetry/sdk/metrics.rb | 1 + .../opentelemetry/sdk/metrics/fork_hooks.rb | 42 +++++++ .../sdk/metrics/fork_hooks_test.rb | 113 ++++++++++++++++++ 3 files changed, 156 insertions(+) create mode 100644 metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb create mode 100644 metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics.rb index 35593185f9..30036f318a 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics.rb @@ -16,6 +16,7 @@ module Metrics require 'opentelemetry/sdk/metrics/aggregation' require 'opentelemetry/sdk/metrics/configuration_patch' require 'opentelemetry/sdk/metrics/export' +require 'opentelemetry/sdk/metrics/fork_hooks' require 'opentelemetry/sdk/metrics/instrument' require 'opentelemetry/sdk/metrics/meter' require 'opentelemetry/sdk/metrics/meter_provider' diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb new file mode 100644 index 0000000000..87df7f1a2d --- /dev/null +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module SDK + module Metrics + # ForkHooks implements methods to run callbacks before and after forking a Process by overriding Process::_fork + # This is used to ensure that the PeriodicMetricReader is restarted after forking + module ForkHooks + def self.attach! + return if @fork_hooks_attached + + Process.singleton_class.prepend(ForkHooks) + @fork_hooks_attached = true + end + + def self.before_fork + ::OpenTelemetry.meter_provider.metric_readers.each do |reader| + reader.before_fork if reader.respond_to?(:before_fork) + end + end + + def self.after_fork + ::OpenTelemetry.meter_provider.metric_readers.each do |reader| + reader.after_fork if reader.respond_to?(:after_fork) + end + end + + def _fork + ForkHooks.before_fork + parent_pid = Process.pid + super.tap do + ForkHooks.after_fork unless Process.pid == parent_pid + end + end + end + end + end +end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb new file mode 100644 index 0000000000..543ab2bac1 --- /dev/null +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb @@ -0,0 +1,113 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' +require 'opentelemetry-exporter-otlp-metrics' unless RUBY_ENGINE == 'jruby' + +describe OpenTelemetry::SDK::Metrics::ForkHooks do + def fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) + with_pipe do |inner_read_io, inner_write_io| + child_pid = fork do # fork twice to avoid prepending fork in the parent process + setup_fork_hooks(before_fork_lambda, after_fork_lambda) do + grandchild_pid = fork {} + Process.waitpid(grandchild_pid) + inner_write_io.puts grandchild_pid + end + end + Process.waitpid(child_pid) + grandchild_pid = inner_read_io.gets.chomp.to_i + refute_equal(child_pid, Process.pid) + refute_equal(child_pid, grandchild_pid) + [child_pid, grandchild_pid] + end + end + + def setup_fork_hooks(before_hook, after_hook) + OpenTelemetry::SDK::Metrics::ForkHooks.stub(:before_fork, before_hook) do + OpenTelemetry::SDK::Metrics::ForkHooks.stub(:after_fork, after_hook) do + Process.singleton_class.prepend(OpenTelemetry::SDK::Metrics::ForkHooks) + yield if block_given? + end + end + end + + def with_pipe + read_io, write_io = IO.pipe + yield(read_io, write_io) + ensure + read_io.close unless read_io.closed? + write_io.close unless write_io.closed? + end + + it 'runs the before_hook before forking' do + with_pipe do |inner_read_io, inner_write_io| + before_fork_lambda = proc do + inner_write_io.puts "before_fork was called on #{Process.pid}" + end + after_fork_lambda = proc {} + forking_pid, _forked_pid = fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) + + before_fork_message = inner_read_io.gets.chomp + assert_equal(before_fork_message, "before_fork was called on #{forking_pid}") + end + end + + it 'runs the after_hook after forking' do + with_pipe do |after_fork_read_io, after_fork_write_io| + before_fork_lambda = proc {} + after_fork_lambda = proc do + after_fork_write_io.puts Process.pid + end + + forking_pid, forked_pid = fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) + pid_from_after_fork = after_fork_read_io.gets.chomp.to_i + + refute_equal(pid_from_after_fork, Process.pid) + refute_equal(pid_from_after_fork, forking_pid) + assert_equal(forked_pid, pid_from_after_fork) + end + end + + it 'calls before_fork on metric readers' do + reader1 = Class.new do + attr_reader :before_fork_called + + def before_fork + @before_fork_called = true + end + end.new + + reader2 = OpenStruct.new + + meter_provider = OpenTelemetry::SDK::Metrics::MeterProvider.new + meter_provider.add_metric_reader(reader1) + meter_provider.add_metric_reader(reader2) + ::OpenTelemetry.stub(:meter_provider, meter_provider) do + OpenTelemetry::SDK::Metrics::ForkHooks.before_fork + end + assert(reader1.before_fork_called) + end + + it 'calls after_fork on metric readers' do + reader1 = Class.new do + attr_reader :after_fork_called + + def after_fork + @after_fork_called = true + end + end.new + + reader2 = OpenStruct.new + + meter_provider = OpenTelemetry::SDK::Metrics::MeterProvider.new + meter_provider.add_metric_reader(reader1) + meter_provider.add_metric_reader(reader2) + ::OpenTelemetry.stub(:meter_provider, meter_provider) do + OpenTelemetry::SDK::Metrics::ForkHooks.after_fork + end + assert(reader1.after_fork_called) + end +end From 851f2bfb19772c0bb9d30bae752dcda65101d3a0 Mon Sep 17 00:00:00 2001 From: Christopher Holmes Date: Thu, 6 Mar 2025 09:47:38 +0000 Subject: [PATCH 2/5] fix: recover PeriodicMetricReader in forked processes --- .../sdk/metrics/configuration_patch.rb | 5 +++ .../metrics/export/periodic_metric_reader.rb | 7 ++++ .../periodic_metric_reader_test.rb | 39 +++++++++++++++++++ metrics_sdk/test/test_helper.rb | 2 +- 4 files changed, 52 insertions(+), 1 deletion(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb index f9ee2c07fa..567ea4fdbc 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/configuration_patch.rb @@ -25,6 +25,7 @@ def initialize def metrics_configuration_hook OpenTelemetry.meter_provider = Metrics::MeterProvider.new(resource: @resource) configure_metric_readers + attach_fork_hooks! end def configure_metric_readers @@ -52,6 +53,10 @@ def wrapped_metric_exporters_from_env end end end + + def attach_fork_hooks! + ForkHooks.attach! + end end end end diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb index 948f98976c..1e83eacc16 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/export/periodic_metric_reader.rb @@ -59,6 +59,13 @@ def force_flush(timeout: nil) Export::FAILURE end + def after_fork + @exporter.reset if @exporter.respond_to?(:reset) + collect # move past previously reported metrics from parent process + @thread = nil + start + end + private def start diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 5abdfbbf8d..45e322aff3 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -5,6 +5,7 @@ # SPDX-License-Identifier: Apache-2.0 require 'test_helper' +require 'json' describe OpenTelemetry::SDK do describe '#periodic_metric_reader' do @@ -81,5 +82,43 @@ _(snapshot.size).must_equal(1) _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false end + + it 'is restarted after forking' do + OpenTelemetry::SDK.configure + + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) + + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + + read, write = IO.pipe + + pid = fork do + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(3, attributes: { 'b' => 'c' }) + counter.add(4, attributes: { 'd' => 'e' }) + + sleep(8) + snapshot = metric_exporter.metric_snapshots + + json = snapshot.map { |reading| { name: reading.name } }.to_json + write.puts json + end + + Timeout.timeout(10) do + Process.waitpid(pid) + end + + periodic_metric_reader.shutdown + snapshot = JSON.parse(read.gets.chomp) + _(snapshot.size).must_equal(1) + _(snapshot[0]).must_equal('name' => 'counter') + _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false + end end end diff --git a/metrics_sdk/test/test_helper.rb b/metrics_sdk/test/test_helper.rb index 11076bde18..cd2775c758 100644 --- a/metrics_sdk/test/test_helper.rb +++ b/metrics_sdk/test/test_helper.rb @@ -20,7 +20,7 @@ def reset_metrics_sdk :@meter_provider, OpenTelemetry::Internal::ProxyMeterProvider.new ) - + OpenTelemetry::SDK::Metrics::ForkHooks.instance_variable_set(:@fork_hooks_attached, false) OpenTelemetry.logger = Logger.new(File::NULL) OpenTelemetry.error_handler = nil end From 67a62298d66304b231da6ae28c8e2e8176a5af40 Mon Sep 17 00:00:00 2001 From: Christopher Holmes Date: Thu, 6 Mar 2025 10:09:59 +0000 Subject: [PATCH 3/5] tests: guard against errors in process forking Add a little bit of guarding to help surface errors in future should problems cause the suite to fail --- .../sdk/metrics/fork_hooks_test.rb | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb index 543ab2bac1..4a4c20d64a 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb @@ -5,7 +5,7 @@ # SPDX-License-Identifier: Apache-2.0 require 'test_helper' -require 'opentelemetry-exporter-otlp-metrics' unless RUBY_ENGINE == 'jruby' +require 'json' describe OpenTelemetry::SDK::Metrics::ForkHooks do def fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) @@ -13,12 +13,18 @@ def fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) child_pid = fork do # fork twice to avoid prepending fork in the parent process setup_fork_hooks(before_fork_lambda, after_fork_lambda) do grandchild_pid = fork {} - Process.waitpid(grandchild_pid) - inner_write_io.puts grandchild_pid + Timeout.timeout(5) { Process.waitpid(grandchild_pid) } + message = { 'child_pid' => Process.pid, 'grandchild_pid' => grandchild_pid }.to_json + inner_write_io.puts message + rescue StandardError => e + message = { 'error' => e.message }.to_json + inner_write_io.puts message end end - Process.waitpid(child_pid) - grandchild_pid = inner_read_io.gets.chomp.to_i + Timeout.timeout(10) { Process.waitpid(child_pid) } + received_from_child = JSON.parse(inner_read_io.gets.chomp) + refute_includes(received_from_child, 'error') + grandchild_pid = received_from_child['grandchild_pid'] refute_equal(child_pid, Process.pid) refute_equal(child_pid, grandchild_pid) [child_pid, grandchild_pid] @@ -59,11 +65,12 @@ def with_pipe with_pipe do |after_fork_read_io, after_fork_write_io| before_fork_lambda = proc {} after_fork_lambda = proc do - after_fork_write_io.puts Process.pid + message = { 'after_fork_pid' => Process.pid }.to_json + after_fork_write_io.puts message end - forking_pid, forked_pid = fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) - pid_from_after_fork = after_fork_read_io.gets.chomp.to_i + forking_pid, forked_pid = fork_with_fork_hooks(after_fork_lambda) + pid_from_after_fork = JSON.parse(after_fork_read_io.gets.chomp)['after_fork_pid'].to_i refute_equal(pid_from_after_fork, Process.pid) refute_equal(pid_from_after_fork, forking_pid) From dedabe7d38a320b7cfed78290672566535349eaa Mon Sep 17 00:00:00 2001 From: Christopher Holmes Date: Thu, 6 Mar 2025 10:10:29 +0000 Subject: [PATCH 4/5] feat: remove the before_hook as it is unused --- .../opentelemetry/sdk/metrics/fork_hooks.rb | 7 --- .../sdk/metrics/fork_hooks_test.rb | 48 +++---------------- 2 files changed, 6 insertions(+), 49 deletions(-) diff --git a/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb b/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb index 87df7f1a2d..0f3f29ce92 100644 --- a/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb +++ b/metrics_sdk/lib/opentelemetry/sdk/metrics/fork_hooks.rb @@ -17,12 +17,6 @@ def self.attach! @fork_hooks_attached = true end - def self.before_fork - ::OpenTelemetry.meter_provider.metric_readers.each do |reader| - reader.before_fork if reader.respond_to?(:before_fork) - end - end - def self.after_fork ::OpenTelemetry.meter_provider.metric_readers.each do |reader| reader.after_fork if reader.respond_to?(:after_fork) @@ -30,7 +24,6 @@ def self.after_fork end def _fork - ForkHooks.before_fork parent_pid = Process.pid super.tap do ForkHooks.after_fork unless Process.pid == parent_pid diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb index 4a4c20d64a..20893b254b 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb @@ -8,10 +8,10 @@ require 'json' describe OpenTelemetry::SDK::Metrics::ForkHooks do - def fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) + def fork_with_fork_hooks(after_fork_lambda) with_pipe do |inner_read_io, inner_write_io| child_pid = fork do # fork twice to avoid prepending fork in the parent process - setup_fork_hooks(before_fork_lambda, after_fork_lambda) do + setup_fork_hooks(after_fork_lambda) do grandchild_pid = fork {} Timeout.timeout(5) { Process.waitpid(grandchild_pid) } message = { 'child_pid' => Process.pid, 'grandchild_pid' => grandchild_pid }.to_json @@ -31,12 +31,10 @@ def fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) end end - def setup_fork_hooks(before_hook, after_hook) - OpenTelemetry::SDK::Metrics::ForkHooks.stub(:before_fork, before_hook) do - OpenTelemetry::SDK::Metrics::ForkHooks.stub(:after_fork, after_hook) do - Process.singleton_class.prepend(OpenTelemetry::SDK::Metrics::ForkHooks) - yield if block_given? - end + def setup_fork_hooks(after_hook) + OpenTelemetry::SDK::Metrics::ForkHooks.stub(:after_fork, after_hook) do + Process.singleton_class.prepend(OpenTelemetry::SDK::Metrics::ForkHooks) + yield if block_given? end end @@ -48,22 +46,8 @@ def with_pipe write_io.close unless write_io.closed? end - it 'runs the before_hook before forking' do - with_pipe do |inner_read_io, inner_write_io| - before_fork_lambda = proc do - inner_write_io.puts "before_fork was called on #{Process.pid}" - end - after_fork_lambda = proc {} - forking_pid, _forked_pid = fork_with_fork_hooks(before_fork_lambda, after_fork_lambda) - - before_fork_message = inner_read_io.gets.chomp - assert_equal(before_fork_message, "before_fork was called on #{forking_pid}") - end - end - it 'runs the after_hook after forking' do with_pipe do |after_fork_read_io, after_fork_write_io| - before_fork_lambda = proc {} after_fork_lambda = proc do message = { 'after_fork_pid' => Process.pid }.to_json after_fork_write_io.puts message @@ -78,26 +62,6 @@ def with_pipe end end - it 'calls before_fork on metric readers' do - reader1 = Class.new do - attr_reader :before_fork_called - - def before_fork - @before_fork_called = true - end - end.new - - reader2 = OpenStruct.new - - meter_provider = OpenTelemetry::SDK::Metrics::MeterProvider.new - meter_provider.add_metric_reader(reader1) - meter_provider.add_metric_reader(reader2) - ::OpenTelemetry.stub(:meter_provider, meter_provider) do - OpenTelemetry::SDK::Metrics::ForkHooks.before_fork - end - assert(reader1.before_fork_called) - end - it 'calls after_fork on metric readers' do reader1 = Class.new do attr_reader :after_fork_called From 857b773cefccdd80b42ab5164c9a91ba6fbc1503 Mon Sep 17 00:00:00 2001 From: Christopher Holmes Date: Fri, 7 Mar 2025 10:32:35 +0000 Subject: [PATCH 5/5] skip forking tests on windows and java platforms --- .../periodic_metric_reader_test.rb | 56 ++++++++++--------- .../sdk/metrics/fork_hooks_test.rb | 2 + 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/metrics_sdk/test/integration/periodic_metric_reader_test.rb b/metrics_sdk/test/integration/periodic_metric_reader_test.rb index 45e322aff3..8ccd25c02c 100644 --- a/metrics_sdk/test/integration/periodic_metric_reader_test.rb +++ b/metrics_sdk/test/integration/periodic_metric_reader_test.rb @@ -83,42 +83,44 @@ _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false end - it 'is restarted after forking' do - OpenTelemetry::SDK.configure + unless Gem.win_platform? || %w[jruby truffleruby].include?(RUBY_ENGINE) # forking is not available on these platforms or runtimes + it 'is restarted after forking' do + OpenTelemetry::SDK.configure - metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new - periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) + metric_exporter = OpenTelemetry::SDK::Metrics::Export::InMemoryMetricPullExporter.new + periodic_metric_reader = OpenTelemetry::SDK::Metrics::Export::PeriodicMetricReader.new(export_interval_millis: 5000, export_timeout_millis: 5000, exporter: metric_exporter) - OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) + OpenTelemetry.meter_provider.add_metric_reader(periodic_metric_reader) - read, write = IO.pipe + read, write = IO.pipe - pid = fork do - meter = OpenTelemetry.meter_provider.meter('test') - counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') + pid = fork do + meter = OpenTelemetry.meter_provider.meter('test') + counter = meter.create_counter('counter', unit: 'smidgen', description: 'a small amount of something') - counter.add(1) - counter.add(2, attributes: { 'a' => 'b' }) - counter.add(2, attributes: { 'a' => 'b' }) - counter.add(3, attributes: { 'b' => 'c' }) - counter.add(4, attributes: { 'd' => 'e' }) + counter.add(1) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(2, attributes: { 'a' => 'b' }) + counter.add(3, attributes: { 'b' => 'c' }) + counter.add(4, attributes: { 'd' => 'e' }) - sleep(8) - snapshot = metric_exporter.metric_snapshots + sleep(8) + snapshot = metric_exporter.metric_snapshots - json = snapshot.map { |reading| { name: reading.name } }.to_json - write.puts json - end + json = snapshot.map { |reading| { name: reading.name } }.to_json + write.puts json + end - Timeout.timeout(10) do - Process.waitpid(pid) - end + Timeout.timeout(10) do + Process.waitpid(pid) + end - periodic_metric_reader.shutdown - snapshot = JSON.parse(read.gets.chomp) - _(snapshot.size).must_equal(1) - _(snapshot[0]).must_equal('name' => 'counter') - _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false + periodic_metric_reader.shutdown + snapshot = JSON.parse(read.gets.chomp) + _(snapshot.size).must_equal(1) + _(snapshot[0]).must_equal('name' => 'counter') + _(periodic_metric_reader.instance_variable_get(:@thread).alive?).must_equal false + end end end end diff --git a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb index 20893b254b..a4f129ffd4 100644 --- a/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb +++ b/metrics_sdk/test/opentelemetry/sdk/metrics/fork_hooks_test.rb @@ -4,6 +4,8 @@ # # SPDX-License-Identifier: Apache-2.0 +return if Gem.win_platform? || %w[jruby truffleruby].include?(RUBY_ENGINE) # forking is not available on these platforms or runtimes + require 'test_helper' require 'json'