diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 63be1281b4..80d230ab15 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -1,6 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::borrow::Cow; +use std::cell::Cell; use std::cell::RefCell; use std::marker::PhantomData; use std::path::Path; @@ -10,6 +11,8 @@ use std::rc::Rc; use async_trait::async_trait; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::task::spawn_blocking; +use deno_core::AsyncRefCell; use deno_core::OpState; use rusqlite::params; use rusqlite::OpenFlags; @@ -112,11 +115,9 @@ impl DatabaseHandler for SqliteDbHandler

{ state: Rc>, path: Option, ) -> Result { - let conn = match (path.as_deref(), &self.default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - rusqlite::Connection::open_in_memory()? - } - (Some(path), _) => { + // Validate path + if let Some(path) = &path { + if path != ":memory:" { if path.is_empty() { return Err(type_error("Filename cannot be empty")); } @@ -132,44 +133,92 @@ impl DatabaseHandler for SqliteDbHandler

{ permissions.check_read(path, "Deno.openKv")?; permissions.check_write(path, "Deno.openKv")?; } - let flags = OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - rusqlite::Connection::open_with_flags(path, flags)? - } - (None, Some(path)) => { - std::fs::create_dir_all(path)?; - let path = path.join("kv.sqlite3"); - rusqlite::Connection::open(&path)? - } - }; - - conn.pragma_update(None, "journal_mode", "wal")?; - conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; - - let current_version: usize = conn - .query_row( - "select version from migration_state where k = 0", - [], - |row| row.get(0), - ) - .optional()? - .unwrap_or(0); - - for (i, migration) in MIGRATIONS.iter().enumerate() { - let version = i + 1; - if version > current_version { - conn.execute_batch(migration)?; - conn.execute( - "replace into migration_state (k, version) values(?, ?)", - [&0, &version], - )?; } } - Ok(SqliteDb(RefCell::new(conn))) + let default_storage_dir = self.default_storage_dir.clone(); + let conn = spawn_blocking(move || { + let conn = match (path.as_deref(), &default_storage_dir) { + (Some(":memory:"), _) | (None, None) => { + rusqlite::Connection::open_in_memory()? + } + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + rusqlite::Connection::open_with_flags(path, flags)? + } + (None, Some(path)) => { + std::fs::create_dir_all(path)?; + let path = path.join("kv.sqlite3"); + rusqlite::Connection::open(&path)? + } + }; + + conn.pragma_update(None, "journal_mode", "wal")?; + conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; + + let current_version: usize = conn + .query_row( + "select version from migration_state where k = 0", + [], + |row| row.get(0), + ) + .optional()? + .unwrap_or(0); + + for (i, migration) in MIGRATIONS.iter().enumerate() { + let version = i + 1; + if version > current_version { + conn.execute_batch(migration)?; + conn.execute( + "replace into migration_state (k, version) values(?, ?)", + [&0, &version], + )?; + } + } + + Ok::<_, AnyError>(conn) + }) + .await + .unwrap()?; + + Ok(SqliteDb(Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))))) } } -pub struct SqliteDb(RefCell); +pub struct SqliteDb(Rc>>>); + +impl SqliteDb { + async fn run_tx(&self, f: F) -> Result + where + F: (FnOnce(rusqlite::Transaction<'_>) -> Result) + + Send + + 'static, + R: Send + 'static, + { + // Transactions need exclusive access to the connection. Wait until + // we can borrow_mut the connection. + let cell = self.0.borrow_mut().await; + + // Take the db out of the cell and run the transaction via spawn_blocking. + let mut db = cell.take().unwrap(); + let (result, db) = spawn_blocking(move || { + let result = { + match db.transaction() { + Ok(tx) => f(tx), + Err(e) => Err(e.into()), + } + }; + (result, db) + }) + .await + .unwrap(); + + // Put the db back into the cell. + cell.set(Some(db)); + result + } +} #[async_trait(?Send)] impl Database for SqliteDb { @@ -178,110 +227,126 @@ impl Database for SqliteDb { requests: Vec, _options: SnapshotReadOptions, ) -> Result, AnyError> { - let mut responses = Vec::with_capacity(requests.len()); - let mut db = self.0.borrow_mut(); - let tx = db.transaction()?; + self + .run_tx(move |tx| { + let mut responses = Vec::with_capacity(requests.len()); + for request in requests { + let mut stmt = tx.prepare_cached(if request.reverse { + STATEMENT_KV_RANGE_SCAN_REVERSE + } else { + STATEMENT_KV_RANGE_SCAN + })?; + let entries = stmt + .query_map( + ( + request.start.as_slice(), + request.end.as_slice(), + request.limit.get(), + ), + |row| { + let key: Vec = row.get(0)?; + let value: Vec = row.get(1)?; + let encoding: i64 = row.get(2)?; - for request in requests { - let mut stmt = tx.prepare_cached(if request.reverse { - STATEMENT_KV_RANGE_SCAN_REVERSE - } else { - STATEMENT_KV_RANGE_SCAN - })?; - let entries = stmt - .query_map( - ( - request.start.as_slice(), - request.end.as_slice(), - request.limit.get(), - ), - |row| { - let key: Vec = row.get(0)?; - let value: Vec = row.get(1)?; - let encoding: i64 = row.get(2)?; + let value = decode_value(value, encoding); - let value = decode_value(value, encoding); + let version: i64 = row.get(3)?; + Ok(KvEntry { + key, + value, + versionstamp: version_to_versionstamp(version), + }) + }, + )? + .collect::, rusqlite::Error>>()?; + responses.push(ReadRangeOutput { entries }); + } - let version: i64 = row.get(3)?; - Ok(KvEntry { - key, - value, - versionstamp: version_to_versionstamp(version), - }) - }, - )? - .collect::, rusqlite::Error>>()?; - responses.push(ReadRangeOutput { entries }); - } - - Ok(responses) + Ok(responses) + }) + .await } async fn atomic_write( &self, write: AtomicWrite, ) -> Result, AnyError> { - let mut db = self.0.borrow_mut(); - - let tx = db.transaction()?; - - for check in write.checks { - let real_versionstamp = tx - .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? - .query_row([check.key.as_slice()], |row| row.get(0)) - .optional()? - .map(version_to_versionstamp); - if real_versionstamp != check.versionstamp { - return Ok(None); - } - } - - let version: i64 = tx - .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? - .query_row([], |row| row.get(0))?; - - for mutation in write.mutations { - match mutation.kind { - MutationKind::Set(value) => { - let (value, encoding) = encode_value(&value); - let changed = tx - .prepare_cached(STATEMENT_KV_POINT_SET)? - .execute(params![mutation.key, &value, &encoding, &version])?; - assert_eq!(changed, 1) + self + .run_tx(move |tx| { + for check in write.checks { + let real_versionstamp = tx + .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? + .query_row([check.key.as_slice()], |row| row.get(0)) + .optional()? + .map(version_to_versionstamp); + if real_versionstamp != check.versionstamp { + return Ok(None); + } } - MutationKind::Delete => { - let changed = tx - .prepare_cached(STATEMENT_KV_POINT_DELETE)? - .execute(params![mutation.key])?; - assert!(changed == 0 || changed == 1) - } - MutationKind::Sum(operand) => { - mutate_le64(&tx, &mutation.key, "sum", &operand, version, |a, b| { - a.wrapping_add(b) - })?; - } - MutationKind::Min(operand) => { - mutate_le64(&tx, &mutation.key, "min", &operand, version, |a, b| { - a.min(b) - })?; - } - MutationKind::Max(operand) => { - mutate_le64(&tx, &mutation.key, "max", &operand, version, |a, b| { - a.max(b) - })?; - } - } - } - // TODO(@losfair): enqueues + let version: i64 = tx + .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? + .query_row([], |row| row.get(0))?; - tx.commit()?; + for mutation in write.mutations { + match mutation.kind { + MutationKind::Set(value) => { + let (value, encoding) = encode_value(&value); + let changed = tx + .prepare_cached(STATEMENT_KV_POINT_SET)? + .execute(params![mutation.key, &value, &encoding, &version])?; + assert_eq!(changed, 1) + } + MutationKind::Delete => { + let changed = tx + .prepare_cached(STATEMENT_KV_POINT_DELETE)? + .execute(params![mutation.key])?; + assert!(changed == 0 || changed == 1) + } + MutationKind::Sum(operand) => { + mutate_le64( + &tx, + &mutation.key, + "sum", + &operand, + version, + |a, b| a.wrapping_add(b), + )?; + } + MutationKind::Min(operand) => { + mutate_le64( + &tx, + &mutation.key, + "min", + &operand, + version, + |a, b| a.min(b), + )?; + } + MutationKind::Max(operand) => { + mutate_le64( + &tx, + &mutation.key, + "max", + &operand, + version, + |a, b| a.max(b), + )?; + } + } + } - let new_vesionstamp = version_to_versionstamp(version); + // TODO(@losfair): enqueues - Ok(Some(CommitResult { - versionstamp: new_vesionstamp, - })) + tx.commit()?; + + let new_vesionstamp = version_to_versionstamp(version); + + Ok(Some(CommitResult { + versionstamp: new_vesionstamp, + })) + }) + .await } }