mirror of
https://github.com/denoland/deno.git
synced 2025-02-01 20:25:12 -05:00
fix(ext/http): truncate read bytes when streaming bodies (#14389)
stream shutdown wasn't happening correctly (moved it to call op_http_shutdown) & extra zeroed bytes were being sent for when body length not a multiple of 64*1024
This commit is contained in:
parent
c0e3b6096d
commit
609c359dd4
3 changed files with 35 additions and 20 deletions
|
@ -854,6 +854,32 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() {
|
||||||
listener.close();
|
listener.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { net: true, write: true, read: true } },
|
||||||
|
async function httpServerCorrectSizeResponse() {
|
||||||
|
const tmpFile = await Deno.makeTempFile();
|
||||||
|
const file = await Deno.open(tmpFile, { write: true, read: true });
|
||||||
|
await file.write(new Uint8Array(70 * 1024).fill(1)); // 70kb sent in 64kb + 6kb chunks
|
||||||
|
file.close();
|
||||||
|
const promise = (async () => {
|
||||||
|
const listener = Deno.listen({ port: 4503 });
|
||||||
|
const conn = await listener.accept();
|
||||||
|
const httpConn = Deno.serveHttp(conn);
|
||||||
|
const ev = await httpConn.nextRequest();
|
||||||
|
const { respondWith } = ev!;
|
||||||
|
const f = await Deno.open(tmpFile, { read: true });
|
||||||
|
await respondWith(new Response(f.readable, { status: 200 }));
|
||||||
|
httpConn.close();
|
||||||
|
listener.close();
|
||||||
|
f.close();
|
||||||
|
})();
|
||||||
|
const resp = await fetch("http://127.0.0.1:4503/");
|
||||||
|
const body = await resp.arrayBuffer();
|
||||||
|
assertEquals(body.byteLength, 70 * 1024);
|
||||||
|
await promise;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
Deno.test(
|
Deno.test(
|
||||||
{ permissions: { net: true, write: true, read: true } },
|
{ permissions: { net: true, write: true, read: true } },
|
||||||
async function httpServerClosedStream() {
|
async function httpServerClosedStream() {
|
||||||
|
|
|
@ -318,6 +318,7 @@
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await core.opAsync("op_http_shutdown", streamRid);
|
await core.opAsync("op_http_shutdown", streamRid);
|
||||||
|
@ -326,7 +327,6 @@
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
const deferred = request[_deferred];
|
const deferred = request[_deferred];
|
||||||
if (deferred) {
|
if (deferred) {
|
||||||
|
|
|
@ -729,7 +729,9 @@ async fn op_http_write_resource(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
HttpResponseWriter::BodyUncompressed(body) => {
|
HttpResponseWriter::BodyUncompressed(body) => {
|
||||||
if let Err(err) = body.send_data(Bytes::from(buf.to_temp())).await {
|
let mut buf = buf.to_temp();
|
||||||
|
buf.truncate(nread);
|
||||||
|
if let Err(err) = body.send_data(Bytes::from(buf)).await {
|
||||||
assert!(err.is_closed());
|
assert!(err.is_closed());
|
||||||
// Pull up the failure associated with the transport connection instead.
|
// Pull up the failure associated with the transport connection instead.
|
||||||
http_stream.conn.closed().await?;
|
http_stream.conn.closed().await?;
|
||||||
|
@ -740,19 +742,6 @@ async fn op_http_write_resource(
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
let wr = take(&mut *wr);
|
|
||||||
if let HttpResponseWriter::Body(mut body_writer) = wr {
|
|
||||||
match body_writer.shutdown().await {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(err) => {
|
|
||||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
|
||||||
// Don't return "broken pipe", that's an implementation detail.
|
|
||||||
// Pull up the failure associated with the transport connection instead.
|
|
||||||
http_stream.conn.closed().await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue