@@ -19,20 +19,30 @@ use crate::{__macros_impl::LogfireValue, LogfireTracer};
19
19
/// this layer.
20
20
pub struct LogfireTracingLayer < S > {
21
21
tracer : LogfireTracer ,
22
+ /// This odd structure with two inner layers is deliberate; we don't want to send any events
23
+ /// to the `otel_layer` and we only send (some) events to the `metrics_layer`.
22
24
otel_layer : tracing_opentelemetry:: OpenTelemetryLayer < S , opentelemetry_sdk:: trace:: Tracer > ,
25
+ metrics_layer : Option < tracing_opentelemetry:: MetricsLayer < S > > ,
23
26
}
24
27
25
28
impl < S > LogfireTracingLayer < S >
26
29
where
27
30
S : Subscriber + for < ' span > LookupSpan < ' span > ,
28
31
{
29
32
/// Create a new `LogfireTracingLayer` with the given tracer.
30
- pub ( crate ) fn new ( tracer : LogfireTracer ) -> Self {
33
+ pub ( crate ) fn new ( tracer : LogfireTracer , enable_tracing_metrics : bool ) -> Self {
31
34
let otel_layer = tracing_opentelemetry:: layer ( )
32
35
. with_error_records_to_exceptions ( true )
33
36
. with_tracer ( tracer. inner . clone ( ) ) ;
34
37
35
- LogfireTracingLayer { tracer, otel_layer }
38
+ let metrics_layer = enable_tracing_metrics
39
+ . then ( || tracing_opentelemetry:: MetricsLayer :: new ( tracer. meter_provider . clone ( ) ) ) ;
40
+
41
+ LogfireTracingLayer {
42
+ tracer,
43
+ otel_layer,
44
+ metrics_layer,
45
+ }
36
46
}
37
47
}
38
48
49
59
// Delegate to OpenTelemetry layer first
50
60
self . otel_layer . on_new_span ( attrs, id, ctx. clone ( ) ) ;
51
61
62
+ // Delegate to MetricsLayer as well
63
+ self . metrics_layer . on_new_span ( attrs, id, ctx. clone ( ) ) ;
64
+
52
65
// Add Logfire-specific attributes
53
66
let span = ctx. span ( id) . expect ( "span not found" ) ;
54
67
let mut extensions = span. extensions_mut ( ) ;
@@ -95,7 +108,23 @@ where
95
108
///
96
109
/// Instead we need to handle them here and write them to the logfire writer.
97
110
fn on_event ( & self , event : & tracing:: Event < ' _ > , ctx : tracing_subscriber:: layer:: Context < ' _ , S > ) {
98
- // Don't delegate events to OpenTelemetry layer, we emit them as log spans instead.
111
+ let is_metrics_event = self . metrics_layer . is_some ( )
112
+ && event. fields ( ) . any ( |field| {
113
+ let name = field. name ( ) ;
114
+
115
+ name. starts_with ( "counter." )
116
+ || name. starts_with ( "monotonic_counter." )
117
+ || name. starts_with ( "histogram." )
118
+ || name. starts_with ( "monotonic_histogram." )
119
+ } ) ;
120
+
121
+ // Allow the metrics layer to see all events, so it can record metrics as needed.
122
+ if is_metrics_event {
123
+ self . metrics_layer . on_event ( event, ctx. clone ( ) ) ;
124
+ }
125
+
126
+ // However we don't want to allow the opentelemetry layer to see events, it will record them
127
+ // as span events. Instead we handle them here and emit them as log spans.
99
128
let event_span = ctx. event_span ( event) . and_then ( |span| ctx. span ( & span. id ( ) ) ) ;
100
129
let mut event_span_extensions = event_span. as_ref ( ) . map ( |s| s. extensions_mut ( ) ) ;
101
130
@@ -161,6 +190,43 @@ where
161
190
unsafe { self . otel_layer . downcast_raw ( id) }
162
191
}
163
192
}
193
+
194
+ fn on_register_dispatch ( & self , subscriber : & tracing:: Dispatch ) {
195
+ self . otel_layer . on_register_dispatch ( subscriber) ;
196
+ self . metrics_layer . on_register_dispatch ( subscriber) ;
197
+ }
198
+
199
+ fn on_layer ( & mut self , subscriber : & mut S ) {
200
+ self . otel_layer . on_layer ( subscriber) ;
201
+ self . metrics_layer . on_layer ( subscriber) ;
202
+ }
203
+
204
+ fn enabled (
205
+ & self ,
206
+ metadata : & tracing:: Metadata < ' _ > ,
207
+ ctx : tracing_subscriber:: layer:: Context < ' _ , S > ,
208
+ ) -> bool {
209
+ self . otel_layer . enabled ( metadata, ctx. clone ( ) ) || self . metrics_layer . enabled ( metadata, ctx)
210
+ }
211
+
212
+ fn event_enabled (
213
+ & self ,
214
+ event : & tracing:: Event < ' _ > ,
215
+ ctx : tracing_subscriber:: layer:: Context < ' _ , S > ,
216
+ ) -> bool {
217
+ self . otel_layer . event_enabled ( event, ctx. clone ( ) )
218
+ || self . metrics_layer . event_enabled ( event, ctx)
219
+ }
220
+
221
+ fn on_id_change (
222
+ & self ,
223
+ old : & tracing:: span:: Id ,
224
+ new : & tracing:: span:: Id ,
225
+ ctx : tracing_subscriber:: layer:: Context < ' _ , S > ,
226
+ ) {
227
+ self . otel_layer . on_id_change ( old, new, ctx. clone ( ) ) ;
228
+ self . metrics_layer . on_id_change ( old, new, ctx) ;
229
+ }
164
230
}
165
231
166
232
/// Dummy struct to mark that we've already entered this span.
@@ -1971,4 +2037,233 @@ mod tests {
1971
2037
[2m1970-01-01T00:00:00.000009Z[0m[34m DEBUG[0m [2;3mopentelemetry_sdk::logs::logger_provider[0m [1m[0m [3mname[0m=LoggerProvider.ShutdownInvokedByUser
1972
2038
" ) ;
1973
2039
}
2040
+
2041
+ #[ tokio:: test]
2042
+ async fn test_tracing_metrics_layer ( ) {
2043
+ use crate :: test_utils:: make_deterministic_resource_metrics;
2044
+ use opentelemetry_sdk:: metrics:: {
2045
+ InMemoryMetricExporterBuilder , ManualReader , data:: ResourceMetrics ,
2046
+ exporter:: PushMetricExporter , reader:: MetricReader ,
2047
+ } ;
2048
+ use std:: sync:: Arc ;
2049
+
2050
+ #[ derive( Clone , Debug ) ]
2051
+ struct SharedManualReader {
2052
+ reader : Arc < ManualReader > ,
2053
+ }
2054
+
2055
+ impl SharedManualReader {
2056
+ fn new ( reader : ManualReader ) -> Self {
2057
+ Self {
2058
+ reader : Arc :: new ( reader) ,
2059
+ }
2060
+ }
2061
+
2062
+ async fn export < E : PushMetricExporter > ( & self , exporter : & E ) {
2063
+ let mut metrics = ResourceMetrics :: default ( ) ;
2064
+ self . reader . collect ( & mut metrics) . unwrap ( ) ;
2065
+ exporter. export ( & mut metrics) . await . unwrap ( ) ;
2066
+ }
2067
+ }
2068
+
2069
+ impl MetricReader for SharedManualReader {
2070
+ fn register_pipeline (
2071
+ & self ,
2072
+ pipeline : std:: sync:: Weak < opentelemetry_sdk:: metrics:: Pipeline > ,
2073
+ ) {
2074
+ self . reader . register_pipeline ( pipeline) ;
2075
+ }
2076
+
2077
+ fn collect (
2078
+ & self ,
2079
+ rm : & mut opentelemetry_sdk:: metrics:: data:: ResourceMetrics ,
2080
+ ) -> opentelemetry_sdk:: error:: OTelSdkResult {
2081
+ self . reader . collect ( rm)
2082
+ }
2083
+
2084
+ fn force_flush ( & self ) -> opentelemetry_sdk:: error:: OTelSdkResult {
2085
+ self . reader . force_flush ( )
2086
+ }
2087
+
2088
+ fn shutdown ( & self ) -> opentelemetry_sdk:: error:: OTelSdkResult {
2089
+ self . reader . shutdown ( )
2090
+ }
2091
+
2092
+ fn shutdown_with_timeout (
2093
+ & self ,
2094
+ timeout : std:: time:: Duration ,
2095
+ ) -> opentelemetry_sdk:: error:: OTelSdkResult {
2096
+ self . reader . shutdown_with_timeout ( timeout)
2097
+ }
2098
+
2099
+ fn temporality (
2100
+ & self ,
2101
+ kind : opentelemetry_sdk:: metrics:: InstrumentKind ,
2102
+ ) -> opentelemetry_sdk:: metrics:: Temporality {
2103
+ self . reader . temporality ( kind)
2104
+ }
2105
+ }
2106
+
2107
+ let mut exporter = InMemoryMetricExporterBuilder :: new ( ) . build ( ) ;
2108
+
2109
+ let reader = SharedManualReader :: new (
2110
+ ManualReader :: builder ( )
2111
+ . with_temporality ( opentelemetry_sdk:: metrics:: Temporality :: Delta )
2112
+ . build ( ) ,
2113
+ ) ;
2114
+
2115
+ let handler = crate :: configure ( )
2116
+ . local ( )
2117
+ . send_to_logfire ( false )
2118
+ . with_metrics ( Some (
2119
+ crate :: config:: MetricsOptions :: default ( ) . with_additional_reader ( reader. clone ( ) ) ,
2120
+ ) )
2121
+ . install_panic_handler ( )
2122
+ . with_default_level_filter ( LevelFilter :: TRACE )
2123
+ . with_advanced_options (
2124
+ AdvancedOptions :: default ( )
2125
+ . with_resource (
2126
+ opentelemetry_sdk:: Resource :: builder_empty ( )
2127
+ . with_service_name ( "test" )
2128
+ . build ( ) ,
2129
+ )
2130
+ . with_id_generator ( DeterministicIdGenerator :: new ( ) )
2131
+ . with_tracing_metrics ( true ) ,
2132
+ )
2133
+ . finish ( )
2134
+ . unwrap ( ) ;
2135
+
2136
+ let _guard = set_local_logfire ( handler. clone ( ) ) ;
2137
+
2138
+ tracing:: info!( counter. test_counter = 1 , "test counter event" ) ;
2139
+ tracing:: info!( histogram. test_histogram = 2.5 , "test histogram event" ) ;
2140
+ tracing:: info!(
2141
+ monotonic_counter. test_monotonic = 3 ,
2142
+ "test monotonic counter event"
2143
+ ) ;
2144
+
2145
+ tracing:: info!( counter. test_counter = 2 , "test counter event" ) ;
2146
+ tracing:: info!( histogram. test_histogram = 3.5 , "test histogram event" ) ;
2147
+ tracing:: info!(
2148
+ monotonic_counter. test_monotonic = 4 ,
2149
+ "test monotonic counter event"
2150
+ ) ;
2151
+
2152
+ reader. export ( & mut exporter) . await ;
2153
+
2154
+ tracing:: info!( counter. test_counter = 3 , "test counter event" ) ;
2155
+ tracing:: info!( histogram. test_histogram = 4.5 , "test histogram event" ) ;
2156
+ tracing:: info!(
2157
+ monotonic_counter. test_monotonic = 5 ,
2158
+ "test monotonic counter event"
2159
+ ) ;
2160
+
2161
+ reader. export ( & mut exporter) . await ;
2162
+
2163
+ handler. shutdown ( ) . unwrap ( ) ;
2164
+
2165
+ let metrics = exporter. get_finished_metrics ( ) . unwrap ( ) ;
2166
+ let metrics = make_deterministic_resource_metrics ( metrics) ;
2167
+
2168
+ assert_debug_snapshot ! ( metrics, @r#"
2169
+ [
2170
+ DeterministicResourceMetrics {
2171
+ resource: Resource {
2172
+ inner: ResourceInner {
2173
+ attrs: {
2174
+ Static(
2175
+ "service.name",
2176
+ ): String(
2177
+ Static(
2178
+ "test",
2179
+ ),
2180
+ ),
2181
+ },
2182
+ schema_url: None,
2183
+ },
2184
+ },
2185
+ scope_metrics: [
2186
+ DeterministicScopeMetrics {
2187
+ scope: InstrumentationScope {
2188
+ name: "tracing/tracing-opentelemetry",
2189
+ version: Some(
2190
+ "0.31.0",
2191
+ ),
2192
+ schema_url: None,
2193
+ attributes: [],
2194
+ },
2195
+ metrics: [
2196
+ DeterministicMetric {
2197
+ name: "test_counter",
2198
+ values: [
2199
+ 3,
2200
+ ],
2201
+ },
2202
+ DeterministicMetric {
2203
+ name: "test_histogram",
2204
+ values: [
2205
+ 2,
2206
+ ],
2207
+ },
2208
+ DeterministicMetric {
2209
+ name: "test_monotonic",
2210
+ values: [
2211
+ 7,
2212
+ ],
2213
+ },
2214
+ ],
2215
+ },
2216
+ ],
2217
+ },
2218
+ DeterministicResourceMetrics {
2219
+ resource: Resource {
2220
+ inner: ResourceInner {
2221
+ attrs: {
2222
+ Static(
2223
+ "service.name",
2224
+ ): String(
2225
+ Static(
2226
+ "test",
2227
+ ),
2228
+ ),
2229
+ },
2230
+ schema_url: None,
2231
+ },
2232
+ },
2233
+ scope_metrics: [
2234
+ DeterministicScopeMetrics {
2235
+ scope: InstrumentationScope {
2236
+ name: "tracing/tracing-opentelemetry",
2237
+ version: Some(
2238
+ "0.31.0",
2239
+ ),
2240
+ schema_url: None,
2241
+ attributes: [],
2242
+ },
2243
+ metrics: [
2244
+ DeterministicMetric {
2245
+ name: "test_counter",
2246
+ values: [
2247
+ 6,
2248
+ ],
2249
+ },
2250
+ DeterministicMetric {
2251
+ name: "test_histogram",
2252
+ values: [
2253
+ 1,
2254
+ ],
2255
+ },
2256
+ DeterministicMetric {
2257
+ name: "test_monotonic",
2258
+ values: [
2259
+ 5,
2260
+ ],
2261
+ },
2262
+ ],
2263
+ },
2264
+ ],
2265
+ },
2266
+ ]
2267
+ "# ) ;
2268
+ }
1974
2269
}
0 commit comments