0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 17:34:47 -05:00

feat(ext/kv) add backoffSchedule to enqueue (#21474)

Also reduces the time to run `kv_queue_undelivered_test.ts` test from
100 seconds down to 3 seconds.

closes #21437
This commit is contained in:
Igor Zinkovsky 2023-12-12 22:51:23 -08:00 committed by GitHub
parent 0ceae7a490
commit 86769b0d1c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 8 deletions

View file

@ -43,8 +43,9 @@ queueTest("queue with undelivered", async (db) => {
try { try {
await db.enqueue("test", { await db.enqueue("test", {
keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]], keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]],
backoffSchedule: [10, 20],
}); });
await sleep(100000); await sleep(3000);
const undelivered = await collect(db.list({ prefix: ["queue_failed"] })); const undelivered = await collect(db.list({ prefix: ["queue_failed"] }));
assertEquals(undelivered.length, 2); assertEquals(undelivered.length, 2);
assertEquals(undelivered[0].key, ["queue_failed", "a"]); assertEquals(undelivered[0].key, ["queue_failed", "a"]);

View file

@ -1600,6 +1600,24 @@ queueTest("queue retries", async (db) => {
assertEquals(4, count); assertEquals(4, count);
}); });
queueTest("queue retries with backoffSchedule", async (db) => {
let count = 0;
const listener = db.listenQueue((_msg) => {
count += 1;
throw new TypeError("dequeue error");
});
try {
await db.enqueue("test", { backoffSchedule: [1] });
await sleep(2000);
} finally {
db.close();
await listener;
}
// There should have been 1 attempt + 1 retry
assertEquals(2, count);
});
queueTest("multiple listenQueues", async (db) => { queueTest("multiple listenQueues", async (db) => {
const numListens = 10; const numListens = 10;
let count = 0; let count = 0;
@ -1876,6 +1894,23 @@ Deno.test({
}, },
}); });
dbTest("invalid backoffSchedule", async (db) => {
await assertRejects(
async () => {
await db.enqueue("foo", { backoffSchedule: [1, 1, 1, 1, 1, 1] });
},
TypeError,
"invalid backoffSchedule",
);
await assertRejects(
async () => {
await db.enqueue("foo", { backoffSchedule: [3600001] });
},
TypeError,
"invalid backoffSchedule",
);
});
dbTest("atomic operation is exposed", (db) => { dbTest("atomic operation is exposed", (db) => {
assert(Deno.AtomicOperation); assert(Deno.AtomicOperation);
const ao = db.atomic(); const ao = db.atomic();

View file

@ -1786,7 +1786,11 @@ declare namespace Deno {
*/ */
enqueue( enqueue(
value: unknown, value: unknown,
options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, options?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
): this; ): this;
/** /**
* Commit the operation to the KV store. Returns a value indicating whether * Commit the operation to the KV store. Returns a value indicating whether
@ -1995,14 +1999,28 @@ declare namespace Deno {
* listener after several attempts. The values are set to the value of * listener after several attempts. The values are set to the value of
* the queued message. * the queued message.
* *
* The `backoffSchedule` option can be used to specify the retry policy for
* failed message delivery. Each element in the array represents the number of
* milliseconds to wait before retrying the delivery. For example,
* `[1000, 5000, 10000]` means that a failed delivery will be retried
* at most 3 times, with 1 second, 5 seconds, and 10 seconds delay
* between each retry.
*
* ```ts * ```ts
* const db = await Deno.openKv(); * const db = await Deno.openKv();
* await db.enqueue("bar", { keysIfUndelivered: [["foo", "bar"]] }); * await db.enqueue("bar", {
* keysIfUndelivered: [["foo", "bar"]],
* backoffSchedule: [1000, 5000, 10000],
* });
* ``` * ```
*/ */
enqueue( enqueue(
value: unknown, value: unknown,
options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, options?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
): Promise<KvCommitResult>; ): Promise<KvCommitResult>;
/** /**

View file

@ -43,6 +43,20 @@ function validateQueueDelay(delay: number) {
} }
} }
const maxQueueBackoffIntervals = 5;
const maxQueueBackoffInterval = 60 * 60 * 1000;
function validateBackoffSchedule(backoffSchedule: number[]) {
if (backoffSchedule.length > maxQueueBackoffIntervals) {
throw new TypeError("invalid backoffSchedule");
}
for (const interval of backoffSchedule) {
if (interval < 0 || interval > maxQueueBackoffInterval || isNaN(interval)) {
throw new TypeError("invalid backoffSchedule");
}
}
}
interface RawKvEntry { interface RawKvEntry {
key: Deno.KvKey; key: Deno.KvKey;
value: RawValue; value: RawValue;
@ -224,18 +238,25 @@ class Kv {
async enqueue( async enqueue(
message: unknown, message: unknown,
opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, opts?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
) { ) {
if (opts?.delay !== undefined) { if (opts?.delay !== undefined) {
validateQueueDelay(opts?.delay); validateQueueDelay(opts?.delay);
} }
if (opts?.backoffSchedule !== undefined) {
validateBackoffSchedule(opts?.backoffSchedule);
}
const enqueues = [ const enqueues = [
[ [
core.serialize(message, { forStorage: true }), core.serialize(message, { forStorage: true }),
opts?.delay ?? 0, opts?.delay ?? 0,
opts?.keysIfUndelivered ?? [], opts?.keysIfUndelivered ?? [],
null, opts?.backoffSchedule ?? null,
], ],
]; ];
@ -468,16 +489,23 @@ class AtomicOperation {
enqueue( enqueue(
message: unknown, message: unknown,
opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, opts?: {
delay?: number;
keysIfUndelivered?: Deno.KvKey[];
backoffSchedule?: number[];
},
): this { ): this {
if (opts?.delay !== undefined) { if (opts?.delay !== undefined) {
validateQueueDelay(opts?.delay); validateQueueDelay(opts?.delay);
} }
if (opts?.backoffSchedule !== undefined) {
validateBackoffSchedule(opts?.backoffSchedule);
}
this.#enqueues.push([ this.#enqueues.push([
core.serialize(message, { forStorage: true }), core.serialize(message, { forStorage: true }),
opts?.delay ?? 0, opts?.delay ?? 0,
opts?.keysIfUndelivered ?? [], opts?.keysIfUndelivered ?? [],
null, opts?.backoffSchedule ?? null,
]); ]);
return this; return this;
} }

View file

@ -802,6 +802,9 @@ where
for enqueue in &enqueues { for enqueue in &enqueues {
total_payload_size += check_enqueue_payload_size(&enqueue.payload)?; total_payload_size += check_enqueue_payload_size(&enqueue.payload)?;
if let Some(schedule) = enqueue.backoff_schedule.as_ref() {
total_payload_size += 4 * schedule.len();
}
} }
if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES { if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES {