mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 17:34:47 -05:00
fix(ext/node): implement DatabaseSync#applyChangeset()
(#27967)
https://nodejs.org/api/sqlite.html#databaseapplychangesetchangeset-options ```js const sourceDb = new DatabaseSync(':memory:'); const targetDb = new DatabaseSync(':memory:'); sourceDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)'); targetDb.exec('CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)'); const session = sourceDb.createSession(); const insert = sourceDb.prepare('INSERT INTO data (key, value) VALUES (?, ?)'); insert.run(1, 'hello'); insert.run(2, 'world'); const changeset = session.changeset(); targetDb.applyChangeset(changeset); // Now that the changeset has been applied, targetDb contains the same data as sourceDb. ```
This commit is contained in:
parent
bc8554878e
commit
ece384c094
3 changed files with 160 additions and 0 deletions
|
@ -2,11 +2,16 @@
|
|||
|
||||
use std::cell::Cell;
|
||||
use std::cell::RefCell;
|
||||
use std::ffi::c_char;
|
||||
use std::ffi::c_void;
|
||||
use std::ffi::CStr;
|
||||
use std::ffi::CString;
|
||||
use std::ptr::null;
|
||||
use std::rc::Rc;
|
||||
|
||||
use deno_core::op2;
|
||||
use deno_core::serde_v8;
|
||||
use deno_core::v8;
|
||||
use deno_core::GarbageCollected;
|
||||
use deno_core::OpState;
|
||||
use deno_permissions::PermissionsContainer;
|
||||
|
@ -41,6 +46,13 @@ impl Default for DatabaseSyncOptions {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct ApplyChangesetOptions<'a> {
|
||||
filter: Option<serde_v8::Value<'a>>,
|
||||
on_conflict: Option<serde_v8::Value<'a>>,
|
||||
}
|
||||
|
||||
pub struct DatabaseSync {
|
||||
conn: Rc<RefCell<Option<rusqlite::Connection>>>,
|
||||
options: DatabaseSyncOptions,
|
||||
|
@ -197,6 +209,119 @@ impl DatabaseSync {
|
|||
})
|
||||
}
|
||||
|
||||
// Applies a changeset to the database.
|
||||
//
|
||||
// This method is a wrapper around `sqlite3changeset_apply()`.
|
||||
#[reentrant]
|
||||
fn apply_changeset<'a>(
|
||||
&self,
|
||||
scope: &mut v8::HandleScope<'a>,
|
||||
#[buffer] changeset: &[u8],
|
||||
#[serde] options: Option<ApplyChangesetOptions<'a>>,
|
||||
) -> Result<bool, SqliteError> {
|
||||
struct HandlerCtx<'a, 'b> {
|
||||
scope: &'a mut v8::HandleScope<'b>,
|
||||
confict: Option<v8::Local<'b, v8::Function>>,
|
||||
filter: Option<v8::Local<'b, v8::Function>>,
|
||||
}
|
||||
|
||||
// Conflict handler callback for `sqlite3changeset_apply()`.
|
||||
unsafe extern "C" fn conflict_handler(
|
||||
p_ctx: *mut c_void,
|
||||
e_conflict: i32,
|
||||
_: *mut libsqlite3_sys::sqlite3_changeset_iter,
|
||||
) -> i32 {
|
||||
let ctx = &mut *(p_ctx as *mut HandlerCtx);
|
||||
|
||||
if let Some(conflict) = &mut ctx.confict {
|
||||
let recv = v8::undefined(ctx.scope).into();
|
||||
let args = [v8::Integer::new(ctx.scope, e_conflict).into()];
|
||||
|
||||
let ret = conflict.call(ctx.scope, recv, &args).unwrap();
|
||||
return ret
|
||||
.int32_value(ctx.scope)
|
||||
.unwrap_or(libsqlite3_sys::SQLITE_CHANGESET_ABORT);
|
||||
}
|
||||
|
||||
libsqlite3_sys::SQLITE_CHANGESET_ABORT
|
||||
}
|
||||
|
||||
// Filter handler callback for `sqlite3changeset_apply()`.
|
||||
unsafe extern "C" fn filter_handler(
|
||||
p_ctx: *mut c_void,
|
||||
z_tab: *const c_char,
|
||||
) -> i32 {
|
||||
let ctx = &mut *(p_ctx as *mut HandlerCtx);
|
||||
|
||||
if let Some(filter) = &mut ctx.filter {
|
||||
let tab = CStr::from_ptr(z_tab).to_str().unwrap();
|
||||
|
||||
let recv = v8::undefined(ctx.scope).into();
|
||||
let args = [v8::String::new(ctx.scope, tab).unwrap().into()];
|
||||
|
||||
let ret = filter.call(ctx.scope, recv, &args).unwrap();
|
||||
return ret.boolean_value(ctx.scope) as i32;
|
||||
}
|
||||
|
||||
1
|
||||
}
|
||||
|
||||
let db = self.conn.borrow();
|
||||
let db = db.as_ref().ok_or(SqliteError::AlreadyClosed)?;
|
||||
|
||||
// It is safe to use scope in the handlers because they are never
|
||||
// called after the call to `sqlite3changeset_apply()`.
|
||||
let mut ctx = HandlerCtx {
|
||||
scope,
|
||||
confict: None,
|
||||
filter: None,
|
||||
};
|
||||
|
||||
if let Some(options) = options {
|
||||
if let Some(filter) = options.filter {
|
||||
let filter_cb: v8::Local<v8::Function> = filter
|
||||
.v8_value
|
||||
.try_into()
|
||||
.map_err(|_| SqliteError::InvalidCallback("filter"))?;
|
||||
ctx.filter = Some(filter_cb);
|
||||
}
|
||||
|
||||
if let Some(on_conflict) = options.on_conflict {
|
||||
let on_conflict_cb: v8::Local<v8::Function> = on_conflict
|
||||
.v8_value
|
||||
.try_into()
|
||||
.map_err(|_| SqliteError::InvalidCallback("onConflict"))?;
|
||||
ctx.confict = Some(on_conflict_cb);
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: lifetime of the connection is guaranteed by reference
|
||||
// counting.
|
||||
let raw_handle = unsafe { db.handle() };
|
||||
|
||||
// SAFETY: `changeset` points to a valid memory location and its
|
||||
// length is correct. `ctx` is stack allocated and its lifetime is
|
||||
// longer than the call to `sqlite3changeset_apply()`.
|
||||
unsafe {
|
||||
let r = libsqlite3_sys::sqlite3changeset_apply(
|
||||
raw_handle,
|
||||
changeset.len() as i32,
|
||||
changeset.as_ptr() as *mut _,
|
||||
Some(filter_handler),
|
||||
Some(conflict_handler),
|
||||
&mut ctx as *mut _ as *mut c_void,
|
||||
);
|
||||
|
||||
if r == libsqlite3_sys::SQLITE_OK {
|
||||
return Ok(true);
|
||||
} else if r == libsqlite3_sys::SQLITE_ABORT {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
Err(SqliteError::ChangesetApplyFailed)
|
||||
}
|
||||
}
|
||||
|
||||
// Creates and attaches a session to the database.
|
||||
//
|
||||
// This method is a wrapper around `sqlite3session_create()` and
|
||||
|
|
|
@ -58,4 +58,10 @@ pub enum SqliteError {
|
|||
#[class(range)]
|
||||
#[error("The value of column {0} is too large to be represented as a JavaScript number: {1}")]
|
||||
NumberTooLarge(i32, i64),
|
||||
#[class(generic)]
|
||||
#[error("Failed to apply changeset")]
|
||||
ChangesetApplyFailed,
|
||||
#[class(type)]
|
||||
#[error("Invalid callback: {0}")]
|
||||
InvalidCallback(&'static str),
|
||||
}
|
||||
|
|
|
@ -152,3 +152,32 @@ Deno.test({
|
|||
}
|
||||
},
|
||||
});
|
||||
|
||||
Deno.test("[node/sqlite] applyChangeset across databases", () => {
|
||||
const sourceDb = new DatabaseSync(":memory:");
|
||||
const targetDb = new DatabaseSync(":memory:");
|
||||
|
||||
sourceDb.exec("CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)");
|
||||
targetDb.exec("CREATE TABLE data(key INTEGER PRIMARY KEY, value TEXT)");
|
||||
|
||||
const session = sourceDb.createSession();
|
||||
|
||||
const insert = sourceDb.prepare(
|
||||
"INSERT INTO data (key, value) VALUES (?, ?)",
|
||||
);
|
||||
insert.run(1, "hello");
|
||||
insert.run(2, "world");
|
||||
|
||||
const changeset = session.changeset();
|
||||
targetDb.applyChangeset(changeset, {
|
||||
filter(e) {
|
||||
return e === "data";
|
||||
},
|
||||
});
|
||||
|
||||
const stmt = targetDb.prepare("SELECT * FROM data");
|
||||
assertEquals(stmt.all(), [
|
||||
{ key: 1, value: "hello", __proto__: null },
|
||||
{ key: 2, value: "world", __proto__: null },
|
||||
]);
|
||||
});
|
||||
|
|
Loading…
Add table
Reference in a new issue