1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 04:52:26 -05:00

chore: update ext/kv to use denokv_* crates (#20986)

This commit updates the ext/kv module to use the denokv_* crates for
the protocol and the sqlite backend. This also fixes a couple of bugs in
the sqlite backend, and updates versionstamps to be updated less
linearly.
This commit is contained in:
Luca Casonato 2023-10-31 12:13:57 +01:00 committed by GitHub
parent 092555c611
commit 2d9298f5f5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 560 additions and 2934 deletions

72
Cargo.lock generated
View file

@ -1412,8 +1412,13 @@ dependencies = [
"base64 0.21.4",
"chrono",
"deno_core",
"deno_fetch",
"deno_node",
"deno_tls",
"deno_unsync 0.1.1",
"denokv_proto",
"denokv_remote",
"denokv_sqlite",
"hex",
"log",
"num-bigint",
@ -1776,6 +1781,64 @@ dependencies = [
"serde",
]
[[package]]
name = "denokv_proto"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8952fb8c38c1dcd796d49b00030afb74aa184160ae86817b72a32a994c8e16f0"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"num-bigint",
"prost",
"prost-build",
"serde",
"uuid 1.5.0",
]
[[package]]
name = "denokv_remote"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfc8447324d783b01e215bd5040ff9149c34d9715c7b7b5080dd648ebf1148a"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"chrono",
"denokv_proto",
"log",
"prost",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"tokio",
"url",
"uuid 1.5.0",
]
[[package]]
name = "denokv_sqlite"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ec76b691ff069f14e56e3e053c2b2163540b27e4b60179f2b120064a7e4960d"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"denokv_proto",
"futures",
"log",
"num-bigint",
"rand 0.8.5",
"rusqlite",
"serde_json",
"tokio",
"uuid 1.5.0",
]
[[package]]
name = "der"
version = "0.6.1"
@ -5621,6 +5684,7 @@ dependencies = [
"base64 0.21.4",
"bytes",
"console_static_text",
"denokv_proto",
"fastwebsockets",
"flate2",
"futures",
@ -5743,9 +5807,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.32.0"
version = "1.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
dependencies = [
"backtrace",
"bytes",
@ -5773,9 +5837,9 @@ dependencies = [
[[package]]
name = "tokio-metrics"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4b2fc67d5dec41db679b9b052eb572269616926040b7831e32c8a152df77b84"
checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112"
dependencies = [
"futures-util",
"pin-project-lite",

View file

@ -48,6 +48,10 @@ test_util = { path = "./test_util" }
deno_lockfile = "0.17.2"
deno_media_type = { version = "0.1.1", features = ["module_specifier"] }
denokv_proto = "0.2.1"
denokv_sqlite = "0.2.1"
denokv_remote = "0.2.3"
# exts
deno_broadcast_channel = { version = "0.115.0", path = "./ext/broadcast_channel" }
deno_cache = { version = "0.53.0", path = "./ext/cache" }

View file

@ -381,7 +381,7 @@ fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput {
deno_net::deno_net::init_ops::<PermissionsContainer>(None, None),
deno_tls::deno_tls::init_ops(),
deno_kv::deno_kv::init_ops(SqliteDbHandler::<PermissionsContainer>::new(
None,
None, None,
)),
deno_napi::deno_napi::init_ops::<PermissionsContainer>(),
deno_http::deno_http::init_ops::<DefaultHttpPropertyExtractor>(),

View file

@ -55,9 +55,7 @@ function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void> | void) {
// https://github.com/denoland/deno/issues/18363
ignore: Deno.build.os === "darwin" && isCI,
async fn() {
const db: Deno.Kv = await Deno.openKv(
":memory:",
);
const db: Deno.Kv = await Deno.openKv(":memory:");
try {
await fn(db);
} finally {
@ -73,14 +71,14 @@ function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
// https://github.com/denoland/deno/issues/18363
ignore: Deno.build.os === "darwin" && isCI,
async fn() {
const db: Deno.Kv = await Deno.openKv(
":memory:",
);
const db: Deno.Kv = await Deno.openKv(":memory:");
await fn(db);
},
});
}
const ZERO_VERSIONSTAMP = "00000000000000000000";
dbTest("basic read-write-delete and versionstamps", async (db) => {
const result1 = await db.get(["a"]);
assertEquals(result1.key, ["a"]);
@ -89,17 +87,19 @@ dbTest("basic read-write-delete and versionstamps", async (db) => {
const setRes = await db.set(["a"], "b");
assert(setRes.ok);
assertEquals(setRes.versionstamp, "00000000000000010000");
assert(setRes.versionstamp > ZERO_VERSIONSTAMP);
const result2 = await db.get(["a"]);
assertEquals(result2.key, ["a"]);
assertEquals(result2.value, "b");
assertEquals(result2.versionstamp, "00000000000000010000");
assertEquals(result2.versionstamp, setRes.versionstamp);
await db.set(["a"], "c");
const setRes2 = await db.set(["a"], "c");
assert(setRes2.ok);
assert(setRes2.versionstamp > setRes.versionstamp);
const result3 = await db.get(["a"]);
assertEquals(result3.key, ["a"]);
assertEquals(result3.value, "c");
assertEquals(result3.versionstamp, "00000000000000020000");
assertEquals(result3.versionstamp, setRes2.versionstamp);
await db.delete(["a"]);
const result4 = await db.get(["a"]);
@ -230,17 +230,18 @@ dbTest("compare and mutate", async (db) => {
await db.set(["t"], "1");
const currentValue = await db.get(["t"]);
assertEquals(currentValue.versionstamp, "00000000000000010000");
assert(currentValue.versionstamp);
assert(currentValue.versionstamp > ZERO_VERSIONSTAMP);
let res = await db.atomic()
.check({ key: ["t"], versionstamp: currentValue.versionstamp })
.set(currentValue.key, "2")
.commit();
assert(res.ok);
assertEquals(res.versionstamp, "00000000000000020000");
assert(res.versionstamp > currentValue.versionstamp);
const newValue = await db.get(["t"]);
assertEquals(newValue.versionstamp, "00000000000000020000");
assertEquals(newValue.versionstamp, res.versionstamp);
assertEquals(newValue.value, "2");
res = await db.atomic()
@ -250,7 +251,7 @@ dbTest("compare and mutate", async (db) => {
assert(!res.ok);
const newValue2 = await db.get(["t"]);
assertEquals(newValue2.versionstamp, "00000000000000020000");
assertEquals(newValue2.versionstamp, newValue.versionstamp);
assertEquals(newValue2.value, "2");
});
@ -260,9 +261,10 @@ dbTest("compare and mutate not exists", async (db) => {
.set(["t"], "1")
.commit();
assert(res.ok);
assert(res.versionstamp > ZERO_VERSIONSTAMP);
const newValue = await db.get(["t"]);
assertEquals(newValue.versionstamp, "00000000000000010000");
assertEquals(newValue.versionstamp, res.versionstamp);
assertEquals(newValue.value, "1");
res = await db.atomic()
@ -303,13 +305,17 @@ dbTest("atomic mutation helper (max)", async (db) => {
});
dbTest("compare multiple and mutate", async (db) => {
await db.set(["t1"], "1");
await db.set(["t2"], "2");
const setRes1 = await db.set(["t1"], "1");
const setRes2 = await db.set(["t2"], "2");
assert(setRes1.ok);
assert(setRes1.versionstamp > ZERO_VERSIONSTAMP);
assert(setRes2.ok);
assert(setRes2.versionstamp > ZERO_VERSIONSTAMP);
const currentValue1 = await db.get(["t1"]);
assertEquals(currentValue1.versionstamp, "00000000000000010000");
assertEquals(currentValue1.versionstamp, setRes1.versionstamp);
const currentValue2 = await db.get(["t2"]);
assertEquals(currentValue2.versionstamp, "00000000000000020000");
assertEquals(currentValue2.versionstamp, setRes2.versionstamp);
const res = await db.atomic()
.check({ key: ["t1"], versionstamp: currentValue1.versionstamp })
@ -318,12 +324,13 @@ dbTest("compare multiple and mutate", async (db) => {
.set(currentValue2.key, "4")
.commit();
assert(res.ok);
assert(res.versionstamp > setRes2.versionstamp);
const newValue1 = await db.get(["t1"]);
assertEquals(newValue1.versionstamp, "00000000000000030000");
assertEquals(newValue1.versionstamp, res.versionstamp);
assertEquals(newValue1.value, "3");
const newValue2 = await db.get(["t2"]);
assertEquals(newValue2.versionstamp, "00000000000000030000");
assertEquals(newValue2.versionstamp, res.versionstamp);
assertEquals(newValue2.value, "4");
// just one of the two checks failed
@ -336,10 +343,10 @@ dbTest("compare multiple and mutate", async (db) => {
assert(!res2.ok);
const newValue3 = await db.get(["t1"]);
assertEquals(newValue3.versionstamp, "00000000000000030000");
assertEquals(newValue3.versionstamp, res.versionstamp);
assertEquals(newValue3.value, "3");
const newValue4 = await db.get(["t2"]);
assertEquals(newValue4.versionstamp, "00000000000000030000");
assertEquals(newValue4.versionstamp, res.versionstamp);
assertEquals(newValue4.value, "4");
});
@ -635,8 +642,8 @@ async function collect<T>(
return entries;
}
async function setupData(db: Deno.Kv) {
await db.atomic()
async function setupData(db: Deno.Kv): Promise<string> {
const res = await db.atomic()
.set(["a"], -1)
.set(["a", "a"], 0)
.set(["a", "b"], 1)
@ -646,27 +653,29 @@ async function setupData(db: Deno.Kv) {
.set(["b"], 99)
.set(["b", "a"], 100)
.commit();
assert(res.ok);
return res.versionstamp;
}
dbTest("get many", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await db.getMany([["b", "a"], ["a"], ["c"]]);
assertEquals(entries, [
{ key: ["b", "a"], value: 100, versionstamp: "00000000000000010000" },
{ key: ["a"], value: -1, versionstamp: "00000000000000010000" },
{ key: ["b", "a"], value: 100, versionstamp },
{ key: ["a"], value: -1, versionstamp },
{ key: ["c"], value: null, versionstamp: null },
]);
});
dbTest("list prefix", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "e"], value: 4, versionstamp },
]);
});
@ -680,12 +689,12 @@ dbTest("list prefix empty", async (db) => {
});
dbTest("list prefix with start", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], start: ["a", "c"] }));
assertEquals(entries, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "e"], value: 4, versionstamp },
]);
});
@ -696,11 +705,11 @@ dbTest("list prefix with start empty", async (db) => {
});
dbTest("list prefix with end", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], end: ["a", "c"] }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
]);
});
@ -711,35 +720,34 @@ dbTest("list prefix with end empty", async (db) => {
});
dbTest("list prefix with empty prefix", async (db) => {
await db.set(["a"], 1);
const res = await db.set(["a"], 1);
const entries = await collect(db.list({ prefix: [] }));
assertEquals(entries, [
{ key: ["a"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a"], value: 1, versionstamp: res.versionstamp },
]);
});
dbTest("list prefix reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { reverse: true }));
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list prefix reverse with start", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], start: ["a", "c"] }, { reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
});
@ -752,13 +760,13 @@ dbTest("list prefix reverse with start empty", async (db) => {
});
dbTest("list prefix reverse with end", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], end: ["a", "c"] }, { reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "a"], value: 0, versionstamp },
]);
});
@ -771,83 +779,82 @@ dbTest("list prefix reverse with end empty", async (db) => {
});
dbTest("list prefix limit", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { limit: 2 }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
]);
});
dbTest("list prefix limit reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { limit: 2, reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
]);
});
dbTest("list prefix with small batch size", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { batchSize: 2 }));
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list prefix with small batch size reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list prefix with small batch size and limit", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3 }),
);
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list prefix with small batch size and limit reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3, reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list prefix with manual cursor", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const iterator = db.list({ prefix: ["a"] }, { limit: 2 });
const values = await collect(iterator);
assertEquals(values, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
]);
const cursor = iterator.cursor;
@ -856,20 +863,20 @@ dbTest("list prefix with manual cursor", async (db) => {
const iterator2 = db.list({ prefix: ["a"] }, { cursor });
const values2 = await collect(iterator2);
assertEquals(values2, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list prefix with manual cursor reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const iterator = db.list({ prefix: ["a"] }, { limit: 2, reverse: true });
const values = await collect(iterator);
assertEquals(values, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
]);
const cursor = iterator.cursor;
@ -878,57 +885,57 @@ dbTest("list prefix with manual cursor reverse", async (db) => {
const iterator2 = db.list({ prefix: ["a"] }, { cursor, reverse: true });
const values2 = await collect(iterator2);
assertEquals(values2, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list range", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }),
);
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list range reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, { reverse: true }),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list range with limit", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, { limit: 3 }),
);
assertEquals(entries, [
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range with limit reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, {
@ -937,46 +944,46 @@ dbTest("list range with limit reverse", async (db) => {
}),
);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range nesting", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(db.list({ start: ["a"], end: ["a", "d"] }));
assertEquals(entries, [
{ key: ["a"], value: -1, versionstamp: "00000000000000010000" },
{ key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a"], value: -1, versionstamp },
{ key: ["a", "a"], value: 0, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range short", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "b"], end: ["a", "d"] }),
);
assertEquals(entries, [
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range with manual cursor", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
limit: 2,
});
const entries = await collect(iterator);
assertEquals(entries, [
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp },
{ key: ["a", "c"], value: 2, versionstamp },
]);
const cursor = iterator.cursor;
@ -985,13 +992,13 @@ dbTest("list range with manual cursor", async (db) => {
});
const entries2 = await collect(iterator2);
assertEquals(entries2, [
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp },
{ key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list range with manual cursor reverse", async (db) => {
await setupData(db);
const versionstamp = await setupData(db);
const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
limit: 2,
@ -999,8 +1006,8 @@ dbTest("list range with manual cursor reverse", async (db) => {
});
const entries = await collect(iterator);
assertEquals(entries, [
{ key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
{ key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
{ key: ["a", "e"], value: 4, versionstamp },
{ key: ["a", "d"], value: 3, versionstamp },
]);
const cursor = iterator.cursor;
@ -1010,8 +1017,8 @@ dbTest("list range with manual cursor reverse", async (db) => {
});
const entries2 = await collect(iterator2);
assertEquals(entries2, [
{ key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
{ key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
{ key: ["a", "c"], value: 2, versionstamp },
{ key: ["a", "b"], value: 1, versionstamp },
]);
});
@ -1110,12 +1117,12 @@ dbTest("key size limit", async (db) => {
const lastValidKey = new Uint8Array(2046).fill(1);
const firstInvalidKey = new Uint8Array(2047).fill(1);
await db.set([lastValidKey], 1);
const res = await db.set([lastValidKey], 1);
assertEquals(await db.get([lastValidKey]), {
key: [lastValidKey],
value: 1,
versionstamp: "00000000000000010000",
versionstamp: res.versionstamp,
});
await assertRejects(
@ -1135,11 +1142,11 @@ dbTest("value size limit", async (db) => {
const lastValidValue = new Uint8Array(65536);
const firstInvalidValue = new Uint8Array(65537);
await db.set(["a"], lastValidValue);
const res = await db.set(["a"], lastValidValue);
assertEquals(await db.get(["a"]), {
key: ["a"],
value: lastValidValue,
versionstamp: "00000000000000010000",
versionstamp: res.versionstamp,
});
await assertRejects(
@ -1415,21 +1422,17 @@ for (const { name, value } of VALUE_CASES) {
queueTest(`listenQueue and enqueue ${name}`, async (db) => {
const numEnqueues = 10;
let count = 0;
const promises: Deferred<void>[] = [];
const dequeuedMessages: unknown[] = [];
const promises: Deferred<unknown>[] = [];
const listeners: Promise<void>[] = [];
listeners.push(db.listenQueue((msg) => {
dequeuedMessages.push(msg);
promises[count++].resolve();
listeners.push(db.listenQueue((msg: unknown) => {
promises[count++].resolve(msg);
}));
try {
for (let i = 0; i < numEnqueues; i++) {
promises.push(deferred());
await db.enqueue(value);
}
for (let i = 0; i < numEnqueues; i++) {
await promises[i];
}
const dequeuedMessages = await Promise.all(promises);
for (let i = 0; i < numEnqueues; i++) {
assertEquals(dequeuedMessages[i], value);
}
@ -1445,7 +1448,7 @@ for (const { name, value } of VALUE_CASES) {
queueTest("queue mixed types", async (db) => {
let promise: Deferred<void>;
let dequeuedMessage: unknown = null;
const listener = db.listenQueue((msg) => {
const listener = db.listenQueue((msg: unknown) => {
dequeuedMessage = msg;
promise.resolve();
});
@ -2066,25 +2069,16 @@ Deno.test({
const db = await Deno.openKv(
"http://localhost:4545/kv_remote_authorize_invalid_format",
);
let ok = false;
try {
await db.set(["some-key"], 1);
} catch (e) {
if (
e.name === "TypeError" &&
e.message.startsWith("Metadata error: Failed to decode metadata: ")
) {
ok = true;
} else {
throw e;
}
} finally {
db.close();
}
if (!ok) {
throw new Error("did not get expected error");
}
await assertRejects(
async () => {
await db.set(["some-key"], 1);
},
Error,
"Failed to parse metadata: ",
);
db.close();
},
});
@ -2094,24 +2088,15 @@ Deno.test({
const db = await Deno.openKv(
"http://localhost:4545/kv_remote_authorize_invalid_version",
);
let ok = false;
try {
await db.set(["some-key"], 1);
} catch (e) {
if (
e.name === "TypeError" &&
e.message === "Metadata error: Unsupported metadata version: 2"
) {
ok = true;
} else {
throw e;
}
} finally {
db.close();
}
if (!ok) {
throw new Error("did not get expected error");
}
await assertRejects(
async () => {
await db.set(["some-key"], 1);
},
Error,
"Failed to parse metadata: unsupported metadata version: 1000",
);
db.close();
},
});

View file

@ -19,8 +19,13 @@ async-trait.workspace = true
base64.workspace = true
chrono.workspace = true
deno_core.workspace = true
deno_fetch.workspace = true
deno_node.workspace = true
deno_tls.workspace = true
deno_unsync = "0.1.1"
denokv_proto.workspace = true
denokv_remote.workspace = true
denokv_sqlite.workspace = true
hex.workspace = true
log.workspace = true
num-bigint.workspace = true

View file

@ -8,74 +8,19 @@ please read the [manual](https://deno.land/manual/runtime/kv).
Deno KV has a pluggable storage interface that supports multiple backends:
- SQLite - backed by a local SQLite database. This backend is suitable for
development and is the default when running locally.
development and is the default when running locally. It is implemented in the
[denokv_sqlite crate](https://github.com/denoland/denokv/blob/main/sqlite).
- Remote - backed by a remote service that implements the
[KV Connect](#kv-connect) protocol, for example
[Deno Deploy](https://deno.com/deploy).
Additional backends can be added by implementing the `DatabaseHandler` trait.
Additional backends can be added by implementing the `Database` trait.
## KV Connect
The KV Connect protocol has separate control and data planes to maximize
throughput and minimize latency. _Metadata Exchange_ and _Data Path_ are the two
sub-protocols that are used when talking to a KV Connect-compatible service.
### Metadata Exchange
To connect to a KV Connect service, the user provides an HTTP or HTTPS URL to
`Deno.openKv`. A background task is then spawned to periodically make HTTP POST
requests to the provided URL to refresh database metadata.
The HTTP `Authorization` header is included and have the format
`Bearer <access-token>`. The `<access-token>` is a static token issued by the
service provider. For Deno Deploy, this is the personal access token generated
from the dashboard. You can specify the access token with the environment
variable `DENO_KV_ACCESS_TOKEN`.
Request body is currently unused. The response is a JSON message that satisfies
the [JSON Schema](https://json-schema.org/) definition in
`cli/schemas/kv-metadata-exchange-response.v1.json`.
Semantics of the response fields:
- `version`: Protocol version. The only supported value is `1`.
- `databaseId`: UUID of the database.
- `endpoints`: Data plane endpoints that can serve requests to the database,
along with their consistency levels.
- `token`: An ephemeral authentication token that must be included in all
requests to the data plane. This value is an opaque string and the client
should not depend on its format.
- `expiresAt`: The time at which the token expires. Encoded as an ISO 8601
string.
### Data Path
After the first metadata exchange has completed, the client can talk to the data
plane endpoints listed in the `endpoints` field using a Protobuf-over-HTTP
protocol called the _Data Path_. The Protobuf messages are defined in
`proto/datapath.proto`.
Two sub-endpoints are available under a data plane endpoint URL:
- `POST /snapshot_read`: Used for read operations: `kv.get()` and
`kv.getMany()`.
- **Request type**: `SnapshotRead`
- **Response type**: `SnapshotReadOutput`
- `POST /atomic_write`: Used for write operations: `kv.set()` and
`kv.atomic().commit()`.
- **Request type**: `AtomicWrite`
- **Response type**: `AtomicWriteOutput`
An HTTP `Authorization` header in the format `Bearer <ephemeral-token>` must be
included in all requests to the data plane. The value of `<ephemeral-token>` is
the `token` field from the metadata exchange response.
### Error handling
All non-client errors (i.e. network errors and HTTP 5xx status codes) are
handled by retrying the request. Randomized exponential backoff is applied to
each retry.
Client errors cannot be recovered by retrying. A JavaScript exception is
generated for each of those errors.
The KV Connect protocol allows the Deno CLI to communicate with a remote KV
database. The
[specification for the protocol](https://github.com/denoland/denokv/blob/main/proto/kv-connect.md),
and the
[protobuf definitions](https://github.com/denoland/denokv/blob/main/proto/schema/datapath.proto)
can be found in the `denokv` repository, under the `proto` directory.

View file

@ -1,19 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::env;
use std::io;
use std::path::PathBuf;
fn main() -> io::Result<()> {
println!("cargo:rerun-if-changed=./proto");
let descriptor_path =
PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
prost_build::Config::new()
.file_descriptor_set_path(&descriptor_path)
.compile_well_known_types()
.compile_protos(&["proto/datapath.proto"], &["proto/"])?;
Ok(())
}

View file

@ -1,543 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/main/foundationdb/src/tuple/pack.rs
use crate::Key;
use crate::KeyPart;
//const NIL: u8 = 0x00;
const BYTES: u8 = 0x01;
const STRING: u8 = 0x02;
//const NESTED: u8 = 0x05;
const NEGINTSTART: u8 = 0x0b;
const INTZERO: u8 = 0x14;
const POSINTEND: u8 = 0x1d;
//const FLOAT: u8 = 0x20;
const DOUBLE: u8 = 0x21;
const FALSE: u8 = 0x26;
const TRUE: u8 = 0x27;
const ESCAPE: u8 = 0xff;
const CANONICAL_NAN_POS: u64 = 0x7ff8000000000000u64;
const CANONICAL_NAN_NEG: u64 = 0xfff8000000000000u64;
pub fn canonicalize_f64(n: f64) -> f64 {
if n.is_nan() {
if n.is_sign_negative() {
f64::from_bits(CANONICAL_NAN_NEG)
} else {
f64::from_bits(CANONICAL_NAN_POS)
}
} else {
n
}
}
pub fn encode_key(key: &Key) -> std::io::Result<Vec<u8>> {
let mut output: Vec<u8> = vec![];
for part in &key.0 {
match part {
KeyPart::String(key) => {
output.push(STRING);
escape_raw_bytes_into(&mut output, key.as_bytes());
output.push(0);
}
KeyPart::Int(key) => {
bigint::encode_into(&mut output, key)?;
}
KeyPart::Float(key) => {
double::encode_into(&mut output, *key);
}
KeyPart::Bytes(key) => {
output.push(BYTES);
escape_raw_bytes_into(&mut output, key);
output.push(0);
}
KeyPart::False => {
output.push(FALSE);
}
KeyPart::True => {
output.push(TRUE);
}
}
}
Ok(output)
}
pub fn decode_key(mut bytes: &[u8]) -> std::io::Result<Key> {
let mut key = Key(vec![]);
while !bytes.is_empty() {
let tag = bytes[0];
bytes = &bytes[1..];
let next_bytes = match tag {
self::STRING => {
let (next_bytes, data) = parse_slice(bytes)?;
let data = String::from_utf8(data).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid utf8")
})?;
key.0.push(KeyPart::String(data));
next_bytes
}
self::NEGINTSTART..=self::POSINTEND => {
let (next_bytes, data) = bigint::decode_from(bytes, tag)?;
key.0.push(KeyPart::Int(data));
next_bytes
}
self::DOUBLE => {
let (next_bytes, data) = double::decode_from(bytes)?;
key.0.push(KeyPart::Float(data));
next_bytes
}
self::BYTES => {
let (next_bytes, data) = parse_slice(bytes)?;
key.0.push(KeyPart::Bytes(data));
next_bytes
}
self::FALSE => {
key.0.push(KeyPart::False);
bytes
}
self::TRUE => {
key.0.push(KeyPart::True);
bytes
}
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid tag",
))
}
};
bytes = next_bytes;
}
Ok(key)
}
fn escape_raw_bytes_into(out: &mut Vec<u8>, x: &[u8]) {
for &b in x {
out.push(b);
if b == 0 {
out.push(ESCAPE);
}
}
}
mod bigint {
use num_bigint::BigInt;
use num_bigint::Sign;
use super::parse_byte;
use super::parse_bytes;
const MAX_SZ: usize = 8;
// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/7415e116d5d96c2630976058de28e439eed7e809/foundationdb/src/tuple/pack.rs#L575
pub fn encode_into(out: &mut Vec<u8>, key: &BigInt) -> std::io::Result<()> {
if key.sign() == Sign::NoSign {
out.push(super::INTZERO);
return Ok(());
}
let (sign, mut bytes) = key.to_bytes_be();
let n = bytes.len();
match sign {
Sign::Minus => {
if n <= MAX_SZ {
out.push(super::INTZERO - n as u8);
} else {
out.extend_from_slice(&[super::NEGINTSTART, bigint_n(n)? ^ 0xff]);
}
invert(&mut bytes);
out.extend_from_slice(&bytes);
}
Sign::NoSign => unreachable!(),
Sign::Plus => {
if n <= MAX_SZ {
out.push(super::INTZERO + n as u8);
} else {
out.extend_from_slice(&[super::POSINTEND, bigint_n(n)?]);
}
out.extend_from_slice(&bytes);
}
}
Ok(())
}
pub fn decode_from(
input: &[u8],
tag: u8,
) -> std::io::Result<(&[u8], BigInt)> {
if super::INTZERO <= tag && tag <= super::INTZERO + MAX_SZ as u8 {
let n = (tag - super::INTZERO) as usize;
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
} else if super::INTZERO - MAX_SZ as u8 <= tag && tag < super::INTZERO {
let n = (super::INTZERO - tag) as usize;
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
} else if tag == super::NEGINTSTART {
let (input, raw_length) = parse_byte(input)?;
let n = usize::from(raw_length ^ 0xff);
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
} else if tag == super::POSINTEND {
let (input, raw_length) = parse_byte(input)?;
let n: usize = usize::from(raw_length);
let (input, bytes) = parse_bytes(input, n)?;
Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("unknown bigint tag: {}", tag),
))
}
}
fn invert(bytes: &mut [u8]) {
// The ones' complement of a binary number is defined as the value
// obtained by inverting all the bits in the binary representation
// of the number (swapping 0s for 1s and vice versa).
for byte in bytes.iter_mut() {
*byte = !*byte;
}
}
fn inverted(bytes: &[u8]) -> Vec<u8> {
// The ones' complement of a binary number is defined as the value
// obtained by inverting all the bits in the binary representation
// of the number (swapping 0s for 1s and vice versa).
bytes.iter().map(|byte| !*byte).collect()
}
fn bigint_n(n: usize) -> std::io::Result<u8> {
u8::try_from(n).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"BigUint requires more than 255 bytes to be represented",
)
})
}
}
mod double {
macro_rules! sign_bit {
($type:ident) => {
(1 << (std::mem::size_of::<$type>() * 8 - 1))
};
}
fn f64_to_ux_be_bytes(f: f64) -> [u8; 8] {
let u = if f.is_sign_negative() {
f.to_bits() ^ ::std::u64::MAX
} else {
f.to_bits() ^ sign_bit!(u64)
};
u.to_be_bytes()
}
pub fn encode_into(out: &mut Vec<u8>, x: f64) {
out.push(super::DOUBLE);
out.extend_from_slice(&f64_to_ux_be_bytes(super::canonicalize_f64(x)));
}
pub fn decode_from(input: &[u8]) -> std::io::Result<(&[u8], f64)> {
let (input, bytes) = super::parse_bytes(input, 8)?;
let mut arr = [0u8; 8];
arr.copy_from_slice(bytes);
let u = u64::from_be_bytes(arr);
Ok((
input,
f64::from_bits(if (u & sign_bit!(u64)) == 0 {
u ^ ::std::u64::MAX
} else {
u ^ sign_bit!(u64)
}),
))
}
}
#[inline]
fn parse_bytes(input: &[u8], num: usize) -> std::io::Result<(&[u8], &[u8])> {
if input.len() < num {
Err(std::io::ErrorKind::UnexpectedEof.into())
} else {
Ok((&input[num..], &input[..num]))
}
}
#[inline]
fn parse_byte(input: &[u8]) -> std::io::Result<(&[u8], u8)> {
if input.is_empty() {
Err(std::io::ErrorKind::UnexpectedEof.into())
} else {
Ok((&input[1..], input[0]))
}
}
fn parse_slice(input: &[u8]) -> std::io::Result<(&[u8], Vec<u8>)> {
let mut output: Vec<u8> = Vec::new();
let mut i = 0usize;
while i < input.len() {
let byte = input[i];
i += 1;
if byte == 0 {
if input.get(i).copied() == Some(ESCAPE) {
output.push(0);
i += 1;
continue;
} else {
return Ok((&input[i..], output));
}
}
output.push(byte);
}
Err(std::io::ErrorKind::UnexpectedEof.into())
}
#[cfg(test)]
mod tests {
use num_bigint::BigInt;
use std::cmp::Ordering;
use crate::Key;
use crate::KeyPart;
use super::decode_key;
use super::encode_key;
fn roundtrip(key: Key) {
let bytes = encode_key(&key).unwrap();
let decoded = decode_key(&bytes).unwrap();
assert_eq!(&key, &decoded);
assert_eq!(format!("{:?}", key), format!("{:?}", decoded));
}
fn check_order(a: Key, b: Key, expected: Ordering) {
let a_bytes = encode_key(&a).unwrap();
let b_bytes = encode_key(&b).unwrap();
assert_eq!(a.cmp(&b), expected);
assert_eq!(a_bytes.cmp(&b_bytes), expected);
}
fn check_bijection(key: Key, serialized: &[u8]) {
let bytes = encode_key(&key).unwrap();
assert_eq!(&bytes[..], serialized);
let decoded = decode_key(serialized).unwrap();
assert_eq!(&key, &decoded);
}
#[test]
fn simple_roundtrip() {
roundtrip(Key(vec![
KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00]),
KeyPart::String("foo".to_string()),
KeyPart::Float(-f64::NAN),
KeyPart::Float(-f64::INFINITY),
KeyPart::Float(-42.1),
KeyPart::Float(-0.0),
KeyPart::Float(0.0),
KeyPart::Float(42.1),
KeyPart::Float(f64::INFINITY),
KeyPart::Float(f64::NAN),
KeyPart::Int(BigInt::from(-10000)),
KeyPart::Int(BigInt::from(-1)),
KeyPart::Int(BigInt::from(0)),
KeyPart::Int(BigInt::from(1)),
KeyPart::Int(BigInt::from(10000)),
KeyPart::False,
KeyPart::True,
]));
}
#[test]
#[rustfmt::skip]
fn order_bytes() {
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Ordering::Equal,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Ordering::Greater,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
Ordering::Greater,
);
}
#[test]
#[rustfmt::skip]
fn order_tags() {
check_order(
Key(vec![KeyPart::Bytes(vec![])]),
Key(vec![KeyPart::String("".into())]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::String("".into())]),
Key(vec![KeyPart::Int(BigInt::from(0))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(0))]),
Key(vec![KeyPart::Float(0.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(0.0)]),
Key(vec![KeyPart::False]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::False]),
Key(vec![KeyPart::True]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::True]),
Key(vec![KeyPart::Bytes(vec![])]),
Ordering::Greater,
);
}
#[test]
#[rustfmt::skip]
fn order_floats() {
check_order(
Key(vec![KeyPart::Float(-f64::NAN)]),
Key(vec![KeyPart::Float(-f64::INFINITY)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(-f64::INFINITY)]),
Key(vec![KeyPart::Float(-10.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(-10.0)]),
Key(vec![KeyPart::Float(-0.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(-0.0)]),
Key(vec![KeyPart::Float(0.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(0.0)]),
Key(vec![KeyPart::Float(10.0)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(10.0)]),
Key(vec![KeyPart::Float(f64::INFINITY)]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Float(f64::INFINITY)]),
Key(vec![KeyPart::Float(f64::NAN)]),
Ordering::Less,
);
}
#[test]
#[rustfmt::skip]
fn order_ints() {
check_order(
Key(vec![KeyPart::Int(BigInt::from(-10000))]),
Key(vec![KeyPart::Int(BigInt::from(-100))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(-100))]),
Key(vec![KeyPart::Int(BigInt::from(-1))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(-1))]),
Key(vec![KeyPart::Int(BigInt::from(0))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(0))]),
Key(vec![KeyPart::Int(BigInt::from(1))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(1))]),
Key(vec![KeyPart::Int(BigInt::from(100))]),
Ordering::Less,
);
check_order(
Key(vec![KeyPart::Int(BigInt::from(100))]),
Key(vec![KeyPart::Int(BigInt::from(10000))]),
Ordering::Less,
);
}
#[test]
#[rustfmt::skip]
fn float_canonicalization() {
let key1 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000001))]);
let key2 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000002))]);
assert_eq!(key1, key2);
assert_eq!(encode_key(&key1).unwrap(), encode_key(&key2).unwrap());
}
#[test]
#[rustfmt::skip]
fn explicit_bijection() {
// string
check_bijection(
Key(vec![KeyPart::String("hello".into())]),
&[0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00],
);
// zero byte escape
check_bijection(
Key(vec![KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08])]),
&[0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00],
);
// array
check_bijection(
Key(vec![
KeyPart::String("hello".into()),
KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08]),
]),
&[
0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, /* string */
0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00, /* bytes */
],
);
}
}

View file

@ -7,17 +7,17 @@ use crate::remote::RemoteDbHandlerPermissions;
use crate::sqlite::SqliteDbHandler;
use crate::sqlite::SqliteDbHandlerPermissions;
use crate::AtomicWrite;
use crate::CommitResult;
use crate::Database;
use crate::DatabaseHandler;
use crate::QueueMessageHandle;
use crate::ReadRange;
use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::OpState;
use denokv_proto::CommitResult;
use denokv_proto::ReadRangeOutput;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
@ -34,15 +34,20 @@ impl MultiBackendDbHandler {
P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
>(
default_storage_dir: Option<std::path::PathBuf>,
versionstamp_rng_seed: Option<u64>,
http_options: crate::remote::HttpOptions,
) -> Self {
Self::new(vec![
(
&["https://", "http://"],
Box::new(crate::remote::RemoteDbHandler::<P>::new()),
Box::new(crate::remote::RemoteDbHandler::<P>::new(http_options)),
),
(
&[""],
Box::new(SqliteDbHandler::<P>::new(default_storage_dir)),
Box::new(SqliteDbHandler::<P>::new(
default_storage_dir,
versionstamp_rng_seed,
)),
),
])
}
@ -118,20 +123,17 @@ where
pub trait DynamicDb {
async fn dyn_snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn dyn_atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
fn dyn_close(&self);
@ -143,26 +145,23 @@ impl Database for Box<dyn DynamicDb> {
async fn snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
(**self).dyn_snapshot_read(state, requests, options).await
(**self).dyn_snapshot_read(requests, options).await
}
async fn atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
(**self).dyn_atomic_write(state, write).await
(**self).dyn_atomic_write(write).await
}
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
(**self).dyn_dequeue_next_message(state).await
(**self).dyn_dequeue_next_message().await
}
fn close(&self) {
@ -178,28 +177,25 @@ where
{
async fn dyn_snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
Ok(self.snapshot_read(state, requests, options).await?)
Ok(self.snapshot_read(requests, options).await?)
}
async fn dyn_atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
Ok(self.atomic_write(state, write).await?)
Ok(self.atomic_write(write).await?)
}
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
Ok(
self
.dequeue_next_message(state)
.dequeue_next_message()
.await?
.map(|x| Box::new(x) as Box<dyn QueueMessageHandle>),
)
@ -209,13 +205,3 @@ where
self.close()
}
}
#[async_trait(?Send)]
impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
(**self).take_payload().await
}
async fn finish(&self, success: bool) -> Result<(), AnyError> {
(**self).finish(success).await
}
}

View file

@ -1,16 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::cmp::Ordering;
use std::num::NonZeroU32;
use std::rc::Rc;
use async_trait::async_trait;
use deno_core::error::AnyError;
use deno_core::OpState;
use num_bigint::BigInt;
use crate::codec::canonicalize_f64;
use denokv_proto::Database;
#[async_trait(?Send)]
pub trait DatabaseHandler {
@ -22,312 +18,3 @@ pub trait DatabaseHandler {
path: Option<String>,
) -> Result<Self::DB, AnyError>;
}
#[async_trait(?Send)]
pub trait Database {
type QMH: QueueMessageHandle + 'static;
async fn snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Option<Self::QMH>, AnyError>;
fn close(&self);
}
#[async_trait(?Send)]
pub trait QueueMessageHandle {
async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError>;
async fn finish(&self, success: bool) -> Result<(), AnyError>;
}
/// Options for a snapshot read.
pub struct SnapshotReadOptions {
pub consistency: Consistency,
}
/// The consistency of a read.
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum Consistency {
Strong,
Eventual,
}
/// A key is for a KV pair. It is a vector of KeyParts.
///
/// The ordering of the keys is defined by the ordering of the KeyParts. The
/// first KeyPart is the most significant, and the last KeyPart is the least
/// significant.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)]
pub struct Key(pub Vec<KeyPart>);
/// A key part is single item in a key. It can be a boolean, a double float, a
/// variable precision signed integer, a UTF-8 string, or an arbitrary byte
/// array.
///
/// The ordering of a KeyPart is dependent on the type of the KeyPart.
///
/// Between different types, the ordering is as follows: arbitrary byte array <
/// UTF-8 string < variable precision signed integer < double float < false < true.
///
/// Within a type, the ordering is as follows:
/// - For a **boolean**, false is less than true.
/// - For a **double float**, the ordering must follow -NaN < -Infinity < -100.0 < -1.0 < -0.5 < -0.0 < 0.0 < 0.5 < 1.0 < 100.0 < Infinity < NaN.
/// - For a **variable precision signed integer**, the ordering must follow mathematical ordering.
/// - For a **UTF-8 string**, the ordering must follow the UTF-8 byte ordering.
/// - For an **arbitrary byte array**, the ordering must follow the byte ordering.
///
/// This means that the key part `1.0` is less than the key part `2.0`, but is
/// greater than the key part `0n`, because `1.0` is a double float and `0n`
/// is a variable precision signed integer, and the ordering types obviously has
/// precedence over the ordering within a type.
#[derive(Clone, Debug)]
pub enum KeyPart {
Bytes(Vec<u8>),
String(String),
Int(BigInt),
Float(f64),
False,
True,
}
impl KeyPart {
fn tag_ordering(&self) -> u8 {
match self {
KeyPart::Bytes(_) => 0,
KeyPart::String(_) => 1,
KeyPart::Int(_) => 2,
KeyPart::Float(_) => 3,
KeyPart::False => 4,
KeyPart::True => 5,
}
}
}
impl Eq for KeyPart {}
impl PartialEq for KeyPart {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Ord for KeyPart {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2),
(KeyPart::String(s1), KeyPart::String(s2)) => {
s1.as_bytes().cmp(s2.as_bytes())
}
(KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2),
(KeyPart::Float(f1), KeyPart::Float(f2)) => {
canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2))
}
_ => self.tag_ordering().cmp(&other.tag_ordering()),
}
}
}
impl PartialOrd for KeyPart {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// A request to read a range of keys from the database. If `end` is `None`,
/// then the range is from `start` shall also be used as the end of the range.
///
/// The range is inclusive of the start and exclusive of the end. The start may
/// not be greater than the end.
///
/// The range is limited to `limit` number of entries.
pub struct ReadRange {
pub start: Vec<u8>,
pub end: Vec<u8>,
pub limit: NonZeroU32,
pub reverse: bool,
}
/// A response to a `ReadRange` request.
pub struct ReadRangeOutput {
pub entries: Vec<KvEntry>,
}
/// A versionstamp is a 10 byte array that is used to represent the version of
/// a key in the database.
type Versionstamp = [u8; 10];
/// A key-value entry with a versionstamp.
pub struct KvEntry {
pub key: Vec<u8>,
pub value: Value,
pub versionstamp: Versionstamp,
}
/// A serialized value for a KV pair as stored in the database. All values
/// **can** be serialized into the V8 representation, but not all values are.
///
/// The V8 representation is an opaque byte array that is only meaningful to
/// the V8 engine. It is guaranteed to be backwards compatible. Because this
/// representation is opaque, it is not possible to inspect or modify the value
/// without deserializing it.
///
/// The inability to inspect or modify the value without deserializing it means
/// that these values can not be quickly modified when performing atomic
/// read-modify-write operations on the database (because the database may not
/// have the ability to deserialize the V8 value into a modifiable value).
///
/// Because of this constraint, there are more specialized representations for
/// certain types of values that can be used in atomic read-modify-write
/// operations. These specialized representations are:
///
/// - **Bytes**: an arbitrary byte array.
/// - **U64**: a 64-bit unsigned integer.
pub enum Value {
V8(Vec<u8>),
Bytes(Vec<u8>),
U64(u64),
}
/// A request to perform an atomic check-modify-write operation on the database.
///
/// The operation is performed atomically, meaning that the operation will
/// either succeed or fail. If the operation fails, then the database will be
/// left in the same state as before the operation was attempted. If the
/// operation succeeds, then the database will be left in a new state.
///
/// The operation is performed by first checking the database for the current
/// state of the keys, defined by the `checks` field. If the current state of
/// the keys does not match the expected state, then the operation fails. If
/// the current state of the keys matches the expected state, then the
/// mutations are applied to the database.
///
/// All checks and mutations are performed atomically.
///
/// The mutations are performed in the order that they are specified in the
/// `mutations` field. The order of checks is not specified, and is also not
/// important because this ordering is un-observable.
pub struct AtomicWrite {
pub checks: Vec<KvCheck>,
pub mutations: Vec<KvMutation>,
pub enqueues: Vec<Enqueue>,
}
/// A request to perform a check on a key in the database. The check is not
/// performed on the value of the key, but rather on the versionstamp of the
/// key.
pub struct KvCheck {
pub key: Vec<u8>,
pub versionstamp: Option<Versionstamp>,
}
/// A request to perform a mutation on a key in the database. The mutation is
/// performed on the value of the key.
///
/// The type of mutation is specified by the `kind` field. The action performed
/// by each mutation kind is specified in the docs for [MutationKind].
pub struct KvMutation {
pub key: Vec<u8>,
pub kind: MutationKind,
pub expire_at: Option<u64>,
}
/// A request to enqueue a message to the database. This message is delivered
/// to a listener of the queue at least once.
///
/// ## Retry
///
/// When the delivery of a message fails, it is retried for a finite number
/// of times. Each retry happens after a backoff period. The backoff periods
/// are specified by the `backoff_schedule` field in milliseconds. If
/// unspecified, the default backoff schedule of the platform (CLI or Deploy)
/// is used.
///
/// If all retry attempts failed, the message is written to the KV under all
/// keys specified in `keys_if_undelivered`.
pub struct Enqueue {
pub payload: Vec<u8>,
pub delay_ms: u64,
pub keys_if_undelivered: Vec<Vec<u8>>,
pub backoff_schedule: Option<Vec<u32>>,
}
/// The type of mutation to perform on a key in the database.
///
/// ## Set
///
/// The set mutation sets the value of the key to the specified value. It
/// discards the previous value of the key, if any.
///
/// This operand supports all [Value] types.
///
/// ## Delete
///
/// The delete mutation deletes the value of the key.
///
/// ## Sum
///
/// The sum mutation adds the specified value to the existing value of the key.
///
/// This operand supports only value types [Value::U64]. The existing value in
/// the database must match the type of the value specified in the mutation. If
/// the key does not exist in the database, then the value specified in the
/// mutation is used as the new value of the key.
///
/// ## Min
///
/// The min mutation sets the value of the key to the minimum of the existing
/// value of the key and the specified value.
///
/// This operand supports only value types [Value::U64]. The existing value in
/// the database must match the type of the value specified in the mutation. If
/// the key does not exist in the database, then the value specified in the
/// mutation is used as the new value of the key.
///
/// ## Max
///
/// The max mutation sets the value of the key to the maximum of the existing
/// value of the key and the specified value.
///
/// This operand supports only value types [Value::U64]. The existing value in
/// the database must match the type of the value specified in the mutation. If
/// the key does not exist in the database, then the value specified in the
/// mutation is used as the new value of the key.
pub enum MutationKind {
Set(Value),
Delete,
Sum(Value),
Min(Value),
Max(Value),
}
impl MutationKind {
pub fn value(&self) -> Option<&Value> {
match self {
MutationKind::Set(value) => Some(value),
MutationKind::Sum(value) => Some(value),
MutationKind::Min(value) => Some(value),
MutationKind::Max(value) => Some(value),
MutationKind::Delete => None,
}
}
}
/// The result of a successful commit of an atomic write operation.
pub struct CommitResult {
/// The new versionstamp of the data that was committed.
pub versionstamp: Versionstamp,
}

View file

@ -1,9 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub mod codec;
pub mod dynamic;
mod interface;
mod proto;
pub mod remote;
pub mod sqlite;
mod time;
@ -12,11 +10,12 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU32;
use std::rc::Rc;
use std::time::Duration;
use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
use codec::decode_key;
use codec::encode_key;
use chrono::DateTime;
use chrono::Utc;
use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
@ -30,8 +29,26 @@ use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
use denokv_proto::decode_key;
use denokv_proto::encode_key;
use denokv_proto::AtomicWrite;
use denokv_proto::Check;
use denokv_proto::Consistency;
use denokv_proto::Database;
use denokv_proto::Enqueue;
use denokv_proto::Key;
use denokv_proto::KeyPart;
use denokv_proto::KvEntry;
use denokv_proto::KvValue;
use denokv_proto::Mutation;
use denokv_proto::MutationKind;
use denokv_proto::QueueMessageHandle;
use denokv_proto::ReadRange;
use denokv_proto::SnapshotReadOptions;
use log::debug;
use serde::Deserialize;
use serde::Serialize;
use time::utc_now;
pub use crate::interface::*;
@ -110,30 +127,26 @@ where
type KvKey = Vec<AnyValue>;
impl From<AnyValue> for KeyPart {
fn from(value: AnyValue) -> Self {
match value {
AnyValue::Bool(false) => KeyPart::False,
AnyValue::Bool(true) => KeyPart::True,
AnyValue::Number(n) => KeyPart::Float(n),
AnyValue::BigInt(n) => KeyPart::Int(n),
AnyValue::String(s) => KeyPart::String(s),
AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
AnyValue::RustBuffer(_) => unreachable!(),
}
fn key_part_from_v8(value: AnyValue) -> KeyPart {
match value {
AnyValue::Bool(false) => KeyPart::False,
AnyValue::Bool(true) => KeyPart::True,
AnyValue::Number(n) => KeyPart::Float(n),
AnyValue::BigInt(n) => KeyPart::Int(n),
AnyValue::String(s) => KeyPart::String(s),
AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
AnyValue::RustBuffer(_) => unreachable!(),
}
}
impl From<KeyPart> for AnyValue {
fn from(value: KeyPart) -> Self {
match value {
KeyPart::False => AnyValue::Bool(false),
KeyPart::True => AnyValue::Bool(true),
KeyPart::Float(n) => AnyValue::Number(n),
KeyPart::Int(n) => AnyValue::BigInt(n),
KeyPart::String(s) => AnyValue::String(s),
KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
}
fn key_part_to_v8(value: KeyPart) -> AnyValue {
match value {
KeyPart::False => AnyValue::Bool(false),
KeyPart::True => AnyValue::Bool(true),
KeyPart::Float(n) => AnyValue::Number(n),
KeyPart::Int(n) => AnyValue::BigInt(n),
KeyPart::String(s) => AnyValue::String(s),
KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
}
}
@ -153,25 +166,25 @@ enum ToV8Value {
U64(BigInt),
}
impl TryFrom<FromV8Value> for Value {
impl TryFrom<FromV8Value> for KvValue {
type Error = AnyError;
fn try_from(value: FromV8Value) -> Result<Self, AnyError> {
Ok(match value {
FromV8Value::V8(buf) => Value::V8(buf.to_vec()),
FromV8Value::Bytes(buf) => Value::Bytes(buf.to_vec()),
FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
FromV8Value::U64(n) => {
Value::U64(num_bigint::BigInt::from(n).try_into()?)
KvValue::U64(num_bigint::BigInt::from(n).try_into()?)
}
})
}
}
impl From<Value> for ToV8Value {
fn from(value: Value) -> Self {
impl From<KvValue> for ToV8Value {
fn from(value: KvValue) -> Self {
match value {
Value::V8(buf) => ToV8Value::V8(buf.into()),
Value::Bytes(buf) => ToV8Value::Bytes(buf.into()),
Value::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
KvValue::V8(buf) => ToV8Value::V8(buf.into()),
KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()),
KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
}
}
}
@ -190,7 +203,7 @@ impl TryFrom<KvEntry> for ToV8KvEntry {
key: decode_key(&entry.key)?
.0
.into_iter()
.map(Into::into)
.map(key_part_to_v8)
.collect(),
value: entry.value.into(),
versionstamp: hex::encode(entry.versionstamp).into(),
@ -282,8 +295,7 @@ where
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
let output_ranges =
db.snapshot_read(state.clone(), read_ranges, opts).await?;
let output_ranges = db.snapshot_read(read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
@ -302,7 +314,7 @@ struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
fn name(&self) -> Cow<str> {
"queue_message".into()
"queueMessage".into()
}
}
@ -331,7 +343,7 @@ where
resource.db.clone()
};
let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else {
let Some(mut handle) = db.dequeue_next_message().await? else {
return Ok(None);
};
let payload = handle.take_payload().await?.into();
@ -361,81 +373,81 @@ where
.map_err(|_| type_error("Queue message not found"))?
.handle
};
handle.finish(success).await
// if we fail to finish the message, there is not much we can do and the
// message will be retried anyway, so we just ignore the error
if let Err(err) = handle.finish(success).await {
debug!("Failed to finish dequeued message: {}", err);
};
Ok(())
}
type V8KvCheck = (KvKey, Option<ByteString>);
impl TryFrom<V8KvCheck> for KvCheck {
type Error = AnyError;
fn try_from(value: V8KvCheck) -> Result<Self, AnyError> {
let versionstamp = match value.1 {
Some(data) => {
let mut out = [0u8; 10];
hex::decode_to_slice(data, &mut out)
.map_err(|_| type_error("invalid versionstamp"))?;
Some(out)
}
None => None,
};
Ok(KvCheck {
key: encode_v8_key(value.0)?,
versionstamp,
})
}
fn check_from_v8(value: V8KvCheck) -> Result<Check, AnyError> {
let versionstamp = match value.1 {
Some(data) => {
let mut out = [0u8; 10];
hex::decode_to_slice(data, &mut out)
.map_err(|_| type_error("invalid versionstamp"))?;
Some(out)
}
None => None,
};
Ok(Check {
key: encode_v8_key(value.0)?,
versionstamp,
})
}
type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
impl TryFrom<(V8KvMutation, u64)> for KvMutation {
type Error = AnyError;
fn try_from(
(value, current_timstamp): (V8KvMutation, u64),
) -> Result<Self, AnyError> {
let key = encode_v8_key(value.0)?;
let kind = match (value.1.as_str(), value.2) {
("set", Some(value)) => MutationKind::Set(value.try_into()?),
("delete", None) => MutationKind::Delete,
("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
("min", Some(value)) => MutationKind::Min(value.try_into()?),
("max", Some(value)) => MutationKind::Max(value.try_into()?),
(op, Some(_)) => {
return Err(type_error(format!("invalid mutation '{op}' with value")))
}
(op, None) => {
return Err(type_error(format!(
"invalid mutation '{op}' without value"
)))
}
};
Ok(KvMutation {
key,
kind,
expire_at: value.3.map(|expire_in| current_timstamp + expire_in),
})
}
fn mutation_from_v8(
(value, current_timstamp): (V8KvMutation, DateTime<Utc>),
) -> Result<Mutation, AnyError> {
let key = encode_v8_key(value.0)?;
let kind = match (value.1.as_str(), value.2) {
("set", Some(value)) => MutationKind::Set(value.try_into()?),
("delete", None) => MutationKind::Delete,
("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
("min", Some(value)) => MutationKind::Min(value.try_into()?),
("max", Some(value)) => MutationKind::Max(value.try_into()?),
(op, Some(_)) => {
return Err(type_error(format!("invalid mutation '{op}' with value")))
}
(op, None) => {
return Err(type_error(format!("invalid mutation '{op}' without value")))
}
};
Ok(Mutation {
key,
kind,
expire_at: value
.3
.map(|expire_in| current_timstamp + Duration::from_millis(expire_in)),
})
}
type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
impl TryFrom<V8Enqueue> for Enqueue {
type Error = AnyError;
fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
Ok(Enqueue {
payload: value.0.to_vec(),
delay_ms: value.1,
keys_if_undelivered: value
.2
.into_iter()
.map(encode_v8_key)
.collect::<std::io::Result<_>>()?,
backoff_schedule: value.3,
})
}
fn enqueue_from_v8(
value: V8Enqueue,
current_timestamp: DateTime<Utc>,
) -> Result<Enqueue, AnyError> {
Ok(Enqueue {
payload: value.0.to_vec(),
deadline: current_timestamp
+ chrono::Duration::milliseconds(value.1 as i64),
keys_if_undelivered: value
.2
.into_iter()
.map(encode_v8_key)
.collect::<std::io::Result<_>>()?,
backoff_schedule: value.3,
})
}
fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
encode_key(&Key(key.into_iter().map(From::from).collect()))
encode_key(&Key(key.into_iter().map(key_part_from_v8).collect()))
}
enum RawSelector {
@ -610,7 +622,7 @@ async fn op_kv_atomic_write<DBH>(
where
DBH: DatabaseHandler + 'static,
{
let current_timestamp = time::utc_now().timestamp_millis() as u64;
let current_timestamp = utc_now();
let db = {
let state = state.borrow();
let resource =
@ -631,17 +643,17 @@ where
let checks = checks
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<KvCheck>, AnyError>>()
.map(check_from_v8)
.collect::<Result<Vec<Check>, AnyError>>()
.with_context(|| "invalid check")?;
let mutations = mutations
.into_iter()
.map(|mutation| TryFrom::try_from((mutation, current_timestamp)))
.collect::<Result<Vec<KvMutation>, AnyError>>()
.map(|mutation| mutation_from_v8((mutation, current_timestamp)))
.collect::<Result<Vec<Mutation>, AnyError>>()
.with_context(|| "invalid mutation")?;
let enqueues = enqueues
.into_iter()
.map(TryInto::try_into)
.map(|e| enqueue_from_v8(e, current_timestamp))
.collect::<Result<Vec<Enqueue>, AnyError>>()
.with_context(|| "invalid enqueue")?;
@ -690,7 +702,7 @@ where
enqueues,
};
let result = db.atomic_write(state.clone(), atomic_write).await?;
let result = db.atomic_write(atomic_write).await?;
Ok(result.map(|res| hex::encode(res.versionstamp)))
}
@ -732,11 +744,11 @@ fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> {
}
}
fn check_value_size(value: &Value) -> Result<usize, AnyError> {
fn check_value_size(value: &KvValue) -> Result<usize, AnyError> {
let payload = match value {
Value::Bytes(x) => x,
Value::V8(x) => x,
Value::U64(_) => return Ok(8),
KvValue::Bytes(x) => x,
KvValue::V8(x) => x,
KvValue::U64(_) => return Ok(8),
};
if payload.len() > MAX_VALUE_SIZE_BYTES {

View file

@ -1,97 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
syntax = "proto3";
package datapath;
message SnapshotRead {
repeated ReadRange ranges = 1;
}
message SnapshotReadOutput {
repeated ReadRangeOutput ranges = 1;
bool read_disabled = 2;
repeated string regions_if_read_disabled = 3;
bool read_is_strongly_consistent = 4;
string primary_if_not_strongly_consistent = 5;
}
message ReadRange {
bytes start = 1;
bytes end = 2;
int32 limit = 3;
bool reverse = 4;
}
message ReadRangeOutput {
repeated KvEntry values = 1;
}
message AtomicWrite {
repeated KvCheck kv_checks = 1;
repeated KvMutation kv_mutations = 2;
repeated Enqueue enqueues = 3;
}
message AtomicWriteOutput {
AtomicWriteStatus status = 1;
bytes versionstamp = 2;
string primary_if_write_disabled = 3;
}
message KvCheck {
bytes key = 1;
bytes versionstamp = 2; // 10-byte raw versionstamp
}
message KvMutation {
bytes key = 1;
KvValue value = 2;
KvMutationType mutation_type = 3;
int64 expire_at_ms = 4;
}
message KvValue {
bytes data = 1;
KvValueEncoding encoding = 2;
}
message KvEntry {
bytes key = 1;
bytes value = 2;
KvValueEncoding encoding = 3;
bytes versionstamp = 4;
}
enum KvMutationType {
M_UNSPECIFIED = 0;
M_SET = 1;
M_CLEAR = 2;
M_SUM = 3;
M_MAX = 4;
M_MIN = 5;
}
enum KvValueEncoding {
VE_UNSPECIFIED = 0;
VE_V8 = 1;
VE_LE64 = 2;
VE_BYTES = 3;
}
enum AtomicWriteStatus {
AW_UNSPECIFIED = 0;
AW_SUCCESS = 1;
AW_CHECK_FAILURE = 2;
AW_UNSUPPORTED_WRITE = 3;
AW_USAGE_LIMIT_EXCEEDED = 4;
AW_WRITE_DISABLED = 5;
AW_QUEUE_BACKLOG_LIMIT_EXCEEDED = 6;
}
message Enqueue {
bytes payload = 1;
int64 deadline_ms = 2;
repeated bytes kv_keys_if_undelivered = 3;
repeated uint32 backoff_schedule = 4;
}

View file

@ -1,7 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Generated code, disable lints
#[allow(clippy::all, non_snake_case)]
pub mod datapath {
include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
}

View file

@ -1,43 +1,42 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::fmt;
use std::io::Write;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use crate::proto::datapath as pb;
use crate::AtomicWrite;
use crate::CommitResult;
use crate::Database;
use crate::DatabaseHandler;
use crate::KvEntry;
use crate::MutationKind;
use crate::QueueMessageHandle;
use crate::ReadRange;
use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use anyhow::Context;
use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::TryFutureExt;
use deno_core::unsync::JoinHandle;
use deno_core::OpState;
use prost::Message;
use rand::Rng;
use serde::Deserialize;
use termcolor::Ansi;
use termcolor::Color;
use termcolor::ColorSpec;
use termcolor::WriteColor;
use tokio::sync::watch;
use deno_fetch::create_http_client;
use deno_fetch::CreateHttpClientOptions;
use deno_tls::rustls::RootCertStore;
use deno_tls::Proxy;
use deno_tls::RootCertStoreProvider;
use denokv_remote::MetadataEndpoint;
use denokv_remote::Remote;
use url::Url;
use uuid::Uuid;
#[derive(Clone)]
pub struct HttpOptions {
pub user_agent: String,
pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
pub proxy: Option<Proxy>,
pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
pub client_cert_chain_and_key: Option<(String, String)>,
}
impl HttpOptions {
pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, AnyError> {
Ok(match &self.root_cert_store_provider {
Some(provider) => Some(provider.get_or_try_init()?.clone()),
None => None,
})
}
}
pub trait RemoteDbHandlerPermissions {
fn check_env(&mut self, var: &str) -> Result<(), AnyError>;
@ -49,50 +48,39 @@ pub trait RemoteDbHandlerPermissions {
}
pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
http_options: HttpOptions,
_p: std::marker::PhantomData<P>,
}
impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
pub fn new() -> Self {
Self { _p: PhantomData }
pub fn new(http_options: HttpOptions) -> Self {
Self {
http_options,
_p: PhantomData,
}
}
}
impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> {
fn default() -> Self {
Self::new()
pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
state: Rc<RefCell<OpState>>,
_permissions: PhantomData<P>,
}
impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
for PermissionChecker<P>
{
fn check_net_url(&self, url: &Url) -> Result<(), anyhow::Error> {
let mut state = self.state.borrow_mut();
let permissions = state.borrow_mut::<P>();
permissions.check_net_url(url, "Deno.openKv")
}
}
#[derive(Deserialize)]
struct VersionInfo {
version: u64,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(dead_code)]
struct DatabaseMetadata {
version: u64,
database_id: Uuid,
endpoints: Vec<EndpointInfo>,
token: String,
expires_at: DateTime<Utc>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EndpointInfo {
pub url: String,
// Using `String` instead of an enum, so that parsing doesn't
// break if more consistency levels are added.
pub consistency: String,
}
#[async_trait(?Send)]
impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
type DB = RemoteDb<P>;
impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler
for RemoteDbHandler<P>
{
type DB = Remote<PermissionChecker<P>>;
async fn open(
&self,
@ -122,470 +110,36 @@ impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
"Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
})?;
let refresher = MetadataRefresher::new(url, access_token);
let db = RemoteDb {
client: reqwest::Client::new(),
refresher,
_p: PhantomData,
};
Ok(db)
}
}
pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> {
client: reqwest::Client,
refresher: MetadataRefresher,
_p: std::marker::PhantomData<P>,
}
pub struct DummyQueueMessageHandle {}
#[async_trait(?Send)]
impl QueueMessageHandle for DummyQueueMessageHandle {
async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
unimplemented!()
}
async fn finish(&self, _success: bool) -> Result<(), AnyError> {
unimplemented!()
}
}
#[async_trait(?Send)]
impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
type QMH = DummyQueueMessageHandle;
async fn snapshot_read(
&self,
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
let req = pb::SnapshotRead {
ranges: requests
.into_iter()
.map(|r| pb::ReadRange {
start: r.start,
end: r.end,
limit: r.limit.get() as _,
reverse: r.reverse,
})
.collect(),
let metadata_endpoint = MetadataEndpoint {
url: parsed_url.clone(),
access_token: access_token.clone(),
};
let res: pb::SnapshotReadOutput = call_remote::<P, _, _>(
&state,
&self.refresher,
&self.client,
"snapshot_read",
&req,
)
.await?;
let options = &self.http_options;
let client = create_http_client(
&options.user_agent,
CreateHttpClientOptions {
root_cert_store: options.root_cert_store()?,
ca_certs: vec![],
proxy: options.proxy.clone(),
unsafely_ignore_certificate_errors: options
.unsafely_ignore_certificate_errors
.clone(),
client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
pool_max_idle_per_host: None,
pool_idle_timeout: None,
http1: true,
http2: true,
},
)?;
if res.read_disabled {
return Err(type_error("Reads are disabled for this database."));
}
let out = res
.ranges
.into_iter()
.map(|r| {
Ok(ReadRangeOutput {
entries: r
.values
.into_iter()
.map(|e| {
let encoding = e.encoding();
Ok(KvEntry {
key: e.key,
value: decode_value(e.value, encoding)?,
versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?,
})
})
.collect::<Result<_, AnyError>>()?,
})
})
.collect::<Result<Vec<_>, AnyError>>()?;
Ok(out)
}
async fn atomic_write(
&self,
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
if !write.enqueues.is_empty() {
return Err(type_error("Enqueue operations are not supported yet."));
}
let req = pb::AtomicWrite {
kv_checks: write
.checks
.into_iter()
.map(|x| {
Ok(pb::KvCheck {
key: x.key,
versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(),
})
})
.collect::<anyhow::Result<_>>()?,
kv_mutations: write.mutations.into_iter().map(encode_mutation).collect(),
enqueues: vec![],
let permissions = PermissionChecker {
state: state.clone(),
_permissions: PhantomData,
};
let res: pb::AtomicWriteOutput = call_remote::<P, _, _>(
&state,
&self.refresher,
&self.client,
"atomic_write",
&req,
)
.await?;
match res.status() {
pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult {
versionstamp: if res.versionstamp.is_empty() {
Default::default()
} else {
res.versionstamp[..].try_into()?
},
})),
pb::AtomicWriteStatus::AwCheckFailure => Ok(None),
pb::AtomicWriteStatus::AwUnsupportedWrite => {
Err(type_error("Unsupported write"))
}
pb::AtomicWriteStatus::AwUsageLimitExceeded => {
Err(type_error("The database usage limit has been exceeded."))
}
pb::AtomicWriteStatus::AwWriteDisabled => {
// TODO: Auto retry
Err(type_error("Writes are disabled for this database."))
}
pb::AtomicWriteStatus::AwUnspecified => {
Err(type_error("Unspecified error"))
}
pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => {
Err(type_error("Queue backlog limit exceeded"))
}
}
}
let remote = Remote::new(client, permissions, metadata_endpoint);
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
) -> Result<Option<Self::QMH>, AnyError> {
let msg = "Deno.Kv.listenQueue is not supported for remote KV databases";
eprintln!("{}", yellow(msg));
deno_core::futures::future::pending().await
}
fn close(&self) {}
}
fn yellow<S: AsRef<str>>(s: S) -> impl fmt::Display {
if std::env::var_os("NO_COLOR").is_some() {
return String::from(s.as_ref());
}
let mut style_spec = ColorSpec::new();
style_spec.set_fg(Some(Color::Yellow));
let mut v = Vec::new();
let mut ansi_writer = Ansi::new(&mut v);
ansi_writer.set_color(&style_spec).unwrap();
ansi_writer.write_all(s.as_ref().as_bytes()).unwrap();
ansi_writer.reset().unwrap();
String::from_utf8_lossy(&v).into_owned()
}
fn decode_value(
value: Vec<u8>,
encoding: pb::KvValueEncoding,
) -> anyhow::Result<crate::Value> {
match encoding {
pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)),
pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)),
pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes(
<[u8; 8]>::try_from(&value[..])?,
))),
pb::KvValueEncoding::VeUnspecified => {
Err(anyhow::anyhow!("Unspecified value encoding, cannot decode"))
}
}
}
fn encode_value(value: crate::Value) -> pb::KvValue {
match value {
crate::Value::V8(data) => pb::KvValue {
data,
encoding: pb::KvValueEncoding::VeV8 as _,
},
crate::Value::Bytes(data) => pb::KvValue {
data,
encoding: pb::KvValueEncoding::VeBytes as _,
},
crate::Value::U64(x) => pb::KvValue {
data: x.to_le_bytes().to_vec(),
encoding: pb::KvValueEncoding::VeLe64 as _,
},
}
}
fn encode_mutation(m: crate::KvMutation) -> pb::KvMutation {
let key = m.key;
let expire_at_ms =
m.expire_at.and_then(|x| i64::try_from(x).ok()).unwrap_or(0);
match m.kind {
MutationKind::Set(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MSet as _,
expire_at_ms,
},
MutationKind::Delete => pb::KvMutation {
key,
value: Some(encode_value(crate::Value::Bytes(vec![]))),
mutation_type: pb::KvMutationType::MClear as _,
expire_at_ms,
},
MutationKind::Max(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MMax as _,
expire_at_ms,
},
MutationKind::Min(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MMin as _,
expire_at_ms,
},
MutationKind::Sum(x) => pb::KvMutation {
key,
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MSum as _,
expire_at_ms,
},
}
}
#[derive(Clone)]
enum MetadataState {
Ready(Arc<DatabaseMetadata>),
Invalid(String),
Pending,
}
struct MetadataRefresher {
metadata_rx: watch::Receiver<MetadataState>,
handle: JoinHandle<()>,
}
impl MetadataRefresher {
pub fn new(url: String, access_token: String) -> Self {
let (tx, rx) = watch::channel(MetadataState::Pending);
let handle =
deno_core::unsync::spawn(metadata_refresh_task(url, access_token, tx));
Self {
handle,
metadata_rx: rx,
}
}
}
impl Drop for MetadataRefresher {
fn drop(&mut self) {
self.handle.abort();
}
}
async fn metadata_refresh_task(
metadata_url: String,
access_token: String,
tx: watch::Sender<MetadataState>,
) {
let client = reqwest::Client::new();
loop {
let mut attempt = 0u64;
let metadata = loop {
match fetch_metadata(&client, &metadata_url, &access_token).await {
Ok(Ok(x)) => break x,
Ok(Err(e)) => {
if tx.send(MetadataState::Invalid(e)).is_err() {
return;
}
}
Err(e) => {
log::error!("Failed to fetch database metadata: {}", e);
}
}
randomized_exponential_backoff(Duration::from_secs(5), attempt).await;
attempt += 1;
};
let ms_until_expire = u64::try_from(
metadata
.expires_at
.timestamp_millis()
.saturating_sub(crate::time::utc_now().timestamp_millis()),
)
.unwrap_or_default();
// Refresh 10 minutes before expiry
// In case of buggy clocks, don't refresh more than once per minute
let interval = Duration::from_millis(ms_until_expire)
.saturating_sub(Duration::from_secs(600))
.max(Duration::from_secs(60));
if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() {
return;
}
tokio::time::sleep(interval).await;
}
}
async fn fetch_metadata(
client: &reqwest::Client,
metadata_url: &str,
access_token: &str,
) -> anyhow::Result<Result<DatabaseMetadata, String>> {
let res = client
.post(metadata_url)
.header("authorization", format!("Bearer {}", access_token))
.send()
.await?;
if !res.status().is_success() {
if res.status().is_client_error() {
return Ok(Err(format!(
"Client error while fetching metadata: {:?} {}",
res.status(),
res.text().await?
)));
} else {
anyhow::bail!(
"remote returned error: {:?} {}",
res.status(),
res.text().await?
);
}
}
let res = res.bytes().await?;
let version_info: VersionInfo = match serde_json::from_slice(&res) {
Ok(x) => x,
Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))),
};
if version_info.version > 1 {
return Ok(Err(format!(
"Unsupported metadata version: {}",
version_info.version
)));
}
Ok(
serde_json::from_slice(&res)
.map_err(|e| format!("Failed to decode metadata: {}", e)),
)
}
async fn randomized_exponential_backoff(base: Duration, attempt: u64) {
let attempt = attempt.min(12);
let delay = base.as_millis() as u64 + (2 << attempt);
let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1);
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
}
async fn call_remote<
P: RemoteDbHandlerPermissions + 'static,
T: Message,
R: Message + Default,
>(
state: &RefCell<OpState>,
refresher: &MetadataRefresher,
client: &reqwest::Client,
method: &str,
req: &T,
) -> anyhow::Result<R> {
let mut attempt = 0u64;
let res = loop {
let mut metadata_rx = refresher.metadata_rx.clone();
let metadata = loop {
match &*metadata_rx.borrow() {
MetadataState::Pending => {}
MetadataState::Ready(x) => break x.clone(),
MetadataState::Invalid(e) => {
return Err(type_error(format!("Metadata error: {}", e)))
}
}
// `unwrap()` never fails because `tx` is owned by the task held by `refresher`.
metadata_rx.changed().await.unwrap();
};
let Some(sc_endpoint) = metadata
.endpoints
.iter()
.find(|x| x.consistency == "strong")
else {
return Err(type_error(
"No strong consistency endpoint is available for this database",
));
};
let full_url = format!("{}/{}", sc_endpoint.url, method);
{
let parsed_url = Url::parse(&full_url)?;
let mut state = state.borrow_mut();
let permissions = state.borrow_mut::<P>();
permissions.check_net_url(&parsed_url, "Deno.Kv")?;
}
let res = client
.post(&full_url)
.header("x-transaction-domain-id", metadata.database_id.to_string())
.header("authorization", format!("Bearer {}", metadata.token))
.body(req.encode_to_vec())
.send()
.map_err(anyhow::Error::from)
.and_then(|x| async move {
if x.status().is_success() {
Ok(Ok(x.bytes().await?))
} else if x.status().is_client_error() {
Ok(Err((x.status(), x.text().await?)))
} else {
Err(anyhow::anyhow!(
"server error ({:?}): {}",
x.status(),
x.text().await?
))
}
})
.await;
match res {
Ok(x) => break x,
Err(e) => {
log::error!("retryable error in {}: {}", method, e);
randomized_exponential_backoff(Duration::from_millis(0), attempt).await;
attempt += 1;
}
}
};
let res = match res {
Ok(x) => x,
Err((status, message)) => {
return Err(type_error(format!(
"client error in {} (status {:?}): {}",
method, status, message
)))
}
};
match R::decode(&*res) {
Ok(x) => Ok(x),
Err(e) => Err(type_error(format!(
"failed to decode response from {}: {}",
method, e
))),
Ok(remote)
}
}

File diff suppressed because it is too large Load diff

View file

@ -222,7 +222,7 @@ mod startup_snapshot {
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(deno_kv::sqlite::SqliteDbHandler::<
Permissions,
>::new(None)),
>::new(None, None)),
deno_napi::deno_napi::init_ops_and_esm::<Permissions>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Default::default()),

View file

@ -211,6 +211,10 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
e.downcast_ref::<url::ParseError>()
.map(get_url_parse_error_class)
})
.or_else(|| {
e.downcast_ref::<deno_kv::sqlite::TypeError>()
.map(|_| "TypeError")
})
.or_else(|| {
#[cfg(unix)]
let maybe_get_nix_error_class =

View file

@ -436,7 +436,19 @@ impl WebWorker {
),
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(
MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(None),
MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
None,
options.seed,
deno_kv::remote::HttpOptions {
user_agent: options.bootstrap.user_agent.clone(),
root_cert_store_provider: options.root_cert_store_provider.clone(),
unsafely_ignore_certificate_errors: options
.unsafely_ignore_certificate_errors
.clone(),
client_cert_chain_and_key: None,
proxy: None,
},
),
),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),

View file

@ -261,6 +261,16 @@ impl MainWorker {
deno_kv::deno_kv::init_ops_and_esm(
MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
options.origin_storage_dir.clone(),
options.seed,
deno_kv::remote::HttpOptions {
user_agent: options.bootstrap.user_agent.clone(),
root_cert_store_provider: options.root_cert_store_provider.clone(),
unsafely_ignore_certificate_errors: options
.unsafely_ignore_certificate_errors
.clone(),
client_cert_chain_and_key: None,
proxy: None,
},
),
),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),

View file

@ -19,6 +19,7 @@ async-stream = "0.3.3"
base64.workspace = true
bytes.workspace = true
console_static_text.workspace = true
denokv_proto.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] }
flate2.workspace = true
futures.workspace = true

View file

@ -1,22 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::env;
use std::io;
use std::path::PathBuf;
fn main() -> io::Result<()> {
println!("cargo:rerun-if-changed=../ext/kv/proto");
let descriptor_path =
PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
prost_build::Config::new()
.file_descriptor_set_path(&descriptor_path)
.compile_well_known_types()
.compile_protos(
&["../ext/kv/proto/datapath.proto"],
&["../ext/kv/proto/"],
)?;
Ok(())
}

View file

@ -1,7 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Generated code, disable lints
#[allow(clippy::all, non_snake_case)]
pub mod datapath {
include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
}

View file

@ -4,6 +4,12 @@
use anyhow::anyhow;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use denokv_proto::datapath::AtomicWrite;
use denokv_proto::datapath::AtomicWriteOutput;
use denokv_proto::datapath::AtomicWriteStatus;
use denokv_proto::datapath::ReadRangeOutput;
use denokv_proto::datapath::SnapshotRead;
use denokv_proto::datapath::SnapshotReadOutput;
use futures::Future;
use futures::FutureExt;
use futures::Stream;
@ -18,12 +24,6 @@ use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use kv_remote::datapath::AtomicWrite;
use kv_remote::datapath::AtomicWriteOutput;
use kv_remote::datapath::AtomicWriteStatus;
use kv_remote::datapath::ReadRangeOutput;
use kv_remote::datapath::SnapshotRead;
use kv_remote::datapath::SnapshotReadOutput;
use npm::CUSTOM_NPM_PACKAGE_CACHE;
use once_cell::sync::Lazy;
use pretty_assertions::assert_eq;
@ -70,7 +70,6 @@ pub mod assertions;
mod builders;
pub mod factory;
mod fs;
mod kv_remote;
pub mod lsp;
mod npm;
pub mod pty;
@ -1206,7 +1205,7 @@ async fn main_server(
.header("content-type", "application/json")
.body(Body::from(
serde_json::json!({
"version": 2,
"version": 1000,
"databaseId": KV_DATABASE_ID,
"endpoints": [
{
@ -1268,9 +1267,7 @@ async fn main_server(
.map(|_| ReadRangeOutput { values: vec![] })
.collect(),
read_disabled: false,
regions_if_read_disabled: vec![],
read_is_strongly_consistent: true,
primary_if_not_strongly_consistent: "".into(),
}
.encode_to_vec(),
))
@ -1311,7 +1308,7 @@ async fn main_server(
AtomicWriteOutput {
status: AtomicWriteStatus::AwSuccess.into(),
versionstamp: vec![0u8; 10],
primary_if_write_disabled: "".into(),
failed_checks: vec![],
}
.encode_to_vec(),
))