From f4e321342f0b0e574beeff9f779db185afeab101 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Thu, 26 Dec 2024 09:01:39 +0100 Subject: [PATCH] feat(unstable): add OTEL MeterProvider (#27240) This commit replaces `Deno.telemetry.MetricsExporter` with `Deno.telemetry.MeterProvider`. Signed-off-by: Luca Casonato Co-authored-by: snek --- cli/args/flags.rs | 2 + ext/telemetry/lib.rs | 1451 ++++++++++++--------- ext/telemetry/telemetry.ts | 852 +++++++----- tests/specs/cli/otel_basic/__test__.jsonc | 21 +- tests/specs/cli/otel_basic/metric.out | 284 ++++ tests/specs/cli/otel_basic/metric.ts | 87 +- 6 files changed, 1764 insertions(+), 933 deletions(-) diff --git a/cli/args/flags.rs b/cli/args/flags.rs index 418edcf34b..2b0b9a2908 100644 --- a/cli/args/flags.rs +++ b/cli/args/flags.rs @@ -1006,6 +1006,8 @@ impl Flags { OtelConfig { tracing_enabled: !disabled && otel_var("OTEL_DENO_TRACING").unwrap_or(default), + metrics_enabled: !disabled + && otel_var("OTEL_DENO_METRICS").unwrap_or(default), console: match std::env::var("OTEL_DENO_CONSOLE").as_deref() { Ok(_) if disabled => OtelConsoleConfig::Ignore, Ok("ignore") => OtelConsoleConfig::Ignore, diff --git a/ext/telemetry/lib.rs b/ext/telemetry/lib.rs index 816e838743..8018843dc4 100644 --- a/ext/telemetry/lib.rs +++ b/ext/telemetry/lib.rs @@ -6,16 +6,22 @@ use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc::UnboundedSender; use deno_core::futures::future::BoxFuture; use deno_core::futures::stream; +use deno_core::futures::FutureExt; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::v8; +use deno_core::GarbageCollected; use deno_core::OpState; use once_cell::sync::Lazy; use once_cell::sync::OnceCell; use opentelemetry::logs::AnyValue; use opentelemetry::logs::LogRecord as LogRecordTrait; use opentelemetry::logs::Severity; +use opentelemetry::metrics::AsyncInstrumentBuilder; +use opentelemetry::metrics::InstrumentBuilder; +use opentelemetry::metrics::MeterProvider; +use opentelemetry::otel_debug; use opentelemetry::otel_error; use opentelemetry::trace::SpanContext; use opentelemetry::trace::SpanId; @@ -28,7 +34,6 @@ use opentelemetry::KeyValue; use opentelemetry::StringValue; use opentelemetry::Value; use opentelemetry_otlp::HttpExporterBuilder; -use opentelemetry_otlp::MetricExporter; use opentelemetry_otlp::Protocol; use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithHttpConfig; @@ -36,10 +41,11 @@ use opentelemetry_sdk::export::trace::SpanData; use opentelemetry_sdk::logs::BatchLogProcessor; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::LogRecord; -use opentelemetry_sdk::metrics::data::Metric; -use opentelemetry_sdk::metrics::data::ResourceMetrics; -use opentelemetry_sdk::metrics::data::ScopeMetrics; use opentelemetry_sdk::metrics::exporter::PushMetricExporter; +use opentelemetry_sdk::metrics::reader::MetricReader; +use opentelemetry_sdk::metrics::ManualReader; +use opentelemetry_sdk::metrics::MetricResult; +use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry_sdk::metrics::Temporality; use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::SpanProcessor; @@ -52,14 +58,21 @@ use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_VERSION; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; +use std::cell::RefCell; +use std::collections::HashMap; use std::env; use std::fmt::Debug; use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; use std::task::Context; use std::task::Poll; use std::thread; use std::time::Duration; use std::time::SystemTime; +use tokio::sync::oneshot; +use tokio::task::JoinSet; deno_core::extension!( deno_telemetry, @@ -75,23 +88,24 @@ deno_core::extension!( op_otel_span_attribute3, op_otel_span_set_dropped, op_otel_span_flush, - op_otel_metrics_resource_attribute, - op_otel_metrics_resource_attribute2, - op_otel_metrics_resource_attribute3, - op_otel_metrics_scope, - op_otel_metrics_sum, - op_otel_metrics_gauge, - op_otel_metrics_sum_or_gauge_data_point, - op_otel_metrics_histogram, - op_otel_metrics_histogram_data_point, - op_otel_metrics_histogram_data_point_entry_final, - op_otel_metrics_histogram_data_point_entry1, - op_otel_metrics_histogram_data_point_entry2, - op_otel_metrics_histogram_data_point_entry3, - op_otel_metrics_data_point_attribute, - op_otel_metrics_data_point_attribute2, - op_otel_metrics_data_point_attribute3, - op_otel_metrics_submit, + op_otel_metric_create_counter, + op_otel_metric_create_up_down_counter, + op_otel_metric_create_gauge, + op_otel_metric_create_histogram, + op_otel_metric_create_observable_counter, + op_otel_metric_create_observable_gauge, + op_otel_metric_create_observable_up_down_counter, + op_otel_metric_attribute3, + op_otel_metric_record0, + op_otel_metric_record1, + op_otel_metric_record2, + op_otel_metric_record3, + op_otel_metric_observable_record0, + op_otel_metric_observable_record1, + op_otel_metric_observable_record2, + op_otel_metric_observable_record3, + op_otel_metric_wait_to_observe, + op_otel_metric_observation_done, ], esm = ["telemetry.ts", "util.ts"], ); @@ -105,6 +119,7 @@ pub struct OtelRuntimeConfig { #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct OtelConfig { pub tracing_enabled: bool, + pub metrics_enabled: bool, pub console: OtelConsoleConfig, pub deterministic: bool, } @@ -113,6 +128,7 @@ impl OtelConfig { pub fn as_v8(&self) -> Box<[u8]> { Box::new([ self.tracing_enabled as u8, + self.metrics_enabled as u8, self.console as u8, self.deterministic as u8, ]) @@ -137,6 +153,10 @@ static OTEL_SHARED_RUNTIME_SPAWN_TASK_TX: Lazy< UnboundedSender>, > = Lazy::new(otel_create_shared_runtime); +static OTEL_PRE_COLLECT_CALLBACKS: Lazy< + Mutex>>>, +> = Lazy::new(Default::default); + fn otel_create_shared_runtime() -> UnboundedSender> { let (spawn_task_tx, mut spawn_task_rx) = mpsc::unbounded::>(); @@ -273,6 +293,181 @@ impl Stream for BatchMessageChannelReceiver { } } +enum DenoPeriodicReaderMessage { + Register(std::sync::Weak), + Export, + ForceFlush(oneshot::Sender>), + Shutdown(oneshot::Sender>), +} + +#[derive(Debug)] +struct DenoPeriodicReader { + tx: tokio::sync::mpsc::Sender, + temporality: Temporality, +} + +impl MetricReader for DenoPeriodicReader { + fn register_pipeline( + &self, + pipeline: std::sync::Weak, + ) { + let _ = self + .tx + .try_send(DenoPeriodicReaderMessage::Register(pipeline)); + } + + fn collect( + &self, + _rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics, + ) -> opentelemetry_sdk::metrics::MetricResult<()> { + unreachable!("collect should not be called on DenoPeriodicReader"); + } + + fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.try_send(DenoPeriodicReaderMessage::ForceFlush(tx)); + deno_core::futures::executor::block_on(rx).unwrap()?; + Ok(()) + } + + fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.try_send(DenoPeriodicReaderMessage::Shutdown(tx)); + deno_core::futures::executor::block_on(rx).unwrap()?; + Ok(()) + } + + fn temporality( + &self, + _kind: opentelemetry_sdk::metrics::InstrumentKind, + ) -> Temporality { + self.temporality + } +} + +const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; +const DEFAULT_INTERVAL: Duration = Duration::from_secs(60); + +impl DenoPeriodicReader { + fn new(exporter: opentelemetry_otlp::MetricExporter) -> Self { + let interval = env::var(METRIC_EXPORT_INTERVAL_NAME) + .ok() + .and_then(|v| v.parse().map(Duration::from_millis).ok()) + .unwrap_or(DEFAULT_INTERVAL); + + let (tx, mut rx) = tokio::sync::mpsc::channel(256); + + let temporality = PushMetricExporter::temporality(&exporter); + + let worker = async move { + let inner = ManualReader::builder() + .with_temporality(PushMetricExporter::temporality(&exporter)) + .build(); + + let collect_and_export = |collect_observed: bool| { + let inner = &inner; + let exporter = &exporter; + async move { + let mut resource_metrics = + opentelemetry_sdk::metrics::data::ResourceMetrics { + resource: Default::default(), + scope_metrics: Default::default(), + }; + if collect_observed { + let callbacks = { + let mut callbacks = OTEL_PRE_COLLECT_CALLBACKS.lock().unwrap(); + std::mem::take(&mut *callbacks) + }; + let mut futures = JoinSet::new(); + for callback in callbacks { + let (tx, rx) = oneshot::channel(); + if let Ok(()) = callback.send(tx) { + futures.spawn(rx); + } + } + while futures.join_next().await.is_some() {} + } + inner.collect(&mut resource_metrics)?; + if resource_metrics.scope_metrics.is_empty() { + return Ok(()); + } + exporter.export(&mut resource_metrics).await?; + Ok(()) + } + }; + + let mut ticker = tokio::time::interval(interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + ticker.tick().await; + + loop { + let message = tokio::select! { + _ = ticker.tick() => DenoPeriodicReaderMessage::Export, + message = rx.recv() => if let Some(message) = message { + message + } else { + break; + }, + }; + + match message { + DenoPeriodicReaderMessage::Register(new_pipeline) => { + inner.register_pipeline(new_pipeline); + } + DenoPeriodicReaderMessage::Export => { + otel_debug!( + name: "DenoPeriodicReader.ExportTriggered", + message = "Export message received.", + ); + if let Err(err) = collect_and_export(true).await { + otel_error!( + name: "DenoPeriodicReader.ExportFailed", + message = "Failed to export metrics", + reason = format!("{}", err)); + } + } + DenoPeriodicReaderMessage::ForceFlush(sender) => { + otel_debug!( + name: "DenoPeriodicReader.ForceFlushCalled", + message = "Flush message received.", + ); + let res = collect_and_export(false).await; + if let Err(send_error) = sender.send(res) { + otel_debug!( + name: "DenoPeriodicReader.Flush.SendResultError", + message = "Failed to send flush result.", + reason = format!("{:?}", send_error), + ); + } + } + DenoPeriodicReaderMessage::Shutdown(sender) => { + otel_debug!( + name: "DenoPeriodicReader.ShutdownCalled", + message = "Shutdown message received", + ); + let res = collect_and_export(false).await; + let _ = exporter.shutdown(); + if let Err(send_error) = sender.send(res) { + otel_debug!( + name: "DenoPeriodicReader.Shutdown.SendResultError", + message = "Failed to send shutdown result", + reason = format!("{:?}", send_error), + ); + } + break; + } + } + } + }; + + (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX) + .unbounded_send(worker.boxed()) + .expect("failed to send task to shared OpenTelemetry runtime"); + + DenoPeriodicReader { tx, temporality } + } +} + mod hyper_client { use http_body_util::BodyExt; use http_body_util::Full; @@ -353,66 +548,10 @@ mod hyper_client { } } -enum MetricProcessorMessage { - ResourceMetrics(ResourceMetrics), - Flush(tokio::sync::oneshot::Sender<()>), -} - -struct MetricProcessor { - tx: tokio::sync::mpsc::Sender, -} - -impl MetricProcessor { - fn new(exporter: MetricExporter) -> Self { - let (tx, mut rx) = tokio::sync::mpsc::channel(2048); - let future = async move { - while let Some(message) = rx.recv().await { - match message { - MetricProcessorMessage::ResourceMetrics(mut rm) => { - if let Err(err) = exporter.export(&mut rm).await { - otel_error!( - name: "MetricProcessor.Export.Error", - error = format!("{}", err) - ); - } - } - MetricProcessorMessage::Flush(tx) => { - if let Err(()) = tx.send(()) { - otel_error!( - name: "MetricProcessor.Flush.SendResultError", - error = "()", - ); - } - } - } - } - }; - - (*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX) - .unbounded_send(Box::pin(future)) - .expect("failed to send task to shared OpenTelemetry runtime"); - - Self { tx } - } - - fn submit(&self, rm: ResourceMetrics) { - let _ = self - .tx - .try_send(MetricProcessorMessage::ResourceMetrics(rm)); - } - - fn force_flush(&self) -> Result<(), anyhow::Error> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.tx.try_send(MetricProcessorMessage::Flush(tx))?; - deno_core::futures::executor::block_on(rx)?; - Ok(()) - } -} - struct Processors { spans: BatchSpanProcessor, logs: BatchLogProcessor, - metrics: MetricProcessor, + meter_provider: SdkMeterProvider, } static OTEL_PROCESSORS: OnceCell = OnceCell::new(); @@ -421,7 +560,7 @@ static BUILT_IN_INSTRUMENTATION_SCOPE: OnceCell< opentelemetry::InstrumentationScope, > = OnceCell::new(); -pub fn init(config: OtelRuntimeConfig) -> anyhow::Result<()> { +pub fn init(rt_config: OtelRuntimeConfig) -> anyhow::Result<()> { // Parse the `OTEL_EXPORTER_OTLP_PROTOCOL` variable. The opentelemetry_* // crates don't do this automatically. // TODO(piscisaureus): enable GRPC support. @@ -454,8 +593,8 @@ pub fn init(config: OtelRuntimeConfig) -> anyhow::Result<()> { // Add the runtime name and version to the resource attributes. Also override // the `telemetry.sdk` attributes to include the Deno runtime. resource = resource.merge(&Resource::new(vec![ - KeyValue::new(PROCESS_RUNTIME_NAME, config.runtime_name), - KeyValue::new(PROCESS_RUNTIME_VERSION, config.runtime_version.clone()), + KeyValue::new(PROCESS_RUNTIME_NAME, rt_config.runtime_name), + KeyValue::new(PROCESS_RUNTIME_VERSION, rt_config.runtime_version.clone()), KeyValue::new( TELEMETRY_SDK_LANGUAGE, format!( @@ -474,7 +613,7 @@ pub fn init(config: OtelRuntimeConfig) -> anyhow::Result<()> { TELEMETRY_SDK_VERSION, format!( "{}-{}", - config.runtime_version, + rt_config.runtime_version, resource.get(Key::new(TELEMETRY_SDK_VERSION)).unwrap() ), ), @@ -494,11 +633,30 @@ pub fn init(config: OtelRuntimeConfig) -> anyhow::Result<()> { BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build(); span_processor.set_resource(&resource); + let temporality_preference = + env::var("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE") + .ok() + .map(|s| s.to_lowercase()); + let temporality = match temporality_preference.as_deref() { + None | Some("cumulative") => Temporality::Cumulative, + Some("delta") => Temporality::Delta, + Some("lowmemory") => Temporality::LowMemory, + Some(other) => { + return Err(anyhow!( + "Invalid value for OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: {}", + other + )); + } + }; let metric_exporter = HttpExporterBuilder::default() .with_http_client(client.clone()) .with_protocol(protocol) - .build_metrics_exporter(Temporality::Cumulative)?; - let metric_processor = MetricProcessor::new(metric_exporter); + .build_metrics_exporter(temporality)?; + let metric_reader = DenoPeriodicReader::new(metric_exporter); + let meter_provider = SdkMeterProvider::builder() + .with_reader(metric_reader) + .with_resource(resource.clone()) + .build(); let log_exporter = HttpExporterBuilder::default() .with_http_client(client) @@ -512,13 +670,13 @@ pub fn init(config: OtelRuntimeConfig) -> anyhow::Result<()> { .set(Processors { spans: span_processor, logs: log_processor, - metrics: metric_processor, + meter_provider, }) .map_err(|_| anyhow!("failed to init otel"))?; let builtin_instrumentation_scope = opentelemetry::InstrumentationScope::builder("deno") - .with_version(config.runtime_version.clone()) + .with_version(rt_config.runtime_version.clone()) .build(); BUILT_IN_INSTRUMENTATION_SCOPE .set(builtin_instrumentation_scope) @@ -534,12 +692,12 @@ pub fn flush() { if let Some(Processors { spans, logs, - metrics, + meter_provider, }) = OTEL_PROCESSORS.get() { let _ = spans.force_flush(); let _ = logs.force_flush(); - let _ = metrics.force_flush(); + let _ = meter_provider.force_flush(); } } @@ -659,8 +817,8 @@ fn parse_span_id( } } -macro_rules! attr { - ($scope:ident, $attributes:expr $(=> $dropped_attributes_count:expr)?, $name:expr, $value:expr) => { +macro_rules! attr_raw { + ($scope:ident, $name:expr, $value:expr) => {{ let name = if let Ok(name) = $name.try_cast() { let view = v8::ValueView::new($scope, name); match view.data() { @@ -695,7 +853,18 @@ macro_rules! attr { None }; if let (Some(name), Some(value)) = (name, value) { - $attributes.push(KeyValue::new(name, value)); + Some(KeyValue::new(name, value)) + } else { + None + } + }}; +} + +macro_rules! attr { + ($scope:ident, $attributes:expr $(=> $dropped_attributes_count:expr)?, $name:expr, $value:expr) => { + let attr = attr_raw!($scope, $name, $value); + if let Some(kv) = attr { + $attributes.push(kv); } $( else { @@ -909,7 +1078,8 @@ fn op_otel_span_attribute<'s>( ) { if let Some(temporary_span) = state.try_borrow_mut::() { temporary_span.0.attributes.reserve_exact( - (capacity as usize) - temporary_span.0.attributes.capacity(), + (capacity as usize) + .saturating_sub(temporary_span.0.attributes.capacity()), ); attr!(scope, temporary_span.0.attributes => temporary_span.0.dropped_attributes_count, key, value); } @@ -927,7 +1097,8 @@ fn op_otel_span_attribute2<'s>( ) { if let Some(temporary_span) = state.try_borrow_mut::() { temporary_span.0.attributes.reserve_exact( - (capacity as usize) - temporary_span.0.attributes.capacity(), + (capacity as usize) + .saturating_sub(temporary_span.0.attributes.capacity()), ); attr!(scope, temporary_span.0.attributes => temporary_span.0.dropped_attributes_count, key1, value1); attr!(scope, temporary_span.0.attributes => temporary_span.0.dropped_attributes_count, key2, value2); @@ -949,7 +1120,8 @@ fn op_otel_span_attribute3<'s>( ) { if let Some(temporary_span) = state.try_borrow_mut::() { temporary_span.0.attributes.reserve_exact( - (capacity as usize) - temporary_span.0.attributes.capacity(), + (capacity as usize) + .saturating_sub(temporary_span.0.attributes.capacity()), ); attr!(scope, temporary_span.0.attributes => temporary_span.0.dropped_attributes_count, key1, value1); attr!(scope, temporary_span.0.attributes => temporary_span.0.dropped_attributes_count, key2, value2); @@ -984,538 +1156,572 @@ fn op_otel_span_flush(state: &mut OpState) { spans.on_end(temporary_span.0); } -// Holds data being built from JS before -// it is submitted to the rust processor. -struct TemporaryMetricsExport { - resource_attributes: Vec, - scope_metrics: Vec, - metric: Option, +enum Instrument { + Counter(opentelemetry::metrics::Counter), + UpDownCounter(opentelemetry::metrics::UpDownCounter), + Gauge(opentelemetry::metrics::Gauge), + Histogram(opentelemetry::metrics::Histogram), + Observable(Arc, f64>>>), } -struct TemporaryMetric { - name: String, - description: String, - unit: String, - data: TemporaryMetricData, -} +impl GarbageCollected for Instrument {} -enum TemporaryMetricData { - Sum(opentelemetry_sdk::metrics::data::Sum), - Gauge(opentelemetry_sdk::metrics::data::Gauge), - Histogram(opentelemetry_sdk::metrics::data::Histogram), -} - -impl From for Metric { - fn from(value: TemporaryMetric) -> Self { - Metric { - name: Cow::Owned(value.name), - description: Cow::Owned(value.description), - unit: Cow::Owned(value.unit), - data: match value.data { - TemporaryMetricData::Sum(sum) => Box::new(sum), - TemporaryMetricData::Gauge(gauge) => Box::new(gauge), - TemporaryMetricData::Histogram(histogram) => Box::new(histogram), - }, - } - } -} - -#[op2(fast)] -fn op_otel_metrics_resource_attribute<'s>( - scope: &mut v8::HandleScope<'s>, +fn create_instrument<'a, T>( + cb: impl FnOnce( + &'_ opentelemetry::metrics::Meter, + String, + ) -> InstrumentBuilder<'_, T>, + cb2: impl FnOnce(InstrumentBuilder<'_, T>) -> Instrument, state: &mut OpState, - #[smi] capacity: u32, - key: v8::Local<'s, v8::Value>, - value: v8::Local<'s, v8::Value>, -) { - let metrics_export = if let Some(metrics_export) = - state.try_borrow_mut::() - { - metrics_export.resource_attributes.reserve_exact( - (capacity as usize) - metrics_export.resource_attributes.capacity(), - ); - metrics_export - } else { - state.put(TemporaryMetricsExport { - resource_attributes: Vec::with_capacity(capacity as usize), - scope_metrics: vec![], - metric: None, - }); - state.borrow_mut() + scope: &mut v8::HandleScope<'a>, + name: v8::Local<'a, v8::Value>, + description: v8::Local<'a, v8::Value>, + unit: v8::Local<'a, v8::Value>, +) -> Result { + let Some(InstrumentationScope(instrumentation_scope)) = + state.try_borrow::() + else { + return Err(anyhow!("instrumentation scope not available")); }; - attr!(scope, metrics_export.resource_attributes, key, value); -} -#[op2(fast)] -fn op_otel_metrics_resource_attribute2<'s>( - scope: &mut v8::HandleScope<'s>, - state: &mut OpState, - #[smi] capacity: u32, - key1: v8::Local<'s, v8::Value>, - value1: v8::Local<'s, v8::Value>, - key2: v8::Local<'s, v8::Value>, - value2: v8::Local<'s, v8::Value>, -) { - let metrics_export = if let Some(metrics_export) = - state.try_borrow_mut::() - { - metrics_export.resource_attributes.reserve_exact( - (capacity as usize) - metrics_export.resource_attributes.capacity(), - ); - metrics_export - } else { - state.put(TemporaryMetricsExport { - resource_attributes: Vec::with_capacity(capacity as usize), - scope_metrics: vec![], - metric: None, - }); - state.borrow_mut() + let meter = OTEL_PROCESSORS + .get() + .unwrap() + .meter_provider + .meter_with_scope(instrumentation_scope.clone()); + + let name = owned_string(scope, name.try_cast()?); + let mut builder = cb(&meter, name); + if !description.is_null_or_undefined() { + let description = owned_string(scope, description.try_cast()?); + builder = builder.with_description(description); }; - attr!(scope, metrics_export.resource_attributes, key1, value1); - attr!(scope, metrics_export.resource_attributes, key2, value2); -} - -#[allow(clippy::too_many_arguments)] -#[op2(fast)] -fn op_otel_metrics_resource_attribute3<'s>( - scope: &mut v8::HandleScope<'s>, - state: &mut OpState, - #[smi] capacity: u32, - key1: v8::Local<'s, v8::Value>, - value1: v8::Local<'s, v8::Value>, - key2: v8::Local<'s, v8::Value>, - value2: v8::Local<'s, v8::Value>, - key3: v8::Local<'s, v8::Value>, - value3: v8::Local<'s, v8::Value>, -) { - let metrics_export = if let Some(metrics_export) = - state.try_borrow_mut::() - { - metrics_export.resource_attributes.reserve_exact( - (capacity as usize) - metrics_export.resource_attributes.capacity(), - ); - metrics_export - } else { - state.put(TemporaryMetricsExport { - resource_attributes: Vec::with_capacity(capacity as usize), - scope_metrics: vec![], - metric: None, - }); - state.borrow_mut() + if !unit.is_null_or_undefined() { + let unit = owned_string(scope, unit.try_cast()?); + builder = builder.with_unit(unit); }; - attr!(scope, metrics_export.resource_attributes, key1, value1); - attr!(scope, metrics_export.resource_attributes, key2, value2); - attr!(scope, metrics_export.resource_attributes, key3, value3); + + Ok(cb2(builder)) } -#[op2(fast)] -fn op_otel_metrics_scope<'s>( - scope: &mut v8::HandleScope<'s>, +#[op2] +#[cppgc] +fn op_otel_metric_create_counter<'s>( state: &mut OpState, + scope: &mut v8::HandleScope<'s>, name: v8::Local<'s, v8::Value>, - schema_url: v8::Local<'s, v8::Value>, - version: v8::Local<'s, v8::Value>, -) { - let name = owned_string(scope, name.cast()); - - let scope_builder = opentelemetry::InstrumentationScope::builder(name); - let scope_builder = if schema_url.is_null_or_undefined() { - scope_builder - } else { - scope_builder.with_schema_url(owned_string(scope, schema_url.cast())) - }; - let scope_builder = if version.is_null_or_undefined() { - scope_builder - } else { - scope_builder.with_version(owned_string(scope, version.cast())) - }; - let scope = scope_builder.build(); - let scope_metric = ScopeMetrics { + description: v8::Local<'s, v8::Value>, + unit: v8::Local<'s, v8::Value>, +) -> Result { + create_instrument( + |meter, name| meter.f64_counter(name), + |i| Instrument::Counter(i.build()), + state, scope, - metrics: vec![], - }; - - match state.try_borrow_mut::() { - Some(temp) => { - if let Some(current_metric) = temp.metric.take() { - let metric = Metric::from(current_metric); - temp.scope_metrics.last_mut().unwrap().metrics.push(metric); - } - temp.scope_metrics.push(scope_metric); - } - None => { - state.put(TemporaryMetricsExport { - resource_attributes: vec![], - scope_metrics: vec![scope_metric], - metric: None, - }); - } - } -} - -#[op2(fast)] -fn op_otel_metrics_sum<'s>( - scope: &mut v8::HandleScope<'s>, - state: &mut OpState, - name: v8::Local<'s, v8::Value>, - description: v8::Local<'s, v8::Value>, - unit: v8::Local<'s, v8::Value>, - #[smi] temporality: u8, - is_monotonic: bool, -) { - let Some(temp) = state.try_borrow_mut::() else { - return; - }; - - if let Some(current_metric) = temp.metric.take() { - let metric = Metric::from(current_metric); - temp.scope_metrics.last_mut().unwrap().metrics.push(metric); - } - - let name = owned_string(scope, name.cast()); - let description = owned_string(scope, description.cast()); - let unit = owned_string(scope, unit.cast()); - let temporality = match temporality { - 0 => Temporality::Delta, - 1 => Temporality::Cumulative, - _ => return, - }; - let sum = opentelemetry_sdk::metrics::data::Sum { - data_points: vec![], - temporality, - is_monotonic, - }; - - temp.metric = Some(TemporaryMetric { name, description, unit, - data: TemporaryMetricData::Sum(sum), - }); + ) } -#[op2(fast)] -fn op_otel_metrics_gauge<'s>( - scope: &mut v8::HandleScope<'s>, +#[op2] +#[cppgc] +fn op_otel_metric_create_up_down_counter<'s>( state: &mut OpState, + scope: &mut v8::HandleScope<'s>, name: v8::Local<'s, v8::Value>, description: v8::Local<'s, v8::Value>, unit: v8::Local<'s, v8::Value>, -) { - let Some(temp) = state.try_borrow_mut::() else { - return; - }; - - if let Some(current_metric) = temp.metric.take() { - let metric = Metric::from(current_metric); - temp.scope_metrics.last_mut().unwrap().metrics.push(metric); - } - - let name = owned_string(scope, name.cast()); - let description = owned_string(scope, description.cast()); - let unit = owned_string(scope, unit.cast()); - - let gauge = opentelemetry_sdk::metrics::data::Gauge { - data_points: vec![], - }; - - temp.metric = Some(TemporaryMetric { +) -> Result { + create_instrument( + |meter, name| meter.f64_up_down_counter(name), + |i| Instrument::UpDownCounter(i.build()), + state, + scope, name, description, unit, - data: TemporaryMetricData::Gauge(gauge), + ) +} + +#[op2] +#[cppgc] +fn op_otel_metric_create_gauge<'s>( + state: &mut OpState, + scope: &mut v8::HandleScope<'s>, + name: v8::Local<'s, v8::Value>, + description: v8::Local<'s, v8::Value>, + unit: v8::Local<'s, v8::Value>, +) -> Result { + create_instrument( + |meter, name| meter.f64_gauge(name), + |i| Instrument::Gauge(i.build()), + state, + scope, + name, + description, + unit, + ) +} + +#[op2] +#[cppgc] +fn op_otel_metric_create_histogram<'s>( + state: &mut OpState, + scope: &mut v8::HandleScope<'s>, + name: v8::Local<'s, v8::Value>, + description: v8::Local<'s, v8::Value>, + unit: v8::Local<'s, v8::Value>, + #[serde] boundaries: Option>, +) -> Result { + let Some(InstrumentationScope(instrumentation_scope)) = + state.try_borrow::() + else { + return Err(anyhow!("instrumentation scope not available")); + }; + + let meter = OTEL_PROCESSORS + .get() + .unwrap() + .meter_provider + .meter_with_scope(instrumentation_scope.clone()); + + let name = owned_string(scope, name.try_cast()?); + let mut builder = meter.f64_histogram(name); + if !description.is_null_or_undefined() { + let description = owned_string(scope, description.try_cast()?); + builder = builder.with_description(description); + }; + if !unit.is_null_or_undefined() { + let unit = owned_string(scope, unit.try_cast()?); + builder = builder.with_unit(unit); + }; + if let Some(boundaries) = boundaries { + builder = builder.with_boundaries(boundaries); + } + + Ok(Instrument::Histogram(builder.build())) +} + +fn create_async_instrument<'a, T>( + cb: impl FnOnce( + &'_ opentelemetry::metrics::Meter, + String, + ) -> AsyncInstrumentBuilder<'_, T, f64>, + cb2: impl FnOnce(AsyncInstrumentBuilder<'_, T, f64>), + state: &mut OpState, + scope: &mut v8::HandleScope<'a>, + name: v8::Local<'a, v8::Value>, + description: v8::Local<'a, v8::Value>, + unit: v8::Local<'a, v8::Value>, +) -> Result { + let Some(InstrumentationScope(instrumentation_scope)) = + state.try_borrow::() + else { + return Err(anyhow!("instrumentation scope not available")); + }; + + let meter = OTEL_PROCESSORS + .get() + .unwrap() + .meter_provider + .meter_with_scope(instrumentation_scope.clone()); + + let name = owned_string(scope, name.try_cast()?); + let mut builder = cb(&meter, name); + if !description.is_null_or_undefined() { + let description = owned_string(scope, description.try_cast()?); + builder = builder.with_description(description); + }; + if !unit.is_null_or_undefined() { + let unit = owned_string(scope, unit.try_cast()?); + builder = builder.with_unit(unit); + }; + + let data_share = Arc::new(Mutex::new(HashMap::new())); + let data_share_: Arc, f64>>> = data_share.clone(); + builder = builder.with_callback(move |i| { + let data = { + let mut data = data_share_.lock().unwrap(); + std::mem::take(&mut *data) + }; + for (attributes, value) in data { + i.observe(value, &attributes); + } }); + cb2(builder); + + Ok(Instrument::Observable(data_share)) +} + +#[op2] +#[cppgc] +fn op_otel_metric_create_observable_counter<'s>( + state: &mut OpState, + scope: &mut v8::HandleScope<'s>, + name: v8::Local<'s, v8::Value>, + description: v8::Local<'s, v8::Value>, + unit: v8::Local<'s, v8::Value>, +) -> Result { + create_async_instrument( + |meter, name| meter.f64_observable_counter(name), + |i| { + i.build(); + }, + state, + scope, + name, + description, + unit, + ) +} + +#[op2] +#[cppgc] +fn op_otel_metric_create_observable_up_down_counter<'s>( + state: &mut OpState, + scope: &mut v8::HandleScope<'s>, + name: v8::Local<'s, v8::Value>, + description: v8::Local<'s, v8::Value>, + unit: v8::Local<'s, v8::Value>, +) -> Result { + create_async_instrument( + |meter, name| meter.f64_observable_up_down_counter(name), + |i| { + i.build(); + }, + state, + scope, + name, + description, + unit, + ) +} + +#[op2] +#[cppgc] +fn op_otel_metric_create_observable_gauge<'s>( + state: &mut OpState, + scope: &mut v8::HandleScope<'s>, + name: v8::Local<'s, v8::Value>, + description: v8::Local<'s, v8::Value>, + unit: v8::Local<'s, v8::Value>, +) -> Result { + create_async_instrument( + |meter, name| meter.f64_observable_gauge(name), + |i| { + i.build(); + }, + state, + scope, + name, + description, + unit, + ) +} + +struct MetricAttributes { + attributes: Vec, } #[op2(fast)] -fn op_otel_metrics_sum_or_gauge_data_point( +fn op_otel_metric_record0( state: &mut OpState, + #[cppgc] instrument: &Instrument, value: f64, - start_time: f64, - time: f64, ) { - let Some(temp) = state.try_borrow_mut::() else { - return; + let values = state.try_take::(); + let attributes = match &values { + Some(values) => &*values.attributes, + None => &[], }; - - let start_time = SystemTime::UNIX_EPOCH - .checked_add(std::time::Duration::from_secs_f64(start_time)) - .unwrap(); - let time = SystemTime::UNIX_EPOCH - .checked_add(std::time::Duration::from_secs_f64(time)) - .unwrap(); - - let data_point = opentelemetry_sdk::metrics::data::DataPoint { - value, - start_time: Some(start_time), - time: Some(time), - attributes: vec![], - exemplars: vec![], - }; - - match &mut temp.metric { - Some(TemporaryMetric { - data: TemporaryMetricData::Sum(sum), - .. - }) => sum.data_points.push(data_point), - Some(TemporaryMetric { - data: TemporaryMetricData::Gauge(gauge), - .. - }) => gauge.data_points.push(data_point), + match instrument { + Instrument::Counter(counter) => counter.add(value, attributes), + Instrument::UpDownCounter(counter) => counter.add(value, attributes), + Instrument::Gauge(gauge) => gauge.record(value, attributes), + Instrument::Histogram(histogram) => histogram.record(value, attributes), _ => {} } } #[op2(fast)] -fn op_otel_metrics_histogram<'s>( - scope: &mut v8::HandleScope<'s>, +fn op_otel_metric_record1( state: &mut OpState, - name: v8::Local<'s, v8::Value>, - description: v8::Local<'s, v8::Value>, - unit: v8::Local<'s, v8::Value>, - #[smi] temporality: u8, + scope: &mut v8::HandleScope<'_>, + instrument: v8::Local<'_, v8::Value>, + value: f64, + key1: v8::Local<'_, v8::Value>, + value1: v8::Local<'_, v8::Value>, ) { - let Some(temp) = state.try_borrow_mut::() else { + let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::( + &mut *scope, + instrument, + ) else { return; }; - - if let Some(current_metric) = temp.metric.take() { - let metric = Metric::from(current_metric); - temp.scope_metrics.last_mut().unwrap().metrics.push(metric); - } - - let name = owned_string(scope, name.cast()); - let description = owned_string(scope, description.cast()); - let unit = owned_string(scope, unit.cast()); - - let temporality = match temporality { - 0 => Temporality::Delta, - 1 => Temporality::Cumulative, - _ => return, - }; - let histogram = opentelemetry_sdk::metrics::data::Histogram { - data_points: vec![], - temporality, - }; - - temp.metric = Some(TemporaryMetric { - name, - description, - unit, - data: TemporaryMetricData::Histogram(histogram), - }); -} - -#[allow(clippy::too_many_arguments)] -#[op2(fast)] -fn op_otel_metrics_histogram_data_point( - state: &mut OpState, - #[number] count: u64, - min: f64, - max: f64, - sum: f64, - start_time: f64, - time: f64, - #[smi] buckets: u32, -) { - let Some(temp) = state.try_borrow_mut::() else { - return; - }; - - let min = if min.is_nan() { None } else { Some(min) }; - let max = if max.is_nan() { None } else { Some(max) }; - - let start_time = SystemTime::UNIX_EPOCH - .checked_add(std::time::Duration::from_secs_f64(start_time)) - .unwrap(); - let time = SystemTime::UNIX_EPOCH - .checked_add(std::time::Duration::from_secs_f64(time)) - .unwrap(); - - let data_point = opentelemetry_sdk::metrics::data::HistogramDataPoint { - bounds: Vec::with_capacity(buckets as usize), - bucket_counts: Vec::with_capacity((buckets as usize) + 1), - count, - sum, - min, - max, - start_time, - time, - attributes: vec![], - exemplars: vec![], - }; - - if let Some(TemporaryMetric { - data: TemporaryMetricData::Histogram(histogram), - .. - }) = &mut temp.metric - { - histogram.data_points.push(data_point); - } -} - -#[op2(fast)] -fn op_otel_metrics_histogram_data_point_entry_final( - state: &mut OpState, - #[number] count1: u64, -) { - let Some(temp) = state.try_borrow_mut::() else { - return; - }; - - if let Some(TemporaryMetric { - data: TemporaryMetricData::Histogram(histogram), - .. - }) = &mut temp.metric - { - histogram - .data_points - .last_mut() - .unwrap() - .bucket_counts - .push(count1) - } -} - -#[op2(fast)] -fn op_otel_metrics_histogram_data_point_entry1( - state: &mut OpState, - #[number] count1: u64, - bound1: f64, -) { - let Some(temp) = state.try_borrow_mut::() else { - return; - }; - - if let Some(TemporaryMetric { - data: TemporaryMetricData::Histogram(histogram), - .. - }) = &mut temp.metric - { - let data_point = histogram.data_points.last_mut().unwrap(); - data_point.bucket_counts.push(count1); - data_point.bounds.push(bound1); - } -} - -#[op2(fast)] -fn op_otel_metrics_histogram_data_point_entry2( - state: &mut OpState, - #[number] count1: u64, - bound1: f64, - #[number] count2: u64, - bound2: f64, -) { - let Some(temp) = state.try_borrow_mut::() else { - return; - }; - - if let Some(TemporaryMetric { - data: TemporaryMetricData::Histogram(histogram), - .. - }) = &mut temp.metric - { - let data_point = histogram.data_points.last_mut().unwrap(); - data_point.bucket_counts.push(count1); - data_point.bounds.push(bound1); - data_point.bucket_counts.push(count2); - data_point.bounds.push(bound2); - } -} - -#[op2(fast)] -fn op_otel_metrics_histogram_data_point_entry3( - state: &mut OpState, - #[number] count1: u64, - bound1: f64, - #[number] count2: u64, - bound2: f64, - #[number] count3: u64, - bound3: f64, -) { - let Some(temp) = state.try_borrow_mut::() else { - return; - }; - - if let Some(TemporaryMetric { - data: TemporaryMetricData::Histogram(histogram), - .. - }) = &mut temp.metric - { - let data_point = histogram.data_points.last_mut().unwrap(); - data_point.bucket_counts.push(count1); - data_point.bounds.push(bound1); - data_point.bucket_counts.push(count2); - data_point.bounds.push(bound2); - data_point.bucket_counts.push(count3); - data_point.bounds.push(bound3); - } -} - -#[op2(fast)] -fn op_otel_metrics_data_point_attribute<'s>( - scope: &mut v8::HandleScope<'s>, - state: &mut OpState, - #[smi] capacity: u32, - key: v8::Local<'s, v8::Value>, - value: v8::Local<'s, v8::Value>, -) { - if let Some(TemporaryMetricsExport { - metric: Some(metric), - .. - }) = state.try_borrow_mut::() - { - let attributes = match &mut metric.data { - TemporaryMetricData::Sum(sum) => { - &mut sum.data_points.last_mut().unwrap().attributes + let mut values = state.try_take::(); + let attr1 = attr_raw!(scope, key1, value1); + let attributes = match &mut values { + Some(values) => { + if let Some(kv) = attr1 { + values.attributes.reserve_exact(1); + values.attributes.push(kv); } - TemporaryMetricData::Gauge(gauge) => { - &mut gauge.data_points.last_mut().unwrap().attributes - } - TemporaryMetricData::Histogram(histogram) => { - &mut histogram.data_points.last_mut().unwrap().attributes - } - }; - attributes.reserve_exact((capacity as usize) - attributes.capacity()); - attr!(scope, attributes, key, value); - } -} - -#[op2(fast)] -fn op_otel_metrics_data_point_attribute2<'s>( - scope: &mut v8::HandleScope<'s>, - state: &mut OpState, - #[smi] capacity: u32, - key1: v8::Local<'s, v8::Value>, - value1: v8::Local<'s, v8::Value>, - key2: v8::Local<'s, v8::Value>, - value2: v8::Local<'s, v8::Value>, -) { - if let Some(TemporaryMetricsExport { - metric: Some(metric), - .. - }) = state.try_borrow_mut::() - { - let attributes = match &mut metric.data { - TemporaryMetricData::Sum(sum) => { - &mut sum.data_points.last_mut().unwrap().attributes - } - TemporaryMetricData::Gauge(gauge) => { - &mut gauge.data_points.last_mut().unwrap().attributes - } - TemporaryMetricData::Histogram(histogram) => { - &mut histogram.data_points.last_mut().unwrap().attributes - } - }; - attributes.reserve_exact((capacity as usize) - attributes.capacity()); - attr!(scope, attributes, key1, value1); - attr!(scope, attributes, key2, value2); + &*values.attributes + } + None => match attr1 { + Some(kv1) => &[kv1] as &[KeyValue], + None => &[], + }, + }; + match &*instrument { + Instrument::Counter(counter) => counter.add(value, attributes), + Instrument::UpDownCounter(counter) => counter.add(value, attributes), + Instrument::Gauge(gauge) => gauge.record(value, attributes), + Instrument::Histogram(histogram) => histogram.record(value, attributes), + _ => {} } } #[allow(clippy::too_many_arguments)] #[op2(fast)] -fn op_otel_metrics_data_point_attribute3<'s>( +fn op_otel_metric_record2( + state: &mut OpState, + scope: &mut v8::HandleScope<'_>, + instrument: v8::Local<'_, v8::Value>, + value: f64, + key1: v8::Local<'_, v8::Value>, + value1: v8::Local<'_, v8::Value>, + key2: v8::Local<'_, v8::Value>, + value2: v8::Local<'_, v8::Value>, +) { + let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::( + &mut *scope, + instrument, + ) else { + return; + }; + let mut values = state.try_take::(); + let attr1 = attr_raw!(scope, key1, value1); + let attr2 = attr_raw!(scope, key2, value2); + let attributes = match &mut values { + Some(values) => { + values.attributes.reserve_exact(2); + if let Some(kv1) = attr1 { + values.attributes.push(kv1); + } + if let Some(kv2) = attr2 { + values.attributes.push(kv2); + } + &*values.attributes + } + None => match (attr1, attr2) { + (Some(kv1), Some(kv2)) => &[kv1, kv2] as &[KeyValue], + (Some(kv1), None) => &[kv1], + (None, Some(kv2)) => &[kv2], + (None, None) => &[], + }, + }; + match &*instrument { + Instrument::Counter(counter) => counter.add(value, attributes), + Instrument::UpDownCounter(counter) => counter.add(value, attributes), + Instrument::Gauge(gauge) => gauge.record(value, attributes), + Instrument::Histogram(histogram) => histogram.record(value, attributes), + _ => {} + } +} + +#[allow(clippy::too_many_arguments)] +#[op2(fast)] +fn op_otel_metric_record3( + state: &mut OpState, + scope: &mut v8::HandleScope<'_>, + instrument: v8::Local<'_, v8::Value>, + value: f64, + key1: v8::Local<'_, v8::Value>, + value1: v8::Local<'_, v8::Value>, + key2: v8::Local<'_, v8::Value>, + value2: v8::Local<'_, v8::Value>, + key3: v8::Local<'_, v8::Value>, + value3: v8::Local<'_, v8::Value>, +) { + let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::( + &mut *scope, + instrument, + ) else { + return; + }; + let mut values = state.try_take::(); + let attr1 = attr_raw!(scope, key1, value1); + let attr2 = attr_raw!(scope, key2, value2); + let attr3 = attr_raw!(scope, key3, value3); + let attributes = match &mut values { + Some(values) => { + values.attributes.reserve_exact(3); + if let Some(kv1) = attr1 { + values.attributes.push(kv1); + } + if let Some(kv2) = attr2 { + values.attributes.push(kv2); + } + if let Some(kv3) = attr3 { + values.attributes.push(kv3); + } + &*values.attributes + } + None => match (attr1, attr2, attr3) { + (Some(kv1), Some(kv2), Some(kv3)) => &[kv1, kv2, kv3] as &[KeyValue], + (Some(kv1), Some(kv2), None) => &[kv1, kv2], + (Some(kv1), None, Some(kv3)) => &[kv1, kv3], + (None, Some(kv2), Some(kv3)) => &[kv2, kv3], + (Some(kv1), None, None) => &[kv1], + (None, Some(kv2), None) => &[kv2], + (None, None, Some(kv3)) => &[kv3], + (None, None, None) => &[], + }, + }; + match &*instrument { + Instrument::Counter(counter) => counter.add(value, attributes), + Instrument::UpDownCounter(counter) => counter.add(value, attributes), + Instrument::Gauge(gauge) => gauge.record(value, attributes), + Instrument::Histogram(histogram) => histogram.record(value, attributes), + _ => {} + } +} + +#[op2(fast)] +fn op_otel_metric_observable_record0( + state: &mut OpState, + #[cppgc] instrument: &Instrument, + value: f64, +) { + let values = state.try_take::(); + let attributes = values.map(|attr| attr.attributes).unwrap_or_default(); + if let Instrument::Observable(data_share) = instrument { + let mut data = data_share.lock().unwrap(); + data.insert(attributes, value); + } +} + +#[op2(fast)] +fn op_otel_metric_observable_record1( + state: &mut OpState, + scope: &mut v8::HandleScope<'_>, + instrument: v8::Local<'_, v8::Value>, + value: f64, + key1: v8::Local<'_, v8::Value>, + value1: v8::Local<'_, v8::Value>, +) { + let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::( + &mut *scope, + instrument, + ) else { + return; + }; + let values = state.try_take::(); + let attr1 = attr_raw!(scope, key1, value1); + let mut attributes = values + .map(|mut attr| { + attr.attributes.reserve_exact(1); + attr.attributes + }) + .unwrap_or_else(|| Vec::with_capacity(1)); + if let Some(kv1) = attr1 { + attributes.push(kv1); + } + if let Instrument::Observable(data_share) = &*instrument { + let mut data = data_share.lock().unwrap(); + data.insert(attributes, value); + } +} + +#[allow(clippy::too_many_arguments)] +#[op2(fast)] +fn op_otel_metric_observable_record2( + state: &mut OpState, + scope: &mut v8::HandleScope<'_>, + instrument: v8::Local<'_, v8::Value>, + value: f64, + key1: v8::Local<'_, v8::Value>, + value1: v8::Local<'_, v8::Value>, + key2: v8::Local<'_, v8::Value>, + value2: v8::Local<'_, v8::Value>, +) { + let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::( + &mut *scope, + instrument, + ) else { + return; + }; + let values = state.try_take::(); + let mut attributes = values + .map(|mut attr| { + attr.attributes.reserve_exact(2); + attr.attributes + }) + .unwrap_or_else(|| Vec::with_capacity(2)); + let attr1 = attr_raw!(scope, key1, value1); + let attr2 = attr_raw!(scope, key2, value2); + if let Some(kv1) = attr1 { + attributes.push(kv1); + } + if let Some(kv2) = attr2 { + attributes.push(kv2); + } + if let Instrument::Observable(data_share) = &*instrument { + let mut data = data_share.lock().unwrap(); + data.insert(attributes, value); + } +} + +#[allow(clippy::too_many_arguments)] +#[op2(fast)] +fn op_otel_metric_observable_record3( + state: &mut OpState, + scope: &mut v8::HandleScope<'_>, + instrument: v8::Local<'_, v8::Value>, + value: f64, + key1: v8::Local<'_, v8::Value>, + value1: v8::Local<'_, v8::Value>, + key2: v8::Local<'_, v8::Value>, + value2: v8::Local<'_, v8::Value>, + key3: v8::Local<'_, v8::Value>, + value3: v8::Local<'_, v8::Value>, +) { + let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::( + &mut *scope, + instrument, + ) else { + return; + }; + let values = state.try_take::(); + let mut attributes = values + .map(|mut attr| { + attr.attributes.reserve_exact(3); + attr.attributes + }) + .unwrap_or_else(|| Vec::with_capacity(3)); + let attr1 = attr_raw!(scope, key1, value1); + let attr2 = attr_raw!(scope, key2, value2); + let attr3 = attr_raw!(scope, key3, value3); + if let Some(kv1) = attr1 { + attributes.push(kv1); + } + if let Some(kv2) = attr2 { + attributes.push(kv2); + } + if let Some(kv3) = attr3 { + attributes.push(kv3); + } + if let Instrument::Observable(data_share) = &*instrument { + let mut data = data_share.lock().unwrap(); + data.insert(attributes, value); + } +} + +#[allow(clippy::too_many_arguments)] +#[op2(fast)] +fn op_otel_metric_attribute3<'s>( scope: &mut v8::HandleScope<'s>, state: &mut OpState, #[smi] capacity: u32, @@ -1526,49 +1732,60 @@ fn op_otel_metrics_data_point_attribute3<'s>( key3: v8::Local<'s, v8::Value>, value3: v8::Local<'s, v8::Value>, ) { - if let Some(TemporaryMetricsExport { - metric: Some(metric), - .. - }) = state.try_borrow_mut::() + let mut values = state.try_borrow_mut::(); + let attr1 = attr_raw!(scope, key1, value1); + let attr2 = attr_raw!(scope, key2, value2); + let attr3 = attr_raw!(scope, key3, value3); + if let Some(values) = &mut values { + values.attributes.reserve_exact( + (capacity as usize).saturating_sub(values.attributes.capacity()), + ); + if let Some(kv1) = attr1 { + values.attributes.push(kv1); + } + if let Some(kv2) = attr2 { + values.attributes.push(kv2); + } + if let Some(kv3) = attr3 { + values.attributes.push(kv3); + } + } else { + let mut attributes = Vec::with_capacity(capacity as usize); + if let Some(kv1) = attr1 { + attributes.push(kv1); + } + if let Some(kv2) = attr2 { + attributes.push(kv2); + } + if let Some(kv3) = attr3 { + attributes.push(kv3); + } + state.put(MetricAttributes { attributes }); + } +} + +struct ObservationDone(oneshot::Sender<()>); + +#[op2(async)] +async fn op_otel_metric_wait_to_observe(state: Rc>) -> bool { + let (tx, rx) = oneshot::channel(); { - let attributes = match &mut metric.data { - TemporaryMetricData::Sum(sum) => { - &mut sum.data_points.last_mut().unwrap().attributes - } - TemporaryMetricData::Gauge(gauge) => { - &mut gauge.data_points.last_mut().unwrap().attributes - } - TemporaryMetricData::Histogram(histogram) => { - &mut histogram.data_points.last_mut().unwrap().attributes - } - }; - attributes.reserve_exact((capacity as usize) - attributes.capacity()); - attr!(scope, attributes, key1, value1); - attr!(scope, attributes, key2, value2); - attr!(scope, attributes, key3, value3); + OTEL_PRE_COLLECT_CALLBACKS + .lock() + .expect("mutex poisoned") + .push(tx); + } + if let Ok(done) = rx.await { + state.borrow_mut().put(ObservationDone(done)); + true + } else { + false } } #[op2(fast)] -fn op_otel_metrics_submit(state: &mut OpState) { - let Some(mut temp) = state.try_take::() else { - return; - }; - - let Some(Processors { metrics, .. }) = OTEL_PROCESSORS.get() else { - return; - }; - - if let Some(current_metric) = temp.metric { - let metric = Metric::from(current_metric); - temp.scope_metrics.last_mut().unwrap().metrics.push(metric); +fn op_otel_metric_observation_done(state: &mut OpState) { + if let Some(ObservationDone(done)) = state.try_take::() { + let _ = done.send(()); } - - let resource = Resource::new(temp.resource_attributes); - let scope_metrics = temp.scope_metrics; - - metrics.submit(ResourceMetrics { - resource, - scope_metrics, - }); } diff --git a/ext/telemetry/telemetry.ts b/ext/telemetry/telemetry.ts index d1335f65b5..86b4fe059d 100644 --- a/ext/telemetry/telemetry.ts +++ b/ext/telemetry/telemetry.ts @@ -7,23 +7,24 @@ import { op_otel_instrumentation_scope_enter, op_otel_instrumentation_scope_enter_builtin, op_otel_log, - op_otel_metrics_data_point_attribute, - op_otel_metrics_data_point_attribute2, - op_otel_metrics_data_point_attribute3, - op_otel_metrics_gauge, - op_otel_metrics_histogram, - op_otel_metrics_histogram_data_point, - op_otel_metrics_histogram_data_point_entry1, - op_otel_metrics_histogram_data_point_entry2, - op_otel_metrics_histogram_data_point_entry3, - op_otel_metrics_histogram_data_point_entry_final, - op_otel_metrics_resource_attribute, - op_otel_metrics_resource_attribute2, - op_otel_metrics_resource_attribute3, - op_otel_metrics_scope, - op_otel_metrics_submit, - op_otel_metrics_sum, - op_otel_metrics_sum_or_gauge_data_point, + op_otel_metric_attribute3, + op_otel_metric_create_counter, + op_otel_metric_create_gauge, + op_otel_metric_create_histogram, + op_otel_metric_create_observable_counter, + op_otel_metric_create_observable_gauge, + op_otel_metric_create_observable_up_down_counter, + op_otel_metric_create_up_down_counter, + op_otel_metric_observable_record0, + op_otel_metric_observable_record1, + op_otel_metric_observable_record2, + op_otel_metric_observable_record3, + op_otel_metric_observation_done, + op_otel_metric_record0, + op_otel_metric_record1, + op_otel_metric_record2, + op_otel_metric_record3, + op_otel_metric_wait_to_observe, op_otel_span_attribute, op_otel_span_attribute2, op_otel_span_attribute3, @@ -36,25 +37,32 @@ import { Console } from "ext:deno_console/01_console.js"; import { performance } from "ext:deno_web/15_performance.js"; const { - SafeWeakMap, Array, - ObjectEntries, - ReflectApply, - SymbolFor, + ArrayPrototypePush, Error, - Uint8Array, - TypedArrayPrototypeSubarray, ObjectAssign, ObjectDefineProperty, - WeakRefPrototypeDeref, + ObjectEntries, + ObjectPrototypeIsPrototypeOf, + ReflectApply, + SafeIterator, + SafeMap, + SafePromiseAll, + SafeSet, + SafeWeakMap, + SafeWeakRef, + SafeWeakSet, String, StringPrototypePadStart, - ObjectPrototypeIsPrototypeOf, - SafeWeakRef, + SymbolFor, + TypedArrayPrototypeSubarray, + Uint8Array, + WeakRefPrototypeDeref, } = primordials; const { AsyncVariable, setAsyncContext } = core; export let TRACING_ENABLED = false; +export let METRICS_ENABLED = false; let DETERMINISTIC = false; // Note: These start at 0 in the JS library, @@ -202,30 +210,9 @@ const instrumentationScopes = new SafeWeakMap< >(); let activeInstrumentationLibrary: WeakRef | null = null; -function submitSpan( - spanId: string | Uint8Array, - traceId: string | Uint8Array, - traceFlags: number, - parentSpanId: string | Uint8Array | null, - span: Omit< - ReadableSpan, - | "spanContext" - | "startTime" - | "endTime" - | "parentSpanId" - | "duration" - | "ended" - | "resource" - >, - startTime: number, - endTime: number, +function activateInstrumentationLibrary( + instrumentationLibrary: InstrumentationLibrary, ) { - if (!TRACING_ENABLED) return; - if (!(traceFlags & TRACE_FLAG_SAMPLED)) return; - - // TODO(@lucacasonato): `resource` is ignored for now, should we implement it? - - const instrumentationLibrary = span.instrumentationLibrary; if ( !activeInstrumentationLibrary || WeakRefPrototypeDeref(activeInstrumentationLibrary) !== @@ -255,6 +242,32 @@ function submitSpan( } } } +} + +function submitSpan( + spanId: string | Uint8Array, + traceId: string | Uint8Array, + traceFlags: number, + parentSpanId: string | Uint8Array | null, + span: Omit< + ReadableSpan, + | "spanContext" + | "startTime" + | "endTime" + | "parentSpanId" + | "duration" + | "ended" + | "resource" + >, + startTime: number, + endTime: number, +) { + if (!TRACING_ENABLED) return; + if (!(traceFlags & TRACE_FLAG_SAMPLED)) return; + + // TODO(@lucacasonato): `resource` is ignored for now, should we implement it? + + activateInstrumentationLibrary(span.instrumentationLibrary); op_otel_span_start( traceId, @@ -368,7 +381,7 @@ export let endSpan: (span: Span) => void; export class Span { #traceId: string | Uint8Array; - #spanId: Uint8Array; + #spanId: string | Uint8Array; #traceFlags = TRACE_FLAG_SAMPLED; #spanContext: SpanContext | null = null; @@ -687,260 +700,510 @@ class ContextManager { } } -function attributeValue(value: IAnyValue) { - return value.boolValue ?? value.stringValue ?? value.doubleValue ?? - value.intValue; +// metrics + +interface MeterOptions { + schemaUrl?: string; } -function submitMetrics(resource, scopeMetrics) { - let i = 0; - while (i < resource.attributes.length) { - if (i + 2 < resource.attributes.length) { - op_otel_metrics_resource_attribute3( - resource.attributes.length, - resource.attributes[i].key, - attributeValue(resource.attributes[i].value), - resource.attributes[i + 1].key, - attributeValue(resource.attributes[i + 1].value), - resource.attributes[i + 2].key, - attributeValue(resource.attributes[i + 2].value), - ); - i += 3; - } else if (i + 1 < resource.attributes.length) { - op_otel_metrics_resource_attribute2( - resource.attributes.length, - resource.attributes[i].key, - attributeValue(resource.attributes[i].value), - resource.attributes[i + 1].key, - attributeValue(resource.attributes[i + 1].value), - ); - i += 2; - } else { - op_otel_metrics_resource_attribute( - resource.attributes.length, - resource.attributes[i].key, - attributeValue(resource.attributes[i].value), - ); - i += 1; - } +interface MetricOptions { + description?: string; + + unit?: string; + + valueType?: ValueType; + + advice?: MetricAdvice; +} + +enum ValueType { + INT = 0, + DOUBLE = 1, +} + +interface MetricAdvice { + /** + * Hint the explicit bucket boundaries for SDK if the metric is been + * aggregated with a HistogramAggregator. + */ + explicitBucketBoundaries?: number[]; +} + +export class MeterProvider { + getMeter(name: string, version?: string, options?: MeterOptions): Meter { + return new Meter({ name, version, schemaUrl: options?.schemaUrl }); + } +} + +type MetricAttributes = Attributes; + +type Instrument = { __key: "instrument" }; + +let batchResultHasObservables: ( + res: BatchObservableResult, + observables: Observable[], +) => boolean; + +class BatchObservableResult { + #observables: WeakSet; + + constructor(observables: WeakSet) { + this.#observables = observables; } - for (let smi = 0; smi < scopeMetrics.length; smi += 1) { - const { scope, metrics } = scopeMetrics[smi]; + static { + batchResultHasObservables = (cb, observables) => { + for (const observable of new SafeIterator(observables)) { + if (!cb.#observables.has(observable)) return false; + } + return true; + }; + } - op_otel_metrics_scope(scope.name, scope.schemaUrl, scope.version); + observe( + metric: Observable, + value: number, + attributes?: MetricAttributes, + ): void { + if (!this.#observables.has(metric)) return; + getObservableResult(metric).observe(value, attributes); + } +} - for (let mi = 0; mi < metrics.length; mi += 1) { - const metric = metrics[mi]; - switch (metric.dataPointType) { - case 3: - op_otel_metrics_sum( - metric.descriptor.name, - // deno-lint-ignore prefer-primordials - metric.descriptor.description, - metric.descriptor.unit, - metric.aggregationTemporality, - metric.isMonotonic, - ); - for (let di = 0; di < metric.dataPoints.length; di += 1) { - const dataPoint = metric.dataPoints[di]; - op_otel_metrics_sum_or_gauge_data_point( - dataPoint.value, - hrToSecs(dataPoint.startTime), - hrToSecs(dataPoint.endTime), - ); - const attributes = ObjectEntries(dataPoint.attributes); - let i = 0; - while (i < attributes.length) { - if (i + 2 < attributes.length) { - op_otel_metrics_data_point_attribute3( - attributes.length, - attributes[i][0], - attributes[i][1], - attributes[i + 1][0], - attributes[i + 1][1], - attributes[i + 2][0], - attributes[i + 2][1], - ); - i += 3; - } else if (i + 1 < attributes.length) { - op_otel_metrics_data_point_attribute2( - attributes.length, - attributes[i][0], - attributes[i][1], - attributes[i + 1][0], - attributes[i + 1][1], - ); - i += 2; - } else { - op_otel_metrics_data_point_attribute( - attributes.length, - attributes[i][0], - attributes[i][1], - ); - i += 1; - } - } - } - break; - case 2: - op_otel_metrics_gauge( - metric.descriptor.name, - // deno-lint-ignore prefer-primordials - metric.descriptor.description, - metric.descriptor.unit, - ); - for (let di = 0; di < metric.dataPoints.length; di += 1) { - const dataPoint = metric.dataPoints[di]; - op_otel_metrics_sum_or_gauge_data_point( - dataPoint.value, - hrToSecs(dataPoint.startTime), - hrToSecs(dataPoint.endTime), - ); - const attributes = ObjectEntries(dataPoint.attributes); - let i = 0; - while (i < attributes.length) { - if (i + 2 < attributes.length) { - op_otel_metrics_data_point_attribute3( - attributes.length, - attributes[i][0], - attributes[i][1], - attributes[i + 1][0], - attributes[i + 1][1], - attributes[i + 2][0], - attributes[i + 2][1], - ); - i += 3; - } else if (i + 1 < attributes.length) { - op_otel_metrics_data_point_attribute2( - attributes.length, - attributes[i][0], - attributes[i][1], - attributes[i + 1][0], - attributes[i + 1][1], - ); - i += 2; - } else { - op_otel_metrics_data_point_attribute( - attributes.length, - attributes[i][0], - attributes[i][1], - ); - i += 1; - } - } - } - break; - case 0: - op_otel_metrics_histogram( - metric.descriptor.name, - // deno-lint-ignore prefer-primordials - metric.descriptor.description, - metric.descriptor.unit, - metric.aggregationTemporality, - ); - for (let di = 0; di < metric.dataPoints.length; di += 1) { - const dataPoint = metric.dataPoints[di]; - const { boundaries, counts } = dataPoint.value.buckets; - op_otel_metrics_histogram_data_point( - dataPoint.value.count, - dataPoint.value.min ?? NaN, - dataPoint.value.max ?? NaN, - dataPoint.value.sum, - hrToSecs(dataPoint.startTime), - hrToSecs(dataPoint.endTime), - boundaries.length, - ); - let j = 0; - while (j < boundaries.length) { - if (j + 3 < boundaries.length) { - op_otel_metrics_histogram_data_point_entry3( - counts[j], - boundaries[j], - counts[j + 1], - boundaries[j + 1], - counts[j + 2], - boundaries[j + 2], - ); - j += 3; - } else if (j + 2 < boundaries.length) { - op_otel_metrics_histogram_data_point_entry2( - counts[j], - boundaries[j], - counts[j + 1], - boundaries[j + 1], - ); - j += 2; - } else { - op_otel_metrics_histogram_data_point_entry1( - counts[j], - boundaries[j], - ); - j += 1; - } - } - op_otel_metrics_histogram_data_point_entry_final(counts[j]); - const attributes = ObjectEntries(dataPoint.attributes); - let i = 0; - while (i < attributes.length) { - if (i + 2 < attributes.length) { - op_otel_metrics_data_point_attribute3( - attributes.length, - attributes[i][0], - attributes[i][1], - attributes[i + 1][0], - attributes[i + 1][1], - attributes[i + 2][0], - attributes[i + 2][1], - ); - i += 3; - } else if (i + 1 < attributes.length) { - op_otel_metrics_data_point_attribute2( - attributes.length, - attributes[i][0], - attributes[i][1], - attributes[i + 1][0], - attributes[i + 1][1], - ); - i += 2; - } else { - op_otel_metrics_data_point_attribute( - attributes.length, - attributes[i][0], - attributes[i][1], - ); - i += 1; - } - } - } - break; - default: - continue; +const BATCH_CALLBACKS = new SafeMap< + BatchObservableCallback, + BatchObservableResult +>(); +const INDIVIDUAL_CALLBACKS = new SafeMap>(); + +class Meter { + #instrumentationLibrary: InstrumentationLibrary; + + constructor(instrumentationLibrary: InstrumentationLibrary) { + this.#instrumentationLibrary = instrumentationLibrary; + } + + createCounter( + name: string, + options?: MetricOptions, + ): Counter { + if (options?.valueType !== undefined && options?.valueType !== 1) { + throw new Error("Only valueType: DOUBLE is supported"); + } + if (!METRICS_ENABLED) return new Counter(null, false); + activateInstrumentationLibrary(this.#instrumentationLibrary); + const instrument = op_otel_metric_create_counter( + name, + // deno-lint-ignore prefer-primordials + options?.description, + options?.unit, + ) as Instrument; + return new Counter(instrument, false); + } + + createUpDownCounter( + name: string, + options?: MetricOptions, + ): Counter { + if (options?.valueType !== undefined && options?.valueType !== 1) { + throw new Error("Only valueType: DOUBLE is supported"); + } + if (!METRICS_ENABLED) return new Counter(null, true); + activateInstrumentationLibrary(this.#instrumentationLibrary); + const instrument = op_otel_metric_create_up_down_counter( + name, + // deno-lint-ignore prefer-primordials + options?.description, + options?.unit, + ) as Instrument; + return new Counter(instrument, true); + } + + createGauge( + name: string, + options?: MetricOptions, + ): Gauge { + if (options?.valueType !== undefined && options?.valueType !== 1) { + throw new Error("Only valueType: DOUBLE is supported"); + } + if (!METRICS_ENABLED) return new Gauge(null); + activateInstrumentationLibrary(this.#instrumentationLibrary); + const instrument = op_otel_metric_create_gauge( + name, + // deno-lint-ignore prefer-primordials + options?.description, + options?.unit, + ) as Instrument; + return new Gauge(instrument); + } + + createHistogram( + name: string, + options?: MetricOptions, + ): Histogram { + if (options?.valueType !== undefined && options?.valueType !== 1) { + throw new Error("Only valueType: DOUBLE is supported"); + } + if (!METRICS_ENABLED) return new Histogram(null); + activateInstrumentationLibrary(this.#instrumentationLibrary); + const instrument = op_otel_metric_create_histogram( + name, + // deno-lint-ignore prefer-primordials + options?.description, + options?.unit, + options?.advice?.explicitBucketBoundaries, + ) as Instrument; + return new Histogram(instrument); + } + + createObservableCounter( + name: string, + options?: MetricOptions, + ): Observable { + if (options?.valueType !== undefined && options?.valueType !== 1) { + throw new Error("Only valueType: DOUBLE is supported"); + } + if (!METRICS_ENABLED) new Observable(new ObservableResult(null, true)); + activateInstrumentationLibrary(this.#instrumentationLibrary); + const instrument = op_otel_metric_create_observable_counter( + name, + // deno-lint-ignore prefer-primordials + options?.description, + options?.unit, + ) as Instrument; + return new Observable(new ObservableResult(instrument, true)); + } + + createObservableGauge( + name: string, + options?: MetricOptions, + ): Observable { + if (options?.valueType !== undefined && options?.valueType !== 1) { + throw new Error("Only valueType: DOUBLE is supported"); + } + if (!METRICS_ENABLED) new Observable(new ObservableResult(null, false)); + activateInstrumentationLibrary(this.#instrumentationLibrary); + const instrument = op_otel_metric_create_observable_gauge( + name, + // deno-lint-ignore prefer-primordials + options?.description, + options?.unit, + ) as Instrument; + return new Observable(new ObservableResult(instrument, false)); + } + + createObservableUpDownCounter( + name: string, + options?: MetricOptions, + ): Observable { + if (options?.valueType !== undefined && options?.valueType !== 1) { + throw new Error("Only valueType: DOUBLE is supported"); + } + if (!METRICS_ENABLED) new Observable(new ObservableResult(null, false)); + activateInstrumentationLibrary(this.#instrumentationLibrary); + const instrument = op_otel_metric_create_observable_up_down_counter( + name, + // deno-lint-ignore prefer-primordials + options?.description, + options?.unit, + ) as Instrument; + return new Observable(new ObservableResult(instrument, false)); + } + + addBatchObservableCallback( + callback: BatchObservableCallback, + observables: Observable[], + ): void { + if (!METRICS_ENABLED) return; + const result = new BatchObservableResult(new SafeWeakSet(observables)); + startObserving(); + BATCH_CALLBACKS.set(callback, result); + } + + removeBatchObservableCallback( + callback: BatchObservableCallback, + observables: Observable[], + ): void { + if (!METRICS_ENABLED) return; + const result = BATCH_CALLBACKS.get(callback); + if (result && batchResultHasObservables(result, observables)) { + BATCH_CALLBACKS.delete(callback); + } + } +} + +type BatchObservableCallback = ( + observableResult: BatchObservableResult, +) => void | Promise; + +function record( + instrument: Instrument | null, + value: number, + attributes?: MetricAttributes, +) { + if (instrument === null) return; + if (attributes === undefined) { + op_otel_metric_record0(instrument, value); + } else { + const attrs = ObjectEntries(attributes); + if (attrs.length === 0) { + op_otel_metric_record0(instrument, value); + } + let i = 0; + while (i < attrs.length) { + const remaining = attrs.length - i; + if (remaining > 3) { + op_otel_metric_attribute3( + instrument, + value, + attrs[i][0], + attrs[i][1], + attrs[i + 1][0], + attrs[i + 1][1], + attrs[i + 2][0], + attrs[i + 2][1], + ); + i += 3; + } else if (remaining === 3) { + op_otel_metric_record3( + instrument, + value, + attrs[i][0], + attrs[i][1], + attrs[i + 1][0], + attrs[i + 1][1], + attrs[i + 2][0], + attrs[i + 2][1], + ); + i += 3; + } else if (remaining === 2) { + op_otel_metric_record2( + instrument, + value, + attrs[i][0], + attrs[i][1], + attrs[i + 1][0], + attrs[i + 1][1], + ); + i += 2; + } else if (remaining === 1) { + op_otel_metric_record1( + instrument, + value, + attrs[i][0], + attrs[i][1], + ); + i += 1; } } } - - op_otel_metrics_submit(); } -class MetricExporter { - export(metrics, resultCallback: (result: ExportResult) => void) { - try { - submitMetrics(metrics.resource, metrics.scopeMetrics); - resultCallback({ code: 0 }); - } catch (error) { - resultCallback({ - code: 1, - error: ObjectPrototypeIsPrototypeOf(error, Error) - ? error as Error - : new Error(String(error)), - }); +function recordObservable( + instrument: Instrument | null, + value: number, + attributes?: MetricAttributes, +) { + if (instrument === null) return; + if (attributes === undefined) { + op_otel_metric_observable_record0(instrument, value); + } else { + const attrs = ObjectEntries(attributes); + if (attrs.length === 0) { + op_otel_metric_observable_record0(instrument, value); + } + let i = 0; + while (i < attrs.length) { + const remaining = attrs.length - i; + if (remaining > 3) { + op_otel_metric_attribute3( + instrument, + value, + attrs[i][0], + attrs[i][1], + attrs[i + 1][0], + attrs[i + 1][1], + attrs[i + 2][0], + attrs[i + 2][1], + ); + i += 3; + } else if (remaining === 3) { + op_otel_metric_observable_record3( + instrument, + value, + attrs[i][0], + attrs[i][1], + attrs[i + 1][0], + attrs[i + 1][1], + attrs[i + 2][0], + attrs[i + 2][1], + ); + i += 3; + } else if (remaining === 2) { + op_otel_metric_observable_record2( + instrument, + value, + attrs[i][0], + attrs[i][1], + attrs[i + 1][0], + attrs[i + 1][1], + ); + i += 2; + } else if (remaining === 1) { + op_otel_metric_observable_record1( + instrument, + value, + attrs[i][0], + attrs[i][1], + ); + i += 1; + } } } +} - async forceFlush() {} +class Counter { + #instrument: Instrument | null; + #upDown: boolean; - async shutdown() {} + constructor(instrument: Instrument | null, upDown: boolean) { + this.#instrument = instrument; + this.#upDown = upDown; + } + + add(value: number, attributes?: MetricAttributes, _context?: Context): void { + if (value < 0 && !this.#upDown) { + throw new Error("Counter can only be incremented"); + } + record(this.#instrument, value, attributes); + } +} + +class Gauge { + #instrument: Instrument | null; + + constructor(instrument: Instrument | null) { + this.#instrument = instrument; + } + + record( + value: number, + attributes?: MetricAttributes, + _context?: Context, + ): void { + record(this.#instrument, value, attributes); + } +} + +class Histogram { + #instrument: Instrument | null; + + constructor(instrument: Instrument | null) { + this.#instrument = instrument; + } + + record( + value: number, + attributes?: MetricAttributes, + _context?: Context, + ): void { + record(this.#instrument, value, attributes); + } +} + +type ObservableCallback = ( + observableResult: ObservableResult, +) => void | Promise; + +let getObservableResult: (observable: Observable) => ObservableResult; + +class Observable { + #result: ObservableResult; + + constructor(result: ObservableResult) { + this.#result = result; + } + + static { + getObservableResult = (observable) => observable.#result; + } + + addCallback(callback: ObservableCallback): void { + const res = INDIVIDUAL_CALLBACKS.get(this); + if (res) res.add(callback); + else INDIVIDUAL_CALLBACKS.set(this, new SafeSet([callback])); + startObserving(); + } + + removeCallback(callback: ObservableCallback): void { + const res = INDIVIDUAL_CALLBACKS.get(this); + if (res) res.delete(callback); + if (res?.size === 0) INDIVIDUAL_CALLBACKS.delete(this); + } +} + +class ObservableResult { + #instrument: Instrument | null; + #isRegularCounter: boolean; + + constructor(instrument: Instrument | null, isRegularCounter: boolean) { + this.#instrument = instrument; + this.#isRegularCounter = isRegularCounter; + } + + observe( + this: ObservableResult, + value: number, + attributes?: MetricAttributes, + ): void { + if (this.#isRegularCounter) { + if (value < 0) { + throw new Error("Observable counters can only be incremented"); + } + } + recordObservable(this.#instrument, value, attributes); + } +} + +async function observe(): Promise { + const promises: Promise[] = []; + // Primordials are not needed, because this is a SafeMap. + // deno-lint-ignore prefer-primordials + for (const { 0: observable, 1: callbacks } of INDIVIDUAL_CALLBACKS) { + const result = getObservableResult(observable); + // Primordials are not needed, because this is a SafeSet. + // deno-lint-ignore prefer-primordials + for (const callback of callbacks) { + // PromiseTry is not in primordials? + // deno-lint-ignore prefer-primordials + ArrayPrototypePush(promises, Promise.try(callback, result)); + } + } + // Primordials are not needed, because this is a SafeMap. + // deno-lint-ignore prefer-primordials + for (const { 0: callback, 1: result } of BATCH_CALLBACKS) { + // PromiseTry is not in primordials? + // deno-lint-ignore prefer-primordials + ArrayPrototypePush(promises, Promise.try(callback, result)); + } + await SafePromiseAll(promises); +} + +let isObserving = false; +function startObserving() { + if (!isObserving) { + isObserving = true; + (async () => { + while (true) { + const promise = op_otel_metric_wait_to_observe(); + core.unrefOpPromise(promise); + const ok = await promise; + if (!ok) break; + await observe(); + op_otel_metric_observation_done(); + } + })(); + } } const otelConsoleConfig = { @@ -951,14 +1214,21 @@ const otelConsoleConfig = { export function bootstrap( config: [ + 0 | 1, 0 | 1, typeof otelConsoleConfig[keyof typeof otelConsoleConfig], 0 | 1, ], ): void { - const { 0: tracingEnabled, 1: consoleConfig, 2: deterministic } = config; + const { + 0: tracingEnabled, + 1: metricsEnabled, + 2: consoleConfig, + 3: deterministic, + } = config; TRACING_ENABLED = tracingEnabled === 1; + METRICS_ENABLED = metricsEnabled === 1; DETERMINISTIC = deterministic === 1; switch (consoleConfig) { @@ -980,5 +1250,5 @@ export function bootstrap( export const telemetry = { SpanExporter, ContextManager, - MetricExporter, + MeterProvider, }; diff --git a/tests/specs/cli/otel_basic/__test__.jsonc b/tests/specs/cli/otel_basic/__test__.jsonc index e7f8d17c7a..f9826671e8 100644 --- a/tests/specs/cli/otel_basic/__test__.jsonc +++ b/tests/specs/cli/otel_basic/__test__.jsonc @@ -1,28 +1,27 @@ { - "steps": [ - { + "tests": { + "basic": { "args": "run -A main.ts basic.ts", "output": "basic.out" }, - { + "natural_exit": { "args": "run -A main.ts natural_exit.ts", "output": "natural_exit.out" }, - { + "deno_dot_exit": { "args": "run -A main.ts deno_dot_exit.ts", "output": "deno_dot_exit.out" }, - { + "uncaught": { "args": "run -A main.ts uncaught.ts", "output": "uncaught.out" }, - { + "metric": { + "envs": { + "OTEL_METRIC_EXPORT_INTERVAL": "1000" + }, "args": "run -A main.ts metric.ts", "output": "metric.out" - }, - { - "args": "run -A --unstable-otel context.ts", - "output": "" } - ] + } } diff --git a/tests/specs/cli/otel_basic/metric.out b/tests/specs/cli/otel_basic/metric.out index 26ed4a23c6..dd53734230 100644 --- a/tests/specs/cli/otel_basic/metric.out +++ b/tests/specs/cli/otel_basic/metric.out @@ -56,6 +56,31 @@ "isMonotonic": false } }, + { + "name": "gauge", + "description": "Example of a Gauge", + "unit": "", + "metadata": [], + "gauge": { + "dataPoints": [ + { + "attributes": [ + { + "key": "attribute", + "value": { + "doubleValue": 1 + } + } + ], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ] + } + }, { "name": "histogram", "description": "Example of a Histogram", @@ -119,6 +144,265 @@ ], "aggregationTemporality": 2 } + }, + { + "name": "observable_counter", + "description": "Example of a ObservableCounter", + "unit": "", + "metadata": [], + "sum": { + "dataPoints": [ + { + "attributes": [], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ], + "aggregationTemporality": 2, + "isMonotonic": true + } + }, + { + "name": "observable_up_down_counter", + "description": "Example of a ObservableUpDownCounter", + "unit": "", + "metadata": [], + "sum": { + "dataPoints": [ + { + "attributes": [], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ], + "aggregationTemporality": 2, + "isMonotonic": false + } + }, + { + "name": "observable_gauge", + "description": "Example of a ObservableGauge", + "unit": "", + "metadata": [], + "gauge": { + "dataPoints": [ + { + "attributes": [], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ] + } + }, + { + "name": "counter", + "description": "Example of a Counter", + "unit": "", + "metadata": [], + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "attribute", + "value": { + "doubleValue": 1 + } + } + ], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ], + "aggregationTemporality": 2, + "isMonotonic": true + } + }, + { + "name": "up_down_counter", + "description": "Example of a UpDownCounter", + "unit": "", + "metadata": [], + "sum": { + "dataPoints": [ + { + "attributes": [ + { + "key": "attribute", + "value": { + "doubleValue": 1 + } + } + ], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": -1 + } + ], + "aggregationTemporality": 2, + "isMonotonic": false + } + }, + { + "name": "gauge", + "description": "Example of a Gauge", + "unit": "", + "metadata": [], + "gauge": { + "dataPoints": [ + { + "attributes": [ + { + "key": "attribute", + "value": { + "doubleValue": 1 + } + } + ], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ] + } + }, + { + "name": "histogram", + "description": "Example of a Histogram", + "unit": "", + "metadata": [], + "histogram": { + "dataPoints": [ + { + "attributes": [ + { + "key": "attribute", + "value": { + "doubleValue": 1 + } + } + ], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "count": 1, + "sum": 1, + "bucketCounts": [ + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0 + ], + "explicitBounds": [ + 0, + 5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + 750, + 1000, + 2500, + 5000, + 7500, + 10000 + ], + "exemplars": [], + "flags": 0, + "min": 1, + "max": 1 + } + ], + "aggregationTemporality": 2 + } + }, + { + "name": "observable_counter", + "description": "Example of a ObservableCounter", + "unit": "", + "metadata": [], + "sum": { + "dataPoints": [ + { + "attributes": [], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ], + "aggregationTemporality": 2, + "isMonotonic": true + } + }, + { + "name": "observable_up_down_counter", + "description": "Example of a ObservableUpDownCounter", + "unit": "", + "metadata": [], + "sum": { + "dataPoints": [ + { + "attributes": [], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ], + "aggregationTemporality": 2, + "isMonotonic": false + } + }, + { + "name": "observable_gauge", + "description": "Example of a ObservableGauge", + "unit": "", + "metadata": [], + "gauge": { + "dataPoints": [ + { + "attributes": [], + "startTimeUnixNano": "[WILDCARD]", + "timeUnixNano": "[WILDCARD]", + "exemplars": [], + "flags": 0, + "asDouble": 1 + } + ] + } } ] } diff --git a/tests/specs/cli/otel_basic/metric.ts b/tests/specs/cli/otel_basic/metric.ts index 7d332f0432..2b472a6fb8 100644 --- a/tests/specs/cli/otel_basic/metric.ts +++ b/tests/specs/cli/otel_basic/metric.ts @@ -1,18 +1,8 @@ -import { - MeterProvider, - PeriodicExportingMetricReader, -} from "npm:@opentelemetry/sdk-metrics@1.28.0"; +import { metrics } from "npm:@opentelemetry/api@1"; -const meterProvider = new MeterProvider(); +metrics.setGlobalMeterProvider(new Deno.telemetry.MeterProvider()); -meterProvider.addMetricReader( - new PeriodicExportingMetricReader({ - exporter: new Deno.telemetry.MetricExporter(), - exportIntervalMillis: 100, - }), -); - -const meter = meterProvider.getMeter("m"); +const meter = metrics.getMeter("m"); const counter = meter.createCounter("counter", { description: "Example of a Counter", @@ -22,13 +12,82 @@ const upDownCounter = meter.createUpDownCounter("up_down_counter", { description: "Example of a UpDownCounter", }); +const gauge = meter.createGauge("gauge", { + description: "Example of a Gauge", +}); + const histogram = meter.createHistogram("histogram", { description: "Example of a Histogram", }); +const observableCounterPromise = Promise.withResolvers(); +const observableCounter = meter.createObservableCounter("observable_counter", { + description: "Example of a ObservableCounter", +}); +observableCounter.addCallback((res) => { + res.observe(1); + observableCounterPromise.resolve(); +}); + +const observableUpDownCounterPromise = Promise.withResolvers(); +const observableUpDownCounter = meter + .createObservableUpDownCounter("observable_up_down_counter", { + description: "Example of a ObservableUpDownCounter", + }); +observableUpDownCounter.addCallback((res) => { + res.observe(1); + observableUpDownCounterPromise.resolve(); +}); + +const observableGaugePromise = Promise.withResolvers(); +const observableGauge = meter.createObservableGauge("observable_gauge", { + description: "Example of a ObservableGauge", +}); +observableGauge.addCallback((res) => { + res.observe(1); + observableGaugePromise.resolve(); +}); + +const observableCounterBatch = meter.createObservableCounter( + "observable_counter_batch", + { description: "Example of a ObservableCounter, written in batch" }, +); +const observableUpDownCounterBatch = meter.createObservableUpDownCounter( + "observable_up_down_counter_batch", + { description: "Example of a ObservableUpDownCounter, written in batch" }, +); +const observableGaugeBatch = meter.createObservableGauge( + "observable_gauge_batch", + { + description: "Example of a ObservableGauge, written in batch", + }, +); + +const observableBatchPromise = Promise.withResolvers(); +meter.addBatchObservableCallback((observer) => { + observer.observe(observableCounter, 2); + observer.observe(observableUpDownCounter, 2); + observer.observe(observableGauge, 2); + observableBatchPromise.resolve(); +}, [ + observableCounterBatch, + observableUpDownCounterBatch, + observableGaugeBatch, +]); + const attributes = { attribute: 1 }; counter.add(1, attributes); upDownCounter.add(-1, attributes); +gauge.record(1, attributes); histogram.record(1, attributes); -await meterProvider.forceFlush(); +const timer = setTimeout(() => {}, 100000); + +await Promise.all([ + observableCounterPromise.promise, + observableUpDownCounterPromise.promise, + observableGaugePromise.promise, + observableBatchPromise.promise, +]); + +clearTimeout(timer);