1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00

core: introduce resource.read_return (#14331)

This commit is contained in:
Divy Srivastava 2022-04-20 22:09:13 +05:30 committed by GitHub
parent 57a8fc37fc
commit 2612b6f20f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 80 additions and 35 deletions

View file

@ -236,7 +236,7 @@ jobs:
~/.cargo/registry/index ~/.cargo/registry/index
~/.cargo/registry/cache ~/.cargo/registry/cache
~/.cargo/git/db ~/.cargo/git/db
key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }} key: 8-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
# In main branch, always creates fresh cache # In main branch, always creates fresh cache
- name: Cache build output (main) - name: Cache build output (main)
@ -252,7 +252,7 @@ jobs:
!./target/*/*.zip !./target/*/*.zip
!./target/*/*.tar.gz !./target/*/*.tar.gz
key: | key: |
7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }} 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
# Restore cache from the latest 'main' branch build. # Restore cache from the latest 'main' branch build.
- name: Cache build output (PR) - name: Cache build output (PR)
@ -268,7 +268,7 @@ jobs:
!./target/*/*.tar.gz !./target/*/*.tar.gz
key: never_saved key: never_saved
restore-keys: | restore-keys: |
7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}- 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
# Don't save cache after building PRs or branches other than 'main'. # Don't save cache after building PRs or branches other than 'main'.
- name: Skip save cache (PR) - name: Skip save cache (PR)

View file

@ -83,13 +83,18 @@ struct TcpStream {
} }
impl TcpStream { impl TcpStream {
async fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> Result<usize, Error> { async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> Result<(usize, ZeroCopyBuf), Error> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel); let cancel = RcRef::map(self, |r| &r.cancel);
rd.read(&mut buf) let nread = rd
.read(&mut buf)
.try_or_cancel(cancel) .try_or_cancel(cancel)
.await .await
.map_err(Error::from) .map_err(Error::from)?;
Ok((nread, buf))
} }
async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> { async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> {
@ -99,7 +104,10 @@ impl TcpStream {
} }
impl Resource for TcpStream { impl Resource for TcpStream {
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf)) Box::pin(self.read(buf))
} }

View file

@ -36,7 +36,17 @@ pub trait Resource: Any + 'static {
} }
/// Resources may implement `read()` to be a readable stream /// Resources may implement `read()` to be a readable stream
fn read(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(async move {
let (nread, _) = self.read_return(buf).await?;
Ok(nread)
})
}
fn read_return(
self: Rc<Self>,
_buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(futures::future::err(not_supported())) Box::pin(futures::future::err(not_supported()))
} }

View file

@ -485,12 +485,15 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into() "fetchResponseBody".into()
} }
fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(async move { Box::pin(async move {
let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel); let cancel = RcRef::map(self, |r| &r.cancel);
let read = reader.read(&mut buf).try_or_cancel(cancel).await?; let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
Ok(read) Ok((read, buf))
}) })
} }

View file

@ -688,16 +688,13 @@ async fn op_http_write_resource(
} }
}; };
let mut vec = vec![0u8; 64 * 1024]; let vec = vec![0u8; 64 * 1024]; // 64KB
let vec_ptr = vec.as_mut_ptr();
let buf = ZeroCopyBuf::new_temp(vec); let buf = ZeroCopyBuf::new_temp(vec);
let nread = resource.clone().read(buf).await?; let (nread, buf) = resource.clone().read_return(buf).await?;
if nread == 0 { if nread == 0 {
break; break;
} }
// SAFETY: ZeroCopyBuf keeps the Vec<u8> alive. let bytes = Bytes::from(buf.to_temp());
let bytes =
Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) });
match body_tx.send_data(bytes).await { match body_tx.send_data(bytes).await {
Ok(_) => {} Ok(_) => {}
Err(err) => { Err(err) => {

View file

@ -70,13 +70,13 @@ where
pub async fn read( pub async fn read(
self: Rc<Self>, self: Rc<Self>,
mut buf: ZeroCopyBuf, mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> { ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.rd_borrow_mut().await; let mut rd = self.rd_borrow_mut().await;
let nread = rd let nread = rd
.read(&mut buf) .read(&mut buf)
.try_or_cancel(self.cancel_handle()) .try_or_cancel(self.cancel_handle())
.await?; .await?;
Ok(nread) Ok((nread, buf))
} }
pub async fn write( pub async fn write(
@ -103,7 +103,10 @@ impl Resource for TcpStreamResource {
"tcpStream".into() "tcpStream".into()
} }
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf)) Box::pin(self.read(buf))
} }
@ -160,7 +163,7 @@ impl UnixStreamResource {
pub async fn read( pub async fn read(
self: Rc<Self>, self: Rc<Self>,
_buf: ZeroCopyBuf, _buf: ZeroCopyBuf,
) -> Result<usize, AnyError> { ) -> Result<(usize, ZeroCopyBuf), AnyError> {
unreachable!() unreachable!()
} }
pub async fn write( pub async fn write(
@ -182,7 +185,10 @@ impl Resource for UnixStreamResource {
"unixStream".into() "unixStream".into()
} }
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf)) Box::pin(self.read(buf))
} }

View file

@ -674,11 +674,11 @@ impl TlsStreamResource {
pub async fn read( pub async fn read(
self: Rc<Self>, self: Rc<Self>,
mut buf: ZeroCopyBuf, mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> { ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
Ok(nread) Ok((nread, buf))
} }
pub async fn write( pub async fn write(
@ -722,7 +722,10 @@ impl Resource for TlsStreamResource {
"tlsStream".into() "tlsStream".into()
} }
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf)) Box::pin(self.read(buf))
} }

View file

@ -170,13 +170,13 @@ where
async fn read( async fn read(
self: Rc<Self>, self: Rc<Self>,
mut buf: ZeroCopyBuf, mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> { ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.borrow_mut().await; let mut rd = self.borrow_mut().await;
let nread = rd let nread = rd
.read(&mut buf) .read(&mut buf)
.try_or_cancel(self.cancel_handle()) .try_or_cancel(self.cancel_handle())
.await?; .await?;
Ok(nread) Ok((nread, buf))
} }
} }
@ -203,7 +203,10 @@ impl Resource for ChildStdoutResource {
"childStdout".into() "childStdout".into()
} }
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf)) Box::pin(self.read(buf))
} }
@ -219,7 +222,10 @@ impl Resource for ChildStderrResource {
"childStderr".into() "childStderr".into()
} }
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf)) Box::pin(self.read(buf))
} }
@ -263,16 +269,17 @@ impl StdFileResource {
async fn read( async fn read(
self: Rc<Self>, self: Rc<Self>,
mut buf: ZeroCopyBuf, mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> { ) -> Result<(usize, ZeroCopyBuf), AnyError> {
if self.fs_file.is_some() { if self.fs_file.is_some() {
let fs_file = self.fs_file.as_ref().unwrap(); let fs_file = self.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone(); let std_file = fs_file.0.as_ref().unwrap().clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(
move || -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut std_file = std_file.lock().unwrap(); let mut std_file = std_file.lock().unwrap();
std_file.read(&mut buf) Ok((std_file.read(&mut buf)?, buf))
}) },
)
.await? .await?
.map_err(AnyError::from)
} else { } else {
Err(resource_unavailable()) Err(resource_unavailable())
} }
@ -322,7 +329,10 @@ impl Resource for StdFileResource {
self.name.as_str().into() self.name.as_str().into()
} }
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf)) Box::pin(self.read(buf))
} }

View file

@ -29,6 +29,14 @@ impl MagicBuffer {
pub fn new_temp(vec: Vec<u8>) -> Self { pub fn new_temp(vec: Vec<u8>) -> Self {
MagicBuffer::Temp(vec) MagicBuffer::Temp(vec)
} }
// TODO(@littledivy): Temporary, this needs a refactor.
pub fn to_temp(self) -> Vec<u8> {
match self {
MagicBuffer::Temp(vec) => vec,
_ => unreachable!(),
}
}
} }
impl Clone for MagicBuffer { impl Clone for MagicBuffer {