1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 13:00:36 -05:00

fix concurrent stream projections

This commit is contained in:
snek 2024-12-22 22:48:31 +01:00
parent 2149b96660
commit a563265316
No known key found for this signature in database

View file

@ -40,6 +40,8 @@ use std::net::SocketAddrV4;
use std::net::SocketAddrV6;
use std::pin::pin;
use std::rc::Rc;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
@ -629,11 +631,19 @@ pub(crate) async fn op_quic_connection_handshake(
zrtt_accepted.await;
}
struct SendStreamResource(AsyncRefCell<quinn::SendStream>);
struct SendStreamResource {
stream: AsyncRefCell<quinn::SendStream>,
stream_id: quinn::StreamId,
priority: AtomicI32,
}
impl SendStreamResource {
fn new(stream: quinn::SendStream) -> Self {
Self(AsyncRefCell::new(stream))
Self {
stream_id: stream.id(),
priority: AtomicI32::new(stream.priority().unwrap_or(0)),
stream: AsyncRefCell::new(stream),
}
}
}
@ -644,8 +654,10 @@ impl Resource for SendStreamResource {
fn write(self: Rc<Self>, view: BufView) -> AsyncResult<WriteOutcome> {
Box::pin(async move {
let mut r = RcRef::map(self, |r| &r.0).borrow_mut().await;
let nwritten = r.write(&view).await?;
let mut stream =
RcRef::map(self.clone(), |r| &r.stream).borrow_mut().await;
stream.set_priority(self.priority.load(Ordering::Relaxed))?;
let nwritten = stream.write(&view).await?;
Ok(WriteOutcome::Partial { nwritten, view })
})
}
@ -653,11 +665,17 @@ impl Resource for SendStreamResource {
fn close(self: Rc<Self>) {}
}
struct RecvStreamResource(AsyncRefCell<quinn::RecvStream>);
struct RecvStreamResource {
stream: AsyncRefCell<quinn::RecvStream>,
stream_id: quinn::StreamId,
}
impl RecvStreamResource {
fn new(stream: quinn::RecvStream) -> Self {
Self(AsyncRefCell::new(stream))
Self {
stream_id: stream.id(),
stream: AsyncRefCell::new(stream),
}
}
}
@ -668,7 +686,7 @@ impl Resource for RecvStreamResource {
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(async move {
let mut r = RcRef::map(self, |r| &r.0).borrow_mut().await;
let mut r = RcRef::map(self, |r| &r.stream).borrow_mut().await;
let mut data = vec![0; limit];
let nread = r.read(&mut data).await?.unwrap_or(0);
data.truncate(nread);
@ -681,7 +699,7 @@ impl Resource for RecvStreamResource {
mut buf: BufMutView,
) -> AsyncResult<(usize, BufMutView)> {
Box::pin(async move {
let mut r = RcRef::map(self, |r| &r.0).borrow_mut().await;
let mut r = RcRef::map(self, |r| &r.stream).borrow_mut().await;
let nread = r.read(&mut buf).await?.unwrap_or(0);
Ok((nread, buf))
})
@ -689,7 +707,7 @@ impl Resource for RecvStreamResource {
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(async move {
let mut r = RcRef::map(self, |r| &r.0).borrow_mut().await;
let mut r = RcRef::map(self, |r| &r.stream).borrow_mut().await;
r.stop(quinn::VarInt::from(0u32))?;
Ok(())
})
@ -828,11 +846,7 @@ pub(crate) fn op_quic_send_stream_get_priority(
.borrow()
.resource_table
.get::<SendStreamResource>(rid)?;
let r = RcRef::map(resource, |r| &r.0).try_borrow();
match r {
Some(s) => Ok(s.priority()?),
None => Err(QuicError::BadResource("QuicSendStream")),
}
Ok(resource.priority.load(Ordering::Relaxed))
}
#[op2(fast)]
@ -845,14 +859,8 @@ pub(crate) fn op_quic_send_stream_set_priority(
.borrow()
.resource_table
.get::<SendStreamResource>(rid)?;
let r = RcRef::map(resource, |r| &r.0).try_borrow();
match r {
Some(s) => {
s.set_priority(priority)?;
Ok(())
}
None => Err(QuicError::BadResource("QuicSendStream")),
}
resource.priority.store(priority, Ordering::Relaxed);
Ok(())
}
#[op2(fast)]
@ -865,11 +873,8 @@ pub(crate) fn op_quic_send_stream_get_id(
.borrow()
.resource_table
.get::<SendStreamResource>(rid)?;
let r = RcRef::map(resource, |r| &r.0).try_borrow();
match r {
Some(s) => Ok(quinn::VarInt::from(s.id()).into_inner()),
None => Err(QuicError::BadResource("QuicSendStream")),
}
let stream_id = quinn::VarInt::from(resource.stream_id).into_inner();
Ok(stream_id)
}
#[op2(fast)]
@ -882,9 +887,6 @@ pub(crate) fn op_quic_recv_stream_get_id(
.borrow()
.resource_table
.get::<RecvStreamResource>(rid)?;
let r = RcRef::map(resource, |r| &r.0).try_borrow();
match r {
Some(s) => Ok(quinn::VarInt::from(s.id()).into_inner()),
None => Err(QuicError::BadResource("QuicRecvStream")),
}
let stream_id = quinn::VarInt::from(resource.stream_id).into_inner();
Ok(stream_id)
}