0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-04 01:44:26 -05:00

feat(unstable): append commit versionstamp to key (#21556)

This commit is contained in:
Heyang Zhou 2023-12-14 00:58:20 +08:00 committed by GitHub
parent 76a6ea5775
commit 10ab8c1ef1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 97 additions and 35 deletions

View file

@ -2216,3 +2216,35 @@ dbTest("key watch", async (db) => {
await work; await work;
await reader.cancel(); await reader.cancel();
}); });
dbTest("set with key versionstamp suffix", async (db) => {
const result1 = await Array.fromAsync(db.list({ prefix: ["a"] }));
assertEquals(result1, []);
const setRes1 = await db.set(["a", db.commitVersionstamp()], "b");
assert(setRes1.ok);
assert(setRes1.versionstamp > ZERO_VERSIONSTAMP);
const result2 = await Array.fromAsync(db.list({ prefix: ["a"] }));
assertEquals(result2.length, 1);
assertEquals(result2[0].key[1], setRes1.versionstamp);
assertEquals(result2[0].value, "b");
assertEquals(result2[0].versionstamp, setRes1.versionstamp);
const setRes2 = await db.atomic().set(["a", db.commitVersionstamp()], "c")
.commit();
assert(setRes2.ok);
assert(setRes2.versionstamp > setRes1.versionstamp);
const result3 = await Array.fromAsync(db.list({ prefix: ["a"] }));
assertEquals(result3.length, 2);
assertEquals(result3[1].key[1], setRes2.versionstamp);
assertEquals(result3[1].value, "c");
assertEquals(result3[1].versionstamp, setRes2.versionstamp);
await assertRejects(
async () => await db.set(["a", db.commitVersionstamp(), "a"], "x"),
TypeError,
"expected string, number, bigint, ArrayBufferView, boolean",
);
});

View file

@ -1488,7 +1488,13 @@ declare namespace Deno {
* *
* @category KV * @category KV
*/ */
export type KvKeyPart = Uint8Array | string | number | bigint | boolean; export type KvKeyPart =
| Uint8Array
| string
| number
| bigint
| boolean
| symbol;
/** **UNSTABLE**: New API, yet to be vetted. /** **UNSTABLE**: New API, yet to be vetted.
* *
@ -2099,6 +2105,14 @@ declare namespace Deno {
*/ */
close(): void; close(): void;
/**
* Get a symbol that represents the versionstamp of the current atomic
* operation. This symbol can be used as the last part of a key in
* `.set()`, both directly on the `Kv` object and on an `AtomicOperation`
* object created from this `Kv` instance.
*/
commitVersionstamp(): symbol;
[Symbol.dispose](): void; [Symbol.dispose](): void;
} }

View file

@ -75,6 +75,7 @@ type RawValue = {
}; };
const kvSymbol = Symbol("KvRid"); const kvSymbol = Symbol("KvRid");
const commitVersionstampSymbol = Symbol("KvCommitVersionstamp");
class Kv { class Kv {
#rid: number; #rid: number;
@ -94,6 +95,10 @@ class Kv {
return new AtomicOperation(this.#rid); return new AtomicOperation(this.#rid);
} }
commitVersionstamp(): symbol {
return commitVersionstampSymbol;
}
async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) { async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) {
const [entries]: [RawKvEntry[]] = await core.opAsync( const [entries]: [RawKvEntry[]] = await core.opAsync(
"op_kv_snapshot_read", "op_kv_snapshot_read",
@ -148,18 +153,10 @@ class Kv {
} }
async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) { async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) {
value = serializeValue(value); const versionstamp = await doAtomicWriteInPlace(
const checks: Deno.AtomicCheck[] = [];
const mutations = [
[key, "set", value, options?.expireIn],
];
const versionstamp = await core.opAsync(
"op_kv_atomic_write",
this.#rid, this.#rid,
checks, [],
mutations, [[key, "set", serializeValue(value), options?.expireIn]],
[], [],
); );
if (versionstamp === null) throw new TypeError("Failed to set value"); if (versionstamp === null) throw new TypeError("Failed to set value");
@ -167,16 +164,10 @@ class Kv {
} }
async delete(key: Deno.KvKey) { async delete(key: Deno.KvKey) {
const checks: Deno.AtomicCheck[] = []; const result = await doAtomicWriteInPlace(
const mutations = [
[key, "delete", null, undefined],
];
const result = await core.opAsync(
"op_kv_atomic_write",
this.#rid, this.#rid,
checks, [],
mutations, [[key, "delete", null, undefined]],
[], [],
); );
if (!result) throw new TypeError("Failed to set value"); if (!result) throw new TypeError("Failed to set value");
@ -251,21 +242,18 @@ class Kv {
validateBackoffSchedule(opts?.backoffSchedule); validateBackoffSchedule(opts?.backoffSchedule);
} }
const enqueues = [ const versionstamp = await doAtomicWriteInPlace(
this.#rid,
[],
[],
[
[ [
core.serialize(message, { forStorage: true }), core.serialize(message, { forStorage: true }),
opts?.delay ?? 0, opts?.delay ?? 0,
opts?.keysIfUndelivered ?? [], opts?.keysIfUndelivered ?? [],
opts?.backoffSchedule ?? null, opts?.backoffSchedule ?? null,
], ],
]; ],
const versionstamp = await core.opAsync(
"op_kv_atomic_write",
this.#rid,
[],
[],
enqueues,
); );
if (versionstamp === null) throw new TypeError("Failed to enqueue value"); if (versionstamp === null) throw new TypeError("Failed to enqueue value");
return { ok: true, versionstamp }; return { ok: true, versionstamp };
@ -511,8 +499,7 @@ class AtomicOperation {
} }
async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> { async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
const versionstamp = await core.opAsync( const versionstamp = await doAtomicWriteInPlace(
"op_kv_atomic_write",
this.#rid, this.#rid,
this.#checks, this.#checks,
this.#mutations, this.#mutations,
@ -764,4 +751,30 @@ class KvListIterator extends AsyncIterator
} }
} }
async function doAtomicWriteInPlace(
rid: number,
checks: [Deno.KvKey, string | null][],
mutations: [Deno.KvKey, string, RawValue | null, number | undefined][],
enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][],
): Promise<string | null> {
for (const m of mutations) {
const key = m[0];
if (
key.length && m[1] === "set" &&
key[key.length - 1] === commitVersionstampSymbol
) {
m[0] = key.slice(0, key.length - 1);
m[1] = "setSuffixVersionstampedKey";
}
}
return await core.opAsync(
"op_kv_atomic_write",
rid,
checks,
mutations,
enqueues,
);
}
export { AtomicOperation, Kv, KvListIterator, KvU64, openKv }; export { AtomicOperation, Kv, KvListIterator, KvU64, openKv };

View file

@ -530,6 +530,9 @@ fn mutation_from_v8(
("sum", Some(value)) => MutationKind::Sum(value.try_into()?), ("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
("min", Some(value)) => MutationKind::Min(value.try_into()?), ("min", Some(value)) => MutationKind::Min(value.try_into()?),
("max", Some(value)) => MutationKind::Max(value.try_into()?), ("max", Some(value)) => MutationKind::Max(value.try_into()?),
("setSuffixVersionstampedKey", Some(value)) => {
MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
}
(op, Some(_)) => { (op, Some(_)) => {
return Err(type_error(format!("invalid mutation '{op}' with value"))) return Err(type_error(format!("invalid mutation '{op}' with value")))
} }