0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

Fix concurrent accepts (#2403)

This commit is contained in:
Ryan Dahl 2019-05-23 21:22:52 +03:00 committed by GitHub
parent 2952fb5405
commit 583a646be7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 6 deletions

View file

@ -171,12 +171,49 @@ impl Resource {
}
}
/// Track the current task (for TcpListener resource).
/// Throws an error if another task is already tracked.
pub fn track_task(&mut self) -> Result<(), std::io::Error> {
let mut table = RESOURCE_TABLE.lock().unwrap();
// Only track if is TcpListener.
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
// Currently, we only allow tracking a single accept task for a listener.
// This might be changed in the future with multiple workers.
// Caveat: TcpListener by itself also only tracks an accept task at a time.
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
if t.is_some() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Another accept task is ongoing",
));
}
t.replace(futures::task::current());
}
Ok(())
}
/// Stop tracking a task (for TcpListener resource).
/// Happens when the task is done and thus no further tracking is needed.
pub fn untrack_task(&mut self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
// Only untrack if is TcpListener.
if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) {
if t.is_some() {
t.take();
}
}
}
// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the RESOURCE_TABLE.
pub fn close(&self) {
let mut table = RESOURCE_TABLE.lock().unwrap();
let r = table.remove(&self.rid);
assert!(r.is_some());
let r = table.remove(&self.rid).unwrap();
// If TcpListener, we must kill all pending accepts!
if let Repr::TcpListener(_, Some(t)) = r {
// Call notify on the tracked task, so that they would error out.
t.notify();
}
}
pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {

View file

@ -78,14 +78,31 @@ pub fn accept(r: Resource) -> Accept {
pub struct Accept {
state: AcceptState,
}
impl Future for Accept {
type Item = (TcpStream, SocketAddr);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (stream, addr) = match self.state {
AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
AcceptState::Pending(ref mut r) => match r.poll_accept() {
Ok(futures::prelude::Async::Ready(t)) => {
r.untrack_task();
t
}
Ok(futures::prelude::Async::NotReady) => {
// Would error out if another accept task is being tracked.
r.track_task()?;
return Ok(futures::prelude::Async::NotReady);
}
Err(e) => {
r.untrack_task();
return Err(e);
}
},
AcceptState::Empty => panic!("poll Accept after it's done"),
};

View file

@ -21,7 +21,6 @@ testPerm({ net: true }, async function netCloseWhileAccept(): Promise<void> {
assertEquals(err.message, "Listener has been closed");
});
/* TODO(ry) Re-enable this test.
testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> {
const listener = Deno.listen("tcp", ":4502");
let acceptErrCount = 0;
@ -42,7 +41,6 @@ testPerm({ net: true }, async function netConcurrentAccept(): Promise<void> {
await [p, p1];
assertEquals(acceptErrCount, 1);
});
*/
testPerm({ net: true }, async function netDialListen(): Promise<void> {
const listener = Deno.listen("tcp", ":4500");