mirror of
https://github.com/denoland/deno.git
synced 2025-01-26 17:02:20 -05:00
147 lines
3.6 KiB
Rust
147 lines
3.6 KiB
Rust
use deno_core::error::AnyError;
|
|
use mio::net::TcpStream;
|
|
use std::{
|
|
cell::UnsafeCell,
|
|
future::Future,
|
|
io::{Read, Write},
|
|
pin::Pin,
|
|
sync::{Arc, Mutex},
|
|
};
|
|
use tokio::sync::mpsc;
|
|
|
|
use crate::ParseStatus;
|
|
|
|
type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
|
|
|
|
pub enum InnerStream {
|
|
Tcp(TcpStream),
|
|
Tls(Box<TlsTcpStream>),
|
|
}
|
|
|
|
pub struct Stream {
|
|
pub inner: InnerStream,
|
|
pub detached: bool,
|
|
pub read_rx: Option<mpsc::Receiver<()>>,
|
|
pub read_tx: Option<mpsc::Sender<()>>,
|
|
pub parse_done: ParseStatus,
|
|
pub buffer: UnsafeCell<Vec<u8>>,
|
|
pub read_lock: Arc<Mutex<()>>,
|
|
}
|
|
|
|
impl Stream {
|
|
pub fn detach_ownership(&mut self) {
|
|
self.detached = true;
|
|
}
|
|
|
|
/// Try to write to the socket.
|
|
#[inline]
|
|
pub fn try_write(&mut self, buf: &[u8]) -> usize {
|
|
let mut nwritten = 0;
|
|
while nwritten < buf.len() {
|
|
match self.write(&buf[nwritten..]) {
|
|
Ok(n) => nwritten += n,
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
|
break;
|
|
}
|
|
Err(e) => {
|
|
log::trace!("Error writing to socket: {}", e);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
nwritten
|
|
}
|
|
|
|
#[inline]
|
|
pub fn shutdown(&mut self) {
|
|
match &mut self.inner {
|
|
InnerStream::Tcp(stream) => {
|
|
// Typically shutdown shouldn't fail.
|
|
let _ = stream.shutdown(std::net::Shutdown::Both);
|
|
}
|
|
InnerStream::Tls(stream) => {
|
|
let _ = stream.sock.shutdown(std::net::Shutdown::Both);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn as_std(&mut self) -> std::net::TcpStream {
|
|
#[cfg(unix)]
|
|
let std_stream = {
|
|
use std::os::unix::prelude::AsRawFd;
|
|
use std::os::unix::prelude::FromRawFd;
|
|
let fd = match self.inner {
|
|
InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(),
|
|
_ => todo!(),
|
|
};
|
|
// SAFETY: `fd` is a valid file descriptor.
|
|
unsafe { std::net::TcpStream::from_raw_fd(fd) }
|
|
};
|
|
#[cfg(windows)]
|
|
let std_stream = {
|
|
use std::os::windows::prelude::AsRawSocket;
|
|
use std::os::windows::prelude::FromRawSocket;
|
|
let fd = match self.inner {
|
|
InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(),
|
|
_ => todo!(),
|
|
};
|
|
// SAFETY: `fd` is a valid file descriptor.
|
|
unsafe { std::net::TcpStream::from_raw_socket(fd) }
|
|
};
|
|
std_stream
|
|
}
|
|
|
|
#[inline]
|
|
pub async fn with_async_stream<F, T>(&mut self, f: F) -> Result<T, AnyError>
|
|
where
|
|
F: FnOnce(
|
|
&mut tokio::net::TcpStream,
|
|
) -> Pin<Box<dyn '_ + Future<Output = Result<T, AnyError>>>>,
|
|
{
|
|
let mut async_stream = tokio::net::TcpStream::from_std(self.as_std())?;
|
|
let result = f(&mut async_stream).await?;
|
|
forget_stream(async_stream.into_std()?);
|
|
Ok(result)
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
pub fn forget_stream(stream: std::net::TcpStream) {
|
|
#[cfg(unix)]
|
|
{
|
|
use std::os::unix::prelude::IntoRawFd;
|
|
let _ = stream.into_raw_fd();
|
|
}
|
|
#[cfg(windows)]
|
|
{
|
|
use std::os::windows::prelude::IntoRawSocket;
|
|
let _ = stream.into_raw_socket();
|
|
}
|
|
}
|
|
|
|
impl Write for Stream {
|
|
#[inline]
|
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
match self.inner {
|
|
InnerStream::Tcp(ref mut stream) => stream.write(buf),
|
|
InnerStream::Tls(ref mut stream) => stream.write(buf),
|
|
}
|
|
}
|
|
#[inline]
|
|
fn flush(&mut self) -> std::io::Result<()> {
|
|
match self.inner {
|
|
InnerStream::Tcp(ref mut stream) => stream.flush(),
|
|
InnerStream::Tls(ref mut stream) => stream.flush(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Read for Stream {
|
|
#[inline]
|
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
|
match self.inner {
|
|
InnerStream::Tcp(ref mut stream) => stream.read(buf),
|
|
InnerStream::Tls(ref mut stream) => stream.read(buf),
|
|
}
|
|
}
|
|
}
|