1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 04:52:26 -05:00

refactor(cli): use new sanitizer for resources (#22125)

Step 1 of the Rustification of sanitizers, which unblocks the faster
timers.

This replaces the resource sanitizer with a Rust one, using the new APIs
in deno_core.
This commit is contained in:
Matt Mastracci 2024-01-26 17:24:16 -05:00 committed by GitHub
parent 6109717c4a
commit 84fb2ad71b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 239 additions and 204 deletions

View file

@ -15,10 +15,8 @@ const {
MapPrototypeGet,
MapPrototypeHas,
MapPrototypeSet,
ObjectKeys,
Promise,
SafeArrayIterator,
Set,
SymbolToStringTag,
TypeError,
} = primordials;
@ -342,182 +340,6 @@ function assertOps(fn) {
};
}
function prettyResourceNames(name) {
switch (name) {
case "fsFile":
return ["A file", "opened", "closed"];
case "fetchRequest":
return ["A fetch request", "started", "finished"];
case "fetchRequestBody":
return ["A fetch request body", "created", "closed"];
case "fetchResponse":
return ["A fetch response body", "created", "consumed"];
case "httpClient":
return ["An HTTP client", "created", "closed"];
case "dynamicLibrary":
return ["A dynamic library", "loaded", "unloaded"];
case "httpConn":
return ["An inbound HTTP connection", "accepted", "closed"];
case "httpStream":
return ["An inbound HTTP request", "accepted", "closed"];
case "tcpStream":
return ["A TCP connection", "opened/accepted", "closed"];
case "unixStream":
return ["A Unix connection", "opened/accepted", "closed"];
case "tlsStream":
return ["A TLS connection", "opened/accepted", "closed"];
case "tlsListener":
return ["A TLS listener", "opened", "closed"];
case "unixListener":
return ["A Unix listener", "opened", "closed"];
case "unixDatagram":
return ["A Unix datagram", "opened", "closed"];
case "tcpListener":
return ["A TCP listener", "opened", "closed"];
case "udpSocket":
return ["A UDP socket", "opened", "closed"];
case "timer":
return ["A timer", "started", "fired/cleared"];
case "textDecoder":
return ["A text decoder", "created", "finished"];
case "messagePort":
return ["A message port", "created", "closed"];
case "webSocketStream":
return ["A WebSocket", "opened", "closed"];
case "fsEvents":
return ["A file system watcher", "created", "closed"];
case "childStdin":
return ["A child process stdin", "opened", "closed"];
case "childStdout":
return ["A child process stdout", "opened", "closed"];
case "childStderr":
return ["A child process stderr", "opened", "closed"];
case "child":
return ["A child process", "started", "closed"];
case "signal":
return ["A signal listener", "created", "fired/cleared"];
case "stdin":
return ["The stdin pipe", "opened", "closed"];
case "stdout":
return ["The stdout pipe", "opened", "closed"];
case "stderr":
return ["The stderr pipe", "opened", "closed"];
case "compression":
return ["A CompressionStream", "created", "closed"];
default:
return [`A "${name}" resource`, "created", "cleaned up"];
}
}
function resourceCloseHint(name) {
switch (name) {
case "fsFile":
return "Close the file handle by calling `file.close()`.";
case "fetchRequest":
return "Await the promise returned from `fetch()` or abort the fetch with an abort signal.";
case "fetchRequestBody":
return "Terminate the request body `ReadableStream` by closing or erroring it.";
case "fetchResponse":
return "Consume or close the response body `ReadableStream`, e.g `await resp.text()` or `await resp.body.cancel()`.";
case "httpClient":
return "Close the HTTP client by calling `httpClient.close()`.";
case "dynamicLibrary":
return "Unload the dynamic library by calling `dynamicLibrary.close()`.";
case "httpConn":
return "Close the inbound HTTP connection by calling `httpConn.close()`.";
case "httpStream":
return "Close the inbound HTTP request by responding with `e.respondWith()` or closing the HTTP connection.";
case "tcpStream":
return "Close the TCP connection by calling `tcpConn.close()`.";
case "unixStream":
return "Close the Unix socket connection by calling `unixConn.close()`.";
case "tlsStream":
return "Close the TLS connection by calling `tlsConn.close()`.";
case "tlsListener":
return "Close the TLS listener by calling `tlsListener.close()`.";
case "unixListener":
return "Close the Unix socket listener by calling `unixListener.close()`.";
case "unixDatagram":
return "Close the Unix datagram socket by calling `unixDatagram.close()`.";
case "tcpListener":
return "Close the TCP listener by calling `tcpListener.close()`.";
case "udpSocket":
return "Close the UDP socket by calling `udpSocket.close()`.";
case "timer":
return "Clear the timer by calling `clearInterval` or `clearTimeout`.";
case "textDecoder":
return "Close the text decoder by calling `textDecoder.decode('')` or `await textDecoderStream.readable.cancel()`.";
case "messagePort":
return "Close the message port by calling `messagePort.close()`.";
case "webSocketStream":
return "Close the WebSocket by calling `webSocket.close()`.";
case "fsEvents":
return "Close the file system watcher by calling `watcher.close()`.";
case "childStdin":
return "Close the child process stdin by calling `proc.stdin.close()`.";
case "childStdout":
return "Close the child process stdout by calling `proc.stdout.close()` or `await child.stdout.cancel()`.";
case "childStderr":
return "Close the child process stderr by calling `proc.stderr.close()` or `await child.stderr.cancel()`.";
case "child":
return "Close the child process by calling `proc.kill()` or `proc.close()`.";
case "signal":
return "Clear the signal listener by calling `Deno.removeSignalListener`.";
case "stdin":
return "Close the stdin pipe by calling `Deno.stdin.close()`.";
case "stdout":
return "Close the stdout pipe by calling `Deno.stdout.close()`.";
case "stderr":
return "Close the stderr pipe by calling `Deno.stderr.close()`.";
case "compression":
return "Close the compression stream by calling `await stream.writable.close()`.";
default:
return "Close the resource before the end of the test.";
}
}
// Wrap test function in additional assertion that makes sure
// the test case does not "leak" resources - ie. resource table after
// the test has exactly the same contents as before the test.
function assertResources(fn) {
/** @param desc {TestDescription | TestStepDescription} */
return async function resourceSanitizer(desc) {
const pre = core.resources();
const innerResult = await fn(desc);
if (innerResult) return innerResult;
const post = core.resources();
const allResources = new Set([
...new SafeArrayIterator(ObjectKeys(pre)),
...new SafeArrayIterator(ObjectKeys(post)),
]);
const details = [];
for (const resource of allResources) {
const preResource = pre[resource];
const postResource = post[resource];
if (preResource === postResource) continue;
if (preResource === undefined) {
const [name, action1, action2] = prettyResourceNames(postResource);
const hint = resourceCloseHint(postResource);
const detail =
`${name} (rid ${resource}) was ${action1} during the test, but not ${action2} during the test. ${hint}`;
ArrayPrototypePush(details, detail);
} else {
const [name, action1, action2] = prettyResourceNames(preResource);
const detail =
`${name} (rid ${resource}) was ${action1} before the test started, but was ${action2} during the test. Do not close resources in a test that were not created during that test.`;
ArrayPrototypePush(details, detail);
}
}
if (details.length == 0) {
return null;
}
return { failed: { leakedResources: details } };
};
}
// Wrap test function in additional assertion that makes sure
// that the test case does not accidentally exit prematurely.
function assertExit(fn, isTest) {
@ -720,6 +542,8 @@ function testInner(
testDesc.name,
testDesc.ignore,
testDesc.only,
false, /*testDesc.sanitizeOps*/
testDesc.sanitizeResources,
testDesc.location.fileName,
testDesc.location.lineNumber,
testDesc.location.columnNumber,
@ -910,9 +734,6 @@ function wrapTest(desc) {
if (desc.sanitizeOps) {
testFn = assertOps(testFn);
}
if (desc.sanitizeResources) {
testFn = assertResources(testFn);
}
if (desc.sanitizeExit) {
testFn = assertExit(testFn, true);
}

View file

@ -363,7 +363,9 @@ impl TestRun {
test::TestResult::Ignored => summary.ignored += 1,
test::TestResult::Failed(error) => {
summary.failed += 1;
summary.failures.push((description.clone(), error.clone()));
summary
.failures
.push(((&description).into(), error.clone()));
}
test::TestResult::Cancelled => {
summary.failed += 1;

View file

@ -123,6 +123,8 @@ fn op_register_test(
#[string] name: String,
ignore: bool,
only: bool,
sanitize_ops: bool,
sanitize_resources: bool,
#[string] file_name: String,
#[smi] line_number: u32,
#[smi] column_number: u32,
@ -141,6 +143,8 @@ fn op_register_test(
name,
ignore,
only,
sanitize_ops,
sanitize_resources,
origin: origin.clone(),
location: TestLocation {
file_name,

View file

@ -6,8 +6,10 @@ leak ... FAILED ([WILDCARD])
leak => ./test/resource_sanitizer.ts:[WILDCARD]
error: Leaking resources:
- The stdin pipe (rid 0) was opened before the test started, but was closed during the test. Do not close resources in a test that were not created during that test.
- A file (rid 3) was opened during the test, but not closed during the test. Close the file handle by calling `file.close()`.
[UNORDERED_START]
- The stdin pipe was opened before the test started, but was closed during the test. Do not close resources in a test that were not created during that test.
- A file was opened during the test, but not closed during the test. Close the file handle by calling `file.close()`.
[UNORDERED_END]
FAILURES

View file

@ -1,5 +1,11 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use deno_core::stats::RuntimeActivity;
use deno_core::stats::RuntimeActivityDiff;
use deno_core::stats::RuntimeActivityType;
use std::borrow::Cow;
use std::ops::AddAssign;
use super::*;
pub fn to_relative_path_or_remote_url(cwd: &Url, path_or_url: &str) -> String {
@ -74,3 +80,136 @@ pub fn format_test_error(js_error: &JsError) -> String {
.to_string();
format_js_error(&js_error)
}
pub fn format_sanitizer_diff(diff: RuntimeActivityDiff) -> Vec<String> {
let mut output = format_sanitizer_accum(diff.appeared, true);
output.extend(format_sanitizer_accum(diff.disappeared, false));
output.sort();
output
}
fn format_sanitizer_accum(
activities: Vec<RuntimeActivity>,
appeared: bool,
) -> Vec<String> {
let mut accum = HashMap::new();
for activity in activities {
let item = format_sanitizer_accum_item(activity);
accum.entry(item).or_insert(0).add_assign(1);
}
let mut output = vec![];
for ((item_type, item_name), count) in accum.into_iter() {
if item_type == RuntimeActivityType::Resource {
// TODO(mmastrac): until we implement the new timers and op sanitization, these must be ignored in this path
if item_name == "timer" {
continue;
}
let (name, action1, action2) = pretty_resource_name(&item_name);
let hint = resource_close_hint(&item_name);
if appeared {
output.push(format!("{name} was {action1} during the test, but not {action2} during the test. {hint}"));
} else {
output.push(format!("{name} was {action1} before the test started, but was {action2} during the test. \
Do not close resources in a test that were not created during that test."));
}
} else {
// TODO(mmastrac): this will be done in a later PR
unimplemented!(
"Unhandled diff: {appeared} {} {:?} {}",
count,
item_type,
item_name
);
}
}
output
}
fn format_sanitizer_accum_item(
activity: RuntimeActivity,
) -> (RuntimeActivityType, Cow<'static, str>) {
let activity_type = activity.activity();
match activity {
RuntimeActivity::AsyncOp(_, name) => (activity_type, name.into()),
RuntimeActivity::Interval(_) => (activity_type, "".into()),
RuntimeActivity::Resource(_, name) => (activity_type, name.into()),
RuntimeActivity::Timer(_) => (activity_type, "".into()),
}
}
fn pretty_resource_name(
name: &str,
) -> (Cow<'static, str>, &'static str, &'static str) {
let (name, action1, action2) = match name {
"fsFile" => ("A file", "opened", "closed"),
"fetchRequest" => ("A fetch request", "started", "finished"),
"fetchRequestBody" => ("A fetch request body", "created", "closed"),
"fetchResponse" => ("A fetch response body", "created", "consumed"),
"httpClient" => ("An HTTP client", "created", "closed"),
"dynamicLibrary" => ("A dynamic library", "loaded", "unloaded"),
"httpConn" => ("An inbound HTTP connection", "accepted", "closed"),
"httpStream" => ("An inbound HTTP request", "accepted", "closed"),
"tcpStream" => ("A TCP connection", "opened/accepted", "closed"),
"unixStream" => ("A Unix connection", "opened/accepted", "closed"),
"tlsStream" => ("A TLS connection", "opened/accepted", "closed"),
"tlsListener" => ("A TLS listener", "opened", "closed"),
"unixListener" => ("A Unix listener", "opened", "closed"),
"unixDatagram" => ("A Unix datagram", "opened", "closed"),
"tcpListener" => ("A TCP listener", "opened", "closed"),
"udpSocket" => ("A UDP socket", "opened", "closed"),
"timer" => ("A timer", "started", "fired/cleared"),
"textDecoder" => ("A text decoder", "created", "finished"),
"messagePort" => ("A message port", "created", "closed"),
"webSocketStream" => ("A WebSocket", "opened", "closed"),
"fsEvents" => ("A file system watcher", "created", "closed"),
"childStdin" => ("A child process stdin", "opened", "closed"),
"childStdout" => ("A child process stdout", "opened", "closed"),
"childStderr" => ("A child process stderr", "opened", "closed"),
"child" => ("A child process", "started", "closed"),
"signal" => ("A signal listener", "created", "fired/cleared"),
"stdin" => ("The stdin pipe", "opened", "closed"),
"stdout" => ("The stdout pipe", "opened", "closed"),
"stderr" => ("The stderr pipe", "opened", "closed"),
"compression" => ("A CompressionStream", "created", "closed"),
_ => return (format!("\"{name}\"").into(), "created", "cleaned up"),
};
(name.into(), action1, action2)
}
fn resource_close_hint(name: &str) -> &'static str {
match name {
"fsFile" => "Close the file handle by calling `file.close()`.",
"fetchRequest" => "Await the promise returned from `fetch()` or abort the fetch with an abort signal.",
"fetchRequestBody" => "Terminate the request body `ReadableStream` by closing or erroring it.",
"fetchResponse" => "Consume or close the response body `ReadableStream`, e.g `await resp.text()` or `await resp.body.cancel()`.",
"httpClient" => "Close the HTTP client by calling `httpClient.close()`.",
"dynamicLibrary" => "Unload the dynamic library by calling `dynamicLibrary.close()`.",
"httpConn" => "Close the inbound HTTP connection by calling `httpConn.close()`.",
"httpStream" => "Close the inbound HTTP request by responding with `e.respondWith()` or closing the HTTP connection.",
"tcpStream" => "Close the TCP connection by calling `tcpConn.close()`.",
"unixStream" => "Close the Unix socket connection by calling `unixConn.close()`.",
"tlsStream" => "Close the TLS connection by calling `tlsConn.close()`.",
"tlsListener" => "Close the TLS listener by calling `tlsListener.close()`.",
"unixListener" => "Close the Unix socket listener by calling `unixListener.close()`.",
"unixDatagram" => "Close the Unix datagram socket by calling `unixDatagram.close()`.",
"tcpListener" => "Close the TCP listener by calling `tcpListener.close()`.",
"udpSocket" => "Close the UDP socket by calling `udpSocket.close()`.",
"timer" => "Clear the timer by calling `clearInterval` or `clearTimeout`.",
"textDecoder" => "Close the text decoder by calling `textDecoder.decode('')` or `await textDecoderStream.readable.cancel()`.",
"messagePort" => "Close the message port by calling `messagePort.close()`.",
"webSocketStream" => "Close the WebSocket by calling `webSocket.close()`.",
"fsEvents" => "Close the file system watcher by calling `watcher.close()`.",
"childStdin" => "Close the child process stdin by calling `proc.stdin.close()`.",
"childStdout" => "Close the child process stdout by calling `proc.stdout.close()` or `await child.stdout.cancel()`.",
"childStderr" => "Close the child process stderr by calling `proc.stderr.close()` or `await child.stderr.cancel()`.",
"child" => "Close the child process by calling `proc.kill()` or `proc.close()`.",
"signal" => "Clear the signal listener by calling `Deno.removeSignalListener`.",
"stdin" => "Close the stdin pipe by calling `Deno.stdin.close()`.",
"stdout" => "Close the stdout pipe by calling `Deno.stdout.close()`.",
"stderr" => "Close the stderr pipe by calling `Deno.stderr.close()`.",
"compression" => "Close the compression stream by calling `await stream.writable.close()`.",
_ => "Close the resource before the end of the test.",
}
}

View file

@ -40,6 +40,8 @@ use deno_core::futures::StreamExt;
use deno_core::located_script_name;
use deno_core::parking_lot::Mutex;
use deno_core::serde_v8;
use deno_core::stats::RuntimeActivityStats;
use deno_core::stats::RuntimeActivityStatsFilter;
use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
use deno_core::url::Url;
@ -87,6 +89,7 @@ use tokio::sync::mpsc::WeakUnboundedSender;
pub mod fmt;
pub mod reporters;
use fmt::format_sanitizer_diff;
pub use fmt::format_test_error;
use reporters::CompoundTestReporter;
use reporters::DotTestReporter;
@ -175,6 +178,29 @@ pub struct TestDescription {
pub only: bool,
pub origin: String,
pub location: TestLocation,
pub sanitize_ops: bool,
pub sanitize_resources: bool,
}
/// May represent a failure of a test or test step.
#[derive(Debug, Clone, PartialEq, Deserialize, Eq, Hash)]
#[serde(rename_all = "camelCase")]
pub struct TestFailureDescription {
pub id: usize,
pub name: String,
pub origin: String,
pub location: TestLocation,
}
impl From<&TestDescription> for TestFailureDescription {
fn from(value: &TestDescription) -> Self {
Self {
id: value.id,
name: value.name.clone(),
origin: value.origin.clone(),
location: value.location.clone(),
}
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@ -332,7 +358,7 @@ pub struct TestSummary {
pub ignored_steps: usize,
pub filtered_out: usize,
pub measured: usize,
pub failures: Vec<(TestDescription, TestFailure)>,
pub failures: Vec<(TestFailureDescription, TestFailure)>,
pub uncaught_errors: Vec<(String, Box<JsError>)>,
}
@ -547,6 +573,17 @@ pub async fn run_tests_for_worker(
used_only,
}))?;
let mut had_uncaught_error = false;
let stats = worker.js_runtime.runtime_activity_stats_factory();
let ops = worker.js_runtime.op_names();
let op_id_host_recv_message = ops
.iter()
.position(|op| *op == "op_host_recv_message")
.unwrap();
let op_id_host_recv_ctrl = ops
.iter()
.position(|op| *op == "op_host_recv_ctrl")
.unwrap();
for (desc, function) in tests {
if fail_fast_tracker.should_stop() {
break;
@ -561,15 +598,11 @@ pub async fn run_tests_for_worker(
}
sender.send(TestEvent::Wait(desc.id))?;
// TODO(bartlomieju): this is a nasty (beautiful) hack, that was required
// when switching `JsRuntime` from `FuturesUnordered` to `JoinSet`. With
// `JoinSet` all pending ops are immediately polled and that caused a problem
// when some async ops were fired and canceled before running tests (giving
// false positives in the ops sanitizer). We should probably rewrite sanitizers
// to be done in Rust instead of in JS (40_testing.js).
// Poll event loop once, to allow all ops that are already resolved, but haven't
// responded to settle.
// TODO(mmastrac): we should provide an API to poll the event loop until no futher
// progress is made.
{
// Poll event loop once, this will allow all ops that are already resolved,
// but haven't responded to settle.
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = worker
@ -577,6 +610,22 @@ pub async fn run_tests_for_worker(
.poll_event_loop(&mut cx, PollEventLoopOptions::default());
}
let mut filter = RuntimeActivityStatsFilter::default();
if desc.sanitize_ops {
filter = filter.with_ops().with_timers();
filter = filter.omit_op(op_id_host_recv_ctrl as _);
filter = filter.omit_op(op_id_host_recv_message as _);
}
if desc.sanitize_resources {
filter = filter.with_resources();
}
let before = if !filter.is_empty() {
Some(stats.clone().capture(&filter))
} else {
None
};
let earlier = SystemTime::now();
let call = worker.js_runtime.call(&function);
let result = match worker
@ -600,6 +649,22 @@ pub async fn run_tests_for_worker(
}
}
};
if let Some(before) = before {
let after = stats.clone().capture(&filter);
let diff = RuntimeActivityStats::diff(&before, &after);
let formatted = format_sanitizer_diff(diff);
if !formatted.is_empty() {
let failure = TestFailure::LeakedResources(formatted);
let elapsed = SystemTime::now().duration_since(earlier)?.as_millis();
sender.send(TestEvent::Result(
desc.id,
TestResult::Failed(failure),
elapsed as u64,
))?;
continue;
}
}
let scope = &mut worker.js_runtime.handle_scope();
let result = v8::Local::new(scope, result);
let result = serde_v8::from_v8::<TestResult>(scope, result)?;

View file

@ -33,7 +33,10 @@ pub(super) fn format_test_step_ancestry(
result
}
pub fn format_test_for_summary(cwd: &Url, desc: &TestDescription) -> String {
pub fn format_test_for_summary(
cwd: &Url,
desc: &TestFailureDescription,
) -> String {
format!(
"{} {}",
&desc.name,
@ -78,7 +81,7 @@ pub(super) fn report_sigint(
let mut formatted_pending = BTreeSet::new();
for id in tests_pending {
if let Some(desc) = tests.get(id) {
formatted_pending.insert(format_test_for_summary(cwd, desc));
formatted_pending.insert(format_test_for_summary(cwd, &desc.into()));
}
if let Some(desc) = test_steps.get(id) {
formatted_pending
@ -107,7 +110,10 @@ pub(super) fn report_summary(
#[allow(clippy::type_complexity)] // Type alias doesn't look better here
let mut failures_by_origin: BTreeMap<
String,
(Vec<(&TestDescription, &TestFailure)>, Option<&JsError>),
(
Vec<(&TestFailureDescription, &TestFailure)>,
Option<&JsError>,
),
> = BTreeMap::default();
let mut failure_titles = vec![];
for (description, failure) in &summary.failures {

View file

@ -113,7 +113,7 @@ impl TestReporter for DotTestReporter {
self
.summary
.failures
.push((description.clone(), failure.clone()));
.push((description.into(), failure.clone()));
}
TestResult::Cancelled => {
self.summary.failed += 1;
@ -162,11 +162,9 @@ impl TestReporter for DotTestReporter {
TestStepResult::Failed(failure) => {
self.summary.failed_steps += 1;
self.summary.failures.push((
TestDescription {
TestFailureDescription {
id: desc.id,
name: common::format_test_step_ancestry(desc, tests, test_steps),
ignore: false,
only: false,
origin: desc.origin.clone(),
location: desc.location.clone(),
},

View file

@ -233,7 +233,7 @@ impl TestReporter for PrettyTestReporter {
self
.summary
.failures
.push((description.clone(), failure.clone()));
.push((description.into(), failure.clone()));
}
TestResult::Cancelled => {
self.summary.failed += 1;
@ -318,11 +318,9 @@ impl TestReporter for PrettyTestReporter {
TestStepResult::Failed(failure) => {
self.summary.failed_steps += 1;
self.summary.failures.push((
TestDescription {
TestFailureDescription {
id: desc.id,
name: common::format_test_step_ancestry(desc, tests, test_steps),
ignore: false,
only: false,
origin: desc.origin.clone(),
location: desc.location.clone(),
},