mirror of
https://github.com/denoland/deno.git
synced 2025-01-22 06:09:25 -05:00
43b6390629
Right now an error in a request body stream causes an uncatchable global promise rejection. This PR fixes this to instead propagate the error correctly into the promise returned from `fetch`. It additionally fixes errored readable stream bodies being treated as successfully completed bodies by Rust.
87 lines
2.4 KiB
Rust
87 lines
2.4 KiB
Rust
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
|
|
|
use std::pin::Pin;
|
|
use std::task::Context;
|
|
use std::task::Poll;
|
|
|
|
use deno_core::futures::Stream;
|
|
use tokio::sync::mpsc;
|
|
|
|
/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
|
|
/// used to bridge between the fetch task and the HTTP body stream. The stream
|
|
/// has the special property that it errors if the channel is closed before an
|
|
/// explicit EOF is sent (in the form of a [None] value on the sender).
|
|
pub struct MpscByteStream {
|
|
receiver: mpsc::Receiver<Option<bytes::Bytes>>,
|
|
shutdown: bool,
|
|
}
|
|
|
|
impl MpscByteStream {
|
|
pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
|
|
let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
|
|
let this = Self {
|
|
receiver,
|
|
shutdown: false,
|
|
};
|
|
(this, sender)
|
|
}
|
|
}
|
|
|
|
impl Stream for MpscByteStream {
|
|
type Item = Result<bytes::Bytes, std::io::Error>;
|
|
|
|
fn poll_next(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
) -> Poll<Option<Self::Item>> {
|
|
let val = std::task::ready!(self.receiver.poll_recv(cx));
|
|
match val {
|
|
None if self.shutdown => Poll::Ready(None),
|
|
None => Poll::Ready(Some(Err(std::io::Error::new(
|
|
std::io::ErrorKind::UnexpectedEof,
|
|
"channel closed",
|
|
)))),
|
|
Some(None) => {
|
|
self.shutdown = true;
|
|
Poll::Ready(None)
|
|
}
|
|
Some(Some(val)) => Poll::Ready(Some(Ok(val))),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use bytes::Bytes;
|
|
use deno_core::futures::StreamExt;
|
|
|
|
#[tokio::test]
|
|
async fn success() {
|
|
let (mut stream, sender) = MpscByteStream::new();
|
|
|
|
sender.send(Some(Bytes::from("hello"))).await.unwrap();
|
|
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
|
|
|
|
sender.send(Some(Bytes::from("world"))).await.unwrap();
|
|
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
|
|
|
|
sender.send(None).await.unwrap();
|
|
drop(sender);
|
|
assert!(stream.next().await.is_none());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn error() {
|
|
let (mut stream, sender) = MpscByteStream::new();
|
|
|
|
sender.send(Some(Bytes::from("hello"))).await.unwrap();
|
|
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
|
|
|
|
drop(sender);
|
|
assert_eq!(
|
|
stream.next().await.unwrap().unwrap_err().kind(),
|
|
std::io::ErrorKind::UnexpectedEof
|
|
);
|
|
}
|
|
}
|