Skip to content

Commit 007f9fe

Browse files
committed
PushMetricExporter hide implementation for ResourceMetrics
1 parent fa692d8 commit 007f9fe

File tree

19 files changed

+421
-245
lines changed

19 files changed

+421
-245
lines changed

opentelemetry-otlp/src/exporter/http/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use crate::metric::MetricsClient;
44
use http::{header::CONTENT_TYPE, Method};
55
use opentelemetry::otel_debug;
66
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
7-
use opentelemetry_sdk::metrics::data::ResourceMetrics;
7+
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;
88

99
use super::OtlpHttpClient;
1010

1111
impl MetricsClient for OtlpHttpClient {
12-
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
12+
async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult {
1313
let client = self
1414
.client
1515
.lock()

opentelemetry-otlp/src/exporter/http/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::time::Duration;
2727
mod metrics;
2828

2929
#[cfg(feature = "metrics")]
30-
use opentelemetry_sdk::metrics::data::ResourceMetrics;
30+
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;
3131

3232
#[cfg(feature = "logs")]
3333
pub(crate) mod logs;
@@ -326,7 +326,7 @@ impl OtlpHttpClient {
326326
#[cfg(feature = "metrics")]
327327
fn build_metrics_export_body(
328328
&self,
329-
metrics: &ResourceMetrics,
329+
metrics: ResourceMetrics<'_>,
330330
) -> Option<(Vec<u8>, &'static str)> {
331331
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
332332

opentelemetry-otlp/src/exporter/tonic/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{
66
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
77
};
88
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
9-
use opentelemetry_sdk::metrics::data::ResourceMetrics;
9+
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;
1010
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
1111

1212
use super::BoxInterceptor;
@@ -52,7 +52,7 @@ impl TonicMetricsClient {
5252
}
5353

5454
impl MetricsClient for TonicMetricsClient {
55-
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
55+
async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult {
5656
let (mut client, metadata, extensions) = self
5757
.inner
5858
.lock()

opentelemetry-otlp/src/metric.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ use crate::{ExporterBuildError, NoExporterBuilderSet};
1717
use core::fmt;
1818
use opentelemetry_sdk::error::OTelSdkResult;
1919

20-
use opentelemetry_sdk::metrics::{
21-
data::ResourceMetrics, exporter::PushMetricExporter, Temporality,
22-
};
20+
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;
21+
use opentelemetry_sdk::metrics::{exporter::PushMetricExporter, Temporality};
2322
use std::fmt::{Debug, Formatter};
2423
use std::time::Duration;
2524

@@ -123,7 +122,7 @@ impl HasHttpConfig for MetricExporterBuilder<HttpExporterBuilderSet> {
123122
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
124123
fn export(
125124
&self,
126-
metrics: &ResourceMetrics,
125+
metrics: ResourceMetrics<'_>,
127126
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
128127
fn shutdown(&self) -> OTelSdkResult;
129128
}
@@ -149,7 +148,7 @@ impl Debug for MetricExporter {
149148
}
150149

151150
impl PushMetricExporter for MetricExporter {
152-
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
151+
async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult {
153152
match &self.client {
154153
#[cfg(feature = "grpc-tonic")]
155154
SupportedTransportClient::Tonic(client) => client.export(metrics).await,

opentelemetry-proto/src/transform/metrics.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ pub mod tonic {
1111
use opentelemetry_sdk::metrics::data::{
1212
AggregatedMetrics, Exemplar as SdkExemplar,
1313
ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge,
14-
Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics,
15-
ScopeMetrics as SdkScopeMetrics, Sum as SdkSum,
14+
Histogram as SdkHistogram, MetricData, Sum as SdkSum,
1615
};
16+
use opentelemetry_sdk::metrics::exporter::{Metric, ResourceMetrics, ScopeMetrics};
1717
use opentelemetry_sdk::metrics::Temporality;
1818
use opentelemetry_sdk::Resource as SdkResource;
1919

@@ -110,12 +110,18 @@ pub mod tonic {
110110
}
111111
}
112112

113-
impl From<&ResourceMetrics> for ExportMetricsServiceRequest {
114-
fn from(rm: &ResourceMetrics) -> Self {
113+
impl From<ResourceMetrics<'_>> for ExportMetricsServiceRequest {
114+
fn from(rm: ResourceMetrics<'_>) -> Self {
115+
let mut scope_metrics = Vec::new();
116+
rm.scope_metrics.collect(|mut iter| {
117+
while let Some(scope_metric) = iter.next_scope_metrics() {
118+
scope_metrics.push(scope_metric.into());
119+
}
120+
});
115121
ExportMetricsServiceRequest {
116122
resource_metrics: vec![TonicResourceMetrics {
117-
resource: Some((&rm.resource).into()),
118-
scope_metrics: rm.scope_metrics.iter().map(Into::into).collect(),
123+
resource: Some(rm.resource.into()),
124+
scope_metrics,
119125
schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(),
120126
}],
121127
}
@@ -131,11 +137,15 @@ pub mod tonic {
131137
}
132138
}
133139

134-
impl From<&SdkScopeMetrics> for TonicScopeMetrics {
135-
fn from(sm: &SdkScopeMetrics) -> Self {
140+
impl From<ScopeMetrics<'_>> for TonicScopeMetrics {
141+
fn from(mut sm: ScopeMetrics<'_>) -> Self {
142+
let mut metrics = Vec::new();
143+
while let Some(metric) = sm.metrics.next_metric() {
144+
metrics.push(metric.into());
145+
}
136146
TonicScopeMetrics {
137-
scope: Some((&sm.scope, None).into()),
138-
metrics: sm.metrics.iter().map(Into::into).collect(),
147+
scope: Some((sm.scope, None).into()),
148+
metrics,
139149
schema_url: sm
140150
.scope
141151
.schema_url()
@@ -145,12 +155,12 @@ pub mod tonic {
145155
}
146156
}
147157

148-
impl From<&SdkMetric> for TonicMetric {
149-
fn from(metric: &SdkMetric) -> Self {
158+
impl From<Metric<'_>> for TonicMetric {
159+
fn from(metric: Metric<'_>) -> Self {
150160
TonicMetric {
151-
name: metric.name.to_string(),
152-
description: metric.description.to_string(),
153-
unit: metric.unit.to_string(),
161+
name: metric.instrument.name.to_string(),
162+
description: metric.instrument.description.to_string(),
163+
unit: metric.instrument.unit.to_string(),
154164
metadata: vec![], // internal and currently unused
155165
data: Some(match &metric.data {
156166
AggregatedMetrics::F64(data) => data.into(),

opentelemetry-sdk/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
## vNext
44

5+
- *Breaking* change for `PushMetricExporter::export` from accepting
6+
`metrics: &ResourceMetrics`, to accepting `metrics: ResourceMetrics<'_>`.
7+
In addition, `ResourceMetrics` was also changed to allow improving underlying
8+
metric collection without any allocations in the future.
9+
[#2957](https://github.com/open-telemetry/opentelemetry-rust/pull/2957)
10+
- *Breaking* change for `Metric::data` field: From dynamic `Box<dyn Aggregation>`
11+
to new enum `AggregatedMetrics`.
12+
[#2857](https://github.com/open-telemetry/opentelemetry-rust/pull/2857)
13+
514
- **Feature**: Added context based telemetry suppression. [#2868](https://github.com/open-telemetry/opentelemetry-rust/pull/2868)
615
- `SdkLogger`, `SdkTracer` modified to respect telemetry suppression based on
716
`Context`. In other words, if the current context has telemetry suppression
@@ -48,6 +57,7 @@ also modified to suppress telemetry before invoking exporters.
4857
- The `export` method on `PushMetricExporter` now accepts `&ResourceMetrics`
4958
instead of `&mut ResourceMetrics`.
5059

60+
5161
## 0.29.0
5262

5363
Released 2025-Mar-21

opentelemetry-sdk/benches/metric.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ use opentelemetry::{
66
use opentelemetry_sdk::{
77
error::OTelSdkResult,
88
metrics::{
9-
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
10-
InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View,
9+
new_view,
10+
reader::{MetricReader, ResourceMetricsData},
11+
Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream,
12+
Temporality, View,
1113
},
1214
Resource,
1315
};
@@ -23,7 +25,7 @@ impl MetricReader for SharedReader {
2325
self.0.register_pipeline(pipeline)
2426
}
2527

26-
fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
28+
fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult {
2729
self.0.collect(rm)
2830
}
2931

@@ -240,7 +242,7 @@ fn counters(c: &mut Criterion) {
240242
});
241243

242244
let (rdr, cntr) = bench_counter(None, "cumulative");
243-
let mut rm = ResourceMetrics {
245+
let mut rm = ResourceMetricsData {
244246
resource: Resource::builder_empty().build(),
245247
scope_metrics: Vec::new(),
246248
};
@@ -337,7 +339,7 @@ fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
337339
h.record(1, &[]);
338340
}
339341

340-
let mut rm = ResourceMetrics {
342+
let mut rm = ResourceMetricsData {
341343
resource: Resource::builder_empty().build(),
342344
scope_metrics: Vec::new(),
343345
};

opentelemetry-sdk/src/metrics/data/mod.rs

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,11 @@
11
//! Types for delivery of pre-aggregated metric time series data.
22
3-
use std::{borrow::Cow, time::SystemTime};
3+
use std::time::SystemTime;
44

5-
use opentelemetry::{InstrumentationScope, KeyValue};
6-
7-
use crate::Resource;
5+
use opentelemetry::KeyValue;
86

97
use super::Temporality;
108

11-
/// A collection of [ScopeMetrics] and the associated [Resource] that created them.
12-
#[derive(Debug)]
13-
pub struct ResourceMetrics {
14-
/// The entity that collected the metrics.
15-
pub resource: Resource,
16-
/// The collection of metrics with unique [InstrumentationScope]s.
17-
pub scope_metrics: Vec<ScopeMetrics>,
18-
}
19-
20-
/// A collection of metrics produced by a meter.
21-
#[derive(Default, Debug)]
22-
pub struct ScopeMetrics {
23-
/// The [InstrumentationScope] that the meter was created with.
24-
pub scope: InstrumentationScope,
25-
/// The list of aggregations created by the meter.
26-
pub metrics: Vec<Metric>,
27-
}
28-
29-
/// A collection of one or more aggregated time series from an [Instrument].
30-
///
31-
/// [Instrument]: crate::metrics::Instrument
32-
#[derive(Debug)]
33-
pub struct Metric {
34-
/// The name of the instrument that created this data.
35-
pub name: Cow<'static, str>,
36-
/// The description of the instrument, which can be used in documentation.
37-
pub description: Cow<'static, str>,
38-
/// The unit in which the instrument reports.
39-
pub unit: Cow<'static, str>,
40-
/// The aggregated data from an instrument.
41-
pub data: AggregatedMetrics,
42-
}
43-
449
/// Aggregated metrics data from an instrument
4510
#[derive(Debug)]
4611
pub enum AggregatedMetrics {

opentelemetry-sdk/src/metrics/exporter.rs

Lines changed: 110 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,116 @@
11
//! Interfaces for exporting metrics
22
3-
use crate::error::OTelSdkResult;
4-
use std::time::Duration;
3+
use opentelemetry::InstrumentationScope;
54

6-
use crate::metrics::data::ResourceMetrics;
5+
use crate::{error::OTelSdkResult, Resource};
6+
use std::{fmt::Debug, slice::Iter, time::Duration};
77

8-
use super::Temporality;
8+
use super::{
9+
data::AggregatedMetrics,
10+
reader::{ResourceMetricsData, ScopeMetricsData},
11+
InstrumentInfo, Temporality,
12+
};
13+
14+
/// Stores borrowed metrics and provide a way to collect them
15+
#[derive(Debug)]
16+
pub struct ScopeMetricsCollector<'a> {
17+
iter: ScopeMetricsLendingIter<'a>,
18+
}
19+
20+
impl ScopeMetricsCollector<'_> {
21+
/// Start collecting all metrics
22+
pub fn collect(self, process: impl FnOnce(ScopeMetricsLendingIter<'_>)) {
23+
process(self.iter)
24+
}
25+
}
26+
27+
/// A collection of [`BatchScopeMetrics`] and the associated [Resource] that created them.
28+
#[derive(Debug)]
29+
pub struct ResourceMetrics<'a> {
30+
/// The entity that collected the metrics.
31+
pub resource: &'a Resource,
32+
/// The collection of metrics with unique [InstrumentationScope]s.
33+
pub scope_metrics: ScopeMetricsCollector<'a>,
34+
}
35+
36+
/// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics.
37+
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
38+
pub struct ScopeMetricsLendingIter<'a> {
39+
iter: Iter<'a, ScopeMetricsData>,
40+
}
41+
42+
/// A collection of metrics produced by a [`InstrumentationScope`] meter.
43+
#[derive(Debug)]
44+
pub struct ScopeMetrics<'a> {
45+
/// The [InstrumentationScope] that the meter was created with.
46+
pub scope: &'a InstrumentationScope,
47+
/// The list of aggregations created by the meter.
48+
pub metrics: MetricsLendingIter<'a>,
49+
}
50+
51+
/// Iterator over aggregations created by the meter.
52+
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
53+
pub struct MetricsLendingIter<'a> {
54+
iter: Iter<'a, super::reader::MetricsData>,
55+
}
56+
57+
/// A collection of one or more aggregated time series from an [Instrument].
58+
///
59+
/// [Instrument]: crate::metrics::Instrument
60+
#[derive(Debug)]
61+
pub struct Metric<'a> {
62+
/// The name of the instrument that created this data.
63+
pub instrument: &'a InstrumentInfo,
64+
/// The aggregated data from an instrument.
65+
pub data: &'a AggregatedMetrics,
66+
}
67+
68+
impl<'a> ResourceMetrics<'a> {
69+
pub(crate) fn new(data: &'a ResourceMetricsData) -> Self {
70+
Self {
71+
resource: &data.resource,
72+
scope_metrics: ScopeMetricsCollector {
73+
iter: ScopeMetricsLendingIter {
74+
iter: data.scope_metrics.iter(),
75+
},
76+
},
77+
}
78+
}
79+
}
80+
81+
impl Debug for ScopeMetricsLendingIter<'_> {
82+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83+
f.debug_struct("BatchScopeMetrics").finish()
84+
}
85+
}
86+
87+
impl ScopeMetricsLendingIter<'_> {
88+
/// Advances the iterator and returns the next value.
89+
pub fn next_scope_metrics(&mut self) -> Option<ScopeMetrics<'_>> {
90+
self.iter.next().map(|item| ScopeMetrics {
91+
scope: &item.scope,
92+
metrics: MetricsLendingIter {
93+
iter: item.metrics.iter(),
94+
},
95+
})
96+
}
97+
}
98+
99+
impl MetricsLendingIter<'_> {
100+
/// Advances the iterator and returns the next value.
101+
pub fn next_metric(&mut self) -> Option<Metric<'_>> {
102+
self.iter.next().map(|item| Metric {
103+
instrument: &item.instrument,
104+
data: &item.data,
105+
})
106+
}
107+
}
108+
109+
impl Debug for MetricsLendingIter<'_> {
110+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111+
f.debug_struct("BatchMetrics").finish()
112+
}
113+
}
9114

10115
/// Exporter handles the delivery of metric data to external receivers.
11116
///
@@ -18,7 +123,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
18123
/// considered unrecoverable and will be logged.
19124
fn export(
20125
&self,
21-
metrics: &ResourceMetrics,
126+
metrics: ResourceMetrics<'_>,
22127
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
23128

24129
/// Flushes any metric data held by an exporter.

0 commit comments

Comments
 (0)