diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts index ed384dd4fc..f11174df2f 100644 --- a/cli/tests/unit/fetch_test.ts +++ b/cli/tests/unit/fetch_test.ts @@ -1291,3 +1291,30 @@ unitTest( client.close(); }, ); + +unitTest( + { perms: { net: true } }, + async function fetchAbortWhileUploadStreaming(): Promise { + const abortController = new AbortController(); + try { + await fetch( + "http://localhost:5552/echo_server", + { + method: "POST", + body: new ReadableStream({ + pull(controller) { + abortController.abort(); + controller.enqueue(new Uint8Array([1, 2, 3, 4])); + }, + }), + signal: abortController.signal, + }, + ); + fail("Fetch didn't reject."); + } catch (error) { + assert(error instanceof DOMException); + assertEquals(error.name, "AbortError"); + assertEquals(error.message, "Ongoing fetch was aborted."); + } + }, +); diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index b81e669474..663d7c61c3 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -40,6 +40,11 @@ TypedArrayPrototypeSubarray, TypeError, Uint8Array, + WeakMap, + WeakMapPrototypeDelete, + WeakMapPrototypeGet, + WeakMapPrototypeHas, + WeakMapPrototypeSet, } = window.__bootstrap.primordials; const REQUEST_BODY_HEADER_NAMES = [ @@ -49,6 +54,8 @@ "content-type", ]; + const requestBodyReaders = new WeakMap(); + /** * @param {{ method: string, url: string, headers: [string, string][], clientRid: number | null, hasBody: boolean }} args * @param {Uint8Array | null} body @@ -193,6 +200,7 @@ reqBody = req.body.stream; } else { const reader = req.body.stream.getReader(); + WeakMapPrototypeSet(requestBodyReaders, req, reader); const r1 = await reader.read(); if (r1.done) { reqBody = new Uint8Array(0); @@ -201,6 +209,7 @@ const r2 = await reader.read(); if (!r2.done) throw new TypeError("Unreachable"); } + WeakMapPrototypeDelete(requestBodyReaders, req); } } else { req.body.streamOrStatic.consumed = true; @@ -232,6 +241,7 @@ throw new TypeError("Unreachable"); } const reader = reqBody.getReader(); + WeakMapPrototypeSet(requestBodyReaders, req, reader); (async () => { while (true) { const { value, done } = await PromisePrototypeCatch( @@ -260,6 +270,7 @@ break; } } + WeakMapPrototypeDelete(requestBodyReaders, req); core.tryClose(requestBodyRid); })(); } @@ -473,7 +484,13 @@ function abortFetch(request, responseObject) { const error = new DOMException("Ongoing fetch was aborted.", "AbortError"); - if (request.body !== null) request.body.cancel(error); + if (request.body !== null) { + if (WeakMapPrototypeHas(requestBodyReaders, request)) { + WeakMapPrototypeGet(requestBodyReaders, request).cancel(error); + } else { + request.body.cancel(error); + } + } if (responseObject !== null) { const response = toInnerResponse(responseObject); if (response.body !== null) response.body.error(error);