// Copyright 2018-2025 the Deno authors. MIT license. use std::cell::RefCell; use std::rc::Rc; use async_trait::async_trait; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::OpState; use denokv_proto::CommitResult; use denokv_proto::ReadRangeOutput; use denokv_proto::WatchStream; use crate::remote::RemoteDbHandlerPermissions; use crate::sqlite::SqliteDbHandler; use crate::sqlite::SqliteDbHandlerPermissions; use crate::AtomicWrite; use crate::Database; use crate::DatabaseHandler; use crate::QueueMessageHandle; use crate::ReadRange; use crate::SnapshotReadOptions; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box)>, } impl MultiBackendDbHandler { pub fn new( backends: Vec<(&'static [&'static str], Box)>, ) -> Self { Self { backends } } pub fn remote_or_sqlite< P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static, >( default_storage_dir: Option, versionstamp_rng_seed: Option, http_options: crate::remote::HttpOptions, ) -> Self { Self::new(vec![ ( &["https://", "http://"], Box::new(crate::remote::RemoteDbHandler::

::new(http_options)), ), ( &[""], Box::new(SqliteDbHandler::

::new( default_storage_dir, versionstamp_rng_seed, )), ), ]) } } #[async_trait(?Send)] impl DatabaseHandler for MultiBackendDbHandler { type DB = RcDynamicDb; async fn open( &self, state: Rc>, path: Option, ) -> Result { for (prefixes, handler) in &self.backends { for &prefix in *prefixes { if prefix.is_empty() { return handler.dyn_open(state.clone(), path.clone()).await; } let Some(path) = &path else { continue; }; if path.starts_with(prefix) { return handler.dyn_open(state.clone(), Some(path.clone())).await; } } } Err(type_error(format!( "No backend supports the given path: {:?}", path ))) } } #[async_trait(?Send)] pub trait DynamicDbHandler { async fn dyn_open( &self, state: Rc>, path: Option, ) -> Result; } #[async_trait(?Send)] impl DatabaseHandler for Box { type DB = RcDynamicDb; async fn open( &self, state: Rc>, path: Option, ) -> Result { (**self).dyn_open(state, path).await } } #[async_trait(?Send)] impl DynamicDbHandler for T where T: DatabaseHandler, DB: Database + 'static, { async fn dyn_open( &self, state: Rc>, path: Option, ) -> Result { Ok(RcDynamicDb(Rc::new(self.open(state, path).await?))) } } #[async_trait(?Send)] pub trait DynamicDb { async fn dyn_snapshot_read( &self, requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError>; async fn dyn_atomic_write( &self, write: AtomicWrite, ) -> Result, AnyError>; async fn dyn_dequeue_next_message( &self, ) -> Result>, AnyError>; fn dyn_watch(&self, keys: Vec>) -> WatchStream; fn dyn_close(&self); } #[derive(Clone)] pub struct RcDynamicDb(Rc); #[async_trait(?Send)] impl Database for RcDynamicDb { type QMH = Box; async fn snapshot_read( &self, requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError> { (*self.0).dyn_snapshot_read(requests, options).await } async fn atomic_write( &self, write: AtomicWrite, ) -> Result, AnyError> { (*self.0).dyn_atomic_write(write).await } async fn dequeue_next_message( &self, ) -> Result>, AnyError> { (*self.0).dyn_dequeue_next_message().await } fn watch(&self, keys: Vec>) -> WatchStream { (*self.0).dyn_watch(keys) } fn close(&self) { (*self.0).dyn_close() } } #[async_trait(?Send)] impl DynamicDb for T where T: Database, QMH: QueueMessageHandle + 'static, { async fn dyn_snapshot_read( &self, requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError> { Ok(self.snapshot_read(requests, options).await?) } async fn dyn_atomic_write( &self, write: AtomicWrite, ) -> Result, AnyError> { Ok(self.atomic_write(write).await?) } async fn dyn_dequeue_next_message( &self, ) -> Result>, AnyError> { Ok( self .dequeue_next_message() .await? .map(|x| Box::new(x) as Box), ) } fn dyn_watch(&self, keys: Vec>) -> WatchStream { self.watch(keys) } fn dyn_close(&self) { self.close() } }