From 0f305c866bb30938de1b001c5b14da8af43c0ab0 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Tue, 22 Nov 2022 02:53:58 -0800 Subject: [PATCH] Reland "perf(ext/flash): optimize response streaming" (#16660) --- ext/flash/01_http.js | 51 ++++++++++++++++++++++++++++++++------- ext/flash/lib.rs | 57 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 94 insertions(+), 14 deletions(-) diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index 7a6b9bc47e..2b0caff493 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -243,6 +243,7 @@ i, respondFast, respondChunked, + tryRespondChunked, ) { // there might've been an HTTP upgrade. if (resp === undefined) { @@ -371,6 +372,9 @@ } } else { const reader = respBody.getReader(); + + // Best case: sends headers + first chunk in a single go. + const { value, done } = await reader.read(); writeFixedResponse( serverId, i, @@ -385,14 +389,23 @@ false, respondFast, ); - while (true) { - const { value, done } = await reader.read(); - await respondChunked( - i, - value, - done, - ); - if (done) break; + + await tryRespondChunked( + i, + value, + done, + ); + + if (!done) { + while (true) { + const chunk = await reader.read(); + await respondChunked( + i, + chunk.value, + chunk.done, + ); + if (chunk.done) break; + } } } } @@ -572,6 +585,7 @@ i, respondFast, respondChunked, + tryRespondChunked, ), ), onError, @@ -589,6 +603,7 @@ i, respondFast, respondChunked, + tryRespondChunked, ) ).catch(onError); continue; @@ -607,6 +622,7 @@ i, respondFast, respondChunked, + tryRespondChunked, ); } @@ -623,6 +639,25 @@ once: true, }); + function tryRespondChunked(token, chunk, shutdown) { + const nwritten = core.ops.op_try_flash_respond_chuncked( + serverId, + token, + chunk ?? new Uint8Array(), + shutdown, + ); + if (nwritten > 0) { + return core.opAsync( + "op_flash_respond_chuncked", + serverId, + token, + chunk, + shutdown, + nwritten, + ); + } + } + function respondChunked(token, chunk, shutdown) { return core.opAsync( "op_flash_respond_chuncked", diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index b077b8d219..d08cdbcdc5 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -106,6 +106,39 @@ fn op_flash_respond( flash_respond(ctx, token, shutdown, &response) } +#[op(fast)] +fn op_try_flash_respond_chuncked( + op_state: &mut OpState, + server_id: u32, + token: u32, + response: &[u8], + shutdown: bool, +) -> u32 { + let flash_ctx = op_state.borrow_mut::(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); + let sock = tx.socket(); + + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + let h = format!("{:x}\r\n", response.len()); + + let concat = [h.as_bytes(), response, b"\r\n"].concat(); + let expected = sock.try_write(&concat); + if expected != concat.len() { + if expected > 2 { + return expected as u32; + } + return expected as u32; + } + + if shutdown { + // Best case: We've written everything and the stream is done too. + let _ = ctx.requests.remove(&token).unwrap(); + } + 0 +} + #[op] async fn op_flash_respond_async( state: Rc>, @@ -157,6 +190,7 @@ async fn op_flash_respond_chuncked( token: u32, response: Option, shutdown: bool, + nwritten: u32, ) -> Result<(), AnyError> { let mut op_state = op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::(); @@ -178,17 +212,27 @@ async fn op_flash_respond_chuncked( .with_async_stream(|stream| { Box::pin(async move { use tokio::io::AsyncWriteExt; + // TODO(@littledivy): Use writev when `UnixIoSlice` lands. + // https://github.com/denoland/deno/pull/15629 + macro_rules! write_whats_not_written { + ($e:expr) => { + let e = $e; + let n = nwritten as usize; + if n < e.len() { + stream.write_all(&e[n..]).await?; + } + }; + } if let Some(response) = response { - stream - .write_all(format!("{:x}\r\n", response.len()).as_bytes()) - .await?; - stream.write_all(&response).await?; - stream.write_all(b"\r\n").await?; + let h = format!("{:x}\r\n", response.len()); + write_whats_not_written!(h.as_bytes()); + write_whats_not_written!(&response); + write_whats_not_written!(b"\r\n"); } // The last chunk if shutdown { - stream.write_all(b"0\r\n\r\n").await?; + write_whats_not_written!(b"0\r\n\r\n"); } Ok(()) @@ -1485,6 +1529,7 @@ pub fn init(unstable: bool) -> Extension { op_flash_close_server::decl(), op_flash_make_request::decl(), op_flash_write_resource::decl(), + op_try_flash_respond_chuncked::decl(), ]) .state(move |op_state| { op_state.put(Unstable(unstable));