diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index 17f99ca98e..c7dce421d0 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -365,6 +365,8 @@ } } else { const reader = respBody.getReader(); + const { value, done } = await reader.read(); + // Best case: sends headers + first chunk in a single go. writeFixedResponse( serverId, i, @@ -379,14 +381,21 @@ false, respondFast, ); - while (true) { - const { value, done } = await reader.read(); - await respondChunked( - i, - value, - done, - ); - if (done) break; + await respondChunked( + i, + value, + done, + ); + if (!done) { + while (true) { + const chunk = await reader.read(); + await respondChunked( + i, + chunk.value, + chunk.done, + ); + if (chunk.done) break; + } } } } @@ -591,13 +600,22 @@ }); function respondChunked(token, chunk, shutdown) { - return core.opAsync( - "op_flash_respond_chuncked", + const nwritten = core.ops.op_try_flash_respond_chuncked( serverId, token, - chunk, + chunk ?? new Uint8Array(), shutdown, ); + if (nwritten > 0) { + return core.opAsync( + "op_flash_respond_chuncked", + serverId, + token, + chunk, + shutdown, + nwritten, + ); + } } const fastOp = prepareFastCalls(); diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index f9ce1c7445..17e3e83178 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -150,6 +150,34 @@ async fn op_flash_respond_async( Ok(()) } +#[op(fast)] +fn op_try_flash_respond_chuncked( + op_state: &mut OpState, + server_id: u32, + token: u32, + response: &[u8], + shutdown: bool, +) -> u32 { + let flash_ctx = op_state.borrow_mut::(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); + let sock = tx.socket(); + + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + let h = format!("{:x}\r\n", response.len()); + let concat = [h.as_bytes(), response, b"\r\n"].concat(); + let expected = sock.try_write(&concat); + if expected != concat.len() { + return expected as u32; + } + if shutdown { + // Best case: We've written everything and the stream is done too. + let _ = ctx.requests.remove(&token).unwrap(); + } + 0 +} + #[op] async fn op_flash_respond_chuncked( op_state: Rc>, @@ -157,6 +185,7 @@ async fn op_flash_respond_chuncked( token: u32, response: Option, shutdown: bool, + nwritten: u32, ) -> Result<(), AnyError> { let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); @@ -178,17 +207,27 @@ async fn op_flash_respond_chuncked( .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + macro_rules! write_whats_not_written { + ($e:expr) => { + let e = $e; + let n = nwritten as usize; + if n < e.len() { + stream.write_all(&e[n..]).await?; + } + }; + } if let Some(response) = response { - stream - .write_all(format!("{:x}\r\n", response.len()).as_bytes()) - .await?; - stream.write_all(&response).await?; - stream.write_all(b"\r\n").await?; + let h = format!("{:x}\r\n", response.len()); + write_whats_not_written!(h.as_bytes()); + write_whats_not_written!(&response); + write_whats_not_written!(b"\r\n"); } // The last chunk if shutdown { - stream.write_all(b"0\r\n\r\n").await?; + write_whats_not_written!(b"0\r\n\r\n"); } Ok(()) @@ -1451,6 +1490,7 @@ pub fn init(unstable: bool) -> Extension { op_flash_respond::decl(), op_flash_respond_async::decl(), op_flash_respond_chuncked::decl(), + op_try_flash_respond_chuncked::decl(), op_flash_method::decl(), op_flash_path::decl(), op_flash_headers::decl(),