Skip to content

Commit ddc1a41

Browse files
committed
fix: observable instruments should not export stale series (#5950)
Per the OpenTelemetry spec, MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection for asynchronous instruments. The SDK was re-exporting the last known value indefinitely when an observable callback stopped reporting a series. Fixes #5950
1 parent 5e83e37 commit ddc1a41

File tree

5 files changed

+303
-3
lines changed

5 files changed

+303
-3
lines changed

src/OpenTelemetry/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ Notes](../../RELEASENOTES.md).
66

77
## Unreleased
88

9+
* Fixed observable instruments (ObservableCounter, ObservableUpDownCounter,
10+
ObservableGauge) continuing to export stale data points after a callback
11+
stops reporting a series. Per the spec, only measurements recorded since
12+
the previous collection should be exported for asynchronous instruments.
13+
([#5950](https://github.com/open-telemetry/opentelemetry-dotnet/issues/5950))
14+
915
## 1.15.0
1016

1117
Released 2026-Jan-21

src/OpenTelemetry/Metrics/AggregatorStore.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ internal sealed class AggregatorStore
2020
internal readonly HashSet<string>? TagKeysInteresting;
2121
#endif
2222
internal readonly bool OutputDelta;
23+
internal readonly bool IsAsynchronous;
2324
internal readonly int NumberOfMetricPoints;
2425
internal readonly ConcurrentDictionary<Tags, LookupData>? TagsToMetricPointIndexDictionaryDelta;
2526
internal readonly Func<ExemplarReservoir?>? ExemplarReservoirFactory;
@@ -74,6 +75,7 @@ internal AggregatorStore(
7475
this.currentMetricPointBatch = new int[this.NumberOfMetricPoints];
7576
this.aggType = aggType;
7677
this.OutputDelta = temporality == AggregationTemporality.Delta;
78+
this.IsAsynchronous = metricStreamIdentity.IsAsynchronous;
7779
this.histogramExplicitBounds = new(metricStreamIdentity.HistogramBucketBounds ?? FindDefaultHistogramBounds(in metricStreamIdentity));
7880
this.exponentialHistogramMaxSize = metricStreamIdentity.ExponentialHistogramMaxSize;
7981
this.exponentialHistogramMaxScale = metricStreamIdentity.ExponentialHistogramMaxScale;
@@ -288,6 +290,14 @@ internal void SnapshotCumulative(int indexSnapshot)
288290
continue;
289291
}
290292

293+
// For asynchronous instruments, only export points observed this cycle.
294+
// Synchronous instruments carry forward all cumulative values.
295+
if (this.IsAsynchronous
296+
&& metricPoint.MetricPointStatus == MetricPointStatus.NoCollectPending)
297+
{
298+
continue;
299+
}
300+
291301
this.TakeMetricPointSnapshot(ref metricPoint, outputDelta: false);
292302

293303
this.currentMetricPointBatch[this.batchSize] = i;

src/OpenTelemetry/Metrics/MetricPoint/MetricPoint.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,13 @@ internal void TakeSnapshot(bool outputDelta)
637637
else
638638
{
639639
this.snapshotValue.AsLong = Interlocked.Read(ref this.runningValue.AsLong);
640+
641+
// For asynchronous instruments, reset status so that points
642+
// not reported in the next callback are treated as stale.
643+
if (this.aggregatorStore.IsAsynchronous)
644+
{
645+
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
646+
}
640647
}
641648

642649
break;
@@ -662,6 +669,13 @@ internal void TakeSnapshot(bool outputDelta)
662669
else
663670
{
664671
this.snapshotValue.AsDouble = InterlockedHelper.Read(ref this.runningValue.AsDouble);
672+
673+
// For asynchronous instruments, reset status so that points
674+
// not reported in the next callback are treated as stale.
675+
if (this.aggregatorStore.IsAsynchronous)
676+
{
677+
this.MetricPointStatus = MetricPointStatus.NoCollectPending;
678+
}
665679
}
666680

667681
break;

src/OpenTelemetry/Metrics/MetricStreamIdentity.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,26 @@ public MetricStreamIdentity(Instrument instrument, MetricStreamConfiguration? me
129129
|| this.InstrumentType == typeof(Histogram<float>)
130130
|| this.InstrumentType == typeof(Histogram<double>);
131131

132+
public bool IsAsynchronous =>
133+
this.InstrumentType == typeof(ObservableCounter<long>)
134+
|| this.InstrumentType == typeof(ObservableCounter<int>)
135+
|| this.InstrumentType == typeof(ObservableCounter<short>)
136+
|| this.InstrumentType == typeof(ObservableCounter<byte>)
137+
|| this.InstrumentType == typeof(ObservableCounter<float>)
138+
|| this.InstrumentType == typeof(ObservableCounter<double>)
139+
|| this.InstrumentType == typeof(ObservableUpDownCounter<long>)
140+
|| this.InstrumentType == typeof(ObservableUpDownCounter<int>)
141+
|| this.InstrumentType == typeof(ObservableUpDownCounter<short>)
142+
|| this.InstrumentType == typeof(ObservableUpDownCounter<byte>)
143+
|| this.InstrumentType == typeof(ObservableUpDownCounter<float>)
144+
|| this.InstrumentType == typeof(ObservableUpDownCounter<double>)
145+
|| this.InstrumentType == typeof(ObservableGauge<long>)
146+
|| this.InstrumentType == typeof(ObservableGauge<int>)
147+
|| this.InstrumentType == typeof(ObservableGauge<short>)
148+
|| this.InstrumentType == typeof(ObservableGauge<byte>)
149+
|| this.InstrumentType == typeof(ObservableGauge<float>)
150+
|| this.InstrumentType == typeof(ObservableGauge<double>);
151+
132152
public static bool operator ==(MetricStreamIdentity metricIdentity1, MetricStreamIdentity metricIdentity2) => metricIdentity1.Equals(metricIdentity2);
133153

134154
public static bool operator !=(MetricStreamIdentity metricIdentity1, MetricStreamIdentity metricIdentity2) => !metricIdentity1.Equals(metricIdentity2);

test/OpenTelemetry.Tests/Metrics/MetricApiTests.cs

Lines changed: 253 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,13 +1062,11 @@ public void ObservableUpDownCounterAggregationTest(bool exportDelta)
10621062
Assert.Equal(30, sumReceived);
10631063
}
10641064

1065-
[Theory(Skip = "Known issue. See https://github.com/open-telemetry/opentelemetry-dotnet/issues/5950")]
1065+
[Theory]
10661066
[InlineData(MetricReaderTemporalityPreference.Delta)]
10671067
[InlineData(MetricReaderTemporalityPreference.Cumulative)]
10681068
public void ObservableUpDownCounterReportsActiveMeasurementsOnlyTest(MetricReaderTemporalityPreference temporality)
10691069
{
1070-
// dotnet test --filter "FullyQualifiedName~ObservableUpDownCounterReportsActiveMeasurementsOnlyTest" --framework net10.0
1071-
// Testing
10721070
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader
10731071
// For asynchronous instruments with Delta or Cumulative aggregation
10741072
// temporality, MetricReader.Collect MUST only receive data points with
@@ -1169,6 +1167,258 @@ public void ObservableUpDownCounterReportsActiveMeasurementsOnlyTest(MetricReade
11691167
Assert.Empty(exportedItems);
11701168
}
11711169

1170+
[Theory]
1171+
[InlineData(MetricReaderTemporalityPreference.Delta)]
1172+
[InlineData(MetricReaderTemporalityPreference.Cumulative)]
1173+
public void ObservableGaugeReportsActiveMeasurementsOnlyTest(MetricReaderTemporalityPreference temporality)
1174+
{
1175+
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader
1176+
// For asynchronous instruments with Delta or Cumulative aggregation
1177+
// temporality, MetricReader.Collect MUST only receive data points with
1178+
// measurements recorded since the previous collection. These rules
1179+
// apply to all metrics, not just those whose point kinds includes an
1180+
// aggregation temporality field.
1181+
1182+
var exportedItems = new List<Metric>();
1183+
var tags1 = new List<KeyValuePair<string, object?>>
1184+
{
1185+
new("key", "value1"),
1186+
};
1187+
1188+
var tags2 = new List<KeyValuePair<string, object?>>
1189+
{
1190+
new("key", "value2"),
1191+
};
1192+
1193+
int callbackInvocationCount = 0;
1194+
1195+
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{temporality}");
1196+
var gauge = meter.CreateObservableGauge(
1197+
"observable-gauge",
1198+
() =>
1199+
{
1200+
callbackInvocationCount++;
1201+
if (callbackInvocationCount == 1)
1202+
{
1203+
return new List<Measurement<long>>
1204+
{
1205+
new(10L, tags1),
1206+
new(20L, tags2),
1207+
};
1208+
}
1209+
else if (callbackInvocationCount == 2)
1210+
{
1211+
return new List<Measurement<long>>
1212+
{
1213+
new(30L, tags1),
1214+
};
1215+
}
1216+
else
1217+
{
1218+
return new List<Measurement<long>>();
1219+
}
1220+
});
1221+
1222+
using var container = BuildMeterProvider(out var meterProvider, builder => builder
1223+
.AddMeter(meter.Name)
1224+
.AddInMemoryExporter(exportedItems, metricReaderOptions =>
1225+
{
1226+
metricReaderOptions.TemporalityPreference = temporality;
1227+
}));
1228+
1229+
// Export 1: Should get both time series
1230+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1231+
Assert.Single(exportedItems);
1232+
var metric = exportedItems[0];
1233+
Assert.Equal("observable-gauge", metric.Name);
1234+
List<MetricPoint> metricPoints = [];
1235+
foreach (ref readonly var mp in metric.GetMetricPoints())
1236+
{
1237+
metricPoints.Add(mp);
1238+
}
1239+
1240+
Assert.Equal(2, metricPoints.Count);
1241+
1242+
var metricPoint1 = metricPoints[0];
1243+
Assert.Equal(10, metricPoint1.GetGaugeLastValueLong());
1244+
ValidateMetricPointTags(tags1, metricPoint1.Tags);
1245+
1246+
var metricPoint2 = metricPoints[1];
1247+
Assert.Equal(20, metricPoint2.GetGaugeLastValueLong());
1248+
ValidateMetricPointTags(tags2, metricPoint2.Tags);
1249+
1250+
// Export 2: Should get only tags1
1251+
exportedItems.Clear();
1252+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1253+
Assert.Single(exportedItems);
1254+
metric = exportedItems[0];
1255+
Assert.Equal("observable-gauge", metric.Name);
1256+
metricPoints.Clear();
1257+
foreach (ref readonly var mp in metric.GetMetricPoints())
1258+
{
1259+
metricPoints.Add(mp);
1260+
}
1261+
1262+
Assert.Single(metricPoints);
1263+
metricPoint1 = metricPoints[0];
1264+
Assert.Equal(30, metricPoint1.GetGaugeLastValueLong());
1265+
ValidateMetricPointTags(tags1, metricPoint1.Tags);
1266+
1267+
// Export 3: Should get nothing
1268+
exportedItems.Clear();
1269+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1270+
Assert.Empty(exportedItems);
1271+
}
1272+
1273+
[Theory]
1274+
[InlineData(MetricReaderTemporalityPreference.Delta)]
1275+
[InlineData(MetricReaderTemporalityPreference.Cumulative)]
1276+
public void ObservableCounterReportsActiveMeasurementsOnlyTest(MetricReaderTemporalityPreference temporality)
1277+
{
1278+
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader
1279+
// For asynchronous instruments with Delta or Cumulative aggregation
1280+
// temporality, MetricReader.Collect MUST only receive data points with
1281+
// measurements recorded since the previous collection. These rules
1282+
// apply to all metrics, not just those whose point kinds includes an
1283+
// aggregation temporality field.
1284+
1285+
var exportedItems = new List<Metric>();
1286+
var tags1 = new List<KeyValuePair<string, object?>>
1287+
{
1288+
new("key", "value1"),
1289+
};
1290+
1291+
var tags2 = new List<KeyValuePair<string, object?>>
1292+
{
1293+
new("key", "value2"),
1294+
};
1295+
1296+
int callbackInvocationCount = 0;
1297+
1298+
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{temporality}");
1299+
var counter = meter.CreateObservableCounter(
1300+
"observable-counter",
1301+
() =>
1302+
{
1303+
callbackInvocationCount++;
1304+
if (callbackInvocationCount == 1)
1305+
{
1306+
return new List<Measurement<long>>
1307+
{
1308+
new(10L, tags1),
1309+
new(10L, tags2),
1310+
};
1311+
}
1312+
else if (callbackInvocationCount == 2)
1313+
{
1314+
return new List<Measurement<long>>
1315+
{
1316+
new(20L, tags1),
1317+
};
1318+
}
1319+
else
1320+
{
1321+
return new List<Measurement<long>>();
1322+
}
1323+
});
1324+
1325+
using var container = BuildMeterProvider(out var meterProvider, builder => builder
1326+
.AddMeter(meter.Name)
1327+
.AddInMemoryExporter(exportedItems, metricReaderOptions =>
1328+
{
1329+
metricReaderOptions.TemporalityPreference = temporality;
1330+
}));
1331+
1332+
// Export 1: Should get both time series
1333+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1334+
Assert.Single(exportedItems);
1335+
var metric = exportedItems[0];
1336+
Assert.Equal("observable-counter", metric.Name);
1337+
List<MetricPoint> metricPoints = [];
1338+
foreach (ref readonly var mp in metric.GetMetricPoints())
1339+
{
1340+
metricPoints.Add(mp);
1341+
}
1342+
1343+
Assert.Equal(2, metricPoints.Count);
1344+
1345+
// Export 2: Should get only tags1
1346+
exportedItems.Clear();
1347+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1348+
Assert.Single(exportedItems);
1349+
metric = exportedItems[0];
1350+
Assert.Equal("observable-counter", metric.Name);
1351+
metricPoints.Clear();
1352+
foreach (ref readonly var mp in metric.GetMetricPoints())
1353+
{
1354+
metricPoints.Add(mp);
1355+
}
1356+
1357+
Assert.Single(metricPoints);
1358+
1359+
// Export 3: Should get nothing
1360+
exportedItems.Clear();
1361+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1362+
Assert.Empty(exportedItems);
1363+
}
1364+
1365+
[Theory]
1366+
[InlineData(MetricReaderTemporalityPreference.Delta)]
1367+
[InlineData(MetricReaderTemporalityPreference.Cumulative)]
1368+
public void SynchronousCounterCumulativeCarriesForwardTest(MetricReaderTemporalityPreference temporality)
1369+
{
1370+
var exportedItems = new List<Metric>();
1371+
var tags1 = new List<KeyValuePair<string, object?>>
1372+
{
1373+
new("key", "value1"),
1374+
};
1375+
1376+
using var meter = new Meter($"{Utils.GetCurrentMethodName()}.{temporality}");
1377+
var counter = meter.CreateCounter<long>("sync-counter");
1378+
1379+
using var container = BuildMeterProvider(out var meterProvider, builder => builder
1380+
.AddMeter(meter.Name)
1381+
.AddInMemoryExporter(exportedItems, metricReaderOptions =>
1382+
{
1383+
metricReaderOptions.TemporalityPreference = temporality;
1384+
}));
1385+
1386+
// Record a measurement then export
1387+
counter.Add(10, tags1.ToArray());
1388+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1389+
Assert.Single(exportedItems);
1390+
var metric = exportedItems[0];
1391+
Assert.Equal("sync-counter", metric.Name);
1392+
List<MetricPoint> metricPoints = [];
1393+
foreach (ref readonly var mp in metric.GetMetricPoints())
1394+
{
1395+
metricPoints.Add(mp);
1396+
}
1397+
1398+
Assert.Single(metricPoints);
1399+
Assert.Equal(10, metricPoints[0].GetSumLong());
1400+
1401+
// Export again without recording new measurements.
1402+
// For Cumulative temporality, synchronous counters must carry forward the total.
1403+
// For Delta temporality, they should report 0 delta (no new measurements).
1404+
exportedItems.Clear();
1405+
meterProvider.ForceFlush(MaxTimeToAllowForFlush);
1406+
1407+
if (temporality == MetricReaderTemporalityPreference.Cumulative)
1408+
{
1409+
Assert.Single(exportedItems);
1410+
metric = exportedItems[0];
1411+
metricPoints.Clear();
1412+
foreach (ref readonly var mp in metric.GetMetricPoints())
1413+
{
1414+
metricPoints.Add(mp);
1415+
}
1416+
1417+
Assert.Single(metricPoints);
1418+
Assert.Equal(10, metricPoints[0].GetSumLong());
1419+
}
1420+
}
1421+
11721422
[Theory]
11731423
[InlineData(true)]
11741424
[InlineData(false)]

0 commit comments

Comments
 (0)