0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

chore(kv) fix and re-enable queue test (#19529)

The callback draining code is no longer needed after #19513.
This commit is contained in:
Igor Zinkovsky 2023-06-17 15:02:32 -07:00 committed by GitHub
parent c8dc6b14ec
commit 0773463de1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 16 deletions

View file

@ -1697,10 +1697,7 @@ Deno.test({
Deno.test({
name: "queue persistence with delay messages",
ignore: true, // flaky
async fn() {
const dispatchedPre = Deno.metrics().opsDispatchedAsync;
const completedPre = Deno.metrics().opsCompletedAsync;
const filename = await Deno.makeTempFile({ prefix: "queue_db" });
try {
await Deno.remove(filename);
@ -1745,14 +1742,6 @@ Deno.test({
db.close();
await listener;
} finally {
// Wait until callbacks are drained before deleting the db.
let dispatched = Deno.metrics().opsDispatchedAsync - dispatchedPre;
let completed = Deno.metrics().opsCompletedAsync - completedPre;
while (dispatched !== completed) {
dispatched = Deno.metrics().opsDispatchedAsync - dispatchedPre;
completed = Deno.metrics().opsCompletedAsync - completedPre;
await sleep(100);
}
try {
await Deno.remove(filename);
} catch {

View file

@ -250,6 +250,7 @@ class Kv {
async listenQueue(
handler: (message: unknown) => Promise<void> | void,
): Promise<void> {
const finishMessageOps = new Map<number, Promise<void>>();
while (!this.#closed) {
// Wait for the next message.
let next: { 0: Uint8Array; 1: number };
@ -282,14 +283,29 @@ class Kv {
} catch (error) {
console.error("Exception in queue handler", error);
} finally {
await core.opAsync(
"op_kv_finish_dequeued_message",
handleId,
success,
);
if (this.#closed) {
core.close(handleId);
} else {
const promise: Promise<void> = core.opAsync(
"op_kv_finish_dequeued_message",
handleId,
success,
);
finishMessageOps.set(handleId, promise);
try {
await promise;
} finally {
finishMessageOps.delete(handleId);
}
}
}
})();
}
for (const promise of finishMessageOps.values()) {
await promise;
}
finishMessageOps.clear();
}
close() {