diff --git a/Cargo.lock b/Cargo.lock index 7a6108b2e7..23ddcb6772 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1447,14 +1447,25 @@ dependencies = [ name = "deno_cache" version = "0.118.0" dependencies = [ + "anyhow", + "async-stream", "async-trait", + "base64 0.21.7", + "bytes", + "chrono", "deno_core", "deno_error", + "futures", + "http 1.1.0", + "hyper 1.4.1", + "reqwest", "rusqlite", "serde", "sha2", + "slab", "thiserror 2.0.3", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4ee2abe993..62e87eb791 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ node_resolver = { version = "0.24.0", path = "./resolvers/node" } aes = "=0.8.3" anyhow = "1.0.57" +async-stream = "0.3" async-trait = "0.1.73" base32 = "=0.5.1" base64 = "0.21.7" diff --git a/ext/cache/Cargo.toml b/ext/cache/Cargo.toml index 4d15c8861b..3ddd30fb69 100644 --- a/ext/cache/Cargo.toml +++ b/ext/cache/Cargo.toml @@ -14,11 +14,22 @@ description = "Implementation of Cache API for Deno" path = "lib.rs" [dependencies] +anyhow.workspace = true +async-stream.workspace = true async-trait.workspace = true +base64.workspace = true +bytes.workspace = true +chrono.workspace = true deno_core.workspace = true deno_error.workspace = true +futures.workspace = true +http.workspace = true +hyper.workspace = true +reqwest.workspace = true rusqlite.workspace = true serde.workspace = true sha2.workspace = true +slab.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util.workspace = true diff --git a/ext/cache/lib.rs b/ext/cache/lib.rs index d3bfe23def..1a825c2ed8 100644 --- a/ext/cache/lib.rs +++ b/ext/cache/lib.rs @@ -1,28 +1,57 @@ // Copyright 2018-2025 the Deno authors. MIT license. +use std::borrow::Cow; use std::cell::RefCell; use std::path::PathBuf; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use deno_core::op2; use deno_core::serde::Deserialize; use deno_core::serde::Serialize; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::ByteString; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; use deno_error::JsErrorBox; +use futures::Stream; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +mod lsc_shard; +mod lscache; mod sqlite; + +pub use lsc_shard::CacheShard; +pub use lscache::LscBackend; pub use sqlite::SqliteBackedCache; +use tokio_util::io::StreamReader; #[derive(Debug, thiserror::Error, deno_error::JsError)] pub enum CacheError { #[class(type)] #[error("CacheStorage is not available in this context")] ContextUnsupported, + #[class(type)] + #[error("Cache name cannot be empty")] + EmptyName, + #[class(type)] + #[error("Cache is not available")] + NotAvailable, + #[class(type)] + #[error("Cache not found")] + NotFound, + #[class(type)] + #[error("Cache deletion is not supported")] + DeletionNotSupported, + #[class(type)] + #[error("Content-Encoding is not allowed in response headers")] + ContentEncodingNotAllowed, #[class(generic)] #[error(transparent)] Sqlite(#[from] rusqlite::Error), @@ -38,6 +67,12 @@ pub enum CacheError { #[class(inherit)] #[error("{0}")] Io(#[from] std::io::Error), + #[class(type)] + #[error(transparent)] + InvalidHeaderName(#[from] hyper::header::InvalidHeaderName), + #[class(type)] + #[error(transparent)] + InvalidHeaderValue(#[from] hyper::header::InvalidHeaderValue), #[class(generic)] #[error("Failed to create cache storage directory {}", .dir.display())] CacheStorageDirectory { @@ -45,27 +80,33 @@ pub enum CacheError { #[source] source: std::io::Error, }, + #[class(generic)] + #[error("cache {method} request failed: {status}")] + RequestFailed { + method: &'static str, + status: hyper::StatusCode, + }, + #[class(generic)] + #[error("{0}")] + Reqwest(#[from] reqwest::Error), } #[derive(Clone)] -pub struct CreateCache( - pub Arc Result>, -); +pub struct CreateCache(pub Arc Result>); deno_core::extension!(deno_cache, deps = [ deno_webidl, deno_web, deno_url, deno_fetch ], - parameters=[CA: Cache], ops = [ - op_cache_storage_open, - op_cache_storage_has, - op_cache_storage_delete, - op_cache_put, - op_cache_match, - op_cache_delete, + op_cache_storage_open, + op_cache_storage_has, + op_cache_storage_delete, + op_cache_put, + op_cache_match, + op_cache_delete, ], esm = [ "01_cache.js" ], options = { - maybe_create_cache: Option>, + maybe_create_cache: Option, }, state = |state, options| { if let Some(create_cache) = options.maybe_create_cache { @@ -149,52 +190,160 @@ pub trait Cache: Clone + 'static { ) -> Result; } +#[derive(Clone)] +pub enum CacheImpl { + Sqlite(SqliteBackedCache), + Lsc(LscBackend), +} + +#[async_trait(?Send)] +impl Cache for CacheImpl { + type CacheMatchResourceType = CacheResponseResource; + + async fn storage_open(&self, cache_name: String) -> Result { + match self { + Self::Sqlite(cache) => cache.storage_open(cache_name).await, + Self::Lsc(cache) => cache.storage_open(cache_name).await, + } + } + + async fn storage_has(&self, cache_name: String) -> Result { + match self { + Self::Sqlite(cache) => cache.storage_has(cache_name).await, + Self::Lsc(cache) => cache.storage_has(cache_name).await, + } + } + + async fn storage_delete( + &self, + cache_name: String, + ) -> Result { + match self { + Self::Sqlite(cache) => cache.storage_delete(cache_name).await, + Self::Lsc(cache) => cache.storage_delete(cache_name).await, + } + } + + async fn put( + &self, + request_response: CachePutRequest, + resource: Option>, + ) -> Result<(), CacheError> { + match self { + Self::Sqlite(cache) => cache.put(request_response, resource).await, + Self::Lsc(cache) => cache.put(request_response, resource).await, + } + } + + async fn r#match( + &self, + request: CacheMatchRequest, + ) -> Result< + Option<(CacheMatchResponseMeta, Option)>, + CacheError, + > { + match self { + Self::Sqlite(cache) => cache.r#match(request).await, + Self::Lsc(cache) => cache.r#match(request).await, + } + } + + async fn delete( + &self, + request: CacheDeleteRequest, + ) -> Result { + match self { + Self::Sqlite(cache) => cache.delete(request).await, + Self::Lsc(cache) => cache.delete(request).await, + } + } +} + +pub enum CacheResponseResource { + Sqlite(AsyncRefCell), + Lsc(AsyncRefCell>>), +} + +impl CacheResponseResource { + fn sqlite(file: tokio::fs::File) -> Self { + Self::Sqlite(AsyncRefCell::new(file)) + } + + fn lsc( + body: impl Stream> + 'static, + ) -> Self { + Self::Lsc(AsyncRefCell::new(Box::pin(StreamReader::new(body)))) + } + + async fn read( + self: Rc, + data: &mut [u8], + ) -> Result { + let nread = match &*self { + CacheResponseResource::Sqlite(_) => { + let resource = deno_core::RcRef::map(&self, |r| match r { + Self::Sqlite(r) => r, + _ => unreachable!(), + }); + let mut file = resource.borrow_mut().await; + file.read(data).await? + } + CacheResponseResource::Lsc(_) => { + let resource = deno_core::RcRef::map(&self, |r| match r { + Self::Lsc(r) => r, + _ => unreachable!(), + }); + let mut file = resource.borrow_mut().await; + file.read(data).await? + } + }; + + Ok(nread) + } +} + +impl Resource for CacheResponseResource { + deno_core::impl_readable_byob!(); + + fn name(&self) -> Cow { + "CacheResponseResource".into() + } +} + #[op2(async)] #[number] -pub async fn op_cache_storage_open( +pub async fn op_cache_storage_open( state: Rc>, #[string] cache_name: String, -) -> Result -where - CA: Cache, -{ - let cache = get_cache::(&state)?; +) -> Result { + let cache = get_cache(&state)?; cache.storage_open(cache_name).await } #[op2(async)] -pub async fn op_cache_storage_has( +pub async fn op_cache_storage_has( state: Rc>, #[string] cache_name: String, -) -> Result -where - CA: Cache, -{ - let cache = get_cache::(&state)?; +) -> Result { + let cache = get_cache(&state)?; cache.storage_has(cache_name).await } #[op2(async)] -pub async fn op_cache_storage_delete( +pub async fn op_cache_storage_delete( state: Rc>, #[string] cache_name: String, -) -> Result -where - CA: Cache, -{ - let cache = get_cache::(&state)?; +) -> Result { + let cache = get_cache(&state)?; cache.storage_delete(cache_name).await } #[op2(async)] -pub async fn op_cache_put( +pub async fn op_cache_put( state: Rc>, #[serde] request_response: CachePutRequest, -) -> Result<(), CacheError> -where - CA: Cache, -{ - let cache = get_cache::(&state)?; +) -> Result<(), CacheError> { + let cache = get_cache(&state)?; let resource = match request_response.response_rid { Some(rid) => Some( state @@ -210,14 +359,11 @@ where #[op2(async)] #[serde] -pub async fn op_cache_match( +pub async fn op_cache_match( state: Rc>, #[serde] request: CacheMatchRequest, -) -> Result, CacheError> -where - CA: Cache, -{ - let cache = get_cache::(&state)?; +) -> Result, CacheError> { + let cache = get_cache(&state)?; match cache.r#match(request).await? { Some((meta, None)) => Ok(Some(CacheMatchResponse(meta, None))), Some((meta, Some(resource))) => { @@ -229,28 +375,24 @@ where } #[op2(async)] -pub async fn op_cache_delete( +pub async fn op_cache_delete( state: Rc>, #[serde] request: CacheDeleteRequest, -) -> Result -where - CA: Cache, -{ - let cache = get_cache::(&state)?; +) -> Result { + let cache = get_cache(&state)?; cache.delete(request).await } -pub fn get_cache(state: &Rc>) -> Result -where - CA: Cache, -{ +pub fn get_cache( + state: &Rc>, +) -> Result { let mut state = state.borrow_mut(); - if let Some(cache) = state.try_borrow::() { + if let Some(cache) = state.try_borrow::() { Ok(cache.clone()) - } else if let Some(create_cache) = state.try_borrow::>() { + } else if let Some(create_cache) = state.try_borrow::() { let cache = create_cache.0()?; state.put(cache); - Ok(state.borrow::().clone()) + Ok(state.borrow::().clone()) } else { Err(CacheError::ContextUnsupported) } diff --git a/ext/cache/lsc_shard.rs b/ext/cache/lsc_shard.rs new file mode 100644 index 0000000000..7a3df607a0 --- /dev/null +++ b/ext/cache/lsc_shard.rs @@ -0,0 +1,77 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use hyper::header::AUTHORIZATION; +use hyper::HeaderMap; +use hyper::StatusCode; + +use crate::CacheError; + +pub struct CacheShard { + client: reqwest::Client, + endpoint: String, + token: String, +} + +impl CacheShard { + pub fn new(endpoint: String, token: String) -> Self { + let client = reqwest::Client::builder() + .build() + .expect("Failed to build reqwest client"); + Self { + client, + endpoint, + token, + } + } + + pub async fn get_object( + &self, + object_key: &str, + ) -> Result, CacheError> { + let res = self + .client + .get(format!("{}/objects/{}", self.endpoint, object_key)) + .header(&AUTHORIZATION, format!("Bearer {}", self.token)) + .header("x-ryw", "1") + .send() + .await + .map_err(|e| e.without_url())?; + + if res.status().is_success() { + Ok(Some(res)) + } else if res.status() == StatusCode::NOT_FOUND { + Ok(None) + } else { + Err(CacheError::RequestFailed { + method: "GET", + status: res.status(), + }) + } + } + + pub async fn put_object( + &self, + object_key: &str, + headers: HeaderMap, + body: reqwest::Body, + ) -> Result<(), CacheError> { + let res = self + .client + .put(format!("{}/objects/{}", self.endpoint, object_key)) + .headers(headers) + .header(&AUTHORIZATION, format!("Bearer {}", self.token)) + .body(body) + .send() + .await + .map_err(|e| e.without_url())?; + + if res.status().is_success() { + Ok(()) + } else { + Err(CacheError::RequestFailed { + method: "GET", + status: res.status(), + }) + } + } +} diff --git a/ext/cache/lscache.rs b/ext/cache/lscache.rs new file mode 100644 index 0000000000..c15ece95f5 --- /dev/null +++ b/ext/cache/lscache.rs @@ -0,0 +1,332 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::cell::RefCell; +use std::rc::Rc; + +use async_stream::try_stream; +use base64::Engine; +use bytes::Bytes; +use deno_core::unsync::spawn; +use deno_core::BufMutView; +use deno_core::ByteString; +use deno_core::Resource; +use futures::StreamExt; +use futures::TryStreamExt; +use http::header::VARY; +use http::HeaderMap; +use http::HeaderName; +use http::HeaderValue; +use slab::Slab; + +use crate::get_header; +use crate::get_headers_from_vary_header; +use crate::lsc_shard::CacheShard; +use crate::CacheDeleteRequest; +use crate::CacheError; +use crate::CacheMatchRequest; +use crate::CacheMatchResponseMeta; +use crate::CachePutRequest; +use crate::CacheResponseResource; + +const REQHDR_PREFIX: &str = "x-lsc-meta-reqhdr-"; + +#[derive(Clone, Default)] +pub struct LscBackend { + shard: Rc>>>, + id2name: Rc>>, +} + +impl LscBackend { + pub fn set_shard(&self, shard: Rc) { + *self.shard.borrow_mut() = Some(shard); + } +} + +#[allow(clippy::unused_async)] +impl LscBackend { + /// Open a cache storage. Internally, this allocates an id and maps it + /// to the provided cache name. + pub async fn storage_open( + &self, + cache_name: String, + ) -> Result { + if cache_name.is_empty() { + return Err(CacheError::EmptyName); + } + let id = self.id2name.borrow_mut().insert(cache_name); + Ok(id as i64) + } + + /// Check if a cache with the provided name exists. Always returns `true`. + pub async fn storage_has( + &self, + _cache_name: String, + ) -> Result { + Ok(true) + } + + /// Delete a cache storage. Not yet implemented. + pub async fn storage_delete( + &self, + _cache_name: String, + ) -> Result { + Err(CacheError::DeletionNotSupported) + } + + /// Writes an entry to the cache. + pub async fn put( + &self, + request_response: CachePutRequest, + resource: Option>, + ) -> Result<(), CacheError> { + let Some(shard) = self.shard.borrow().as_ref().cloned() else { + return Err(CacheError::NotAvailable); + }; + + let Some(cache_name) = self + .id2name + .borrow_mut() + .get(request_response.cache_id as usize) + .cloned() + else { + return Err(CacheError::NotFound); + }; + let object_key = build_cache_object_key( + cache_name.as_bytes(), + request_response.request_url.as_bytes(), + ); + let mut headers = HeaderMap::new(); + for hdr in &request_response.request_headers { + headers.insert( + HeaderName::from_bytes( + &[REQHDR_PREFIX.as_bytes(), &hdr.0[..]].concat(), + )?, + HeaderValue::from_bytes(&hdr.1[..])?, + ); + } + for hdr in &request_response.response_headers { + if hdr.0.starts_with(b"x-lsc-meta-") { + continue; + } + if hdr.0[..] == b"content-encoding"[..] { + return Err(CacheError::ContentEncodingNotAllowed); + } + headers.insert( + HeaderName::from_bytes(&hdr.0[..])?, + HeaderValue::from_bytes(&hdr.1[..])?, + ); + } + + headers.insert( + HeaderName::from_bytes(b"x-lsc-meta-cached-at")?, + HeaderValue::from_bytes( + chrono::Utc::now() + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true) + .as_bytes(), + )?, + ); + + let body = try_stream! { + if let Some(resource) = resource { + loop { + let (size, buf) = resource.clone().read_byob(BufMutView::new(64 * 1024)).await.map_err(CacheError::Other)?; + if size == 0 { + break; + } + yield Bytes::copy_from_slice(&buf[..size]); + } + } + }; + let (body_tx, body_rx) = futures::channel::mpsc::channel(4); + spawn(body.map(Ok::, _>).forward(body_tx)); + let body = reqwest::Body::wrap_stream(body_rx); + shard.put_object(&object_key, headers, body).await?; + Ok(()) + } + + /// Matches a request against the cache. + pub async fn r#match( + &self, + request: CacheMatchRequest, + ) -> Result< + Option<(CacheMatchResponseMeta, Option)>, + CacheError, + > { + let Some(shard) = self.shard.borrow().as_ref().cloned() else { + return Err(CacheError::NotAvailable); + }; + let Some(cache_name) = self + .id2name + .borrow() + .get(request.cache_id as usize) + .cloned() + else { + return Err(CacheError::NotFound); + }; + let object_key = build_cache_object_key( + cache_name.as_bytes(), + request.request_url.as_bytes(), + ); + let Some(res) = shard.get_object(&object_key).await? else { + return Ok(None); + }; + + // Is this a tombstone? + if res.headers().contains_key("x-lsc-meta-deleted-at") { + return Ok(None); + } + + // From https://w3c.github.io/ServiceWorker/#request-matches-cached-item-algorithm + // If there's Vary header in the response, ensure all the + // headers of the cached request match the query request. + if let Some(vary_header) = res.headers().get(&VARY) { + if !vary_header_matches( + vary_header.as_bytes(), + &request.request_headers, + res.headers(), + ) { + return Ok(None); + } + } + + let mut response_headers: Vec<(ByteString, ByteString)> = res + .headers() + .iter() + .filter_map(|(k, v)| { + if k.as_str().starts_with("x-lsc-meta-") || k.as_str() == "x-ryw" { + None + } else { + Some((k.as_str().into(), v.as_bytes().into())) + } + }) + .collect(); + + if let Some(x) = res + .headers() + .get("x-lsc-meta-cached-at") + .and_then(|x| x.to_str().ok()) + { + if let Ok(cached_at) = chrono::DateTime::parse_from_rfc3339(x) { + let age = chrono::Utc::now() + .signed_duration_since(cached_at) + .num_seconds(); + if age >= 0 { + response_headers.push(("age".into(), age.to_string().into())); + } + } + } + + let meta = CacheMatchResponseMeta { + response_status: res.status().as_u16(), + response_status_text: res + .status() + .canonical_reason() + .unwrap_or("") + .to_string(), + request_headers: res + .headers() + .iter() + .filter_map(|(k, v)| { + let reqhdr_prefix = REQHDR_PREFIX.as_bytes(); + if k.as_str().as_bytes().starts_with(reqhdr_prefix) { + Some(( + k.as_str().as_bytes()[REQHDR_PREFIX.len()..].into(), + v.as_bytes().into(), + )) + } else { + None + } + }) + .collect(), + response_headers, + }; + + let body = CacheResponseResource::lsc( + res + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + + Ok(Some((meta, Some(body)))) + } + + pub async fn delete( + &self, + request: CacheDeleteRequest, + ) -> Result { + let Some(shard) = self.shard.borrow().as_ref().cloned() else { + return Err(CacheError::NotAvailable); + }; + + let Some(cache_name) = self + .id2name + .borrow_mut() + .get(request.cache_id as usize) + .cloned() + else { + return Err(CacheError::NotFound); + }; + let object_key = build_cache_object_key( + cache_name.as_bytes(), + request.request_url.as_bytes(), + ); + let mut headers = HeaderMap::new(); + headers.insert( + HeaderName::from_bytes(b"expires")?, + HeaderValue::from_bytes(b"Thu, 01 Jan 1970 00:00:00 GMT")?, + ); + headers.insert( + HeaderName::from_bytes(b"x-lsc-meta-deleted-at")?, + HeaderValue::from_bytes( + chrono::Utc::now() + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true) + .as_bytes(), + )?, + ); + shard + .put_object(&object_key, headers, reqwest::Body::from(&[][..])) + .await?; + Ok(true) + } +} +impl deno_core::Resource for LscBackend { + fn name(&self) -> std::borrow::Cow { + "LscBackend".into() + } +} + +fn vary_header_matches( + vary_header: &[u8], + query_request_headers: &[(ByteString, ByteString)], + cached_headers: &HeaderMap, +) -> bool { + let vary_header = match std::str::from_utf8(vary_header) { + Ok(vary_header) => vary_header, + Err(_) => return false, + }; + let headers = get_headers_from_vary_header(vary_header); + for header in headers { + // Ignoring `accept-encoding` is safe because we refuse to cache responses + // with `content-encoding` + if header == "accept-encoding" { + continue; + } + let lookup_key = format!("{}{}", REQHDR_PREFIX, header); + let query_header = get_header(&header, query_request_headers); + let cached_header = cached_headers.get(&lookup_key); + if query_header.as_ref().map(|x| &x[..]) + != cached_header.as_ref().map(|x| x.as_bytes()) + { + return false; + } + } + true +} + +fn build_cache_object_key(cache_name: &[u8], request_url: &[u8]) -> String { + format!( + "v1/{}/{}", + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(cache_name), + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(request_url), + ) +} diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs index 6587a52bac..37463316cf 100644 --- a/ext/cache/sqlite.rs +++ b/ext/cache/sqlite.rs @@ -1,5 +1,4 @@ // Copyright 2018-2025 the Deno authors. MIT license. -use std::borrow::Cow; use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; @@ -7,19 +6,15 @@ use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use async_trait::async_trait; use deno_core::futures::future::poll_fn; use deno_core::parking_lot::Mutex; use deno_core::unsync::spawn_blocking; -use deno_core::AsyncRefCell; -use deno_core::AsyncResult; use deno_core::BufMutView; use deno_core::ByteString; use deno_core::Resource; use rusqlite::params; use rusqlite::Connection; use rusqlite::OptionalExtension; -use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; @@ -27,12 +22,12 @@ use crate::deserialize_headers; use crate::get_header; use crate::serialize_headers; use crate::vary_header_matches; -use crate::Cache; use crate::CacheDeleteRequest; use crate::CacheError; use crate::CacheMatchRequest; use crate::CacheMatchResponseMeta; use crate::CachePutRequest; +use crate::CacheResponseResource; #[derive(Clone)] pub struct SqliteBackedCache { @@ -94,14 +89,14 @@ impl SqliteBackedCache { } } -#[async_trait(?Send)] -impl Cache for SqliteBackedCache { - type CacheMatchResourceType = CacheResponseResource; - +impl SqliteBackedCache { /// Open a cache storage. Internally, this creates a row in the /// sqlite db if the cache doesn't exist and returns the internal id /// of the cache. - async fn storage_open(&self, cache_name: String) -> Result { + pub async fn storage_open( + &self, + cache_name: String, + ) -> Result { let db = self.connection.clone(); let cache_storage_dir = self.cache_storage_dir.clone(); spawn_blocking(move || { @@ -127,7 +122,10 @@ impl Cache for SqliteBackedCache { /// Check if a cache with the provided name exists. /// Note: this doesn't check the disk, it only checks the sqlite db. - async fn storage_has(&self, cache_name: String) -> Result { + pub async fn storage_has( + &self, + cache_name: String, + ) -> Result { let db = self.connection.clone(); spawn_blocking(move || { let db = db.lock(); @@ -145,7 +143,7 @@ impl Cache for SqliteBackedCache { } /// Delete a cache storage. Internally, this deletes the row in the sqlite db. - async fn storage_delete( + pub async fn storage_delete( &self, cache_name: String, ) -> Result { @@ -174,7 +172,7 @@ impl Cache for SqliteBackedCache { .await? } - async fn put( + pub async fn put( &self, request_response: CachePutRequest, resource: Option>, @@ -227,7 +225,7 @@ impl Cache for SqliteBackedCache { Ok(()) } - async fn r#match( + pub async fn r#match( &self, request: CacheMatchRequest, ) -> Result< @@ -298,14 +296,17 @@ impl Cache for SqliteBackedCache { } Err(err) => return Err(err.into()), }; - Ok(Some((cache_meta, Some(CacheResponseResource::new(file))))) + Ok(Some(( + cache_meta, + Some(CacheResponseResource::sqlite(file)), + ))) } Some((cache_meta, None)) => Ok(Some((cache_meta, None))), None => Ok(None), } } - async fn delete( + pub async fn delete( &self, request: CacheDeleteRequest, ) -> Result { @@ -370,36 +371,6 @@ impl deno_core::Resource for SqliteBackedCache { } } -pub struct CacheResponseResource { - file: AsyncRefCell, -} - -impl CacheResponseResource { - fn new(file: tokio::fs::File) -> Self { - Self { - file: AsyncRefCell::new(file), - } - } - - async fn read( - self: Rc, - data: &mut [u8], - ) -> Result { - let resource = deno_core::RcRef::map(&self, |r| &r.file); - let mut file = resource.borrow_mut().await; - let nread = file.read(data).await?; - Ok(nread) - } -} - -impl Resource for CacheResponseResource { - deno_core::impl_readable_byob!(); - - fn name(&self) -> Cow { - "CacheResponseResource".into() - } -} - pub fn hash(token: &str) -> String { use sha2::Digest; format!("{:x}", sha2::Sha256::digest(token.as_bytes())) diff --git a/runtime/snapshot.rs b/runtime/snapshot.rs index a2f0322763..bd6d09b4af 100644 --- a/runtime/snapshot.rs +++ b/runtime/snapshot.rs @@ -10,7 +10,6 @@ use std::sync::Arc; use deno_ast::MediaType; use deno_ast::ParseParams; use deno_ast::SourceMapOption; -use deno_cache::SqliteBackedCache; use deno_core::snapshot::*; use deno_core::v8; use deno_core::Extension; @@ -289,7 +288,7 @@ pub fn create_runtime_snapshot( deno_webgpu::deno_webgpu::init_ops_and_esm(), deno_canvas::deno_canvas::init_ops_and_esm(), deno_fetch::deno_fetch::init_ops_and_esm::(Default::default()), - deno_cache::deno_cache::init_ops_and_esm::(None), + deno_cache::deno_cache::init_ops_and_esm(None), deno_websocket::deno_websocket::init_ops_and_esm::( "".to_owned(), None, diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index d7cf85bab9..3cae4e127f 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -11,6 +11,7 @@ use std::task::Context; use std::task::Poll; use deno_broadcast_channel::InMemoryBroadcastChannel; +use deno_cache::CacheImpl; use deno_cache::CreateCache; use deno_cache::SqliteBackedCache; use deno_core::error::CoreError; @@ -452,10 +453,40 @@ impl WebWorker { // Permissions: many ops depend on this let enable_testing_features = options.bootstrap.enable_testing_features; - let create_cache = options.cache_storage_dir.map(|storage_dir| { - let create_cache_fn = move || SqliteBackedCache::new(storage_dir.clone()); - CreateCache(Arc::new(create_cache_fn)) - }); + + fn create_cache_inner(options: &WebWorkerOptions) -> Option { + if let Ok(var) = std::env::var("DENO_CACHE_LSC_ENDPOINT") { + let elems: Vec<_> = var.split(",").collect(); + if elems.len() == 2 { + let endpoint = elems[0]; + let token = elems[1]; + use deno_cache::CacheShard; + + let shard = + Rc::new(CacheShard::new(endpoint.to_string(), token.to_string())); + let create_cache_fn = move || { + let x = deno_cache::LscBackend::default(); + x.set_shard(shard.clone()); + + Ok(CacheImpl::Lsc(x)) + }; + #[allow(clippy::arc_with_non_send_sync)] + return Some(CreateCache(Arc::new(create_cache_fn))); + } + } + + if let Some(storage_dir) = &options.cache_storage_dir { + let storage_dir = storage_dir.clone(); + let create_cache_fn = move || { + let s = SqliteBackedCache::new(storage_dir.clone())?; + Ok(CacheImpl::Sqlite(s)) + }; + return Some(CreateCache(Arc::new(create_cache_fn))); + } + + None + } + let create_cache = create_cache_inner(&options); // NOTE(bartlomieju): ordering is important here, keep it in sync with // `runtime/worker.rs` and `runtime/snapshot.rs`! @@ -483,9 +514,7 @@ impl WebWorker { ..Default::default() }, ), - deno_cache::deno_cache::init_ops_and_esm::( - create_cache, - ), + deno_cache::deno_cache::init_ops_and_esm(create_cache), deno_websocket::deno_websocket::init_ops_and_esm::( options.bootstrap.user_agent.clone(), services.root_cert_store_provider.clone(), diff --git a/runtime/worker.rs b/runtime/worker.rs index 270c8b5392..d6f720073e 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -8,6 +8,7 @@ use std::time::Duration; use std::time::Instant; use deno_broadcast_channel::InMemoryBroadcastChannel; +use deno_cache::CacheImpl; use deno_cache::CreateCache; use deno_cache::SqliteBackedCache; use deno_core::error::CoreError; @@ -340,6 +341,40 @@ impl MainWorker { }, ); + fn create_cache_inner(options: &WorkerOptions) -> Option { + if let Ok(var) = std::env::var("DENO_CACHE_LSC_ENDPOINT") { + let elems: Vec<_> = var.split(",").collect(); + if elems.len() == 2 { + let endpoint = elems[0]; + let token = elems[1]; + use deno_cache::CacheShard; + + let shard = + Rc::new(CacheShard::new(endpoint.to_string(), token.to_string())); + let create_cache_fn = move || { + let x = deno_cache::LscBackend::default(); + x.set_shard(shard.clone()); + + Ok(CacheImpl::Lsc(x)) + }; + #[allow(clippy::arc_with_non_send_sync)] + return Some(CreateCache(Arc::new(create_cache_fn))); + } + } + + if let Some(storage_dir) = &options.cache_storage_dir { + let storage_dir = storage_dir.clone(); + let create_cache_fn = move || { + let s = SqliteBackedCache::new(storage_dir.clone())?; + Ok(CacheImpl::Sqlite(s)) + }; + return Some(CreateCache(Arc::new(create_cache_fn))); + } + + None + } + let create_cache = create_cache_inner(&options); + // Get our op metrics let (op_summary_metrics, op_metrics_factory_fn) = create_op_metrics( options.bootstrap.enable_op_summary_metrics, @@ -349,10 +384,6 @@ impl MainWorker { // Permissions: many ops depend on this let enable_testing_features = options.bootstrap.enable_testing_features; let exit_code = ExitCode::default(); - let create_cache = options.cache_storage_dir.map(|storage_dir| { - let create_cache_fn = move || SqliteBackedCache::new(storage_dir.clone()); - CreateCache(Arc::new(create_cache_fn)) - }); // NOTE(bartlomieju): ordering is important here, keep it in sync with // `runtime/web_worker.rs` and `runtime/snapshot.rs`! @@ -380,9 +411,7 @@ impl MainWorker { ..Default::default() }, ), - deno_cache::deno_cache::init_ops_and_esm::( - create_cache, - ), + deno_cache::deno_cache::init_ops_and_esm(create_cache), deno_websocket::deno_websocket::init_ops_and_esm::( options.bootstrap.user_agent.clone(), services.root_cert_store_provider.clone(),