diff --git a/Cargo.lock b/Cargo.lock index aaff2475f8..68212f05bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -967,6 +967,7 @@ name = "deno_core" version = "0.154.0" dependencies = [ "anyhow", + "bytes", "deno_ast", "deno_ops", "futures", diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts index e2ff0d5e04..a668bb4802 100644 --- a/cli/tests/unit/fetch_test.ts +++ b/cli/tests/unit/fetch_test.ts @@ -869,13 +869,13 @@ Deno.test( ); Deno.test(function responseRedirect() { - const redir = Response.redirect("example.com/newLocation", 301); + const redir = Response.redirect("http://example.com/newLocation", 301); assertEquals(redir.status, 301); assertEquals(redir.statusText, ""); assertEquals(redir.url, ""); assertEquals( redir.headers.get("Location"), - "http://js-unit-tests/foo/example.com/newLocation", + "http://example.com/newLocation", ); assertEquals(redir.type, "default"); }); diff --git a/core/Cargo.toml b/core/Cargo.toml index 82ceab2a0f..57dafa0e50 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,6 +18,7 @@ v8_use_custom_libcxx = ["v8/use_custom_libcxx"] [dependencies] anyhow = "1.0.57" +bytes = "1" deno_ops = { path = "../ops", version = "0.32.0" } futures = "0.3.21" # Stay on 1.6 to avoid a dependency cycle in ahash https://github.com/tkaitchuck/aHash/issues/95 diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 7c895f326f..6d61d35ecb 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -10,7 +10,6 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; use std::cell::RefCell; use std::env; use std::net::SocketAddr; @@ -83,37 +82,23 @@ struct TcpStream { } impl TcpStream { - async fn read( - self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), Error> { + async fn read(self: Rc, data: &mut [u8]) -> Result { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - let nread = rd - .read(&mut buf) - .try_or_cancel(cancel) - .await - .map_err(Error::from)?; - Ok((nread, buf)) + let nread = rd.read(data).try_or_cancel(cancel).await?; + Ok(nread) } - async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { + async fn write(self: Rc, data: &[u8]) -> Result { let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; - wr.write(&buf).await.map_err(Error::from) + let nwritten = wr.write(data).await?; + Ok(nwritten) } } impl Resource for TcpStream { - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(self.write(buf)) - } + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); fn close(self: Rc) { self.cancel.cancel() diff --git a/core/io.rs b/core/io.rs new file mode 100644 index 0000000000..7baad12e44 --- /dev/null +++ b/core/io.rs @@ -0,0 +1,271 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::ops::Deref; +use std::ops::DerefMut; + +use serde_v8::ZeroCopyBuf; + +/// BufView is a wrapper around an underlying contiguous chunk of bytes. It can +/// be created from a [ZeroCopyBuf], [bytes::Bytes], or [Vec] and implements +/// `Deref<[u8]>` and `AsRef<[u8]>`. +/// +/// The wrapper has the ability to constrain the exposed view to a sub-region of +/// the underlying buffer. This is useful for write operations, because they may +/// have to be called multiple times, with different views onto the buffer to be +/// able to write it entirely. +pub struct BufView { + inner: BufViewInner, + cursor: usize, +} + +enum BufViewInner { + Empty, + Bytes(bytes::Bytes), + ZeroCopy(ZeroCopyBuf), + Vec(Vec), +} + +impl BufView { + fn from_inner(inner: BufViewInner) -> Self { + Self { inner, cursor: 0 } + } + + pub fn empty() -> Self { + Self::from_inner(BufViewInner::Empty) + } + + /// Get the length of the buffer view. This is the length of the underlying + /// buffer minus the cursor position. + pub fn len(&self) -> usize { + match &self.inner { + BufViewInner::Empty => 0, + BufViewInner::Bytes(bytes) => bytes.len() - self.cursor, + BufViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor, + BufViewInner::Vec(vec) => vec.len() - self.cursor, + } + } + + /// Is the buffer view empty? + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Advance the internal cursor of the buffer view by `n` bytes. + pub fn advance_cursor(&mut self, n: usize) { + assert!(self.len() >= n); + self.cursor += n; + } + + /// Reset the internal cursor of the buffer view to the beginning of the + /// buffer. Returns the old cursor position. + pub fn reset_cursor(&mut self) -> usize { + let old = self.cursor; + self.cursor = 0; + old + } +} + +impl Deref for BufView { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let buf = match &self.inner { + BufViewInner::Empty => &[], + BufViewInner::Bytes(bytes) => bytes.deref(), + BufViewInner::ZeroCopy(zero_copy) => zero_copy.deref(), + BufViewInner::Vec(vec) => vec.deref(), + }; + &buf[self.cursor..] + } +} + +impl AsRef<[u8]> for BufView { + fn as_ref(&self) -> &[u8] { + self.deref() + } +} + +impl From for BufView { + fn from(buf: ZeroCopyBuf) -> Self { + Self::from_inner(BufViewInner::ZeroCopy(buf)) + } +} + +impl From> for BufView { + fn from(vec: Vec) -> Self { + Self::from_inner(BufViewInner::Vec(vec)) + } +} + +impl From for BufView { + fn from(buf: bytes::Bytes) -> Self { + Self::from_inner(BufViewInner::Bytes(buf)) + } +} + +impl From for bytes::Bytes { + fn from(buf: BufView) -> Self { + match buf.inner { + BufViewInner::Empty => bytes::Bytes::new(), + BufViewInner::Bytes(bytes) => bytes, + BufViewInner::ZeroCopy(zero_copy) => zero_copy.into(), + BufViewInner::Vec(vec) => vec.into(), + } + } +} + +/// BufMutView is a wrapper around an underlying contiguous chunk of writable +/// bytes. It can be created from a `ZeroCopyBuf` or a `Vec` and implements +/// `DerefMut<[u8]>` and `AsMut<[u8]>`. +/// +/// The wrapper has the ability to constrain the exposed view to a sub-region of +/// the underlying buffer. This is useful for write operations, because they may +/// have to be called multiple times, with different views onto the buffer to be +/// able to write it entirely. +/// +/// A `BufMutView` can be turned into a `BufView` by calling `BufMutView::into_view`. +pub struct BufMutView { + inner: BufMutViewInner, + cursor: usize, +} + +enum BufMutViewInner { + ZeroCopy(ZeroCopyBuf), + Vec(Vec), +} + +impl BufMutView { + fn from_inner(inner: BufMutViewInner) -> Self { + Self { inner, cursor: 0 } + } + + pub fn new(len: usize) -> Self { + Self::from_inner(BufMutViewInner::Vec(vec![0; len])) + } + + /// Get the length of the buffer view. This is the length of the underlying + /// buffer minus the cursor position. + pub fn len(&self) -> usize { + match &self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.len() - self.cursor, + BufMutViewInner::Vec(vec) => vec.len() - self.cursor, + } + } + + /// Is the buffer view empty? + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Advance the internal cursor of the buffer view by `n` bytes. + pub fn advance_cursor(&mut self, n: usize) { + assert!(self.len() >= n); + self.cursor += n; + } + + /// Reset the internal cursor of the buffer view to the beginning of the + /// buffer. Returns the old cursor position. + pub fn reset_cursor(&mut self) -> usize { + let old = self.cursor; + self.cursor = 0; + old + } + + /// Turn this `BufMutView` into a `BufView`. + pub fn into_view(self) -> BufView { + let inner = match self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => BufViewInner::ZeroCopy(zero_copy), + BufMutViewInner::Vec(vec) => BufViewInner::Vec(vec), + }; + BufView { + inner, + cursor: self.cursor, + } + } + + /// Unwrap the underlying buffer into a `Vec`, consuming the `BufMutView`. + /// + /// This method panics when called on a `BufMutView` that was created from a + /// `ZeroCopyBuf`. + pub fn unwrap_vec(self) -> Vec { + match self.inner { + BufMutViewInner::ZeroCopy(_) => { + panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec"); + } + BufMutViewInner::Vec(vec) => vec, + } + } + + /// Get a mutable reference to an underlying `Vec`. + /// + /// This method panics when called on a `BufMutView` that was created from a + /// `ZeroCopyBuf`. + pub fn get_mut_vec(&mut self) -> &mut Vec { + match &mut self.inner { + BufMutViewInner::ZeroCopy(_) => { + panic!("Cannot unwrap a ZeroCopyBuf backed BufMutView into a Vec"); + } + BufMutViewInner::Vec(vec) => vec, + } + } +} + +impl Deref for BufMutView { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + let buf = match &self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref(), + BufMutViewInner::Vec(vec) => vec.deref(), + }; + &buf[self.cursor..] + } +} + +impl DerefMut for BufMutView { + fn deref_mut(&mut self) -> &mut [u8] { + let buf = match &mut self.inner { + BufMutViewInner::ZeroCopy(zero_copy) => zero_copy.deref_mut(), + BufMutViewInner::Vec(vec) => vec.deref_mut(), + }; + &mut buf[self.cursor..] + } +} + +impl AsRef<[u8]> for BufMutView { + fn as_ref(&self) -> &[u8] { + self.deref() + } +} + +impl AsMut<[u8]> for BufMutView { + fn as_mut(&mut self) -> &mut [u8] { + self.deref_mut() + } +} + +impl From for BufMutView { + fn from(buf: ZeroCopyBuf) -> Self { + Self::from_inner(BufMutViewInner::ZeroCopy(buf)) + } +} + +impl From> for BufMutView { + fn from(buf: Vec) -> Self { + Self::from_inner(BufMutViewInner::Vec(buf)) + } +} + +pub enum WriteOutcome { + Partial { nwritten: usize, view: BufView }, + Full { nwritten: usize }, +} + +impl WriteOutcome { + pub fn nwritten(&self) -> usize { + match self { + WriteOutcome::Partial { nwritten, .. } => *nwritten, + WriteOutcome::Full { nwritten } => *nwritten, + } + } +} diff --git a/core/lib.rs b/core/lib.rs index 8043b5e95b..adda980463 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -8,6 +8,7 @@ mod extensions; mod flags; mod gotham_state; mod inspector; +mod io; mod module_specifier; mod modules; mod normalize_path; @@ -58,6 +59,9 @@ pub use crate::inspector::InspectorMsgKind; pub use crate::inspector::InspectorSessionProxy; pub use crate::inspector::JsRuntimeInspector; pub use crate::inspector::LocalInspectorSession; +pub use crate::io::BufMutView; +pub use crate::io::BufView; +pub use crate::io::WriteOutcome; pub use crate::module_specifier::resolve_import; pub use crate::module_specifier::resolve_path; pub use crate::module_specifier::resolve_url; diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 7393d4b699..41741bf287 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -2,6 +2,8 @@ use crate::error::format_file_name; use crate::error::type_error; use crate::include_js_files; +use crate::io::BufMutView; +use crate::io::BufView; use crate::ops_metrics::OpMetrics; use crate::resources::ResourceId; use crate::Extension; @@ -166,7 +168,8 @@ async fn op_read( buf: ZeroCopyBuf, ) -> Result { let resource = state.borrow().resource_table.get_any(rid)?; - resource.read_return(buf).await.map(|(n, _)| n as u32) + let view = BufMutView::from(buf); + resource.read_byob(view).await.map(|(n, _)| n as u32) } #[op] @@ -175,18 +178,67 @@ async fn op_read_all( rid: ResourceId, ) -> Result { let resource = state.borrow().resource_table.get_any(rid)?; - let (min, maximum) = resource.size_hint(); - let size = maximum.unwrap_or(min) as usize; - let mut buffer = Vec::with_capacity(size); + // The number of bytes we attempt to grow the buffer by each time it fills + // up and we have more data to read. We start at 64 KB. The grow_len is + // doubled if the nread returned from a single read is equal or greater than + // the grow_len. This allows us to reduce allocations for resources that can + // read large chunks of data at a time. + let mut grow_len: usize = 64 * 1024; + + let (min, maybe_max) = resource.size_hint(); + // Try to determine an optimial starting buffer size for this resource based + // on the size hint. + let initial_size = match (min, maybe_max) { + (min, Some(max)) if min == max => min as usize, + (_min, Some(max)) if (max as usize) < grow_len => max as usize, + (min, _) if (min as usize) < grow_len => grow_len, + (min, _) => min as usize, + }; + + let mut buf = BufMutView::new(initial_size); loop { - let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]); - let (nread, tmp) = resource.clone().read_return(tmp).await?; - if nread == 0 { - return Ok(buffer.into()); + // if the buffer does not have much remaining space, we may have to grow it. + if buf.len() < grow_len { + let vec = buf.get_mut_vec(); + match maybe_max { + Some(max) if vec.len() >= max as usize => { + // no need to resize the vec, because the vec is already large enough + // to accomodate the maximum size of the read data. + } + Some(max) if (max as usize) < vec.len() + grow_len => { + // grow the vec to the maximum size of the read data + vec.resize(max as usize, 0); + } + _ => { + // grow the vec by grow_len + vec.resize(vec.len() + grow_len, 0); + } + } + } + let (n, new_buf) = resource.clone().read_byob(buf).await?; + buf = new_buf; + buf.advance_cursor(n); + if n == 0 { + break; + } + if n >= grow_len { + // we managed to read more or equal data than fits in a single grow_len in + // a single go, so let's attempt to read even more next time. this reduces + // allocations for resources that can read large chunks of data at a time. + grow_len *= 2; } - buffer.extend_from_slice(&tmp[..nread]); } + + let nread = buf.reset_cursor(); + let mut vec = buf.unwrap_vec(); + // If the buffer is larger than the amount of data read, shrink it to the + // amount of data read. + if nread < vec.len() { + vec.truncate(nread); + } + + Ok(ZeroCopyBuf::from(vec)) } #[op] @@ -196,7 +248,9 @@ async fn op_write( buf: ZeroCopyBuf, ) -> Result { let resource = state.borrow().resource_table.get_any(rid)?; - resource.write(buf).await.map(|n| n as u32) + let view = BufView::from(buf); + let resp = resource.write(view).await?; + Ok(resp.nwritten() as u32) } #[op] diff --git a/core/resources.rs b/core/resources.rs index 1a1ba31934..ee9ee689f4 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -8,7 +8,9 @@ use crate::error::bad_resource_id; use crate::error::not_supported; -use crate::ZeroCopyBuf; +use crate::io::BufMutView; +use crate::io::BufView; +use crate::io::WriteOutcome; use anyhow::Error; use futures::Future; use std::any::type_name; @@ -23,9 +25,51 @@ use std::rc::Rc; /// Returned by resource read/write/shutdown methods pub type AsyncResult = Pin>>>; -/// All objects that can be store in the resource table should implement the -/// `Resource` trait. -/// TODO(@AaronO): investigate avoiding alloc on read/write/shutdown +/// Resources are Rust objects that are attached to a [deno_core::JsRuntime]. +/// They are identified in JS by a numeric ID (the resource ID, or rid). +/// Resources can be created in ops. Resources can also be retrieved in ops by +/// their rid. Resources are not thread-safe - they can only be accessed from +/// the thread that the JsRuntime lives on. +/// +/// Resources are reference counted in Rust. This means that they can be +/// cloned and passed around. When the last reference is dropped, the resource +/// is automatically closed. As long as the resource exists in the resource +/// table, the reference count is at least 1. +/// +/// ### Readable +/// +/// Readable resources are resources that can have data read from. Examples of +/// this are files, sockets, or HTTP streams. +/// +/// Readables can be read from from either JS or Rust. In JS one can use +/// `Deno.core.read()` to read from a single chunk of data from a readable. In +/// Rust one can directly call `read()` or `read_byob()`. The Rust side code is +/// used to implement ops like `op_slice`. +/// +/// A distinction can be made between readables that produce chunks of data +/// themselves (they allocate the chunks), and readables that fill up +/// bring-your-own-buffers (BYOBs). The former is often the case for framed +/// protocols like HTTP, while the latter is often the case for kernel backed +/// resources like files and sockets. +/// +/// All readables must implement `read()`. If resources can support an optimized +/// path for BYOBs, they should also implement `read_byob()`. For kernel backed +/// resources it often makes sense to implement `read_byob()` first, and then +/// implement `read()` as an operation that allocates a new chunk with +/// `len == limit`, then calls `read_byob()`, and then returns a chunk sliced to +/// the number of bytes read. Kernel backed resources can use the +/// [deno_core::impl_readable_byob] macro to implement optimized `read_byob()` +/// and `read()` implementations from a single `Self::read()` method. +/// +/// ### Writable +/// +/// Writable resources are resources that can have data written to. Examples of +/// this are files, sockets, or HTTP streams. +/// +/// Writables can be written to from either JS or Rust. In JS one can use +/// `Deno.core.write()` to write to a single chunk of data to a writable. In +/// Rust one can directly call `write()`. The latter is used to implement ops +/// like `op_slice`. pub trait Resource: Any + 'static { /// Returns a string representation of the resource which is made available /// to JavaScript code through `op_resources`. The default implementation @@ -35,20 +79,86 @@ pub trait Resource: Any + 'static { type_name::().into() } - /// Resources may implement `read_return()` to be a readable stream - fn read_return( + /// Read a single chunk of data from the resource. This operation returns a + /// `BufView` that represents the data that was read. If a zero length buffer + /// is returned, it indicates that the resource has reached EOF. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. + /// + /// If a readable can provide an optimized path for BYOBs, it should also + /// implement `read_byob()`. + fn read(self: Rc, limit: usize) -> AsyncResult { + _ = limit; + Box::pin(futures::future::err(not_supported())) + } + + /// Read a single chunk of data from the resource into the provided `BufMutView`. + /// + /// This operation returns the number of bytes read. If zero bytes are read, + /// it indicates that the resource has reached EOF. + /// + /// If this method is not implemented explicitly, the default implementation + /// will call `read()` and then copy the data into the provided buffer. For + /// readable resources that can provide an optimized path for BYOBs, it is + /// strongly recommended to override this method. + fn read_byob( self: Rc, - _buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + mut buf: BufMutView, + ) -> AsyncResult<(usize, BufMutView)> { + Box::pin(async move { + let read = self.read(buf.len()).await?; + let nread = read.len(); + buf[..nread].copy_from_slice(&read); + Ok((nread, buf)) + }) + } + + /// Write a single chunk of data to the resource. The operation may not be + /// able to write the entire chunk, in which case it should return the number + /// of bytes written. Additionally it should return the `BufView` that was + /// passed in. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. + fn write(self: Rc, buf: BufView) -> AsyncResult { + _ = buf; Box::pin(futures::future::err(not_supported())) } - /// Resources may implement `write()` to be a writable stream - fn write(self: Rc, _buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(futures::future::err(not_supported())) + /// Write an entire chunk of data to the resource. Unlike `write()`, this will + /// ensure the entire chunk is written. If the operation is not able to write + /// the entire chunk, an error is to be returned. + /// + /// By default this method will call `write()` repeatedly until the entire + /// chunk is written. Resources that can write the entire chunk in a single + /// operation using an optimized path should override this method. + fn write_all(self: Rc, view: BufView) -> AsyncResult<()> { + Box::pin(async move { + let mut view = view; + let this = self; + while !view.is_empty() { + let resp = this.clone().write(view).await?; + match resp { + WriteOutcome::Partial { + nwritten, + view: new_view, + } => { + view = new_view; + view.advance_cursor(nwritten); + } + WriteOutcome::Full { .. } => break, + } + } + Ok(()) + }) } - /// Resources may implement `shutdown()` for graceful async shutdowns + /// The shutdown method can be used to asynchronously close the resource. It + /// is not automatically called when the resource is dropped or closed. + /// + /// If this method is not implemented, the default implementation will error + /// with a "not supported" error. fn shutdown(self: Rc) -> AsyncResult<()> { Box::pin(futures::future::err(not_supported())) } @@ -229,3 +339,60 @@ impl ResourceTable { .map(|(&id, resource)| (id, resource.name())) } } + +#[macro_export] +macro_rules! impl_readable_byob { + () => { + fn read(self: Rc, limit: usize) -> AsyncResult<$crate::BufView> { + Box::pin(async move { + let mut vec = vec![0; limit]; + let nread = self.read(&mut vec).await?; + if nread != vec.len() { + vec.truncate(nread); + } + let view = $crate::BufView::from(vec); + Ok(view) + }) + } + + fn read_byob( + self: Rc, + mut buf: $crate::BufMutView, + ) -> AsyncResult<(usize, $crate::BufMutView)> { + Box::pin(async move { + let nread = self.read(buf.as_mut()).await?; + Ok((nread, buf)) + }) + } + }; +} + +#[macro_export] +macro_rules! impl_writable { + (__write) => { + fn write( + self: Rc, + view: $crate::BufView, + ) -> AsyncResult<$crate::WriteOutcome> { + Box::pin(async move { + let nwritten = self.write(&view).await?; + Ok($crate::WriteOutcome::Partial { nwritten, view }) + }) + } + }; + (__write_all) => { + fn write_all(self: Rc, view: $crate::BufView) -> AsyncResult<()> { + Box::pin(async move { + self.write_all(&view).await?; + Ok(()) + }) + } + }; + () => { + $crate::impl_writable!(__write); + }; + (with_all) => { + $crate::impl_writable!(__write); + $crate::impl_writable!(__write_all); + }; +} diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs index 7e97fb5633..75aa7cc6ee 100644 --- a/ext/cache/sqlite.rs +++ b/ext/cache/sqlite.rs @@ -7,7 +7,6 @@ use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::ByteString; use deno_core::Resource; -use deno_core::ZeroCopyBuf; use rusqlite::params; use rusqlite::Connection; use rusqlite::OptionalExtension; @@ -347,10 +346,10 @@ pub struct CachePutResource { } impl CachePutResource { - async fn write(self: Rc, data: ZeroCopyBuf) -> Result { + async fn write(self: Rc, data: &[u8]) -> Result { let resource = deno_core::RcRef::map(&self, |r| &r.file); let mut file = resource.borrow_mut().await; - file.write_all(&data).await?; + file.write_all(data).await?; Ok(data.len()) } @@ -374,9 +373,7 @@ impl Resource for CachePutResource { "CachePutResource".into() } - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(self.write(buf)) - } + deno_core::impl_writable!(); fn shutdown(self: Rc) -> AsyncResult<()> { Box::pin(self.shutdown()) @@ -394,28 +391,20 @@ impl CacheResponseResource { } } - async fn read( - self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + async fn read(self: Rc, data: &mut [u8]) -> Result { let resource = deno_core::RcRef::map(&self, |r| &r.file); let mut file = resource.borrow_mut().await; - let nread = file.read(&mut buf).await?; - Ok((nread, buf)) + let nread = file.read(data).await?; + Ok(nread) } } impl Resource for CacheResponseResource { + deno_core::impl_readable_byob!(); + fn name(&self) -> Cow { "CacheResponseResource".into() } - - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } } pub fn hash(token: &str) -> String { diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 0adc32343d..b8f784284a 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -5,11 +5,14 @@ mod fs_fetch_handler; use data_url::DataUrl; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::stream::Peekable; use deno_core::futures::Future; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::include_js_files; use deno_core::op; +use deno_core::BufView; +use deno_core::WriteOutcome; use deno_core::url::Url; use deno_core::AsyncRefCell; @@ -43,15 +46,14 @@ use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::cmp::min; use std::convert::From; use std::path::Path; use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; -use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::StreamReader; // Re-export reqwest and data_url pub use data_url; @@ -252,7 +254,7 @@ where match data { None => { // If no body is passed, we return a writer for streaming the body. - let (tx, rx) = mpsc::channel::>>(1); + let (tx, rx) = mpsc::channel::>(1); // If the size of the body is known, we include a content-length // header explicitly. @@ -401,12 +403,11 @@ pub async fn op_fetch_send( let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) })); - let stream_reader = StreamReader::new(stream); let rid = state .borrow_mut() .resource_table .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream_reader), + reader: AsyncRefCell::new(stream.peekable()), cancel: CancelHandle::default(), size: content_length, }); @@ -446,7 +447,7 @@ impl Resource for FetchCancelHandle { } pub struct FetchRequestBodyResource { - body: AsyncRefCell>>>, + body: AsyncRefCell>>, cancel: CancelHandle, } @@ -455,17 +456,16 @@ impl Resource for FetchRequestBodyResource { "fetchRequestBody".into() } - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn write(self: Rc, buf: BufView) -> AsyncResult { Box::pin(async move { - let data = buf.to_vec(); - let len = data.len(); + let bytes: bytes::Bytes = buf.into(); + let nwritten = bytes.len(); let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| { + body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { type_error("request body receiver not connected (request closed)") })?; - - Ok(len) + Ok(WriteOutcome::Full { nwritten }) }) } @@ -478,7 +478,7 @@ type BytesStream = Pin> + Unpin>>; struct FetchResponseBodyResource { - reader: AsyncRefCell>, + reader: AsyncRefCell>, cancel: CancelHandle, size: Option, } @@ -488,15 +488,36 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } - fn read_return( - self: Rc, - mut buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, limit: usize) -> AsyncResult { Box::pin(async move { - let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(self, |r| &r.cancel); - let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok((read, buf)) + let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; + + let fut = async move { + let mut reader = Pin::new(reader); + loop { + match reader.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(limit, chunk.len()); + let chunk = chunk.split_to(len); + break Ok(chunk.into()); + } + // This unwrap is safe because `peek_mut()` returned `Some`, and thus + // currently has a peeked value that can be synchronously returned + // from `next()`. + // + // The future returned from `next()` is always ready, so we can + // safely call `await` on it without creating a race condition. + Some(_) => match reader.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(BufView::empty()), + } + } + }; + + let cancel_handle = RcRef::map(self, |r| &r.cancel); + fut.try_or_cancel(cancel_handle).await }) } diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index a7bd8b4395..f9ce1c7445 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -253,20 +253,16 @@ async fn op_flash_write_resource( .write_all(b"Transfer-Encoding: chunked\r\n\r\n") .await?; loop { - let vec = vec![0u8; 64 * 1024]; // 64KB - let buf = ZeroCopyBuf::new_temp(vec); - let (nread, buf) = resource.clone().read_return(buf).await?; - if nread == 0 { + let view = resource.clone().read(64 * 1024).await?; // 64KB + if view.is_empty() { stream.write_all(b"0\r\n\r\n").await?; break; } - - let response = &buf[..nread]; // TODO(@littledivy): use vectored writes. stream - .write_all(format!("{:x}\r\n", response.len()).as_bytes()) + .write_all(format!("{:x}\r\n", view.len()).as_bytes()) .await?; - stream.write_all(response).await?; + stream.write_all(&view).await?; stream.write_all(b"\r\n").await?; } resource.close(); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index a8c2810bc7..e71d9fae3b 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -23,6 +23,8 @@ use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -333,61 +335,58 @@ impl HttpStreamResource { } } -impl HttpStreamResource { - async fn read( - self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { - let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; - - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(_, body) => break body, - HttpRequestReader::Closed => return Ok((0, buf)), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let (parts, body) = request.into_parts(); - *rd = HttpRequestReader::Body(parts.headers, body.peekable()); - } - _ => unreachable!(), - }; - }; - - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok((len, buf)); - } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok((0, buf)), - } - } - }; - - let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await - } -} - impl Resource for HttpStreamResource { fn name(&self) -> Cow { "httpStream".into() } - fn read_return( - self: Rc, - _buf: ZeroCopyBuf, - ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(_buf)) + fn read(self: Rc, limit: usize) -> AsyncResult { + Box::pin(async move { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(_, body) => break body, + HttpRequestReader::Closed => return Ok(BufView::empty()), + } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let (parts, body) = request.into_parts(); + *rd = HttpRequestReader::Body(parts.headers, body.peekable()); + } + _ => unreachable!(), + }; + }; + + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(limit, chunk.len()); + let buf = chunk.split_to(len); + let view = BufView::from(buf); + break Ok(view); + } + // This unwrap is safe because `peek_mut()` returned `Some`, and thus + // currently has a peeked value that can be synchronously returned + // from `next()`. + // + // The future returned from `next()` is always ready, so we can + // safely call `await` on it without creating a race condition. + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(BufView::empty()), + } + } + }; + + let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await + }) } fn close(self: Rc) { @@ -763,16 +762,14 @@ async fn op_http_write_resource( _ => {} }; - let vec = vec![0u8; 64 * 1024]; // 64KB - let buf = ZeroCopyBuf::new_temp(vec); - let (nread, buf) = resource.clone().read_return(buf).await?; - if nread == 0 { + let view = resource.clone().read(64 * 1024).await?; // 64KB + if view.is_empty() { break; } match &mut *wr { HttpResponseWriter::Body(body) => { - if let Err(err) = body.write_all(&buf[..nread]).await { + if let Err(err) = body.write_all(&view).await { assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead. @@ -782,9 +779,8 @@ async fn op_http_write_resource( } } HttpResponseWriter::BodyUncompressed(body) => { - let mut buf = buf.to_temp(); - buf.truncate(nread); - if let Err(err) = body.send_data(Bytes::from(buf)).await { + let bytes = Bytes::from(view); + if let Err(err) = body.send_data(bytes).await { assert!(err.is_closed()); // Pull up the failure associated with the transport connection instead. http_stream.conn.closed().await?; diff --git a/ext/net/io.rs b/ext/net/io.rs index c9587c8518..4c9fbe3d22 100644 --- a/ext/net/io.rs +++ b/ext/net/io.rs @@ -9,7 +9,6 @@ use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::RcRef; use deno_core::Resource; -use deno_core::ZeroCopyBuf; use socket2::SockRef; use std::borrow::Cow; use std::rc::Rc; @@ -69,22 +68,16 @@ where pub async fn read( self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + data: &mut [u8], + ) -> Result { let mut rd = self.rd_borrow_mut().await; - let nread = rd - .read(&mut buf) - .try_or_cancel(self.cancel_handle()) - .await?; - Ok((nread, buf)) + let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) } - pub async fn write( - self: Rc, - buf: ZeroCopyBuf, - ) -> Result { + pub async fn write(self: Rc, data: &[u8]) -> Result { let mut wr = self.wr_borrow_mut().await; - let nwritten = wr.write(&buf).await?; + let nwritten = wr.write(data).await?; Ok(nwritten) } @@ -99,21 +92,13 @@ pub type TcpStreamResource = FullDuplexResource; impl Resource for TcpStreamResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + fn name(&self) -> Cow { "tcpStream".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(self.write(buf)) - } - fn shutdown(self: Rc) -> AsyncResult<()> { Box::pin(self.shutdown()) } @@ -161,16 +146,10 @@ pub struct UnixStreamResource; #[cfg(not(unix))] impl UnixStreamResource { - pub async fn read( - self: Rc, - _buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + fn read(self: Rc, data: &mut [u8]) -> AsyncResult { unreachable!() } - pub async fn write( - self: Rc, - _buf: ZeroCopyBuf, - ) -> Result { + fn write(self: Rc, data: &[u8]) -> AsyncResult { unreachable!() } pub async fn shutdown(self: Rc) -> Result<(), AnyError> { @@ -182,21 +161,13 @@ impl UnixStreamResource { } impl Resource for UnixStreamResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + fn name(&self) -> Cow { "unixStream".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(self.write(buf)) - } - fn shutdown(self: Rc) -> AsyncResult<()> { Box::pin(self.shutdown()) } diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index 230f4359eb..a1b48b84e4 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -38,7 +38,6 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; use deno_tls::create_client_config; use deno_tls::load_certs; use deno_tls::load_private_keys; @@ -691,21 +690,18 @@ impl TlsStreamResource { pub async fn read( self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + data: &mut [u8], + ) -> Result { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); - let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; - Ok((nread, buf)) + let nread = rd.read(data).try_or_cancel(cancel_handle).await?; + Ok(nread) } - pub async fn write( - self: Rc, - buf: ZeroCopyBuf, - ) -> Result { + pub async fn write(self: Rc, data: &[u8]) -> Result { self.handshake().await?; let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; - let nwritten = wr.write(&buf).await?; + let nwritten = wr.write(data).await?; wr.flush().await?; Ok(nwritten) } @@ -736,21 +732,13 @@ impl TlsStreamResource { } impl Resource for TlsStreamResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + fn name(&self) -> Cow { "tlsStream".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(self.write(buf)) - } - fn shutdown(self: Rc) -> AsyncResult<()> { Box::pin(self.shutdown()) } diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 18c7fb5e51..8ed6969f9c 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -7,6 +7,8 @@ use deno_core::parking_lot::Mutex; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufMutView; +use deno_core::BufView; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -202,9 +204,9 @@ where RcRef::map(self, |r| &r.stream).borrow_mut() } - async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { + async fn write(self: Rc, data: &[u8]) -> Result { let mut stream = self.borrow_mut().await; - let nwritten = stream.write(&buf).await?; + let nwritten = stream.write(data).await?; Ok(nwritten) } @@ -250,16 +252,10 @@ where self.cancel_handle.cancel() } - async fn read( - self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + async fn read(self: Rc, data: &mut [u8]) -> Result { let mut rd = self.borrow_mut().await; - let nread = rd - .read(&mut buf) - .try_or_cancel(self.cancel_handle()) - .await?; - Ok((nread, buf)) + let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) } pub fn into_inner(self) -> S { @@ -274,9 +270,7 @@ impl Resource for ChildStdinResource { "childStdin".into() } - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(self.write(buf)) - } + deno_core::impl_writable!(); fn shutdown(self: Rc) -> AsyncResult<()> { Box::pin(self.shutdown()) @@ -286,17 +280,12 @@ impl Resource for ChildStdinResource { pub type ChildStdoutResource = ReadOnlyResource; impl Resource for ChildStdoutResource { + deno_core::impl_readable_byob!(); + fn name(&self) -> Cow { "childStdout".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - fn close(self: Rc) { self.cancel_read_ops(); } @@ -305,17 +294,12 @@ impl Resource for ChildStdoutResource { pub type ChildStderrResource = ReadOnlyResource; impl Resource for ChildStderrResource { + deno_core::impl_readable_byob!(); + fn name(&self) -> Cow { "childStderr".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - fn close(self: Rc) { self.cancel_read_ops(); } @@ -534,25 +518,34 @@ impl StdFileResource { result } - async fn read( + async fn read_byob( self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + mut buf: BufMutView, + ) -> Result<(usize, BufMutView), AnyError> { self - .with_inner_blocking_task( - move |inner| -> Result<(usize, ZeroCopyBuf), AnyError> { - Ok((inner.read(&mut buf)?, buf)) - }, - ) + .with_inner_blocking_task(move |inner| { + let nread = inner.read(&mut buf)?; + Ok((nread, buf)) + }) .await } - async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { + async fn write(self: Rc, data: &[u8]) -> Result { + let buf = data.to_owned(); self .with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf)) .await } + async fn write_all(self: Rc, data: &[u8]) -> Result<(), AnyError> { + let buf = data.to_owned(); + self + .with_inner_blocking_task(move |inner| { + inner.write_all_and_maybe_flush(&buf) + }) + .await + } + fn with_resource( state: &mut OpState, rid: ResourceId, @@ -641,17 +634,28 @@ impl Resource for StdFileResource { self.name.as_str().into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) + fn read(self: Rc, limit: usize) -> AsyncResult { + Box::pin(async move { + let vec = vec![0; limit]; + let buf = BufMutView::from(vec); + let (nread, buf) = self.read_byob(buf).await?; + let mut vec = buf.unwrap_vec(); + if vec.len() != nread { + vec.truncate(nread); + } + Ok(BufView::from(vec)) + }) } - fn write(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(self.write(buf)) + fn read_byob( + self: Rc, + buf: deno_core::BufMutView, + ) -> AsyncResult<(usize, deno_core::BufMutView)> { + Box::pin(self.read_byob(buf)) } + deno_core::impl_writable!(with_all); + #[cfg(unix)] fn backing_fd(self: Rc) -> Option { use std::os::unix::io::AsRawFd;