From 88903e14af0f2588c5b58d3f958200fbfd4919b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 11 Jan 2025 00:00:18 +0100 Subject: [PATCH] feat(ext/cache): support lscache --- Cargo.lock | 11 ++ Cargo.toml | 1 + ext/cache/Cargo.toml | 11 ++ ext/cache/lib.rs | 8 + ext/cache/lsc_shard.rs | 78 +++++++++ ext/cache/lscache.rs | 378 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 487 insertions(+) create mode 100644 ext/cache/lsc_shard.rs create mode 100644 ext/cache/lscache.rs diff --git a/Cargo.lock b/Cargo.lock index 1f13898fe0..29ca2adb85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1444,14 +1444,25 @@ dependencies = [ name = "deno_cache" version = "0.117.0" dependencies = [ + "anyhow", + "async-stream", "async-trait", + "base64 0.21.7", + "bytes", + "chrono", "deno_core", "deno_error", + "futures", + "http 1.1.0", + "hyper 1.4.1", + "reqwest", "rusqlite", "serde", "sha2", + "slab", "thiserror 2.0.3", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 48abe48305..223a9c98e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ node_resolver = { version = "0.23.0", path = "./resolvers/node" } aes = "=0.8.3" anyhow = "1.0.57" +async-stream = "0.3" async-trait = "0.1.73" base32 = "=0.5.1" base64 = "0.21.7" diff --git a/ext/cache/Cargo.toml b/ext/cache/Cargo.toml index 18fbe23a23..43568b080f 100644 --- a/ext/cache/Cargo.toml +++ b/ext/cache/Cargo.toml @@ -14,11 +14,22 @@ description = "Implementation of Cache API for Deno" path = "lib.rs" [dependencies] +anyhow.workspace = true +async-stream.workspace = true async-trait.workspace = true +base64.workspace = true +bytes.workspace = true +chrono.workspace = true deno_core.workspace = true deno_error.workspace = true +futures.workspace = true +http.workspace = true +hyper.workspace = true +reqwest.workspace = true rusqlite.workspace = true serde.workspace = true sha2.workspace = true +slab.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util.workspace = true diff --git a/ext/cache/lib.rs b/ext/cache/lib.rs index d3bfe23def..b1f6bbb974 100644 --- a/ext/cache/lib.rs +++ b/ext/cache/lib.rs @@ -15,7 +15,12 @@ use deno_core::Resource; use deno_core::ResourceId; use deno_error::JsErrorBox; +mod lsc_shard; +mod lscache; mod sqlite; + +pub use lsc_shard::CacheShard; +pub use lscache::LscBackend; pub use sqlite::SqliteBackedCache; #[derive(Debug, thiserror::Error, deno_error::JsError)] @@ -23,6 +28,9 @@ pub enum CacheError { #[class(type)] #[error("CacheStorage is not available in this context")] ContextUnsupported, + #[class(type)] + #[error("Cache name cannot be empty")] + EmptyName, #[class(generic)] #[error(transparent)] Sqlite(#[from] rusqlite::Error), diff --git a/ext/cache/lsc_shard.rs b/ext/cache/lsc_shard.rs new file mode 100644 index 0000000000..ac63131c74 --- /dev/null +++ b/ext/cache/lsc_shard.rs @@ -0,0 +1,78 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use anyhow::Context; +use hyper::header::AUTHORIZATION; +use hyper::HeaderMap; +use hyper::StatusCode; + +pub struct CacheShard { + client: reqwest::Client, + endpoint: String, + token: String, +} + +impl CacheShard { + pub fn new(endpoint: String, token: String) -> Self { + let client = reqwest::Client::builder() + .build() + .expect("Failed to build reqwest client"); + Self { + client, + endpoint, + token, + } + } + + pub async fn get_object( + &self, + object_key: &str, + ) -> anyhow::Result> { + let res = self + .client + .get(format!("{}/objects/{}", self.endpoint, object_key)) + .header(&AUTHORIZATION, format!("Bearer {}", self.token)) + .header("x-ryw", "1") + .send() + .await + .map_err(|e| e.without_url()) + .with_context(|| "failed to start cache GET request")?; + + if res.status().is_success() { + Ok(Some(res)) + } else if res.status() == StatusCode::NOT_FOUND { + Ok(None) + } else { + Err(anyhow::anyhow!( + "cache GET request failed: {}", + res.status() + )) + } + } + + pub async fn put_object( + &self, + object_key: &str, + headers: HeaderMap, + body: reqwest::Body, + ) -> anyhow::Result<()> { + let res = self + .client + .put(format!("{}/objects/{}", self.endpoint, object_key)) + .headers(headers) + .header(&AUTHORIZATION, format!("Bearer {}", self.token)) + .body(body) + .send() + .await + .map_err(|e| e.without_url()) + .with_context(|| "failed to start cache PUT request")?; + + if res.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "cache PUT request failed: {}", + res.status() + )) + } + } +} diff --git a/ext/cache/lscache.rs b/ext/cache/lscache.rs new file mode 100644 index 0000000000..1a421a14da --- /dev/null +++ b/ext/cache/lscache.rs @@ -0,0 +1,378 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::borrow::Cow; +use std::cell::RefCell; +use std::pin::Pin; +use std::rc::Rc; + +use async_stream::try_stream; +use async_trait::async_trait; +use base64::Engine; +use bytes::Bytes; +use deno_core::unsync::spawn; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufMutView; +use deno_core::ByteString; +use deno_core::Resource; +use futures::Stream; +use futures::StreamExt; +use futures::TryStreamExt; +use http::header::VARY; +use http::HeaderMap; +use http::HeaderName; +use http::HeaderValue; +use slab::Slab; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio_util::io::StreamReader; + +use crate::get_header; +use crate::get_headers_from_vary_header; +use crate::lsc_shard::CacheShard; +use crate::Cache; +use crate::CacheDeleteRequest; +use crate::CacheError; +use crate::CacheMatchRequest; +use crate::CacheMatchResponseMeta; +use crate::CachePutRequest; + +const REQHDR_PREFIX: &str = "x-lsc-meta-reqhdr-"; + +#[derive(Clone)] +pub struct LscBackend { + shard: Rc>>>, + id2name: Rc>>, +} + +impl LscBackend { + pub fn new() -> Self { + Self { + shard: Rc::new(RefCell::new(None)), + id2name: Rc::new(RefCell::new(Slab::new())), + } + } + + pub fn set_shard(&self, shard: Rc) { + *self.shard.borrow_mut() = Some(shard); + } +} + +#[async_trait(?Send)] +impl Cache for LscBackend { + type CacheMatchResourceType = CacheResponseResource; + + /// Open a cache storage. Internally, this allocates an id and maps it + /// to the provided cache name. + async fn storage_open(&self, cache_name: String) -> Result { + if cache_name.is_empty() { + return Err(CacheError::EmptyName); + } + let id = self.id2name.borrow_mut().insert(cache_name); + Ok(id as i64) + } + + /// Check if a cache with the provided name exists. Always returns `true`. + async fn storage_has(&self, _cache_name: String) -> Result { + Ok(true) + } + + /// Delete a cache storage. Not yet implemented. + async fn storage_delete( + &self, + _cache_name: String, + ) -> Result { + Err(type_error("Cache deletion is not supported")) + } + + /// Writes an entry to the cache. + async fn put( + &self, + request_response: CachePutRequest, + resource: Option>, + ) -> Result<(), CacheError> { + let Some(shard) = self.shard.borrow().as_ref().cloned() else { + return Err(type_error("Cache is not available")); + }; + + let Some(cache_name) = self + .id2name + .borrow_mut() + .get(request_response.cache_id as usize) + .cloned() + else { + return Err(type_error("Cache not found")); + }; + let object_key = build_cache_object_key( + cache_name.as_bytes(), + request_response.request_url.as_bytes(), + ); + let mut headers = HeaderMap::new(); + for hdr in &request_response.request_headers { + headers.insert( + HeaderName::from_bytes( + &[REQHDR_PREFIX.as_bytes(), &hdr.0[..]].concat(), + )?, + HeaderValue::from_bytes(&hdr.1[..])?, + ); + } + for hdr in &request_response.response_headers { + if hdr.0.starts_with(b"x-lsc-meta-") { + continue; + } + if hdr.0[..] == b"content-encoding"[..] { + return Err(type_error( + "Content-Encoding is not allowed in response headers", + )); + } + headers.insert( + HeaderName::from_bytes(&hdr.0[..])?, + HeaderValue::from_bytes(&hdr.1[..])?, + ); + } + + headers.insert( + HeaderName::from_bytes(b"x-lsc-meta-cached-at")?, + HeaderValue::from_bytes( + chrono::Utc::now() + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true) + .as_bytes(), + )?, + ); + + let body = try_stream! { + if let Some(resource) = resource { + loop { + let (size, buf) = resource.clone().read_byob(BufMutView::new(64 * 1024)).await?; + if size == 0 { + break; + } + yield Bytes::copy_from_slice(&buf[..size]); + } + } + }; + let (body_tx, body_rx) = futures::channel::mpsc::channel(4); + spawn(body.map(Ok::, _>).forward(body_tx)); + let body = reqwest::Body::wrap_stream(body_rx); + shard.put_object(&object_key, headers, body).await?; + Ok(()) + } + + /// Matches a request against the cache. + async fn r#match( + &self, + request: CacheMatchRequest, + ) -> Result< + Option<(CacheMatchResponseMeta, Option)>, + CacheError, + > { + let Some(shard) = self.shard.borrow().as_ref().cloned() else { + return Err(type_error("Cache is not available")); + }; + let Some(cache_name) = self + .id2name + .borrow() + .get(request.cache_id as usize) + .cloned() + else { + return Err(type_error("Cache not found")); + }; + let object_key = build_cache_object_key( + cache_name.as_bytes(), + request.request_url.as_bytes(), + ); + let Some(res) = shard.get_object(&object_key).await? else { + return Ok(None); + }; + + // Is this a tombstone? + if res.headers().contains_key("x-lsc-meta-deleted-at") { + return Ok(None); + } + + // From https://w3c.github.io/ServiceWorker/#request-matches-cached-item-algorithm + // If there's Vary header in the response, ensure all the + // headers of the cached request match the query request. + if let Some(vary_header) = res.headers().get(&VARY) { + if !vary_header_matches( + vary_header.as_bytes(), + &request.request_headers, + res.headers(), + ) { + return Ok(None); + } + } + + let mut response_headers: Vec<(ByteString, ByteString)> = res + .headers() + .iter() + .filter_map(|(k, v)| { + if k.as_str().starts_with("x-lsc-meta-") || k.as_str() == "x-ryw" { + None + } else { + Some((k.as_str().into(), v.as_bytes().into())) + } + }) + .collect(); + + if let Some(x) = res + .headers() + .get("x-lsc-meta-cached-at") + .and_then(|x| x.to_str().ok()) + { + if let Ok(cached_at) = chrono::DateTime::parse_from_rfc3339(x) { + let age = chrono::Utc::now() + .signed_duration_since(cached_at) + .num_seconds(); + if age >= 0 { + response_headers.push(("age".into(), age.to_string().into())); + } + } + } + + let meta = CacheMatchResponseMeta { + response_status: res.status().as_u16(), + response_status_text: res + .status() + .canonical_reason() + .unwrap_or("") + .to_string(), + request_headers: res + .headers() + .iter() + .filter_map(|(k, v)| { + let reqhdr_prefix = REQHDR_PREFIX.as_bytes(); + if k.as_str().as_bytes().starts_with(reqhdr_prefix) { + Some(( + k.as_str().as_bytes()[REQHDR_PREFIX.len()..].into(), + v.as_bytes().into(), + )) + } else { + None + } + }) + .collect(), + response_headers, + }; + + let body = CacheResponseResource::new( + res + .bytes_stream() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)), + ); + + Ok(Some((meta, Some(body)))) + } + + async fn delete( + &self, + request: CacheDeleteRequest, + ) -> Result { + let Some(shard) = self.shard.borrow().as_ref().cloned() else { + return Err(type_error("Cache is not available")); + }; + + let Some(cache_name) = self + .id2name + .borrow_mut() + .get(request.cache_id as usize) + .cloned() + else { + return Err(type_error("Cache not found")); + }; + let object_key = build_cache_object_key( + cache_name.as_bytes(), + request.request_url.as_bytes(), + ); + let mut headers = HeaderMap::new(); + headers.insert( + HeaderName::from_bytes(b"expires")?, + HeaderValue::from_bytes(b"Thu, 01 Jan 1970 00:00:00 GMT")?, + ); + headers.insert( + HeaderName::from_bytes(b"x-lsc-meta-deleted-at")?, + HeaderValue::from_bytes( + chrono::Utc::now() + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true) + .as_bytes(), + )?, + ); + shard + .put_object(&object_key, headers, reqwest::Body::from(&[][..])) + .await?; + Ok(true) + } +} +impl deno_core::Resource for LscBackend { + fn name(&self) -> std::borrow::Cow { + "LscBackend".into() + } +} + +pub struct CacheResponseResource { + body: AsyncRefCell>>, +} + +impl CacheResponseResource { + fn new( + body: impl Stream> + 'static, + ) -> Self { + Self { + body: AsyncRefCell::new(Box::pin(StreamReader::new(body))), + } + } + + async fn read( + self: Rc, + data: &mut [u8], + ) -> Result { + let resource = deno_core::RcRef::map(&self, |r| &r.body); + let mut body = resource.borrow_mut().await; + let nread = body.read(data).await?; + Ok(nread) + } +} + +impl Resource for CacheResponseResource { + deno_core::impl_readable_byob!(); + + fn name(&self) -> Cow { + "CacheResponseResource".into() + } +} + +fn vary_header_matches( + vary_header: &[u8], + query_request_headers: &[(ByteString, ByteString)], + cached_headers: &HeaderMap, +) -> bool { + let vary_header = match std::str::from_utf8(vary_header) { + Ok(vary_header) => vary_header, + Err(_) => return false, + }; + let headers = get_headers_from_vary_header(vary_header); + for header in headers { + // Ignoring `accept-encoding` is safe because we refuse to cache responses + // with `content-encoding` + if header == "accept-encoding" { + continue; + } + let lookup_key = format!("{}{}", REQHDR_PREFIX, header); + let query_header = get_header(&header, query_request_headers); + let cached_header = cached_headers.get(&lookup_key); + if query_header.as_ref().map(|x| &x[..]) + != cached_header.as_ref().map(|x| x.as_bytes()) + { + return false; + } + } + true +} + +fn build_cache_object_key(cache_name: &[u8], request_url: &[u8]) -> String { + format!( + "v1/{}/{}", + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(cache_name), + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(request_url), + ) +}