1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00
denoland-deno/ext/telemetry/lib.rs
Leo Kettmeir ea30e188a8
refactor: update deno_core for error refactor (#26867)
Closes #26171

---------

Co-authored-by: David Sherret <dsherret@gmail.com>
2025-01-08 14:52:32 -08:00

2077 lines
59 KiB
Rust

// Copyright 2018-2025 the Deno authors. MIT license.
#![allow(clippy::too_many_arguments)]
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::env;
use std::fmt::Debug;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use std::thread;
use std::time::Duration;
use std::time::SystemTime;
use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::mpsc::UnboundedSender;
use deno_core::futures::future::BoxFuture;
use deno_core::futures::stream;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::v8;
use deno_core::v8::DataError;
use deno_core::GarbageCollected;
use deno_core::OpState;
use deno_error::JsError;
use deno_error::JsErrorBox;
use once_cell::sync::Lazy;
use once_cell::sync::OnceCell;
use opentelemetry::logs::AnyValue;
use opentelemetry::logs::LogRecord as LogRecordTrait;
use opentelemetry::logs::Severity;
use opentelemetry::metrics::AsyncInstrumentBuilder;
use opentelemetry::metrics::InstrumentBuilder;
use opentelemetry::metrics::MeterProvider as _;
use opentelemetry::otel_debug;
use opentelemetry::otel_error;
use opentelemetry::trace::SpanContext;
use opentelemetry::trace::SpanId;
use opentelemetry::trace::SpanKind;
use opentelemetry::trace::Status as SpanStatus;
use opentelemetry::trace::TraceFlags;
use opentelemetry::trace::TraceId;
use opentelemetry::trace::TraceState;
use opentelemetry::InstrumentationScope;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry::StringValue;
use opentelemetry::Value;
use opentelemetry_otlp::HttpExporterBuilder;
use opentelemetry_otlp::Protocol;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithHttpConfig;
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::logs::BatchLogProcessor;
use opentelemetry_sdk::logs::LogProcessor;
use opentelemetry_sdk::logs::LogRecord;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use opentelemetry_sdk::metrics::reader::MetricReader;
use opentelemetry_sdk::metrics::ManualReader;
use opentelemetry_sdk::metrics::MetricResult;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::trace::BatchSpanProcessor;
use opentelemetry_sdk::trace::IdGenerator;
use opentelemetry_sdk::trace::RandomIdGenerator;
use opentelemetry_sdk::trace::SpanEvents;
use opentelemetry_sdk::trace::SpanLinks;
use opentelemetry_sdk::trace::SpanProcessor as _;
use opentelemetry_sdk::Resource;
use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_NAME;
use opentelemetry_semantic_conventions::resource::PROCESS_RUNTIME_VERSION;
use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_LANGUAGE;
use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_NAME;
use opentelemetry_semantic_conventions::resource::TELEMETRY_SDK_VERSION;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::task::JoinSet;
deno_core::extension!(
deno_telemetry,
ops = [
op_otel_log,
op_otel_log_foreign,
op_otel_span_attribute1,
op_otel_span_attribute2,
op_otel_span_attribute3,
op_otel_span_update_name,
op_otel_metric_attribute3,
op_otel_metric_record0,
op_otel_metric_record1,
op_otel_metric_record2,
op_otel_metric_record3,
op_otel_metric_observable_record0,
op_otel_metric_observable_record1,
op_otel_metric_observable_record2,
op_otel_metric_observable_record3,
op_otel_metric_wait_to_observe,
op_otel_metric_observation_done,
],
objects = [OtelTracer, OtelMeter, OtelSpan],
esm = ["telemetry.ts", "util.ts"],
);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OtelRuntimeConfig {
pub runtime_name: Cow<'static, str>,
pub runtime_version: Cow<'static, str>,
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct OtelConfig {
pub tracing_enabled: bool,
pub metrics_enabled: bool,
pub console: OtelConsoleConfig,
pub deterministic: bool,
}
impl OtelConfig {
pub fn as_v8(&self) -> Box<[u8]> {
Box::new([
self.tracing_enabled as u8,
self.metrics_enabled as u8,
self.console as u8,
self.deterministic as u8,
])
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[repr(u8)]
pub enum OtelConsoleConfig {
Ignore = 0,
Capture = 1,
Replace = 2,
}
impl Default for OtelConsoleConfig {
fn default() -> Self {
Self::Ignore
}
}
static OTEL_SHARED_RUNTIME_SPAWN_TASK_TX: Lazy<
UnboundedSender<BoxFuture<'static, ()>>,
> = Lazy::new(otel_create_shared_runtime);
static OTEL_PRE_COLLECT_CALLBACKS: Lazy<
Mutex<Vec<oneshot::Sender<oneshot::Sender<()>>>>,
> = Lazy::new(Default::default);
fn otel_create_shared_runtime() -> UnboundedSender<BoxFuture<'static, ()>> {
let (spawn_task_tx, mut spawn_task_rx) =
mpsc::unbounded::<BoxFuture<'static, ()>>();
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
// This limits the number of threads for blocking operations (like for
// synchronous fs ops) or CPU bound tasks like when we run dprint in
// parallel for deno fmt.
// The default value is 512, which is an unhelpfully large thread pool. We
// don't ever want to have more than a couple dozen threads.
.max_blocking_threads(if cfg!(windows) {
// on windows, tokio uses blocking tasks for child process IO, make sure
// we have enough available threads for other tasks to run
4 * std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(8)
} else {
32
})
.build()
.unwrap();
rt.block_on(async move {
while let Some(task) = spawn_task_rx.next().await {
tokio::spawn(task);
}
});
});
spawn_task_tx
}
#[derive(Clone, Copy)]
struct OtelSharedRuntime;
impl hyper::rt::Executor<BoxFuture<'static, ()>> for OtelSharedRuntime {
fn execute(&self, fut: BoxFuture<'static, ()>) {
(*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
.unbounded_send(fut)
.expect("failed to send task to shared OpenTelemetry runtime");
}
}
impl opentelemetry_sdk::runtime::Runtime for OtelSharedRuntime {
type Interval = Pin<Box<dyn Stream<Item = ()> + Send + 'static>>;
type Delay = Pin<Box<tokio::time::Sleep>>;
fn interval(&self, period: Duration) -> Self::Interval {
stream::repeat(())
.then(move |_| tokio::time::sleep(period))
.boxed()
}
fn spawn(&self, future: BoxFuture<'static, ()>) {
(*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
.unbounded_send(future)
.expect("failed to send task to shared OpenTelemetry runtime");
}
fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
}
}
impl opentelemetry_sdk::runtime::RuntimeChannel for OtelSharedRuntime {
type Receiver<T: Debug + Send> = BatchMessageChannelReceiver<T>;
type Sender<T: Debug + Send> = BatchMessageChannelSender<T>;
fn batch_message_channel<T: Debug + Send>(
&self,
capacity: usize,
) -> (Self::Sender<T>, Self::Receiver<T>) {
let (batch_tx, batch_rx) = tokio::sync::mpsc::channel::<T>(capacity);
(batch_tx.into(), batch_rx.into())
}
}
#[derive(Debug)]
pub struct BatchMessageChannelSender<T: Send> {
sender: tokio::sync::mpsc::Sender<T>,
}
impl<T: Send> From<tokio::sync::mpsc::Sender<T>>
for BatchMessageChannelSender<T>
{
fn from(sender: tokio::sync::mpsc::Sender<T>) -> Self {
Self { sender }
}
}
impl<T: Send> opentelemetry_sdk::runtime::TrySend
for BatchMessageChannelSender<T>
{
type Message = T;
fn try_send(
&self,
item: Self::Message,
) -> Result<(), opentelemetry_sdk::runtime::TrySendError> {
self.sender.try_send(item).map_err(|err| match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => {
opentelemetry_sdk::runtime::TrySendError::ChannelFull
}
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
opentelemetry_sdk::runtime::TrySendError::ChannelClosed
}
})
}
}
pub struct BatchMessageChannelReceiver<T> {
receiver: tokio::sync::mpsc::Receiver<T>,
}
impl<T> From<tokio::sync::mpsc::Receiver<T>>
for BatchMessageChannelReceiver<T>
{
fn from(receiver: tokio::sync::mpsc::Receiver<T>) -> Self {
Self { receiver }
}
}
impl<T> Stream for BatchMessageChannelReceiver<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
enum DenoPeriodicReaderMessage {
Register(std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>),
Export,
ForceFlush(oneshot::Sender<MetricResult<()>>),
Shutdown(oneshot::Sender<MetricResult<()>>),
}
#[derive(Debug)]
struct DenoPeriodicReader {
tx: tokio::sync::mpsc::Sender<DenoPeriodicReaderMessage>,
temporality: Temporality,
}
impl MetricReader for DenoPeriodicReader {
fn register_pipeline(
&self,
pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>,
) {
let _ = self
.tx
.try_send(DenoPeriodicReaderMessage::Register(pipeline));
}
fn collect(
&self,
_rm: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
) -> opentelemetry_sdk::metrics::MetricResult<()> {
unreachable!("collect should not be called on DenoPeriodicReader");
}
fn force_flush(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.try_send(DenoPeriodicReaderMessage::ForceFlush(tx));
deno_core::futures::executor::block_on(rx).unwrap()?;
Ok(())
}
fn shutdown(&self) -> opentelemetry_sdk::metrics::MetricResult<()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.try_send(DenoPeriodicReaderMessage::Shutdown(tx));
deno_core::futures::executor::block_on(rx).unwrap()?;
Ok(())
}
fn temporality(
&self,
_kind: opentelemetry_sdk::metrics::InstrumentKind,
) -> Temporality {
self.temporality
}
}
const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
impl DenoPeriodicReader {
fn new(exporter: opentelemetry_otlp::MetricExporter) -> Self {
let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_INTERVAL);
let (tx, mut rx) = tokio::sync::mpsc::channel(256);
let temporality = PushMetricExporter::temporality(&exporter);
let worker = async move {
let inner = ManualReader::builder()
.with_temporality(PushMetricExporter::temporality(&exporter))
.build();
let collect_and_export = |collect_observed: bool| {
let inner = &inner;
let exporter = &exporter;
async move {
let mut resource_metrics =
opentelemetry_sdk::metrics::data::ResourceMetrics {
resource: Default::default(),
scope_metrics: Default::default(),
};
if collect_observed {
let callbacks = {
let mut callbacks = OTEL_PRE_COLLECT_CALLBACKS.lock().unwrap();
std::mem::take(&mut *callbacks)
};
let mut futures = JoinSet::new();
for callback in callbacks {
let (tx, rx) = oneshot::channel();
if let Ok(()) = callback.send(tx) {
futures.spawn(rx);
}
}
while futures.join_next().await.is_some() {}
}
inner.collect(&mut resource_metrics)?;
if resource_metrics.scope_metrics.is_empty() {
return Ok(());
}
exporter.export(&mut resource_metrics).await?;
Ok(())
}
};
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
let message = tokio::select! {
_ = ticker.tick() => DenoPeriodicReaderMessage::Export,
message = rx.recv() => if let Some(message) = message {
message
} else {
break;
},
};
match message {
DenoPeriodicReaderMessage::Register(new_pipeline) => {
inner.register_pipeline(new_pipeline);
}
DenoPeriodicReaderMessage::Export => {
otel_debug!(
name: "DenoPeriodicReader.ExportTriggered",
message = "Export message received.",
);
if let Err(err) = collect_and_export(true).await {
otel_error!(
name: "DenoPeriodicReader.ExportFailed",
message = "Failed to export metrics",
reason = format!("{}", err));
}
}
DenoPeriodicReaderMessage::ForceFlush(sender) => {
otel_debug!(
name: "DenoPeriodicReader.ForceFlushCalled",
message = "Flush message received.",
);
let res = collect_and_export(false).await;
if let Err(send_error) = sender.send(res) {
otel_debug!(
name: "DenoPeriodicReader.Flush.SendResultError",
message = "Failed to send flush result.",
reason = format!("{:?}", send_error),
);
}
}
DenoPeriodicReaderMessage::Shutdown(sender) => {
otel_debug!(
name: "DenoPeriodicReader.ShutdownCalled",
message = "Shutdown message received",
);
let res = collect_and_export(false).await;
let _ = exporter.shutdown();
if let Err(send_error) = sender.send(res) {
otel_debug!(
name: "DenoPeriodicReader.Shutdown.SendResultError",
message = "Failed to send shutdown result",
reason = format!("{:?}", send_error),
);
}
break;
}
}
}
};
(*OTEL_SHARED_RUNTIME_SPAWN_TASK_TX)
.unbounded_send(worker.boxed())
.expect("failed to send task to shared OpenTelemetry runtime");
DenoPeriodicReader { tx, temporality }
}
}
mod hyper_client {
use std::fmt::Debug;
use std::pin::Pin;
use std::task::Poll;
use std::task::{self};
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::body::Body as HttpBody;
use hyper::body::Frame;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client;
use opentelemetry_http::Bytes;
use opentelemetry_http::HttpError;
use opentelemetry_http::Request;
use opentelemetry_http::Response;
use opentelemetry_http::ResponseExt;
use super::OtelSharedRuntime;
// same as opentelemetry_http::HyperClient except it uses OtelSharedRuntime
#[derive(Debug, Clone)]
pub struct HyperClient {
inner: Client<HttpConnector, Body>,
}
impl HyperClient {
pub fn new() -> Self {
Self {
inner: Client::builder(OtelSharedRuntime).build(HttpConnector::new()),
}
}
}
#[async_trait::async_trait]
impl opentelemetry_http::HttpClient for HyperClient {
async fn send(
&self,
request: Request<Vec<u8>>,
) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let request = Request::from_parts(parts, Body(Full::from(body)));
let mut response = self.inner.request(request).await?;
let headers = std::mem::take(response.headers_mut());
let mut http_response = Response::builder()
.status(response.status())
.body(response.into_body().collect().await?.to_bytes())?;
*http_response.headers_mut() = headers;
Ok(http_response.error_for_status()?)
}
}
#[pin_project::pin_project]
pub struct Body(#[pin] Full<Bytes>);
impl HttpBody for Body {
type Data = Bytes;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project().0.poll_frame(cx).map_err(Into::into)
}
#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.0.size_hint()
}
}
}
struct OtelGlobals {
span_processor: BatchSpanProcessor<OtelSharedRuntime>,
log_processor: BatchLogProcessor<OtelSharedRuntime>,
id_generator: DenoIdGenerator,
meter_provider: SdkMeterProvider,
builtin_instrumentation_scope: InstrumentationScope,
}
static OTEL_GLOBALS: OnceCell<OtelGlobals> = OnceCell::new();
pub fn init(
rt_config: OtelRuntimeConfig,
config: &OtelConfig,
) -> deno_core::anyhow::Result<()> {
// Parse the `OTEL_EXPORTER_OTLP_PROTOCOL` variable. The opentelemetry_*
// crates don't do this automatically.
// TODO(piscisaureus): enable GRPC support.
let protocol = match env::var("OTEL_EXPORTER_OTLP_PROTOCOL").as_deref() {
Ok("http/protobuf") => Protocol::HttpBinary,
Ok("http/json") => Protocol::HttpJson,
Ok("") | Err(env::VarError::NotPresent) => Protocol::HttpBinary,
Ok(protocol) => {
return Err(deno_core::anyhow::anyhow!(
"Env var OTEL_EXPORTER_OTLP_PROTOCOL specifies an unsupported protocol: {}",
protocol
));
}
Err(err) => {
return Err(deno_core::anyhow::anyhow!(
"Failed to read env var OTEL_EXPORTER_OTLP_PROTOCOL: {}",
err
));
}
};
// Define the resource attributes that will be attached to all log records.
// These attributes are sourced as follows (in order of precedence):
// * The `service.name` attribute from the `OTEL_SERVICE_NAME` env var.
// * Additional attributes from the `OTEL_RESOURCE_ATTRIBUTES` env var.
// * Default attribute values defined here.
// TODO(piscisaureus): add more default attributes (e.g. script path).
let mut resource = Resource::default();
// Add the runtime name and version to the resource attributes. Also override
// the `telemetry.sdk` attributes to include the Deno runtime.
resource = resource.merge(&Resource::new(vec![
KeyValue::new(PROCESS_RUNTIME_NAME, rt_config.runtime_name),
KeyValue::new(PROCESS_RUNTIME_VERSION, rt_config.runtime_version.clone()),
KeyValue::new(
TELEMETRY_SDK_LANGUAGE,
format!(
"deno-{}",
resource.get(Key::new(TELEMETRY_SDK_LANGUAGE)).unwrap()
),
),
KeyValue::new(
TELEMETRY_SDK_NAME,
format!(
"deno-{}",
resource.get(Key::new(TELEMETRY_SDK_NAME)).unwrap()
),
),
KeyValue::new(
TELEMETRY_SDK_VERSION,
format!(
"{}-{}",
rt_config.runtime_version,
resource.get(Key::new(TELEMETRY_SDK_VERSION)).unwrap()
),
),
]));
// The OTLP endpoint is automatically picked up from the
// `OTEL_EXPORTER_OTLP_ENDPOINT` environment variable. Additional headers can
// be specified using `OTEL_EXPORTER_OTLP_HEADERS`.
let client = hyper_client::HyperClient::new();
let span_exporter = HttpExporterBuilder::default()
.with_http_client(client.clone())
.with_protocol(protocol)
.build_span_exporter()?;
let mut span_processor =
BatchSpanProcessor::builder(span_exporter, OtelSharedRuntime).build();
span_processor.set_resource(&resource);
let temporality_preference =
env::var("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE")
.ok()
.map(|s| s.to_lowercase());
let temporality = match temporality_preference.as_deref() {
None | Some("cumulative") => Temporality::Cumulative,
Some("delta") => Temporality::Delta,
Some("lowmemory") => Temporality::LowMemory,
Some(other) => {
return Err(deno_core::anyhow::anyhow!(
"Invalid value for OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: {}",
other
));
}
};
let metric_exporter = HttpExporterBuilder::default()
.with_http_client(client.clone())
.with_protocol(protocol)
.build_metrics_exporter(temporality)?;
let metric_reader = DenoPeriodicReader::new(metric_exporter);
let meter_provider = SdkMeterProvider::builder()
.with_reader(metric_reader)
.with_resource(resource.clone())
.build();
let log_exporter = HttpExporterBuilder::default()
.with_http_client(client)
.with_protocol(protocol)
.build_log_exporter()?;
let log_processor =
BatchLogProcessor::builder(log_exporter, OtelSharedRuntime).build();
log_processor.set_resource(&resource);
let builtin_instrumentation_scope =
opentelemetry::InstrumentationScope::builder("deno")
.with_version(rt_config.runtime_version.clone())
.build();
let id_generator = if config.deterministic {
DenoIdGenerator::deterministic()
} else {
DenoIdGenerator::random()
};
OTEL_GLOBALS
.set(OtelGlobals {
log_processor,
span_processor,
id_generator,
meter_provider,
builtin_instrumentation_scope,
})
.map_err(|_| deno_core::anyhow::anyhow!("failed to set otel globals"))?;
Ok(())
}
/// This function is called by the runtime whenever it is about to call
/// `process::exit()`, to ensure that all OpenTelemetry logs are properly
/// flushed before the process terminates.
pub fn flush() {
if let Some(OtelGlobals {
span_processor: spans,
log_processor: logs,
meter_provider,
..
}) = OTEL_GLOBALS.get()
{
let _ = spans.force_flush();
let _ = logs.force_flush();
let _ = meter_provider.force_flush();
}
}
pub fn handle_log(record: &log::Record) {
use log::Level;
let Some(OtelGlobals {
log_processor: logs,
builtin_instrumentation_scope,
..
}) = OTEL_GLOBALS.get()
else {
return;
};
let mut log_record = LogRecord::default();
log_record.set_observed_timestamp(SystemTime::now());
log_record.set_severity_number(match record.level() {
Level::Error => Severity::Error,
Level::Warn => Severity::Warn,
Level::Info => Severity::Info,
Level::Debug => Severity::Debug,
Level::Trace => Severity::Trace,
});
log_record.set_severity_text(record.level().as_str());
log_record.set_body(record.args().to_string().into());
log_record.set_target(record.metadata().target().to_string());
struct Visitor<'s>(&'s mut LogRecord);
impl<'s, 'kvs> log::kv::VisitSource<'kvs> for Visitor<'s> {
fn visit_pair(
&mut self,
key: log::kv::Key<'kvs>,
value: log::kv::Value<'kvs>,
) -> Result<(), log::kv::Error> {
#[allow(clippy::manual_map)]
let value = if let Some(v) = value.to_bool() {
Some(AnyValue::Boolean(v))
} else if let Some(v) = value.to_borrowed_str() {
Some(AnyValue::String(v.to_owned().into()))
} else if let Some(v) = value.to_f64() {
Some(AnyValue::Double(v))
} else if let Some(v) = value.to_i64() {
Some(AnyValue::Int(v))
} else {
None
};
if let Some(value) = value {
let key = Key::from(key.as_str().to_owned());
self.0.add_attribute(key, value);
}
Ok(())
}
}
let _ = record.key_values().visit(&mut Visitor(&mut log_record));
logs.emit(&mut log_record, builtin_instrumentation_scope);
}
#[derive(Debug)]
enum DenoIdGenerator {
Random(RandomIdGenerator),
Deterministic {
next_trace_id: AtomicU64,
next_span_id: AtomicU64,
},
}
impl IdGenerator for DenoIdGenerator {
fn new_trace_id(&self) -> TraceId {
match self {
Self::Random(generator) => generator.new_trace_id(),
Self::Deterministic { next_trace_id, .. } => {
let id =
next_trace_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let bytes = id.to_be_bytes();
let bytes = [
0, 0, 0, 0, 0, 0, 0, 0, bytes[0], bytes[1], bytes[2], bytes[3],
bytes[4], bytes[5], bytes[6], bytes[7],
];
TraceId::from_bytes(bytes)
}
}
}
fn new_span_id(&self) -> SpanId {
match self {
Self::Random(generator) => generator.new_span_id(),
Self::Deterministic { next_span_id, .. } => {
let id =
next_span_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
SpanId::from_bytes(id.to_be_bytes())
}
}
}
}
impl DenoIdGenerator {
fn random() -> Self {
Self::Random(RandomIdGenerator::default())
}
fn deterministic() -> Self {
Self::Deterministic {
next_trace_id: AtomicU64::new(1),
next_span_id: AtomicU64::new(1),
}
}
}
fn parse_trace_id(
scope: &mut v8::HandleScope<'_>,
trace_id: v8::Local<'_, v8::Value>,
) -> TraceId {
if let Ok(string) = trace_id.try_cast() {
let value_view = v8::ValueView::new(scope, string);
match value_view.data() {
v8::ValueViewData::OneByte(bytes) => {
TraceId::from_hex(&String::from_utf8_lossy(bytes))
.unwrap_or(TraceId::INVALID)
}
_ => TraceId::INVALID,
}
} else if let Ok(uint8array) = trace_id.try_cast::<v8::Uint8Array>() {
let data = uint8array.data();
let byte_length = uint8array.byte_length();
if byte_length != 16 {
return TraceId::INVALID;
}
// SAFETY: We have ensured that the byte length is 16, so it is safe to
// cast the data to an array of 16 bytes.
let bytes = unsafe { &*(data as *const u8 as *const [u8; 16]) };
TraceId::from_bytes(*bytes)
} else {
TraceId::INVALID
}
}
fn parse_span_id(
scope: &mut v8::HandleScope<'_>,
span_id: v8::Local<'_, v8::Value>,
) -> SpanId {
if let Ok(string) = span_id.try_cast() {
let value_view = v8::ValueView::new(scope, string);
match value_view.data() {
v8::ValueViewData::OneByte(bytes) => {
SpanId::from_hex(&String::from_utf8_lossy(bytes))
.unwrap_or(SpanId::INVALID)
}
_ => SpanId::INVALID,
}
} else if let Ok(uint8array) = span_id.try_cast::<v8::Uint8Array>() {
let data = uint8array.data();
let byte_length = uint8array.byte_length();
if byte_length != 8 {
return SpanId::INVALID;
}
// SAFETY: We have ensured that the byte length is 8, so it is safe to
// cast the data to an array of 8 bytes.
let bytes = unsafe { &*(data as *const u8 as *const [u8; 8]) };
SpanId::from_bytes(*bytes)
} else {
SpanId::INVALID
}
}
macro_rules! attr_raw {
($scope:ident, $name:expr, $value:expr) => {{
let name = if let Ok(name) = $name.try_cast() {
let view = v8::ValueView::new($scope, name);
match view.data() {
v8::ValueViewData::OneByte(bytes) => {
Some(String::from_utf8_lossy(bytes).into_owned())
}
v8::ValueViewData::TwoByte(bytes) => {
Some(String::from_utf16_lossy(bytes))
}
}
} else {
None
};
let value = if let Ok(string) = $value.try_cast::<v8::String>() {
Some(Value::String(StringValue::from({
let x = v8::ValueView::new($scope, string);
match x.data() {
v8::ValueViewData::OneByte(bytes) => {
String::from_utf8_lossy(bytes).into_owned()
}
v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
}
})))
} else if let Ok(number) = $value.try_cast::<v8::Number>() {
Some(Value::F64(number.value()))
} else if let Ok(boolean) = $value.try_cast::<v8::Boolean>() {
Some(Value::Bool(boolean.is_true()))
} else if let Ok(bigint) = $value.try_cast::<v8::BigInt>() {
let (i64_value, _lossless) = bigint.i64_value();
Some(Value::I64(i64_value))
} else if let Ok(_array) = $value.try_cast::<v8::Array>() {
// TODO: implement array attributes
None
} else {
None
};
if let (Some(name), Some(value)) = (name, value) {
Some(KeyValue::new(name, value))
} else {
None
}
}};
}
macro_rules! attr {
($scope:ident, $attributes:expr $(=> $dropped_attributes_count:expr)?, $name:expr, $value:expr) => {
let attr = attr_raw!($scope, $name, $value);
if let Some(kv) = attr {
$attributes.push(kv);
}
$(
else {
$dropped_attributes_count += 1;
}
)?
};
}
#[op2(fast)]
fn op_otel_log<'s>(
scope: &mut v8::HandleScope<'s>,
message: v8::Local<'s, v8::Value>,
#[smi] level: i32,
span: v8::Local<'s, v8::Value>,
) {
let Some(OtelGlobals {
log_processor,
builtin_instrumentation_scope,
..
}) = OTEL_GLOBALS.get()
else {
return;
};
// Convert the integer log level that ext/console uses to the corresponding
// OpenTelemetry log severity.
let severity = match level {
..=0 => Severity::Debug,
1 => Severity::Info,
2 => Severity::Warn,
3.. => Severity::Error,
};
let mut log_record = LogRecord::default();
log_record.set_observed_timestamp(SystemTime::now());
let Ok(message) = message.try_cast() else {
return;
};
log_record.set_body(owned_string(scope, message).into());
log_record.set_severity_number(severity);
log_record.set_severity_text(severity.name());
if let Some(span) =
deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
{
let state = span.0.borrow();
match &**state {
OtelSpanState::Recording(span) => {
log_record.set_trace_context(
span.span_context.trace_id(),
span.span_context.span_id(),
Some(span.span_context.trace_flags()),
);
}
OtelSpanState::Done(span_context) => {
log_record.set_trace_context(
span_context.trace_id(),
span_context.span_id(),
Some(span_context.trace_flags()),
);
}
}
}
log_processor.emit(&mut log_record, builtin_instrumentation_scope);
}
#[op2(fast)]
fn op_otel_log_foreign(
scope: &mut v8::HandleScope<'_>,
#[string] message: String,
#[smi] level: i32,
trace_id: v8::Local<'_, v8::Value>,
span_id: v8::Local<'_, v8::Value>,
#[smi] trace_flags: u8,
) {
let Some(OtelGlobals {
log_processor,
builtin_instrumentation_scope,
..
}) = OTEL_GLOBALS.get()
else {
return;
};
// Convert the integer log level that ext/console uses to the corresponding
// OpenTelemetry log severity.
let severity = match level {
..=0 => Severity::Debug,
1 => Severity::Info,
2 => Severity::Warn,
3.. => Severity::Error,
};
let trace_id = parse_trace_id(scope, trace_id);
let span_id = parse_span_id(scope, span_id);
let mut log_record = LogRecord::default();
log_record.set_observed_timestamp(SystemTime::now());
log_record.set_body(message.into());
log_record.set_severity_number(severity);
log_record.set_severity_text(severity.name());
if trace_id != TraceId::INVALID && span_id != SpanId::INVALID {
log_record.set_trace_context(
trace_id,
span_id,
Some(TraceFlags::new(trace_flags)),
);
}
log_processor.emit(&mut log_record, builtin_instrumentation_scope);
}
fn owned_string<'s>(
scope: &mut v8::HandleScope<'s>,
string: v8::Local<'s, v8::String>,
) -> String {
let x = v8::ValueView::new(scope, string);
match x.data() {
v8::ValueViewData::OneByte(bytes) => {
String::from_utf8_lossy(bytes).into_owned()
}
v8::ValueViewData::TwoByte(bytes) => String::from_utf16_lossy(bytes),
}
}
struct OtelTracer(InstrumentationScope);
impl deno_core::GarbageCollected for OtelTracer {}
#[op2]
impl OtelTracer {
#[constructor]
#[cppgc]
fn new(
#[string] name: String,
#[string] version: Option<String>,
#[string] schema_url: Option<String>,
) -> OtelTracer {
let mut builder = opentelemetry::InstrumentationScope::builder(name);
if let Some(version) = version {
builder = builder.with_version(version);
}
if let Some(schema_url) = schema_url {
builder = builder.with_schema_url(schema_url);
}
let scope = builder.build();
OtelTracer(scope)
}
#[static_method]
#[cppgc]
fn builtin() -> OtelTracer {
let OtelGlobals {
builtin_instrumentation_scope,
..
} = OTEL_GLOBALS.get().unwrap();
OtelTracer(builtin_instrumentation_scope.clone())
}
#[cppgc]
fn start_span<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
#[cppgc] parent: Option<&OtelSpan>,
name: v8::Local<'s, v8::Value>,
#[smi] span_kind: u8,
start_time: Option<f64>,
#[smi] attribute_count: usize,
) -> Result<OtelSpan, JsErrorBox> {
let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
let span_context;
let parent_span_id;
match parent {
Some(parent) => {
let parent = parent.0.borrow();
let parent_span_context = match &**parent {
OtelSpanState::Recording(span) => &span.span_context,
OtelSpanState::Done(span_context) => span_context,
};
span_context = SpanContext::new(
parent_span_context.trace_id(),
id_generator.new_span_id(),
TraceFlags::SAMPLED,
false,
parent_span_context.trace_state().clone(),
);
parent_span_id = parent_span_context.span_id();
}
None => {
span_context = SpanContext::new(
id_generator.new_trace_id(),
id_generator.new_span_id(),
TraceFlags::SAMPLED,
false,
TraceState::NONE,
);
parent_span_id = SpanId::INVALID;
}
}
let name = owned_string(
scope,
name
.try_cast()
.map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
);
let span_kind = match span_kind {
0 => SpanKind::Internal,
1 => SpanKind::Server,
2 => SpanKind::Client,
3 => SpanKind::Producer,
4 => SpanKind::Consumer,
_ => return Err(JsErrorBox::generic("invalid span kind")),
};
let start_time = start_time
.map(|start_time| {
SystemTime::UNIX_EPOCH
.checked_add(std::time::Duration::from_secs_f64(start_time))
.ok_or_else(|| JsErrorBox::generic("invalid start time"))
})
.unwrap_or_else(|| Ok(SystemTime::now()))?;
let span_data = SpanData {
span_context,
parent_span_id,
span_kind,
name: Cow::Owned(name),
start_time,
end_time: SystemTime::UNIX_EPOCH,
attributes: Vec::with_capacity(attribute_count),
dropped_attributes_count: 0,
status: SpanStatus::Unset,
events: SpanEvents::default(),
links: SpanLinks::default(),
instrumentation_scope: self.0.clone(),
};
Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
span_data,
)))))
}
#[cppgc]
fn start_span_foreign<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
parent_trace_id: v8::Local<'s, v8::Value>,
parent_span_id: v8::Local<'s, v8::Value>,
name: v8::Local<'s, v8::Value>,
#[smi] span_kind: u8,
start_time: Option<f64>,
#[smi] attribute_count: usize,
) -> Result<OtelSpan, JsErrorBox> {
let parent_trace_id = parse_trace_id(scope, parent_trace_id);
if parent_trace_id == TraceId::INVALID {
return Err(JsErrorBox::generic("invalid trace id"));
};
let parent_span_id = parse_span_id(scope, parent_span_id);
if parent_span_id == SpanId::INVALID {
return Err(JsErrorBox::generic("invalid span id"));
};
let OtelGlobals { id_generator, .. } = OTEL_GLOBALS.get().unwrap();
let span_context = SpanContext::new(
parent_trace_id,
id_generator.new_span_id(),
TraceFlags::SAMPLED,
false,
TraceState::NONE,
);
let name = owned_string(
scope,
name
.try_cast()
.map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
);
let span_kind = match span_kind {
0 => SpanKind::Internal,
1 => SpanKind::Server,
2 => SpanKind::Client,
3 => SpanKind::Producer,
4 => SpanKind::Consumer,
_ => return Err(JsErrorBox::generic("invalid span kind")),
};
let start_time = start_time
.map(|start_time| {
SystemTime::UNIX_EPOCH
.checked_add(std::time::Duration::from_secs_f64(start_time))
.ok_or_else(|| JsErrorBox::generic("invalid start time"))
})
.unwrap_or_else(|| Ok(SystemTime::now()))?;
let span_data = SpanData {
span_context,
parent_span_id,
span_kind,
name: Cow::Owned(name),
start_time,
end_time: SystemTime::UNIX_EPOCH,
attributes: Vec::with_capacity(attribute_count),
dropped_attributes_count: 0,
status: SpanStatus::Unset,
events: SpanEvents::default(),
links: SpanLinks::default(),
instrumentation_scope: self.0.clone(),
};
Ok(OtelSpan(RefCell::new(Box::new(OtelSpanState::Recording(
span_data,
)))))
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct JsSpanContext {
trace_id: Box<str>,
span_id: Box<str>,
trace_flags: u8,
}
#[derive(Debug, Error, JsError)]
#[error("OtelSpan cannot be constructed.")]
#[class(type)]
struct OtelSpanCannotBeConstructedError;
#[derive(Debug, Error, JsError)]
#[error("invalid span status code")]
#[class(type)]
struct InvalidSpanStatusCodeError;
// boxed because of https://github.com/denoland/rusty_v8/issues/1676
#[derive(Debug)]
struct OtelSpan(RefCell<Box<OtelSpanState>>);
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum OtelSpanState {
Recording(SpanData),
Done(SpanContext),
}
impl deno_core::GarbageCollected for OtelSpan {}
#[op2]
impl OtelSpan {
#[constructor]
#[cppgc]
fn new() -> Result<OtelSpan, OtelSpanCannotBeConstructedError> {
Err(OtelSpanCannotBeConstructedError)
}
#[serde]
fn span_context(&self) -> JsSpanContext {
let state = self.0.borrow();
let span_context = match &**state {
OtelSpanState::Recording(span) => &span.span_context,
OtelSpanState::Done(span_context) => span_context,
};
JsSpanContext {
trace_id: format!("{:?}", span_context.trace_id()).into(),
span_id: format!("{:?}", span_context.span_id()).into(),
trace_flags: span_context.trace_flags().to_u8(),
}
}
#[fast]
fn set_status<'s>(
&self,
#[smi] status: u8,
#[string] error_description: String,
) -> Result<(), InvalidSpanStatusCodeError> {
let mut state = self.0.borrow_mut();
let OtelSpanState::Recording(span) = &mut **state else {
return Ok(());
};
span.status = match status {
0 => SpanStatus::Unset,
1 => SpanStatus::Ok,
2 => SpanStatus::Error {
description: Cow::Owned(error_description),
},
_ => return Err(InvalidSpanStatusCodeError),
};
Ok(())
}
#[fast]
fn drop_event(&self) {
let mut state = self.0.borrow_mut();
match &mut **state {
OtelSpanState::Recording(span) => {
span.events.dropped_count += 1;
}
OtelSpanState::Done(_) => {}
}
}
#[fast]
fn drop_link(&self) {
let mut state = self.0.borrow_mut();
match &mut **state {
OtelSpanState::Recording(span) => {
span.links.dropped_count += 1;
}
OtelSpanState::Done(_) => {}
}
}
#[fast]
fn end(&self, end_time: f64) {
let end_time = if end_time.is_nan() {
SystemTime::now()
} else {
SystemTime::UNIX_EPOCH
.checked_add(Duration::from_secs_f64(end_time))
.unwrap()
};
let mut state = self.0.borrow_mut();
if let OtelSpanState::Recording(span) = &mut **state {
let span_context = span.span_context.clone();
if let OtelSpanState::Recording(mut span) = *std::mem::replace(
&mut *state,
Box::new(OtelSpanState::Done(span_context)),
) {
span.end_time = end_time;
let Some(OtelGlobals { span_processor, .. }) = OTEL_GLOBALS.get()
else {
return;
};
span_processor.on_end(span);
}
}
}
}
#[op2(fast)]
fn op_otel_span_attribute1<'s>(
scope: &mut v8::HandleScope<'s>,
span: v8::Local<'_, v8::Value>,
key: v8::Local<'s, v8::Value>,
value: v8::Local<'s, v8::Value>,
) {
let Some(span) =
deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
else {
return;
};
let mut state = span.0.borrow_mut();
if let OtelSpanState::Recording(span) = &mut **state {
attr!(scope, span.attributes => span.dropped_attributes_count, key, value);
}
}
#[op2(fast)]
fn op_otel_span_attribute2<'s>(
scope: &mut v8::HandleScope<'s>,
span: v8::Local<'_, v8::Value>,
key1: v8::Local<'s, v8::Value>,
value1: v8::Local<'s, v8::Value>,
key2: v8::Local<'s, v8::Value>,
value2: v8::Local<'s, v8::Value>,
) {
let Some(span) =
deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
else {
return;
};
let mut state = span.0.borrow_mut();
if let OtelSpanState::Recording(span) = &mut **state {
attr!(scope, span.attributes => span.dropped_attributes_count, key1, value1);
attr!(scope, span.attributes => span.dropped_attributes_count, key2, value2);
}
}
#[allow(clippy::too_many_arguments)]
#[op2(fast)]
fn op_otel_span_attribute3<'s>(
scope: &mut v8::HandleScope<'s>,
span: v8::Local<'_, v8::Value>,
key1: v8::Local<'s, v8::Value>,
value1: v8::Local<'s, v8::Value>,
key2: v8::Local<'s, v8::Value>,
value2: v8::Local<'s, v8::Value>,
key3: v8::Local<'s, v8::Value>,
value3: v8::Local<'s, v8::Value>,
) {
let Some(span) =
deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
else {
return;
};
let mut state = span.0.borrow_mut();
if let OtelSpanState::Recording(span) = &mut **state {
attr!(scope, span.attributes => span.dropped_attributes_count, key1, value1);
attr!(scope, span.attributes => span.dropped_attributes_count, key2, value2);
attr!(scope, span.attributes => span.dropped_attributes_count, key3, value3);
}
}
#[op2(fast)]
fn op_otel_span_update_name<'s>(
scope: &mut v8::HandleScope<'s>,
span: v8::Local<'s, v8::Value>,
name: v8::Local<'s, v8::Value>,
) {
let Ok(name) = name.try_cast() else {
return;
};
let name = owned_string(scope, name);
let Some(span) =
deno_core::_ops::try_unwrap_cppgc_object::<OtelSpan>(scope, span)
else {
return;
};
let mut state = span.0.borrow_mut();
if let OtelSpanState::Recording(span) = &mut **state {
span.name = Cow::Owned(name)
}
}
struct OtelMeter(opentelemetry::metrics::Meter);
impl deno_core::GarbageCollected for OtelMeter {}
#[op2]
impl OtelMeter {
#[constructor]
#[cppgc]
fn new(
#[string] name: String,
#[string] version: Option<String>,
#[string] schema_url: Option<String>,
) -> OtelMeter {
let mut builder = opentelemetry::InstrumentationScope::builder(name);
if let Some(version) = version {
builder = builder.with_version(version);
}
if let Some(schema_url) = schema_url {
builder = builder.with_schema_url(schema_url);
}
let scope = builder.build();
let meter = OTEL_GLOBALS
.get()
.unwrap()
.meter_provider
.meter_with_scope(scope);
OtelMeter(meter)
}
#[cppgc]
fn create_counter<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
name: v8::Local<'s, v8::Value>,
description: v8::Local<'s, v8::Value>,
unit: v8::Local<'s, v8::Value>,
) -> Result<Instrument, JsErrorBox> {
create_instrument(
|name| self.0.f64_counter(name),
|i| Instrument::Counter(i.build()),
scope,
name,
description,
unit,
)
.map_err(|e| JsErrorBox::generic(e.to_string()))
}
#[cppgc]
fn create_up_down_counter<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
name: v8::Local<'s, v8::Value>,
description: v8::Local<'s, v8::Value>,
unit: v8::Local<'s, v8::Value>,
) -> Result<Instrument, JsErrorBox> {
create_instrument(
|name| self.0.f64_up_down_counter(name),
|i| Instrument::UpDownCounter(i.build()),
scope,
name,
description,
unit,
)
.map_err(|e| JsErrorBox::generic(e.to_string()))
}
#[cppgc]
fn create_gauge<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
name: v8::Local<'s, v8::Value>,
description: v8::Local<'s, v8::Value>,
unit: v8::Local<'s, v8::Value>,
) -> Result<Instrument, JsErrorBox> {
create_instrument(
|name| self.0.f64_gauge(name),
|i| Instrument::Gauge(i.build()),
scope,
name,
description,
unit,
)
.map_err(|e| JsErrorBox::generic(e.to_string()))
}
#[cppgc]
fn create_histogram<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
name: v8::Local<'s, v8::Value>,
description: v8::Local<'s, v8::Value>,
unit: v8::Local<'s, v8::Value>,
#[serde] boundaries: Option<Vec<f64>>,
) -> Result<Instrument, JsErrorBox> {
let name = owned_string(
scope,
name
.try_cast()
.map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
);
let mut builder = self.0.f64_histogram(name);
if !description.is_null_or_undefined() {
let description = owned_string(
scope,
description
.try_cast()
.map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
);
builder = builder.with_description(description);
};
if !unit.is_null_or_undefined() {
let unit = owned_string(
scope,
unit
.try_cast()
.map_err(|e: DataError| JsErrorBox::generic(e.to_string()))?,
);
builder = builder.with_unit(unit);
};
if let Some(boundaries) = boundaries {
builder = builder.with_boundaries(boundaries);
}
Ok(Instrument::Histogram(builder.build()))
}
#[cppgc]
fn create_observable_counter<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
name: v8::Local<'s, v8::Value>,
description: v8::Local<'s, v8::Value>,
unit: v8::Local<'s, v8::Value>,
) -> Result<Instrument, JsErrorBox> {
create_async_instrument(
|name| self.0.f64_observable_counter(name),
|i| {
i.build();
},
scope,
name,
description,
unit,
)
.map_err(|e| JsErrorBox::generic(e.to_string()))
}
#[cppgc]
fn create_observable_up_down_counter<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
name: v8::Local<'s, v8::Value>,
description: v8::Local<'s, v8::Value>,
unit: v8::Local<'s, v8::Value>,
) -> Result<Instrument, JsErrorBox> {
create_async_instrument(
|name| self.0.f64_observable_up_down_counter(name),
|i| {
i.build();
},
scope,
name,
description,
unit,
)
.map_err(|e| JsErrorBox::generic(e.to_string()))
}
#[cppgc]
fn create_observable_gauge<'s>(
&self,
scope: &mut v8::HandleScope<'s>,
name: v8::Local<'s, v8::Value>,
description: v8::Local<'s, v8::Value>,
unit: v8::Local<'s, v8::Value>,
) -> Result<Instrument, JsErrorBox> {
create_async_instrument(
|name| self.0.f64_observable_gauge(name),
|i| {
i.build();
},
scope,
name,
description,
unit,
)
.map_err(|e| JsErrorBox::generic(e.to_string()))
}
}
enum Instrument {
Counter(opentelemetry::metrics::Counter<f64>),
UpDownCounter(opentelemetry::metrics::UpDownCounter<f64>),
Gauge(opentelemetry::metrics::Gauge<f64>),
Histogram(opentelemetry::metrics::Histogram<f64>),
Observable(Arc<Mutex<HashMap<Vec<KeyValue>, f64>>>),
}
impl GarbageCollected for Instrument {}
fn create_instrument<'a, 'b, T>(
cb: impl FnOnce(String) -> InstrumentBuilder<'b, T>,
cb2: impl FnOnce(InstrumentBuilder<'b, T>) -> Instrument,
scope: &mut v8::HandleScope<'a>,
name: v8::Local<'a, v8::Value>,
description: v8::Local<'a, v8::Value>,
unit: v8::Local<'a, v8::Value>,
) -> Result<Instrument, v8::DataError> {
let name = owned_string(scope, name.try_cast()?);
let mut builder = cb(name);
if !description.is_null_or_undefined() {
let description = owned_string(scope, description.try_cast()?);
builder = builder.with_description(description);
};
if !unit.is_null_or_undefined() {
let unit = owned_string(scope, unit.try_cast()?);
builder = builder.with_unit(unit);
};
Ok(cb2(builder))
}
fn create_async_instrument<'a, 'b, T>(
cb: impl FnOnce(String) -> AsyncInstrumentBuilder<'b, T, f64>,
cb2: impl FnOnce(AsyncInstrumentBuilder<'b, T, f64>),
scope: &mut v8::HandleScope<'a>,
name: v8::Local<'a, v8::Value>,
description: v8::Local<'a, v8::Value>,
unit: v8::Local<'a, v8::Value>,
) -> Result<Instrument, DataError> {
let name = owned_string(scope, name.try_cast()?);
let mut builder = cb(name);
if !description.is_null_or_undefined() {
let description = owned_string(scope, description.try_cast()?);
builder = builder.with_description(description);
};
if !unit.is_null_or_undefined() {
let unit = owned_string(scope, unit.try_cast()?);
builder = builder.with_unit(unit);
};
let data_share = Arc::new(Mutex::new(HashMap::new()));
let data_share_: Arc<Mutex<HashMap<Vec<KeyValue>, f64>>> = data_share.clone();
builder = builder.with_callback(move |i| {
let data = {
let mut data = data_share_.lock().unwrap();
std::mem::take(&mut *data)
};
for (attributes, value) in data {
i.observe(value, &attributes);
}
});
cb2(builder);
Ok(Instrument::Observable(data_share))
}
struct MetricAttributes {
attributes: Vec<KeyValue>,
}
#[op2(fast)]
fn op_otel_metric_record0(
state: &mut OpState,
#[cppgc] instrument: &Instrument,
value: f64,
) {
let values = state.try_take::<MetricAttributes>();
let attributes = match &values {
Some(values) => &*values.attributes,
None => &[],
};
match instrument {
Instrument::Counter(counter) => counter.add(value, attributes),
Instrument::UpDownCounter(counter) => counter.add(value, attributes),
Instrument::Gauge(gauge) => gauge.record(value, attributes),
Instrument::Histogram(histogram) => histogram.record(value, attributes),
_ => {}
}
}
#[op2(fast)]
fn op_otel_metric_record1(
state: &mut OpState,
scope: &mut v8::HandleScope<'_>,
instrument: v8::Local<'_, v8::Value>,
value: f64,
key1: v8::Local<'_, v8::Value>,
value1: v8::Local<'_, v8::Value>,
) {
let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
&mut *scope,
instrument,
) else {
return;
};
let mut values = state.try_take::<MetricAttributes>();
let attr1 = attr_raw!(scope, key1, value1);
let attributes = match &mut values {
Some(values) => {
if let Some(kv) = attr1 {
values.attributes.reserve_exact(1);
values.attributes.push(kv);
}
&*values.attributes
}
None => match attr1 {
Some(kv1) => &[kv1] as &[KeyValue],
None => &[],
},
};
match &*instrument {
Instrument::Counter(counter) => counter.add(value, attributes),
Instrument::UpDownCounter(counter) => counter.add(value, attributes),
Instrument::Gauge(gauge) => gauge.record(value, attributes),
Instrument::Histogram(histogram) => histogram.record(value, attributes),
_ => {}
}
}
#[allow(clippy::too_many_arguments)]
#[op2(fast)]
fn op_otel_metric_record2(
state: &mut OpState,
scope: &mut v8::HandleScope<'_>,
instrument: v8::Local<'_, v8::Value>,
value: f64,
key1: v8::Local<'_, v8::Value>,
value1: v8::Local<'_, v8::Value>,
key2: v8::Local<'_, v8::Value>,
value2: v8::Local<'_, v8::Value>,
) {
let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
&mut *scope,
instrument,
) else {
return;
};
let mut values = state.try_take::<MetricAttributes>();
let attr1 = attr_raw!(scope, key1, value1);
let attr2 = attr_raw!(scope, key2, value2);
let attributes = match &mut values {
Some(values) => {
values.attributes.reserve_exact(2);
if let Some(kv1) = attr1 {
values.attributes.push(kv1);
}
if let Some(kv2) = attr2 {
values.attributes.push(kv2);
}
&*values.attributes
}
None => match (attr1, attr2) {
(Some(kv1), Some(kv2)) => &[kv1, kv2] as &[KeyValue],
(Some(kv1), None) => &[kv1],
(None, Some(kv2)) => &[kv2],
(None, None) => &[],
},
};
match &*instrument {
Instrument::Counter(counter) => counter.add(value, attributes),
Instrument::UpDownCounter(counter) => counter.add(value, attributes),
Instrument::Gauge(gauge) => gauge.record(value, attributes),
Instrument::Histogram(histogram) => histogram.record(value, attributes),
_ => {}
}
}
#[allow(clippy::too_many_arguments)]
#[op2(fast)]
fn op_otel_metric_record3(
state: &mut OpState,
scope: &mut v8::HandleScope<'_>,
instrument: v8::Local<'_, v8::Value>,
value: f64,
key1: v8::Local<'_, v8::Value>,
value1: v8::Local<'_, v8::Value>,
key2: v8::Local<'_, v8::Value>,
value2: v8::Local<'_, v8::Value>,
key3: v8::Local<'_, v8::Value>,
value3: v8::Local<'_, v8::Value>,
) {
let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
&mut *scope,
instrument,
) else {
return;
};
let mut values = state.try_take::<MetricAttributes>();
let attr1 = attr_raw!(scope, key1, value1);
let attr2 = attr_raw!(scope, key2, value2);
let attr3 = attr_raw!(scope, key3, value3);
let attributes = match &mut values {
Some(values) => {
values.attributes.reserve_exact(3);
if let Some(kv1) = attr1 {
values.attributes.push(kv1);
}
if let Some(kv2) = attr2 {
values.attributes.push(kv2);
}
if let Some(kv3) = attr3 {
values.attributes.push(kv3);
}
&*values.attributes
}
None => match (attr1, attr2, attr3) {
(Some(kv1), Some(kv2), Some(kv3)) => &[kv1, kv2, kv3] as &[KeyValue],
(Some(kv1), Some(kv2), None) => &[kv1, kv2],
(Some(kv1), None, Some(kv3)) => &[kv1, kv3],
(None, Some(kv2), Some(kv3)) => &[kv2, kv3],
(Some(kv1), None, None) => &[kv1],
(None, Some(kv2), None) => &[kv2],
(None, None, Some(kv3)) => &[kv3],
(None, None, None) => &[],
},
};
match &*instrument {
Instrument::Counter(counter) => counter.add(value, attributes),
Instrument::UpDownCounter(counter) => counter.add(value, attributes),
Instrument::Gauge(gauge) => gauge.record(value, attributes),
Instrument::Histogram(histogram) => histogram.record(value, attributes),
_ => {}
}
}
#[op2(fast)]
fn op_otel_metric_observable_record0(
state: &mut OpState,
#[cppgc] instrument: &Instrument,
value: f64,
) {
let values = state.try_take::<MetricAttributes>();
let attributes = values.map(|attr| attr.attributes).unwrap_or_default();
if let Instrument::Observable(data_share) = instrument {
let mut data = data_share.lock().unwrap();
data.insert(attributes, value);
}
}
#[op2(fast)]
fn op_otel_metric_observable_record1(
state: &mut OpState,
scope: &mut v8::HandleScope<'_>,
instrument: v8::Local<'_, v8::Value>,
value: f64,
key1: v8::Local<'_, v8::Value>,
value1: v8::Local<'_, v8::Value>,
) {
let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
&mut *scope,
instrument,
) else {
return;
};
let values = state.try_take::<MetricAttributes>();
let attr1 = attr_raw!(scope, key1, value1);
let mut attributes = values
.map(|mut attr| {
attr.attributes.reserve_exact(1);
attr.attributes
})
.unwrap_or_else(|| Vec::with_capacity(1));
if let Some(kv1) = attr1 {
attributes.push(kv1);
}
if let Instrument::Observable(data_share) = &*instrument {
let mut data = data_share.lock().unwrap();
data.insert(attributes, value);
}
}
#[allow(clippy::too_many_arguments)]
#[op2(fast)]
fn op_otel_metric_observable_record2(
state: &mut OpState,
scope: &mut v8::HandleScope<'_>,
instrument: v8::Local<'_, v8::Value>,
value: f64,
key1: v8::Local<'_, v8::Value>,
value1: v8::Local<'_, v8::Value>,
key2: v8::Local<'_, v8::Value>,
value2: v8::Local<'_, v8::Value>,
) {
let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
&mut *scope,
instrument,
) else {
return;
};
let values = state.try_take::<MetricAttributes>();
let mut attributes = values
.map(|mut attr| {
attr.attributes.reserve_exact(2);
attr.attributes
})
.unwrap_or_else(|| Vec::with_capacity(2));
let attr1 = attr_raw!(scope, key1, value1);
let attr2 = attr_raw!(scope, key2, value2);
if let Some(kv1) = attr1 {
attributes.push(kv1);
}
if let Some(kv2) = attr2 {
attributes.push(kv2);
}
if let Instrument::Observable(data_share) = &*instrument {
let mut data = data_share.lock().unwrap();
data.insert(attributes, value);
}
}
#[allow(clippy::too_many_arguments)]
#[op2(fast)]
fn op_otel_metric_observable_record3(
state: &mut OpState,
scope: &mut v8::HandleScope<'_>,
instrument: v8::Local<'_, v8::Value>,
value: f64,
key1: v8::Local<'_, v8::Value>,
value1: v8::Local<'_, v8::Value>,
key2: v8::Local<'_, v8::Value>,
value2: v8::Local<'_, v8::Value>,
key3: v8::Local<'_, v8::Value>,
value3: v8::Local<'_, v8::Value>,
) {
let Some(instrument) = deno_core::_ops::try_unwrap_cppgc_object::<Instrument>(
&mut *scope,
instrument,
) else {
return;
};
let values = state.try_take::<MetricAttributes>();
let mut attributes = values
.map(|mut attr| {
attr.attributes.reserve_exact(3);
attr.attributes
})
.unwrap_or_else(|| Vec::with_capacity(3));
let attr1 = attr_raw!(scope, key1, value1);
let attr2 = attr_raw!(scope, key2, value2);
let attr3 = attr_raw!(scope, key3, value3);
if let Some(kv1) = attr1 {
attributes.push(kv1);
}
if let Some(kv2) = attr2 {
attributes.push(kv2);
}
if let Some(kv3) = attr3 {
attributes.push(kv3);
}
if let Instrument::Observable(data_share) = &*instrument {
let mut data = data_share.lock().unwrap();
data.insert(attributes, value);
}
}
#[allow(clippy::too_many_arguments)]
#[op2(fast)]
fn op_otel_metric_attribute3<'s>(
scope: &mut v8::HandleScope<'s>,
state: &mut OpState,
#[smi] capacity: u32,
key1: v8::Local<'s, v8::Value>,
value1: v8::Local<'s, v8::Value>,
key2: v8::Local<'s, v8::Value>,
value2: v8::Local<'s, v8::Value>,
key3: v8::Local<'s, v8::Value>,
value3: v8::Local<'s, v8::Value>,
) {
let mut values = state.try_borrow_mut::<MetricAttributes>();
let attr1 = attr_raw!(scope, key1, value1);
let attr2 = attr_raw!(scope, key2, value2);
let attr3 = attr_raw!(scope, key3, value3);
if let Some(values) = &mut values {
values.attributes.reserve_exact(
(capacity as usize).saturating_sub(values.attributes.capacity()),
);
if let Some(kv1) = attr1 {
values.attributes.push(kv1);
}
if let Some(kv2) = attr2 {
values.attributes.push(kv2);
}
if let Some(kv3) = attr3 {
values.attributes.push(kv3);
}
} else {
let mut attributes = Vec::with_capacity(capacity as usize);
if let Some(kv1) = attr1 {
attributes.push(kv1);
}
if let Some(kv2) = attr2 {
attributes.push(kv2);
}
if let Some(kv3) = attr3 {
attributes.push(kv3);
}
state.put(MetricAttributes { attributes });
}
}
struct ObservationDone(oneshot::Sender<()>);
#[op2(async)]
async fn op_otel_metric_wait_to_observe(state: Rc<RefCell<OpState>>) -> bool {
let (tx, rx) = oneshot::channel();
{
OTEL_PRE_COLLECT_CALLBACKS
.lock()
.expect("mutex poisoned")
.push(tx);
}
if let Ok(done) = rx.await {
state.borrow_mut().put(ObservationDone(done));
true
} else {
false
}
}
#[op2(fast)]
fn op_otel_metric_observation_done(state: &mut OpState) {
if let Some(ObservationDone(done)) = state.try_take::<ObservationDone>() {
let _ = done.send(());
}
}