From 669b6fd69248b4c0db9423b5d9df3f8fbeca4cb4 Mon Sep 17 00:00:00 2001
From: Igor Zinkovsky <igor@deno.com>
Date: Sat, 26 Aug 2023 18:26:09 -0700
Subject: [PATCH] fix(kv) increase number of allowed mutations in atomic
 (#20126)

fixes #19741

Impose a limit on the total atomic payload size
---
 cli/tests/unit/kv_test.ts | 54 +++++++++++++++++++++++++++++++++++++--
 ext/kv/lib.rs             | 32 +++++++++++++++--------
 2 files changed, 73 insertions(+), 13 deletions(-)

diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index acda9a0e2e..de764cf805 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -1214,6 +1214,28 @@ dbTest("operation size limit", async (db) => {
     "too many checks (max 10)",
   );
 
+  const validMutateKeys: Deno.KvKey[] = new Array(1000).fill(0).map((
+    _,
+    i,
+  ) => ["a", i]);
+  const invalidMutateKeys: Deno.KvKey[] = new Array(1001).fill(0).map((
+    _,
+    i,
+  ) => ["a", i]);
+
+  const res4 = await db.atomic()
+    .check(...lastValidKeys.map((key) => ({
+      key,
+      versionstamp: null,
+    })))
+    .mutate(...validMutateKeys.map((key) => ({
+      key,
+      type: "set",
+      value: 1,
+    } satisfies Deno.KvMutation)))
+    .commit();
+  assert(res4);
+
   await assertRejects(
     async () => {
       await db.atomic()
@@ -1221,7 +1243,7 @@ dbTest("operation size limit", async (db) => {
           key,
           versionstamp: null,
         })))
-        .mutate(...firstInvalidKeys.map((key) => ({
+        .mutate(...invalidMutateKeys.map((key) => ({
           key,
           type: "set",
           value: 1,
@@ -1229,7 +1251,35 @@ dbTest("operation size limit", async (db) => {
         .commit();
     },
     TypeError,
-    "too many mutations (max 10)",
+    "too many mutations (max 1000)",
+  );
+});
+
+dbTest("total mutation size limit", async (db) => {
+  const keys: Deno.KvKey[] = new Array(1000).fill(0).map((
+    _,
+    i,
+  ) => ["a", i]);
+
+  const atomic = db.atomic();
+  for (const key of keys) {
+    atomic.set(key, "foo");
+  }
+  const res = await atomic.commit();
+  assert(res);
+
+  // Use bigger values to trigger "total mutation size too large" error
+  await assertRejects(
+    async () => {
+      const value = new Array(3000).fill("a").join("");
+      const atomic = db.atomic();
+      for (const key of keys) {
+        atomic.set(key, value);
+      }
+      await atomic.commit();
+    },
+    TypeError,
+    "total mutation size too large (max 819200 bytes)",
   );
 });
 
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index f226b11ae7..ab78fe4c34 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -38,7 +38,8 @@ const MAX_VALUE_SIZE_BYTES: usize = 65536;
 const MAX_READ_RANGES: usize = 10;
 const MAX_READ_ENTRIES: usize = 1000;
 const MAX_CHECKS: usize = 10;
-const MAX_MUTATIONS: usize = 10;
+const MAX_MUTATIONS: usize = 1000;
+const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 819200;
 
 struct UnstableChecker {
   pub unstable: bool,
@@ -638,6 +639,8 @@ where
     .collect::<Result<Vec<Enqueue>, AnyError>>()
     .with_context(|| "invalid enqueue")?;
 
+  let mut total_payload_size = 0usize;
+
   for key in checks
     .iter()
     .map(|c| &c.key)
@@ -647,15 +650,22 @@ where
       return Err(type_error("key cannot be empty"));
     }
 
-    check_write_key_size(key)?;
+    total_payload_size += check_write_key_size(key)?;
   }
 
   for value in mutations.iter().flat_map(|m| m.kind.value()) {
-    check_value_size(value)?;
+    total_payload_size += check_value_size(value)?;
   }
 
   for enqueue in &enqueues {
-    check_enqueue_payload_size(&enqueue.payload)?;
+    total_payload_size += check_enqueue_payload_size(&enqueue.payload)?;
+  }
+
+  if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES {
+    return Err(type_error(format!(
+      "total mutation size too large (max {} bytes)",
+      MAX_TOTAL_MUTATION_SIZE_BYTES
+    )));
   }
 
   let atomic_write = AtomicWrite {
@@ -694,22 +704,22 @@ fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> {
   }
 }
 
-fn check_write_key_size(key: &[u8]) -> Result<(), AnyError> {
+fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> {
   if key.len() > MAX_WRITE_KEY_SIZE_BYTES {
     Err(type_error(format!(
       "key too large for write (max {} bytes)",
       MAX_WRITE_KEY_SIZE_BYTES
     )))
   } else {
-    Ok(())
+    Ok(key.len())
   }
 }
 
-fn check_value_size(value: &Value) -> Result<(), AnyError> {
+fn check_value_size(value: &Value) -> Result<usize, AnyError> {
   let payload = match value {
     Value::Bytes(x) => x,
     Value::V8(x) => x,
-    Value::U64(_) => return Ok(()),
+    Value::U64(_) => return Ok(8),
   };
 
   if payload.len() > MAX_VALUE_SIZE_BYTES {
@@ -718,17 +728,17 @@ fn check_value_size(value: &Value) -> Result<(), AnyError> {
       MAX_VALUE_SIZE_BYTES
     )))
   } else {
-    Ok(())
+    Ok(payload.len())
   }
 }
 
-fn check_enqueue_payload_size(payload: &[u8]) -> Result<(), AnyError> {
+fn check_enqueue_payload_size(payload: &[u8]) -> Result<usize, AnyError> {
   if payload.len() > MAX_VALUE_SIZE_BYTES {
     Err(type_error(format!(
       "enqueue payload too large (max {} bytes)",
       MAX_VALUE_SIZE_BYTES
     )))
   } else {
-    Ok(())
+    Ok(payload.len())
   }
 }