mirror of
https://github.com/denoland/deno.git
synced 2025-02-08 07:16:56 -05:00
fix(kv) increase number of allowed mutations in atomic (#20126)
fixes #19741 Impose a limit on the total atomic payload size
This commit is contained in:
parent
5fc403b91a
commit
669b6fd692
2 changed files with 73 additions and 13 deletions
|
@ -1214,6 +1214,28 @@ dbTest("operation size limit", async (db) => {
|
||||||
"too many checks (max 10)",
|
"too many checks (max 10)",
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const validMutateKeys: Deno.KvKey[] = new Array(1000).fill(0).map((
|
||||||
|
_,
|
||||||
|
i,
|
||||||
|
) => ["a", i]);
|
||||||
|
const invalidMutateKeys: Deno.KvKey[] = new Array(1001).fill(0).map((
|
||||||
|
_,
|
||||||
|
i,
|
||||||
|
) => ["a", i]);
|
||||||
|
|
||||||
|
const res4 = await db.atomic()
|
||||||
|
.check(...lastValidKeys.map((key) => ({
|
||||||
|
key,
|
||||||
|
versionstamp: null,
|
||||||
|
})))
|
||||||
|
.mutate(...validMutateKeys.map((key) => ({
|
||||||
|
key,
|
||||||
|
type: "set",
|
||||||
|
value: 1,
|
||||||
|
} satisfies Deno.KvMutation)))
|
||||||
|
.commit();
|
||||||
|
assert(res4);
|
||||||
|
|
||||||
await assertRejects(
|
await assertRejects(
|
||||||
async () => {
|
async () => {
|
||||||
await db.atomic()
|
await db.atomic()
|
||||||
|
@ -1221,7 +1243,7 @@ dbTest("operation size limit", async (db) => {
|
||||||
key,
|
key,
|
||||||
versionstamp: null,
|
versionstamp: null,
|
||||||
})))
|
})))
|
||||||
.mutate(...firstInvalidKeys.map((key) => ({
|
.mutate(...invalidMutateKeys.map((key) => ({
|
||||||
key,
|
key,
|
||||||
type: "set",
|
type: "set",
|
||||||
value: 1,
|
value: 1,
|
||||||
|
@ -1229,7 +1251,35 @@ dbTest("operation size limit", async (db) => {
|
||||||
.commit();
|
.commit();
|
||||||
},
|
},
|
||||||
TypeError,
|
TypeError,
|
||||||
"too many mutations (max 10)",
|
"too many mutations (max 1000)",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
dbTest("total mutation size limit", async (db) => {
|
||||||
|
const keys: Deno.KvKey[] = new Array(1000).fill(0).map((
|
||||||
|
_,
|
||||||
|
i,
|
||||||
|
) => ["a", i]);
|
||||||
|
|
||||||
|
const atomic = db.atomic();
|
||||||
|
for (const key of keys) {
|
||||||
|
atomic.set(key, "foo");
|
||||||
|
}
|
||||||
|
const res = await atomic.commit();
|
||||||
|
assert(res);
|
||||||
|
|
||||||
|
// Use bigger values to trigger "total mutation size too large" error
|
||||||
|
await assertRejects(
|
||||||
|
async () => {
|
||||||
|
const value = new Array(3000).fill("a").join("");
|
||||||
|
const atomic = db.atomic();
|
||||||
|
for (const key of keys) {
|
||||||
|
atomic.set(key, value);
|
||||||
|
}
|
||||||
|
await atomic.commit();
|
||||||
|
},
|
||||||
|
TypeError,
|
||||||
|
"total mutation size too large (max 819200 bytes)",
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,8 @@ const MAX_VALUE_SIZE_BYTES: usize = 65536;
|
||||||
const MAX_READ_RANGES: usize = 10;
|
const MAX_READ_RANGES: usize = 10;
|
||||||
const MAX_READ_ENTRIES: usize = 1000;
|
const MAX_READ_ENTRIES: usize = 1000;
|
||||||
const MAX_CHECKS: usize = 10;
|
const MAX_CHECKS: usize = 10;
|
||||||
const MAX_MUTATIONS: usize = 10;
|
const MAX_MUTATIONS: usize = 1000;
|
||||||
|
const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 819200;
|
||||||
|
|
||||||
struct UnstableChecker {
|
struct UnstableChecker {
|
||||||
pub unstable: bool,
|
pub unstable: bool,
|
||||||
|
@ -638,6 +639,8 @@ where
|
||||||
.collect::<Result<Vec<Enqueue>, AnyError>>()
|
.collect::<Result<Vec<Enqueue>, AnyError>>()
|
||||||
.with_context(|| "invalid enqueue")?;
|
.with_context(|| "invalid enqueue")?;
|
||||||
|
|
||||||
|
let mut total_payload_size = 0usize;
|
||||||
|
|
||||||
for key in checks
|
for key in checks
|
||||||
.iter()
|
.iter()
|
||||||
.map(|c| &c.key)
|
.map(|c| &c.key)
|
||||||
|
@ -647,15 +650,22 @@ where
|
||||||
return Err(type_error("key cannot be empty"));
|
return Err(type_error("key cannot be empty"));
|
||||||
}
|
}
|
||||||
|
|
||||||
check_write_key_size(key)?;
|
total_payload_size += check_write_key_size(key)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for value in mutations.iter().flat_map(|m| m.kind.value()) {
|
for value in mutations.iter().flat_map(|m| m.kind.value()) {
|
||||||
check_value_size(value)?;
|
total_payload_size += check_value_size(value)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for enqueue in &enqueues {
|
for enqueue in &enqueues {
|
||||||
check_enqueue_payload_size(&enqueue.payload)?;
|
total_payload_size += check_enqueue_payload_size(&enqueue.payload)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES {
|
||||||
|
return Err(type_error(format!(
|
||||||
|
"total mutation size too large (max {} bytes)",
|
||||||
|
MAX_TOTAL_MUTATION_SIZE_BYTES
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let atomic_write = AtomicWrite {
|
let atomic_write = AtomicWrite {
|
||||||
|
@ -694,22 +704,22 @@ fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_write_key_size(key: &[u8]) -> Result<(), AnyError> {
|
fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> {
|
||||||
if key.len() > MAX_WRITE_KEY_SIZE_BYTES {
|
if key.len() > MAX_WRITE_KEY_SIZE_BYTES {
|
||||||
Err(type_error(format!(
|
Err(type_error(format!(
|
||||||
"key too large for write (max {} bytes)",
|
"key too large for write (max {} bytes)",
|
||||||
MAX_WRITE_KEY_SIZE_BYTES
|
MAX_WRITE_KEY_SIZE_BYTES
|
||||||
)))
|
)))
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(key.len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_value_size(value: &Value) -> Result<(), AnyError> {
|
fn check_value_size(value: &Value) -> Result<usize, AnyError> {
|
||||||
let payload = match value {
|
let payload = match value {
|
||||||
Value::Bytes(x) => x,
|
Value::Bytes(x) => x,
|
||||||
Value::V8(x) => x,
|
Value::V8(x) => x,
|
||||||
Value::U64(_) => return Ok(()),
|
Value::U64(_) => return Ok(8),
|
||||||
};
|
};
|
||||||
|
|
||||||
if payload.len() > MAX_VALUE_SIZE_BYTES {
|
if payload.len() > MAX_VALUE_SIZE_BYTES {
|
||||||
|
@ -718,17 +728,17 @@ fn check_value_size(value: &Value) -> Result<(), AnyError> {
|
||||||
MAX_VALUE_SIZE_BYTES
|
MAX_VALUE_SIZE_BYTES
|
||||||
)))
|
)))
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(payload.len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_enqueue_payload_size(payload: &[u8]) -> Result<(), AnyError> {
|
fn check_enqueue_payload_size(payload: &[u8]) -> Result<usize, AnyError> {
|
||||||
if payload.len() > MAX_VALUE_SIZE_BYTES {
|
if payload.len() > MAX_VALUE_SIZE_BYTES {
|
||||||
Err(type_error(format!(
|
Err(type_error(format!(
|
||||||
"enqueue payload too large (max {} bytes)",
|
"enqueue payload too large (max {} bytes)",
|
||||||
MAX_VALUE_SIZE_BYTES
|
MAX_VALUE_SIZE_BYTES
|
||||||
)))
|
)))
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(payload.len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue