1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00
denoland-deno/ext/node/polyfills/diagnostics_channel.js
2024-12-31 19:12:39 +00:00

430 lines
10 KiB
JavaScript

// Copyright 2018-2025 the Deno authors. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// TODO(petamoriken): enable prefer-primordials for node polyfills
// deno-lint-ignore-file prefer-primordials ban-untagged-todo
import { ERR_INVALID_ARG_TYPE } from "ext:deno_node/internal/errors.ts";
import { validateFunction } from "ext:deno_node/internal/validators.mjs";
import { nextTick } from "node:process";
import { primordials } from "ext:core/mod.js";
const {
ArrayPrototypeAt,
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
ObjectDefineProperty,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromisePrototypeThen,
PromiseReject,
PromiseResolve,
ReflectApply,
SafeFinalizationRegistry,
SafeMap,
SymbolHasInstance,
} = primordials;
import { WeakReference } from "ext:deno_node/internal/util.mjs";
// Can't delete when weakref count reaches 0 as it could increment again.
// Only GC can be used as a valid time to clean up the channels map.
class WeakRefMap extends SafeMap {
#finalizers = new SafeFinalizationRegistry((key) => {
this.delete(key);
});
set(key, value) {
this.#finalizers.register(value, key);
return super.set(key, new WeakReference(value));
}
get(key) {
return super.get(key)?.get();
}
incRef(key) {
return super.get(key)?.incRef();
}
decRef(key) {
return super.get(key)?.decRef();
}
}
function markActive(channel) {
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
channel._subscribers = [];
channel._stores = new SafeMap();
}
function maybeMarkInactive(channel) {
// When there are no more active subscribers or bound, restore to fast prototype.
if (!channel._subscribers.length && !channel._stores.size) {
ObjectSetPrototypeOf(channel, Channel.prototype);
channel._subscribers = undefined;
channel._stores = undefined;
}
}
function defaultTransform(data) {
return data;
}
function wrapStoreRun(store, data, next, transform = defaultTransform) {
return () => {
let context;
try {
context = transform(data);
} catch (err) {
nextTick(() => {
// TODO(bartlomieju): in Node.js this is using `triggerUncaughtException` API, need
// to clarify if we need that or if just throwing the error is enough here.
throw err;
// triggerUncaughtException(err, false);
});
return next();
}
return store.run(context, next);
};
}
class ActiveChannel {
subscribe(subscription) {
validateFunction(subscription, "subscription");
ArrayPrototypePush(this._subscribers, subscription);
channels.incRef(this.name);
}
unsubscribe(subscription) {
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
if (index === -1) return false;
ArrayPrototypeSplice(this._subscribers, index, 1);
channels.decRef(this.name);
maybeMarkInactive(this);
return true;
}
bindStore(store, transform) {
const replacing = this._stores.has(store);
if (!replacing) channels.incRef(this.name);
this._stores.set(store, transform);
}
unbindStore(store) {
if (!this._stores.has(store)) {
return false;
}
this._stores.delete(store);
channels.decRef(this.name);
maybeMarkInactive(this);
return true;
}
get hasSubscribers() {
return true;
}
publish(data) {
for (let i = 0; i < (this._subscribers?.length || 0); i++) {
try {
const onMessage = this._subscribers[i];
onMessage(data, this.name);
} catch (err) {
nextTick(() => {
// TODO(bartlomieju): in Node.js this is using `triggerUncaughtException` API, need
// to clarify if we need that or if just throwing the error is enough here.
throw err;
// triggerUncaughtException(err, false);
});
}
}
}
runStores(data, fn, thisArg, ...args) {
let run = () => {
this.publish(data);
return ReflectApply(fn, thisArg, args);
};
for (const entry of this._stores.entries()) {
const store = entry[0];
const transform = entry[1];
run = wrapStoreRun(store, data, run, transform);
}
return run();
}
}
class Channel {
constructor(name) {
this._subscribers = undefined;
this._stores = undefined;
this.name = name;
channels.set(name, this);
}
static [SymbolHasInstance](instance) {
const prototype = ObjectGetPrototypeOf(instance);
return prototype === Channel.prototype ||
prototype === ActiveChannel.prototype;
}
subscribe(subscription) {
markActive(this);
this.subscribe(subscription);
}
unsubscribe() {
return false;
}
bindStore(store, transform) {
markActive(this);
this.bindStore(store, transform);
}
unbindStore() {
return false;
}
get hasSubscribers() {
return false;
}
publish() {}
runStores(_data, fn, thisArg, ...args) {
return ReflectApply(fn, thisArg, args);
}
}
const channels = new WeakRefMap();
export function channel(name) {
const channel = channels.get(name);
if (channel) return channel;
if (typeof name !== "string" && typeof name !== "symbol") {
throw new ERR_INVALID_ARG_TYPE("channel", ["string", "symbol"], name);
}
return new Channel(name);
}
export function subscribe(name, subscription) {
return channel(name).subscribe(subscription);
}
export function unsubscribe(name, subscription) {
return channel(name).unsubscribe(subscription);
}
export function hasSubscribers(name) {
const channel = channels.get(name);
if (!channel) return false;
return channel.hasSubscribers;
}
const traceEvents = [
"start",
"end",
"asyncStart",
"asyncEnd",
"error",
];
function assertChannel(value, name) {
if (!(value instanceof Channel)) {
throw new ERR_INVALID_ARG_TYPE(name, ["Channel"], value);
}
}
function tracingChannelFrom(nameOrChannels, name) {
if (typeof nameOrChannels === "string") {
return channel(`tracing:${nameOrChannels}:${name}`);
}
if (typeof nameOrChannels === "object" && nameOrChannels !== null) {
const channel = nameOrChannels[name];
assertChannel(channel, `nameOrChannels.${name}`);
return channel;
}
throw new ERR_INVALID_ARG_TYPE("nameOrChannels", [
"string",
"object",
"Channel",
], nameOrChannels);
}
class TracingChannel {
constructor(nameOrChannels) {
for (const eventName of traceEvents) {
ObjectDefineProperty(this, eventName, {
__proto__: null,
value: tracingChannelFrom(nameOrChannels, eventName),
});
}
}
get hasSubscribers() {
return this.start.hasSubscribers ||
this.end.hasSubscribers ||
this.asyncStart.hasSubscribers ||
this.asyncEnd.hasSubscribers ||
this.error.hasSubscribers;
}
subscribe(handlers) {
for (const name of traceEvents) {
if (!handlers[name]) continue;
this[name]?.subscribe(handlers[name]);
}
}
unsubscribe(handlers) {
let done = true;
for (const name of traceEvents) {
if (!handlers[name]) continue;
if (!this[name]?.unsubscribe(handlers[name])) {
done = false;
}
}
return done;
}
traceSync(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}
const { start, end, error } = this;
return start.runStores(context, () => {
try {
const result = ReflectApply(fn, thisArg, args);
context.result = result;
return result;
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
tracePromise(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}
const { start, end, asyncStart, asyncEnd, error } = this;
function reject(err) {
context.error = err;
error.publish(context);
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return PromiseReject(err);
}
function resolve(result) {
context.result = result;
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return result;
}
return start.runStores(context, () => {
try {
let promise = ReflectApply(fn, thisArg, args);
// Convert thenables to native promises
if (!(promise instanceof Promise)) {
promise = PromiseResolve(promise);
}
return PromisePrototypeThen(promise, resolve, reject);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}
const { start, end, asyncStart, asyncEnd, error } = this;
function wrappedCallback(err, res) {
if (err) {
context.error = err;
error.publish(context);
} else {
context.result = res;
}
// Using runStores here enables manual context failure recovery
asyncStart.runStores(context, () => {
try {
return ReflectApply(callback, this, arguments);
} finally {
asyncEnd.publish(context);
}
});
}
const callback = ArrayPrototypeAt(args, position);
validateFunction(callback, "callback");
ArrayPrototypeSplice(args, position, 1, wrappedCallback);
return start.runStores(context, () => {
try {
return ReflectApply(fn, thisArg, args);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
}
export function tracingChannel(nameOrChannels) {
return new TracingChannel(nameOrChannels);
}
export default {
channel,
hasSubscribers,
subscribe,
tracingChannel,
unsubscribe,
Channel,
};