From 4894e500cf8c60c2971d186d6a21b994bf36e7d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 4 Mar 2023 20:10:31 -0400 Subject: [PATCH] refactor: move TaskQueue from deno_runtime to deno_core (#18016) This utility is useful in several contexts so it seems reasonable to have it in `deno_core`. --- core/lib.rs | 2 + core/task_queue.rs | 148 +++++++++++++++++++++++++++++++++++++++++++ runtime/ops/io.rs | 3 +- runtime/ops/utils.rs | 147 ------------------------------------------ 4 files changed, 151 insertions(+), 149 deletions(-) create mode 100644 core/task_queue.rs diff --git a/core/lib.rs b/core/lib.rs index 51a03493d6..7ec40e3119 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -20,6 +20,7 @@ mod resources; mod runtime; pub mod snapshot_util; mod source_map; +mod task_queue; // Re-exports pub use anyhow; @@ -116,6 +117,7 @@ pub use crate::runtime::Snapshot; pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX; pub use crate::runtime::V8_WRAPPER_TYPE_INDEX; pub use crate::source_map::SourceMapGetter; +pub use crate::task_queue::TaskQueue; pub fn v8_version() -> &'static str { v8::V8::get_version() diff --git a/core/task_queue.rs b/core/task_queue.rs new file mode 100644 index 0000000000..839c47655c --- /dev/null +++ b/core/task_queue.rs @@ -0,0 +1,148 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use futures::task::AtomicWaker; +use futures::Future; +use parking_lot::Mutex; +use std::collections::LinkedList; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +#[derive(Default)] +struct TaskQueueTaskWaker { + is_ready: AtomicBool, + waker: AtomicWaker, +} + +#[derive(Default)] +struct TaskQueueTasks { + is_running: bool, + wakers: LinkedList>, +} + +/// A queue that executes tasks sequentially one after the other +/// ensuring order and that no task runs at the same time as another. +/// +/// Note that tokio's semaphore doesn't seem to maintain order +/// and so we can't use that in the code that uses this or use +/// that here. +#[derive(Clone, Default)] +pub struct TaskQueue { + tasks: Arc>, +} + +impl TaskQueue { + /// Alternate API that acquires a permit internally + /// for the duration of the future. + #[cfg(test)] + pub async fn queue(&self, future: impl Future) -> R { + let _permit = self.acquire().await; + future.await + } + + /// Acquires a permit where the tasks are executed one at a time + /// and in the order that they were acquired. + pub async fn acquire(&self) -> TaskQueuePermit { + let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); + acquire.await; + TaskQueuePermit { + tasks: self.tasks.clone(), + } + } +} + +/// A permit that when dropped will allow another task to proceed. +pub struct TaskQueuePermit { + tasks: Arc>, +} + +impl Drop for TaskQueuePermit { + fn drop(&mut self) { + let next_item = { + let mut tasks = self.tasks.lock(); + let next_item = tasks.wakers.pop_front(); + tasks.is_running = next_item.is_some(); + next_item + }; + if let Some(next_item) = next_item { + next_item.is_ready.store(true, Ordering::SeqCst); + next_item.waker.wake(); + } + } +} + +struct TaskQueuePermitAcquire { + tasks: Arc>, + initialized: AtomicBool, + waker: Arc, +} + +impl TaskQueuePermitAcquire { + pub fn new(tasks: Arc>) -> Self { + Self { + tasks, + initialized: Default::default(), + waker: Default::default(), + } + } +} + +impl Future for TaskQueuePermitAcquire { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + // update with the latest waker + self.waker.waker.register(cx.waker()); + + // ensure this is initialized + if !self.initialized.swap(true, Ordering::SeqCst) { + let mut tasks = self.tasks.lock(); + if !tasks.is_running { + tasks.is_running = true; + return std::task::Poll::Ready(()); + } + tasks.wakers.push_back(self.waker.clone()); + return std::task::Poll::Pending; + } + + // check if we're ready to run + if self.waker.is_ready.load(Ordering::SeqCst) { + std::task::Poll::Ready(()) + } else { + std::task::Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use parking_lot::Mutex; + use std::sync::Arc; + + use super::TaskQueue; + + #[tokio::test] + async fn task_queue_runs_one_after_other() { + let task_queue = TaskQueue::default(); + let mut tasks = Vec::new(); + let data = Arc::new(Mutex::new(0)); + for i in 0..100 { + let data = data.clone(); + tasks.push(task_queue.queue(async move { + tokio::task::spawn_blocking(move || { + let mut data = data.lock(); + if *data != i { + panic!("Value was not equal."); + } + *data = i + 1; + }) + .await + .unwrap(); + })); + } + futures::future::join_all(tasks).await; + } +} diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 55eae6373f..d0ee116a5b 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -16,6 +16,7 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::TaskQueue; use once_cell::sync::Lazy; use std::borrow::Cow; use std::cell::RefCell; @@ -170,8 +171,6 @@ pub fn init_stdio(stdio: Stdio) -> Extension { #[cfg(unix)] use nix::sys::termios; -use super::utils::TaskQueue; - #[derive(Default)] pub struct TtyMetadata { #[cfg(unix)] diff --git a/runtime/ops/utils.rs b/runtime/ops/utils.rs index bdbe7f6d06..e7bf02c108 100644 --- a/runtime/ops/utils.rs +++ b/runtime/ops/utils.rs @@ -2,13 +2,6 @@ use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::task::AtomicWaker; -use deno_core::futures::Future; -use deno_core::parking_lot::Mutex; -use std::collections::LinkedList; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::Arc; /// A utility function to map OsStrings to Strings pub fn into_string(s: std::ffi::OsString) -> Result { @@ -17,143 +10,3 @@ pub fn into_string(s: std::ffi::OsString) -> Result { custom_error("InvalidData", message) }) } - -#[derive(Default)] -struct TaskQueueTaskWaker { - is_ready: AtomicBool, - waker: AtomicWaker, -} - -#[derive(Default)] -struct TaskQueueTasks { - is_running: bool, - wakers: LinkedList>, -} - -/// A queue that executes tasks sequentially one after the other -/// ensuring order and that no task runs at the same time as another. -/// -/// Note that tokio's semaphore doesn't seem to maintain order -/// and so we can't use that in the code that uses this or use -/// that here. -#[derive(Clone, Default)] -pub struct TaskQueue { - tasks: Arc>, -} - -impl TaskQueue { - /// Alternate API that acquires a permit internally - /// for the duration of the future. - #[cfg(test)] - pub async fn queue(&self, future: impl Future) -> R { - let _permit = self.acquire().await; - future.await - } - - /// Acquires a permit where the tasks are executed one at a time - /// and in the order that they were acquired. - pub async fn acquire(&self) -> TaskQueuePermit { - let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); - acquire.await; - TaskQueuePermit { - tasks: self.tasks.clone(), - } - } -} - -/// A permit that when dropped will allow another task to proceed. -pub struct TaskQueuePermit { - tasks: Arc>, -} - -impl Drop for TaskQueuePermit { - fn drop(&mut self) { - let next_item = { - let mut tasks = self.tasks.lock(); - let next_item = tasks.wakers.pop_front(); - tasks.is_running = next_item.is_some(); - next_item - }; - if let Some(next_item) = next_item { - next_item.is_ready.store(true, Ordering::SeqCst); - next_item.waker.wake(); - } - } -} - -struct TaskQueuePermitAcquire { - tasks: Arc>, - initialized: AtomicBool, - waker: Arc, -} - -impl TaskQueuePermitAcquire { - pub fn new(tasks: Arc>) -> Self { - Self { - tasks, - initialized: Default::default(), - waker: Default::default(), - } - } -} - -impl Future for TaskQueuePermitAcquire { - type Output = (); - - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - // update with the latest waker - self.waker.waker.register(cx.waker()); - - // ensure this is initialized - if !self.initialized.swap(true, Ordering::SeqCst) { - let mut tasks = self.tasks.lock(); - if !tasks.is_running { - tasks.is_running = true; - return std::task::Poll::Ready(()); - } - tasks.wakers.push_back(self.waker.clone()); - return std::task::Poll::Pending; - } - - // check if we're ready to run - if self.waker.is_ready.load(Ordering::SeqCst) { - std::task::Poll::Ready(()) - } else { - std::task::Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use deno_core::futures; - use deno_core::parking_lot::Mutex; - use std::sync::Arc; - - use super::TaskQueue; - - #[tokio::test] - async fn task_queue_runs_one_after_other() { - let task_queue = TaskQueue::default(); - let mut tasks = Vec::new(); - let data = Arc::new(Mutex::new(0)); - for i in 0..100 { - let data = data.clone(); - tasks.push(task_queue.queue(async move { - tokio::task::spawn_blocking(move || { - let mut data = data.lock(); - if *data != i { - panic!("Value was not equal."); - } - *data = i + 1; - }) - .await - .unwrap(); - })); - } - futures::future::join_all(tasks).await; - } -}