0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-12 16:59:32 -05:00

fix(ext/node): send data frame with end_stream flag on _final call (#24147)

This commit is contained in:
Satya Rohith 2024-06-10 20:00:56 +05:30 committed by Nathan Whitaker
parent ef1fa7ff2f
commit 702235b65c
No known key found for this signature in database
2 changed files with 21 additions and 6 deletions

View file

@ -344,6 +344,7 @@ pub async fn op_http2_client_send_data(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[smi] stream_rid: ResourceId, #[smi] stream_rid: ResourceId,
#[buffer] data: JsBuffer, #[buffer] data: JsBuffer,
end_of_stream: bool,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let resource = state let resource = state
.borrow() .borrow()
@ -351,8 +352,7 @@ pub async fn op_http2_client_send_data(
.get::<Http2ClientStream>(stream_rid)?; .get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
// TODO(bartlomieju): handle end of stream stream.send_data(data.to_vec().into(), end_of_stream)?;
stream.send_data(data.to_vec().into(), false)?;
Ok(()) Ok(())
} }

View file

@ -978,7 +978,7 @@ export class ClientHttp2Stream extends Duplex {
return; return;
} }
shutdownWritable(this, cb); shutdownWritable(this, cb, this.#rid);
} }
// TODO(bartlomieju): needs a proper cleanup // TODO(bartlomieju): needs a proper cleanup
@ -1176,15 +1176,30 @@ export class ClientHttp2Stream extends Duplex {
} }
} }
function shutdownWritable(stream, callback) { function shutdownWritable(stream, callback, streamRid) {
debugHttp2(">>> shutdownWritable", callback); debugHttp2(">>> shutdownWritable", callback);
const state = stream[kState]; const state = stream[kState];
if (state.shutdownWritableCalled) { if (state.shutdownWritableCalled) {
debugHttp2(">>> shutdownWritable() already called");
return callback(); return callback();
} }
state.shutdownWritableCalled = true; state.shutdownWritableCalled = true;
onStreamTrailers(stream); if (state.flags & STREAM_FLAGS_HAS_TRAILERS) {
callback(); onStreamTrailers(stream);
callback();
} else {
op_http2_client_send_data(streamRid, new Uint8Array(), true)
.then(() => {
callback();
stream[kMaybeDestroy]();
core.tryClose(streamRid);
})
.catch((e) => {
callback(e);
core.tryClose(streamRid);
stream._destroy(e);
});
}
// TODO(bartlomieju): might have to add "finish" event listener here, // TODO(bartlomieju): might have to add "finish" event listener here,
// check it. // check it.
} }