Skip to content

Commit 71cecc7

Browse files
committed
feat: hook into Process forking to reset metric readers
1 parent aacd8c8 commit 71cecc7

File tree

3 files changed

+156
-0
lines changed

3 files changed

+156
-0
lines changed

metrics_sdk/lib/opentelemetry/sdk/metrics.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ module Metrics
1616
require 'opentelemetry/sdk/metrics/aggregation'
1717
require 'opentelemetry/sdk/metrics/configuration_patch'
1818
require 'opentelemetry/sdk/metrics/export'
19+
require 'opentelemetry/sdk/metrics/fork_hooks'
1920
require 'opentelemetry/sdk/metrics/instrument'
2021
require 'opentelemetry/sdk/metrics/meter'
2122
require 'opentelemetry/sdk/metrics/meter_provider'
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module SDK
9+
module Metrics
10+
# ForkHooks implements methods to run callbacks before and after forking a Process by overriding Process::_fork
11+
# This is used to ensure that the PeriodicMetricReader is restarted after forking
12+
module ForkHooks
13+
def self.attach!
14+
return if @fork_hooks_attached
15+
16+
Process.singleton_class.prepend(ForkHooks)
17+
@fork_hooks_attached = true
18+
end
19+
20+
def self.before_fork
21+
::OpenTelemetry.meter_provider.metric_readers.each do |reader|
22+
reader.before_fork if reader.respond_to?(:before_fork)
23+
end
24+
end
25+
26+
def self.after_fork
27+
::OpenTelemetry.meter_provider.metric_readers.each do |reader|
28+
reader.after_fork if reader.respond_to?(:after_fork)
29+
end
30+
end
31+
32+
def _fork
33+
ForkHooks.before_fork
34+
parent_pid = Process.pid
35+
super.tap do
36+
ForkHooks.after_fork unless Process.pid == parent_pid
37+
end
38+
end
39+
end
40+
end
41+
end
42+
end
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require 'test_helper'
8+
require 'opentelemetry-exporter-otlp-metrics' unless RUBY_ENGINE == 'jruby'
9+
10+
describe OpenTelemetry::SDK::Metrics::ForkHooks do
11+
def fork_with_fork_hooks(before_fork_lambda, after_fork_lambda)
12+
with_pipe do |inner_read_io, inner_write_io|
13+
child_pid = fork do # fork twice to avoid prepending fork in the parent process
14+
setup_fork_hooks(before_fork_lambda, after_fork_lambda) do
15+
grandchild_pid = fork {}
16+
Process.waitpid(grandchild_pid)
17+
inner_write_io.puts grandchild_pid
18+
end
19+
end
20+
Process.waitpid(child_pid)
21+
grandchild_pid = inner_read_io.gets.chomp.to_i
22+
refute_equal(child_pid, Process.pid)
23+
refute_equal(child_pid, grandchild_pid)
24+
[child_pid, grandchild_pid]
25+
end
26+
end
27+
28+
def setup_fork_hooks(before_hook, after_hook)
29+
OpenTelemetry::SDK::Metrics::ForkHooks.stub(:before_fork, before_hook) do
30+
OpenTelemetry::SDK::Metrics::ForkHooks.stub(:after_fork, after_hook) do
31+
Process.singleton_class.prepend(OpenTelemetry::SDK::Metrics::ForkHooks)
32+
yield if block_given?
33+
end
34+
end
35+
end
36+
37+
def with_pipe
38+
read_io, write_io = IO.pipe
39+
yield(read_io, write_io)
40+
ensure
41+
read_io.close unless read_io.closed?
42+
write_io.close unless write_io.closed?
43+
end
44+
45+
it 'runs the before_hook before forking' do
46+
with_pipe do |inner_read_io, inner_write_io|
47+
before_fork_lambda = proc do
48+
inner_write_io.puts "before_fork was called on #{Process.pid}"
49+
end
50+
after_fork_lambda = proc {}
51+
forking_pid, _forked_pid = fork_with_fork_hooks(before_fork_lambda, after_fork_lambda)
52+
53+
before_fork_message = inner_read_io.gets.chomp
54+
assert_equal(before_fork_message, "before_fork was called on #{forking_pid}")
55+
end
56+
end
57+
58+
it 'runs the after_hook after forking' do
59+
with_pipe do |after_fork_read_io, after_fork_write_io|
60+
before_fork_lambda = proc {}
61+
after_fork_lambda = proc do
62+
after_fork_write_io.puts Process.pid
63+
end
64+
65+
forking_pid, forked_pid = fork_with_fork_hooks(before_fork_lambda, after_fork_lambda)
66+
pid_from_after_fork = after_fork_read_io.gets.chomp.to_i
67+
68+
refute_equal(pid_from_after_fork, Process.pid)
69+
refute_equal(pid_from_after_fork, forking_pid)
70+
assert_equal(forked_pid, pid_from_after_fork)
71+
end
72+
end
73+
74+
it 'calls before_fork on metric readers' do
75+
reader1 = Class.new do
76+
attr_reader :before_fork_called
77+
78+
def before_fork
79+
@before_fork_called = true
80+
end
81+
end.new
82+
83+
reader2 = OpenStruct.new
84+
85+
meter_provider = OpenTelemetry::SDK::Metrics::MeterProvider.new
86+
meter_provider.add_metric_reader(reader1)
87+
meter_provider.add_metric_reader(reader2)
88+
::OpenTelemetry.stub(:meter_provider, meter_provider) do
89+
OpenTelemetry::SDK::Metrics::ForkHooks.before_fork
90+
end
91+
assert(reader1.before_fork_called)
92+
end
93+
94+
it 'calls after_fork on metric readers' do
95+
reader1 = Class.new do
96+
attr_reader :after_fork_called
97+
98+
def after_fork
99+
@after_fork_called = true
100+
end
101+
end.new
102+
103+
reader2 = OpenStruct.new
104+
105+
meter_provider = OpenTelemetry::SDK::Metrics::MeterProvider.new
106+
meter_provider.add_metric_reader(reader1)
107+
meter_provider.add_metric_reader(reader2)
108+
::OpenTelemetry.stub(:meter_provider, meter_provider) do
109+
OpenTelemetry::SDK::Metrics::ForkHooks.after_fork
110+
end
111+
assert(reader1.after_fork_called)
112+
end
113+
end

0 commit comments

Comments
 (0)