0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

fix(kv) run sqlite transactions via spawn_blocking (#19350)

`rusqlite` does not support async operations; with this PR SQLite
operations will run through `spawn_blocking` to ensure that the event
loop does not get blocked.

There is still only a single SQLite connection. So all operations will
do an async wait on the connection. In the future we can add a
connection pool if needed.
This commit is contained in:
Igor Zinkovsky 2023-06-02 11:12:26 -07:00 committed by GitHub
parent 98320ff1f8
commit ce5bf9fb2a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::borrow::Cow;
use std::cell::Cell;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::path::Path;
@ -10,6 +11,8 @@ use std::rc::Rc;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::task::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::OpState;
use rusqlite::params;
use rusqlite::OpenFlags;
@ -112,11 +115,9 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError> {
let conn = match (path.as_deref(), &self.default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
rusqlite::Connection::open_in_memory()?
}
(Some(path), _) => {
// Validate path
if let Some(path) = &path {
if path != ":memory:" {
if path.is_empty() {
return Err(type_error("Filename cannot be empty"));
}
@ -132,44 +133,92 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
permissions.check_read(path, "Deno.openKv")?;
permissions.check_write(path, "Deno.openKv")?;
}
let flags = OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
rusqlite::Connection::open_with_flags(path, flags)?
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
rusqlite::Connection::open(&path)?
}
};
conn.pragma_update(None, "journal_mode", "wal")?;
conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
let current_version: usize = conn
.query_row(
"select version from migration_state where k = 0",
[],
|row| row.get(0),
)
.optional()?
.unwrap_or(0);
for (i, migration) in MIGRATIONS.iter().enumerate() {
let version = i + 1;
if version > current_version {
conn.execute_batch(migration)?;
conn.execute(
"replace into migration_state (k, version) values(?, ?)",
[&0, &version],
)?;
}
}
Ok(SqliteDb(RefCell::new(conn)))
let default_storage_dir = self.default_storage_dir.clone();
let conn = spawn_blocking(move || {
let conn = match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
rusqlite::Connection::open_in_memory()?
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
rusqlite::Connection::open_with_flags(path, flags)?
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
rusqlite::Connection::open(&path)?
}
};
conn.pragma_update(None, "journal_mode", "wal")?;
conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
let current_version: usize = conn
.query_row(
"select version from migration_state where k = 0",
[],
|row| row.get(0),
)
.optional()?
.unwrap_or(0);
for (i, migration) in MIGRATIONS.iter().enumerate() {
let version = i + 1;
if version > current_version {
conn.execute_batch(migration)?;
conn.execute(
"replace into migration_state (k, version) values(?, ?)",
[&0, &version],
)?;
}
}
Ok::<_, AnyError>(conn)
})
.await
.unwrap()?;
Ok(SqliteDb(Rc::new(AsyncRefCell::new(Cell::new(Some(conn))))))
}
}
pub struct SqliteDb(RefCell<rusqlite::Connection>);
pub struct SqliteDb(Rc<AsyncRefCell<Cell<Option<rusqlite::Connection>>>>);
impl SqliteDb {
async fn run_tx<F, R>(&self, f: F) -> Result<R, AnyError>
where
F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
+ Send
+ 'static,
R: Send + 'static,
{
// Transactions need exclusive access to the connection. Wait until
// we can borrow_mut the connection.
let cell = self.0.borrow_mut().await;
// Take the db out of the cell and run the transaction via spawn_blocking.
let mut db = cell.take().unwrap();
let (result, db) = spawn_blocking(move || {
let result = {
match db.transaction() {
Ok(tx) => f(tx),
Err(e) => Err(e.into()),
}
};
(result, db)
})
.await
.unwrap();
// Put the db back into the cell.
cell.set(Some(db));
result
}
}
#[async_trait(?Send)]
impl Database for SqliteDb {
@ -178,110 +227,126 @@ impl Database for SqliteDb {
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
let mut responses = Vec::with_capacity(requests.len());
let mut db = self.0.borrow_mut();
let tx = db.transaction()?;
self
.run_tx(move |tx| {
let mut responses = Vec::with_capacity(requests.len());
for request in requests {
let mut stmt = tx.prepare_cached(if request.reverse {
STATEMENT_KV_RANGE_SCAN_REVERSE
} else {
STATEMENT_KV_RANGE_SCAN
})?;
let entries = stmt
.query_map(
(
request.start.as_slice(),
request.end.as_slice(),
request.limit.get(),
),
|row| {
let key: Vec<u8> = row.get(0)?;
let value: Vec<u8> = row.get(1)?;
let encoding: i64 = row.get(2)?;
for request in requests {
let mut stmt = tx.prepare_cached(if request.reverse {
STATEMENT_KV_RANGE_SCAN_REVERSE
} else {
STATEMENT_KV_RANGE_SCAN
})?;
let entries = stmt
.query_map(
(
request.start.as_slice(),
request.end.as_slice(),
request.limit.get(),
),
|row| {
let key: Vec<u8> = row.get(0)?;
let value: Vec<u8> = row.get(1)?;
let encoding: i64 = row.get(2)?;
let value = decode_value(value, encoding);
let value = decode_value(value, encoding);
let version: i64 = row.get(3)?;
Ok(KvEntry {
key,
value,
versionstamp: version_to_versionstamp(version),
})
},
)?
.collect::<Result<Vec<_>, rusqlite::Error>>()?;
responses.push(ReadRangeOutput { entries });
}
let version: i64 = row.get(3)?;
Ok(KvEntry {
key,
value,
versionstamp: version_to_versionstamp(version),
})
},
)?
.collect::<Result<Vec<_>, rusqlite::Error>>()?;
responses.push(ReadRangeOutput { entries });
}
Ok(responses)
Ok(responses)
})
.await
}
async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let mut db = self.0.borrow_mut();
let tx = db.transaction()?;
for check in write.checks {
let real_versionstamp = tx
.prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
.query_row([check.key.as_slice()], |row| row.get(0))
.optional()?
.map(version_to_versionstamp);
if real_versionstamp != check.versionstamp {
return Ok(None);
}
}
let version: i64 = tx
.prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
.query_row([], |row| row.get(0))?;
for mutation in write.mutations {
match mutation.kind {
MutationKind::Set(value) => {
let (value, encoding) = encode_value(&value);
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_SET)?
.execute(params![mutation.key, &value, &encoding, &version])?;
assert_eq!(changed, 1)
self
.run_tx(move |tx| {
for check in write.checks {
let real_versionstamp = tx
.prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
.query_row([check.key.as_slice()], |row| row.get(0))
.optional()?
.map(version_to_versionstamp);
if real_versionstamp != check.versionstamp {
return Ok(None);
}
}
MutationKind::Delete => {
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_DELETE)?
.execute(params![mutation.key])?;
assert!(changed == 0 || changed == 1)
}
MutationKind::Sum(operand) => {
mutate_le64(&tx, &mutation.key, "sum", &operand, version, |a, b| {
a.wrapping_add(b)
})?;
}
MutationKind::Min(operand) => {
mutate_le64(&tx, &mutation.key, "min", &operand, version, |a, b| {
a.min(b)
})?;
}
MutationKind::Max(operand) => {
mutate_le64(&tx, &mutation.key, "max", &operand, version, |a, b| {
a.max(b)
})?;
}
}
}
// TODO(@losfair): enqueues
let version: i64 = tx
.prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
.query_row([], |row| row.get(0))?;
tx.commit()?;
for mutation in write.mutations {
match mutation.kind {
MutationKind::Set(value) => {
let (value, encoding) = encode_value(&value);
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_SET)?
.execute(params![mutation.key, &value, &encoding, &version])?;
assert_eq!(changed, 1)
}
MutationKind::Delete => {
let changed = tx
.prepare_cached(STATEMENT_KV_POINT_DELETE)?
.execute(params![mutation.key])?;
assert!(changed == 0 || changed == 1)
}
MutationKind::Sum(operand) => {
mutate_le64(
&tx,
&mutation.key,
"sum",
&operand,
version,
|a, b| a.wrapping_add(b),
)?;
}
MutationKind::Min(operand) => {
mutate_le64(
&tx,
&mutation.key,
"min",
&operand,
version,
|a, b| a.min(b),
)?;
}
MutationKind::Max(operand) => {
mutate_le64(
&tx,
&mutation.key,
"max",
&operand,
version,
|a, b| a.max(b),
)?;
}
}
}
let new_vesionstamp = version_to_versionstamp(version);
// TODO(@losfair): enqueues
Ok(Some(CommitResult {
versionstamp: new_vesionstamp,
}))
tx.commit()?;
let new_vesionstamp = version_to_versionstamp(version);
Ok(Some(CommitResult {
versionstamp: new_vesionstamp,
}))
})
.await
}
}