diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 8bed444ca1..a312cf60ed 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -137,8 +137,10 @@ class HttpConn { return null; } - const { 0: streamRid, 1: method, 2: url } = nextRequest; - SetPrototypeAdd(this.#managedResources, streamRid); + const { 0: readStreamRid, 1: writeStreamRid, 2: method, 3: url } = + nextRequest; + SetPrototypeAdd(this.#managedResources, readStreamRid); + SetPrototypeAdd(this.#managedResources, writeStreamRid); /** @type {ReadableStream | undefined} */ let body = null; @@ -146,17 +148,16 @@ class HttpConn { // It will be closed automatically once the request has been handled and // the response has been sent. if (method !== "GET" && method !== "HEAD") { - body = readableStreamForRid(streamRid, false); + body = readableStreamForRid(readStreamRid, false); } const innerRequest = newInnerRequest( method, url, - () => op_http_headers(streamRid), + () => op_http_headers(readStreamRid), body !== null ? new InnerBody(body) : null, false, ); - innerRequest[streamRid] = streamRid; const abortController = new AbortController(); const request = fromInnerRequest( innerRequest, @@ -167,7 +168,8 @@ class HttpConn { const respondWith = createRespondWith( this, - streamRid, + readStreamRid, + writeStreamRid, abortController, ); @@ -178,10 +180,10 @@ class HttpConn { close() { if (!this.#closed) { this.#closed = true; - core.close(this.#rid); + core.tryClose(this.#rid); for (const rid of new SafeSetIterator(this.#managedResources)) { SetPrototypeDelete(this.#managedResources, rid); - core.close(rid); + core.tryClose(rid); } } } @@ -209,7 +211,8 @@ class HttpConn { function createRespondWith( httpConn, - streamRid, + readStreamRid, + writeStreamRid, abortController, ) { return async function respondWith(resp) { @@ -270,7 +273,7 @@ function createRespondWith( ); try { await op_http_write_headers( - streamRid, + writeStreamRid, innerResp.status ?? 200, innerResp.headerList, isStreamingResponseBody ? null : respBody, @@ -310,7 +313,7 @@ function createRespondWith( reader = respBody.getReader(); // Acquire JS lock. try { await op_http_write_resource( - streamRid, + writeStreamRid, resourceBacking.rid, ); if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); @@ -340,7 +343,7 @@ function createRespondWith( break; } try { - await op_http_write(streamRid, value); + await op_http_write(writeStreamRid, value); } catch (error) { const connError = httpConn[connErrorSymbol]; if ( @@ -359,7 +362,7 @@ function createRespondWith( if (success) { try { - await op_http_shutdown(streamRid); + await op_http_shutdown(writeStreamRid); } catch (error) { await reader.cancel(error); throw error; @@ -370,7 +373,7 @@ function createRespondWith( const ws = resp[_ws]; if (ws) { const wsRid = await op_http_upgrade_websocket( - streamRid, + readStreamRid, ); ws[_rid] = wsRid; ws[_protocol] = resp.headers.get("sec-websocket-protocol"); @@ -395,8 +398,11 @@ function createRespondWith( abortController.abort(error); throw error; } finally { - if (deleteManagedResource(httpConn, streamRid)) { - core.close(streamRid); + if (deleteManagedResource(httpConn, readStreamRid)) { + core.tryClose(readStreamRid); + } + if (deleteManagedResource(httpConn, writeStreamRid)) { + core.tryClose(writeStreamRid); } } }; diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 129aaac477..df31b9c445 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -209,7 +209,15 @@ impl HttpConnResource { // Accepts a new incoming HTTP request. async fn accept( self: &Rc, - ) -> Result, AnyError> { + ) -> Result< + Option<( + HttpStreamReadResource, + HttpStreamWriteResource, + String, + String, + )>, + AnyError, + > { let fut = async { let (request_tx, request_rx) = oneshot::channel(); let (response_tx, response_rx) = oneshot::channel(); @@ -218,7 +226,6 @@ impl HttpConnResource { self.acceptors_tx.unbounded_send(acceptor).ok()?; let request = request_rx.await.ok()?; - let accept_encoding = { let encodings = fly_accept_encoding::encodings_iter_http_02(request.headers()) @@ -234,9 +241,10 @@ impl HttpConnResource { let method = request.method().to_string(); let url = req_url(&request, self.scheme, &self.addr); - let stream = - HttpStreamResource::new(self, request, response_tx, accept_encoding); - Some((stream, method, url)) + let read_stream = HttpStreamReadResource::new(self, request); + let write_stream = + HttpStreamWriteResource::new(self, response_tx, accept_encoding); + Some((read_stream, write_stream, method, url)) }; async { @@ -348,38 +356,34 @@ impl HttpAcceptor { } } -/// A resource representing a single HTTP request/response stream. -pub struct HttpStreamResource { - conn: Rc, +pub struct HttpStreamReadResource { + _conn: Rc, pub rd: AsyncRefCell, - wr: AsyncRefCell, - accept_encoding: Encoding, cancel_handle: CancelHandle, size: SizeHint, } -impl HttpStreamResource { - fn new( - conn: &Rc, - request: Request, - response_tx: oneshot::Sender>, - accept_encoding: Encoding, - ) -> Self { +pub struct HttpStreamWriteResource { + conn: Rc, + wr: AsyncRefCell, + accept_encoding: Encoding, +} + +impl HttpStreamReadResource { + fn new(conn: &Rc, request: Request) -> Self { let size = request.body().size_hint(); Self { - conn: conn.clone(), + _conn: conn.clone(), rd: HttpRequestReader::Headers(request).into(), - wr: HttpResponseWriter::Headers(response_tx).into(), - accept_encoding, size, cancel_handle: CancelHandle::new(), } } } -impl Resource for HttpStreamResource { +impl Resource for HttpStreamReadResource { fn name(&self) -> Cow { - "httpStream".into() + "httpReadStream".into() } fn read(self: Rc, limit: usize) -> AsyncResult { @@ -440,6 +444,26 @@ impl Resource for HttpStreamResource { } } +impl HttpStreamWriteResource { + fn new( + conn: &Rc, + response_tx: oneshot::Sender>, + accept_encoding: Encoding, + ) -> Self { + Self { + conn: conn.clone(), + wr: HttpResponseWriter::Headers(response_tx).into(), + accept_encoding, + } + } +} + +impl Resource for HttpStreamWriteResource { + fn name(&self) -> Cow { + "httpWriteStream".into() + } +} + /// The read half of an HTTP stream. pub enum HttpRequestReader { Headers(Request), @@ -504,7 +528,9 @@ impl Drop for BodyUncompressedSender { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // stream_rid: + // read_stream_rid: + ResourceId, + // write_stream_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -523,10 +549,17 @@ async fn op_http_accept( let conn = state.borrow().resource_table.get::(rid)?; match conn.accept().await { - Ok(Some((stream, method, url))) => { - let stream_rid = - state.borrow_mut().resource_table.add_rc(Rc::new(stream)); - let r = NextRequestResponse(stream_rid, method, url); + Ok(Some((read_stream, write_stream, method, url))) => { + let read_stream_rid = state + .borrow_mut() + .resource_table + .add_rc(Rc::new(read_stream)); + let write_stream_rid = state + .borrow_mut() + .resource_table + .add_rc(Rc::new(write_stream)); + let r = + NextRequestResponse(read_stream_rid, write_stream_rid, method, url); Ok(Some(r)) } Ok(None) => Ok(None), @@ -628,7 +661,7 @@ async fn op_http_write_headers( let stream = state .borrow_mut() .resource_table - .get::(rid)?; + .get::(rid)?; // Track supported encoding let encoding = stream.accept_encoding; @@ -693,7 +726,7 @@ fn op_http_headers( state: &mut OpState, #[smi] rid: u32, ) -> Result, AnyError> { - let stream = state.resource_table.get::(rid)?; + let stream = state.resource_table.get::(rid)?; let rd = RcRef::map(&stream, |r| &r.rd) .try_borrow() .ok_or_else(|| http_error("already in use"))?; @@ -849,7 +882,7 @@ async fn op_http_write_resource( let http_stream = state .borrow() .resource_table - .get::(rid)?; + .get::(rid)?; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; let resource = state.borrow().resource_table.get_any(stream)?; loop { @@ -908,7 +941,7 @@ async fn op_http_write( let stream = state .borrow() .resource_table - .get::(rid)?; + .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; match &mut *wr { @@ -960,7 +993,7 @@ async fn op_http_shutdown( let stream = state .borrow() .resource_table - .get::(rid)?; + .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); match wr { @@ -1008,7 +1041,7 @@ async fn op_http_upgrade_websocket( let stream = state .borrow_mut() .resource_table - .get::(rid)?; + .get::(rid)?; let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; let request = match &mut *rd { diff --git a/tests/unit/http_test.ts b/tests/unit/http_test.ts index 108c43e40c..607f2fc6e4 100644 --- a/tests/unit/http_test.ts +++ b/tests/unit/http_test.ts @@ -2668,6 +2668,61 @@ Deno.test( }, ); +Deno.test("proxy with fetch", async () => { + const listener = Deno.listen({ port: listenPort }); + const deferred = Promise.withResolvers(); + + const server = Deno.serve({ port: listenPort + 1 }, (_req) => { + return new Response("Hello world"); + }); + + let httpConn: Deno.HttpConn; + async function handleHttp(conn: Deno.Conn) { + httpConn = Deno.serveHttp(conn); + for await (const e of httpConn) { + await e.respondWith(serve(e.request)); + break; + } + } + + async function serve(req: Request) { + return await fetch(`http://localhost:${listenPort + 1}/`, req); + } + + const originServer = (async () => { + for await (const conn of listener) { + handleHttp(conn); + break; + } + })(); + + const proxiedRequest = (async () => { + const conn = await Deno.connect({ port: listenPort }); + const payload = new TextEncoder().encode( + "POST /api/sessions HTTP/1.1\x0d\x0aConnection: keep-alive\x0d\x0aContent-Length: 2\x0d\x0a\x0d\x0a{}", + ); + const n = await conn.write(payload); + assertEquals(n, 76); + const buf = new Uint8Array(1000); + const nread = await conn.read(buf); + assertEquals(nread, 150); + const respText = new TextDecoder().decode(buf); + assert(respText.includes("HTTP/1.1 200 OK")); + assert(respText.includes("content-type: text/plain;charset=UTF-8")); + assert(respText.includes("vary: Accept-Encoding")); + assert(respText.includes("content-length: 11")); + assert(respText.includes("Hello world")); + conn.close(); + deferred.resolve(); + })(); + await proxiedRequest; + await originServer; + await deferred.promise; + await server.shutdown(); + await server.finished; + httpConn!.close(); +}); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r);