1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 04:52:26 -05:00

Compare commits

...

17 commits

Author SHA1 Message Date
Bartek Iwańczuk
48bbd2b55d
Merge 289a7e0893 into b962b87cfe 2025-01-20 00:16:09 +08:00
Bartek Iwańczuk
289a7e0893
fix 2025-01-18 00:07:51 +01:00
Bartek Iwańczuk
2a9d6fea77
Merge branch 'main' into lsc_cache 2025-01-18 00:07:01 +01:00
David Sherret
2ab5d9ac4a
refactor: move denort to separate crate (#27688)
This slightly degrades the performance of CJS export analysis on
subsequent runs because I changed it to no longer cache in the DENO_DIR
with this PR (denort now properly has no idea about the DENO_DIR). We'll
have to change it to embed this data in the binary and that will also
allow us to get rid of swc in denort (will do that in a follow-up PR).
2025-01-18 00:03:13 +01:00
Leo Kettmeir
6aec36178a
refactor: update deno_core and use more concrete errors (#27620)
waiting for https://github.com/denoland/deno_core/pull/1043

Fixes #27672
2025-01-18 00:02:58 +01:00
Yoshiya Hinosawa
3ae9f0c04c
fix(ext/node): tls.connect regression (#27707)
The TLS start sequence has been broken since #26661 because of the way
how we wrap TCP handle to create TLS handle.

#26661 introduced happy-eyeballs algorithm and some connection could be
dropped because of happy-eyeball attempt timeout. The current
implementation doesn't consider that case and it could start TLS
handshake with timed out TCP connection. That caused #27652 .

This PR fixes it by changing the initialization steps. Now `wrapHandle`
of TLSSocket set up `afterConnectTls` callback in TCP handle, and
`afterConnect` of TCP handle calls it at `connect` event timing if it
exists. This avoids starting TLS session with timed out connection.

closes #27652
2025-01-18 00:02:58 +01:00
Bartek Iwańczuk
b1eaefb3cb
Merge branch 'main' into lsc_cache 2025-01-17 14:44:58 +00:00
Bartek Iwańczuk
6cda8b7284
fix flag 2025-01-17 15:24:45 +01:00
Bartek Iwańczuk
3d41feaf41
wire up a flag 2025-01-17 14:29:59 +01:00
Bartek Iwańczuk
cb1cfb8442
need to add a flag 2025-01-17 14:14:39 +01:00
Bartek Iwańczuk
d62e31af02
Merge branch 'main' into lsc_cache 2025-01-17 13:41:42 +01:00
Bartek Iwańczuk
9b119bcd31
wip 2025-01-16 01:41:41 +01:00
Bartek Iwańczuk
f7f8274881
Merge branch 'main' into lsc_cache 2025-01-16 01:33:26 +01:00
Bartek Iwańczuk
1415c99606
lints now 2025-01-14 15:15:13 +01:00
Bartek Iwańczuk
0df9a78956
Merge branch 'main' into lsc_cache 2025-01-14 15:06:01 +01:00
Bartek Iwańczuk
c144f7bf40
map more errors 2025-01-11 00:44:26 +01:00
Bartek Iwańczuk
88903e14af
feat(ext/cache): support lscache 2025-01-11 00:00:18 +01:00
15 changed files with 739 additions and 117 deletions

11
Cargo.lock generated
View file

@ -1447,14 +1447,25 @@ dependencies = [
name = "deno_cache"
version = "0.118.0"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"base64 0.21.7",
"bytes",
"chrono",
"deno_core",
"deno_error",
"futures",
"http 1.1.0",
"hyper 1.4.1",
"reqwest",
"rusqlite",
"serde",
"sha2",
"slab",
"thiserror 2.0.3",
"tokio",
"tokio-util",
]
[[package]]

View file

@ -107,6 +107,7 @@ node_resolver = { version = "0.24.0", path = "./resolvers/node" }
aes = "=0.8.3"
anyhow = "1.0.57"
async-stream = "0.3"
async-trait = "0.1.73"
base32 = "=0.5.1"
base64 = "0.21.7"

View file

@ -627,6 +627,7 @@ pub struct Flags {
pub code_cache_enabled: bool,
pub permissions: PermissionFlags,
pub allow_scripts: PackagesAllowedScripts,
pub internal_use_lsc_cache: bool,
}
#[derive(Clone, Debug, Eq, PartialEq, Default, Serialize, Deserialize)]
@ -3764,6 +3765,12 @@ fn runtime_misc_args(app: Command) -> Command {
.arg(seed_arg())
.arg(enable_testing_features_arg())
.arg(strace_ops_arg())
.arg(
Arg::new("internal-use-lsc-cache")
.long("internal-use-lsc-cache")
.action(ArgAction::SetTrue)
.hide(true),
)
}
fn allow_import_arg() -> Arg {
@ -5634,6 +5641,7 @@ fn runtime_args_parse(
allow_scripts_arg_parse(flags, matches)?;
}
location_arg_parse(flags, matches);
flags.internal_use_lsc_cache = matches.get_flag("internal-use-lsc-cache");
v8_flags_arg_parse(flags, matches);
seed_arg_parse(flags, matches);
enable_testing_features_arg_parse(flags, matches);

View file

@ -1662,6 +1662,10 @@ impl CliOptions {
&self.flags.v8_flags
}
pub fn use_lsc_cache(&self) -> bool {
self.flags.internal_use_lsc_cache
}
pub fn code_cache_enabled(&self) -> bool {
self.flags.code_cache_enabled
}

View file

@ -1127,6 +1127,7 @@ impl CliFactory {
pkg_json_resolver,
self.root_cert_store_provider().clone(),
cli_options.resolve_storage_key_resolver(),
cli_options.use_lsc_cache(),
self.sys(),
self.create_lib_main_worker_options()?,
);

View file

@ -196,6 +196,7 @@ struct LibWorkerFactorySharedState<TSys: DenoLibSys> {
root_cert_store_provider: Arc<dyn RootCertStoreProvider>,
shared_array_buffer_store: SharedArrayBufferStore,
storage_key_resolver: StorageKeyResolver,
use_lsc_cache: bool,
sys: TSys,
options: LibMainWorkerOptions,
}
@ -326,6 +327,7 @@ impl<TSys: DenoLibSys> LibWorkerFactorySharedState<TSys> {
worker_type: args.worker_type,
stdio: stdio.clone(),
cache_storage_dir,
use_lsc_cache: shared.use_lsc_cache,
strace_ops: shared.options.strace_ops.clone(),
close_on_idle: args.close_on_idle,
maybe_worker_metadata: args.maybe_worker_metadata,
@ -357,6 +359,7 @@ impl<TSys: DenoLibSys> LibMainWorkerFactory<TSys> {
pkg_json_resolver: Arc<node_resolver::PackageJsonResolver<TSys>>,
root_cert_store_provider: Arc<dyn RootCertStoreProvider>,
storage_key_resolver: StorageKeyResolver,
use_lsc_cache: bool,
sys: TSys,
options: LibMainWorkerOptions,
) -> Self {
@ -376,6 +379,7 @@ impl<TSys: DenoLibSys> LibMainWorkerFactory<TSys> {
root_cert_store_provider,
shared_array_buffer_store: Default::default(),
storage_key_resolver,
use_lsc_cache,
sys,
options,
}),
@ -500,6 +504,7 @@ impl<TSys: DenoLibSys> LibMainWorkerFactory<TSys> {
should_wait_for_inspector_session: shared.options.inspect_wait,
strace_ops: shared.options.strace_ops.clone(),
cache_storage_dir,
use_lsc_cache: shared.use_lsc_cache,
origin_storage_dir,
stdio,
skip_op_registration: shared.options.skip_op_registration,

View file

@ -960,6 +960,7 @@ pub async fn run(
pkg_json_resolver,
root_cert_store_provider,
StorageKeyResolver::empty(),
false,
sys.clone(),
lib_main_worker_options,
);

11
ext/cache/Cargo.toml vendored
View file

@ -14,11 +14,22 @@ description = "Implementation of Cache API for Deno"
path = "lib.rs"
[dependencies]
anyhow.workspace = true
async-stream.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
deno_core.workspace = true
deno_error.workspace = true
futures.workspace = true
http.workspace = true
hyper.workspace = true
reqwest.workspace = true
rusqlite.workspace = true
serde.workspace = true
sha2.workspace = true
slab.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true

250
ext/cache/lib.rs vendored
View file

@ -1,28 +1,57 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use deno_core::op2;
use deno_core::serde::Deserialize;
use deno_core::serde::Serialize;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::ByteString;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_error::JsErrorBox;
use futures::Stream;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
mod lsc_shard;
mod lscache;
mod sqlite;
pub use lsc_shard::CacheShard;
pub use lscache::LscBackend;
pub use sqlite::SqliteBackedCache;
use tokio_util::io::StreamReader;
#[derive(Debug, thiserror::Error, deno_error::JsError)]
pub enum CacheError {
#[class(type)]
#[error("CacheStorage is not available in this context")]
ContextUnsupported,
#[class(type)]
#[error("Cache name cannot be empty")]
EmptyName,
#[class(type)]
#[error("Cache is not available")]
NotAvailable,
#[class(type)]
#[error("Cache not found")]
NotFound,
#[class(type)]
#[error("Cache deletion is not supported")]
DeletionNotSupported,
#[class(type)]
#[error("Content-Encoding is not allowed in response headers")]
ContentEncodingNotAllowed,
#[class(generic)]
#[error(transparent)]
Sqlite(#[from] rusqlite::Error),
@ -38,6 +67,12 @@ pub enum CacheError {
#[class(inherit)]
#[error("{0}")]
Io(#[from] std::io::Error),
#[class(type)]
#[error(transparent)]
InvalidHeaderName(#[from] hyper::header::InvalidHeaderName),
#[class(type)]
#[error(transparent)]
InvalidHeaderValue(#[from] hyper::header::InvalidHeaderValue),
#[class(generic)]
#[error("Failed to create cache storage directory {}", .dir.display())]
CacheStorageDirectory {
@ -45,27 +80,33 @@ pub enum CacheError {
#[source]
source: std::io::Error,
},
#[class(generic)]
#[error("cache {method} request failed: {status}")]
RequestFailed {
method: &'static str,
status: hyper::StatusCode,
},
#[class(generic)]
#[error("{0}")]
Reqwest(#[from] reqwest::Error),
}
#[derive(Clone)]
pub struct CreateCache<C: Cache + 'static>(
pub Arc<dyn Fn() -> Result<C, CacheError>>,
);
pub struct CreateCache(pub Arc<dyn Fn() -> Result<CacheImpl, CacheError>>);
deno_core::extension!(deno_cache,
deps = [ deno_webidl, deno_web, deno_url, deno_fetch ],
parameters=[CA: Cache],
ops = [
op_cache_storage_open<CA>,
op_cache_storage_has<CA>,
op_cache_storage_delete<CA>,
op_cache_put<CA>,
op_cache_match<CA>,
op_cache_delete<CA>,
op_cache_storage_open,
op_cache_storage_has,
op_cache_storage_delete,
op_cache_put,
op_cache_match,
op_cache_delete,
],
esm = [ "01_cache.js" ],
options = {
maybe_create_cache: Option<CreateCache<CA>>,
maybe_create_cache: Option<CreateCache>,
},
state = |state, options| {
if let Some(create_cache) = options.maybe_create_cache {
@ -149,52 +190,160 @@ pub trait Cache: Clone + 'static {
) -> Result<bool, CacheError>;
}
#[derive(Clone)]
pub enum CacheImpl {
Sqlite(SqliteBackedCache),
Lsc(LscBackend),
}
#[async_trait(?Send)]
impl Cache for CacheImpl {
type CacheMatchResourceType = CacheResponseResource;
async fn storage_open(&self, cache_name: String) -> Result<i64, CacheError> {
match self {
Self::Sqlite(cache) => cache.storage_open(cache_name).await,
Self::Lsc(cache) => cache.storage_open(cache_name).await,
}
}
async fn storage_has(&self, cache_name: String) -> Result<bool, CacheError> {
match self {
Self::Sqlite(cache) => cache.storage_has(cache_name).await,
Self::Lsc(cache) => cache.storage_has(cache_name).await,
}
}
async fn storage_delete(
&self,
cache_name: String,
) -> Result<bool, CacheError> {
match self {
Self::Sqlite(cache) => cache.storage_delete(cache_name).await,
Self::Lsc(cache) => cache.storage_delete(cache_name).await,
}
}
async fn put(
&self,
request_response: CachePutRequest,
resource: Option<Rc<dyn Resource>>,
) -> Result<(), CacheError> {
match self {
Self::Sqlite(cache) => cache.put(request_response, resource).await,
Self::Lsc(cache) => cache.put(request_response, resource).await,
}
}
async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
Option<(CacheMatchResponseMeta, Option<Self::CacheMatchResourceType>)>,
CacheError,
> {
match self {
Self::Sqlite(cache) => cache.r#match(request).await,
Self::Lsc(cache) => cache.r#match(request).await,
}
}
async fn delete(
&self,
request: CacheDeleteRequest,
) -> Result<bool, CacheError> {
match self {
Self::Sqlite(cache) => cache.delete(request).await,
Self::Lsc(cache) => cache.delete(request).await,
}
}
}
pub enum CacheResponseResource {
Sqlite(AsyncRefCell<tokio::fs::File>),
Lsc(AsyncRefCell<Pin<Box<dyn AsyncRead>>>),
}
impl CacheResponseResource {
fn sqlite(file: tokio::fs::File) -> Self {
Self::Sqlite(AsyncRefCell::new(file))
}
fn lsc(
body: impl Stream<Item = Result<Bytes, std::io::Error>> + 'static,
) -> Self {
Self::Lsc(AsyncRefCell::new(Box::pin(StreamReader::new(body))))
}
async fn read(
self: Rc<Self>,
data: &mut [u8],
) -> Result<usize, std::io::Error> {
let nread = match &*self {
CacheResponseResource::Sqlite(_) => {
let resource = deno_core::RcRef::map(&self, |r| match r {
Self::Sqlite(r) => r,
_ => unreachable!(),
});
let mut file = resource.borrow_mut().await;
file.read(data).await?
}
CacheResponseResource::Lsc(_) => {
let resource = deno_core::RcRef::map(&self, |r| match r {
Self::Lsc(r) => r,
_ => unreachable!(),
});
let mut file = resource.borrow_mut().await;
file.read(data).await?
}
};
Ok(nread)
}
}
impl Resource for CacheResponseResource {
deno_core::impl_readable_byob!();
fn name(&self) -> Cow<str> {
"CacheResponseResource".into()
}
}
#[op2(async)]
#[number]
pub async fn op_cache_storage_open<CA>(
pub async fn op_cache_storage_open(
state: Rc<RefCell<OpState>>,
#[string] cache_name: String,
) -> Result<i64, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<i64, CacheError> {
let cache = get_cache(&state)?;
cache.storage_open(cache_name).await
}
#[op2(async)]
pub async fn op_cache_storage_has<CA>(
pub async fn op_cache_storage_has(
state: Rc<RefCell<OpState>>,
#[string] cache_name: String,
) -> Result<bool, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<bool, CacheError> {
let cache = get_cache(&state)?;
cache.storage_has(cache_name).await
}
#[op2(async)]
pub async fn op_cache_storage_delete<CA>(
pub async fn op_cache_storage_delete(
state: Rc<RefCell<OpState>>,
#[string] cache_name: String,
) -> Result<bool, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<bool, CacheError> {
let cache = get_cache(&state)?;
cache.storage_delete(cache_name).await
}
#[op2(async)]
pub async fn op_cache_put<CA>(
pub async fn op_cache_put(
state: Rc<RefCell<OpState>>,
#[serde] request_response: CachePutRequest,
) -> Result<(), CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<(), CacheError> {
let cache = get_cache(&state)?;
let resource = match request_response.response_rid {
Some(rid) => Some(
state
@ -210,14 +359,11 @@ where
#[op2(async)]
#[serde]
pub async fn op_cache_match<CA>(
pub async fn op_cache_match(
state: Rc<RefCell<OpState>>,
#[serde] request: CacheMatchRequest,
) -> Result<Option<CacheMatchResponse>, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<Option<CacheMatchResponse>, CacheError> {
let cache = get_cache(&state)?;
match cache.r#match(request).await? {
Some((meta, None)) => Ok(Some(CacheMatchResponse(meta, None))),
Some((meta, Some(resource))) => {
@ -229,28 +375,24 @@ where
}
#[op2(async)]
pub async fn op_cache_delete<CA>(
pub async fn op_cache_delete(
state: Rc<RefCell<OpState>>,
#[serde] request: CacheDeleteRequest,
) -> Result<bool, CacheError>
where
CA: Cache,
{
let cache = get_cache::<CA>(&state)?;
) -> Result<bool, CacheError> {
let cache = get_cache(&state)?;
cache.delete(request).await
}
pub fn get_cache<CA>(state: &Rc<RefCell<OpState>>) -> Result<CA, CacheError>
where
CA: Cache,
{
pub fn get_cache(
state: &Rc<RefCell<OpState>>,
) -> Result<CacheImpl, CacheError> {
let mut state = state.borrow_mut();
if let Some(cache) = state.try_borrow::<CA>() {
if let Some(cache) = state.try_borrow::<CacheImpl>() {
Ok(cache.clone())
} else if let Some(create_cache) = state.try_borrow::<CreateCache<CA>>() {
} else if let Some(create_cache) = state.try_borrow::<CreateCache>() {
let cache = create_cache.0()?;
state.put(cache);
Ok(state.borrow::<CA>().clone())
Ok(state.borrow::<CacheImpl>().clone())
} else {
Err(CacheError::ContextUnsupported)
}

77
ext/cache/lsc_shard.rs vendored Normal file
View file

@ -0,0 +1,77 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use hyper::header::AUTHORIZATION;
use hyper::HeaderMap;
use hyper::StatusCode;
use crate::CacheError;
pub struct CacheShard {
client: reqwest::Client,
endpoint: String,
token: String,
}
impl CacheShard {
pub fn new(endpoint: String, token: String) -> Self {
let client = reqwest::Client::builder()
.build()
.expect("Failed to build reqwest client");
Self {
client,
endpoint,
token,
}
}
pub async fn get_object(
&self,
object_key: &str,
) -> Result<Option<reqwest::Response>, CacheError> {
let res = self
.client
.get(format!("{}/objects/{}", self.endpoint, object_key))
.header(&AUTHORIZATION, format!("Bearer {}", self.token))
.header("x-ryw", "1")
.send()
.await
.map_err(|e| e.without_url())?;
if res.status().is_success() {
Ok(Some(res))
} else if res.status() == StatusCode::NOT_FOUND {
Ok(None)
} else {
Err(CacheError::RequestFailed {
method: "GET",
status: res.status(),
})
}
}
pub async fn put_object(
&self,
object_key: &str,
headers: HeaderMap,
body: reqwest::Body,
) -> Result<(), CacheError> {
let res = self
.client
.put(format!("{}/objects/{}", self.endpoint, object_key))
.headers(headers)
.header(&AUTHORIZATION, format!("Bearer {}", self.token))
.body(body)
.send()
.await
.map_err(|e| e.without_url())?;
if res.status().is_success() {
Ok(())
} else {
Err(CacheError::RequestFailed {
method: "GET",
status: res.status(),
})
}
}
}

332
ext/cache/lscache.rs vendored Normal file
View file

@ -0,0 +1,332 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use std::cell::RefCell;
use std::rc::Rc;
use async_stream::try_stream;
use base64::Engine;
use bytes::Bytes;
use deno_core::unsync::spawn;
use deno_core::BufMutView;
use deno_core::ByteString;
use deno_core::Resource;
use futures::StreamExt;
use futures::TryStreamExt;
use http::header::VARY;
use http::HeaderMap;
use http::HeaderName;
use http::HeaderValue;
use slab::Slab;
use crate::get_header;
use crate::get_headers_from_vary_header;
use crate::lsc_shard::CacheShard;
use crate::CacheDeleteRequest;
use crate::CacheError;
use crate::CacheMatchRequest;
use crate::CacheMatchResponseMeta;
use crate::CachePutRequest;
use crate::CacheResponseResource;
const REQHDR_PREFIX: &str = "x-lsc-meta-reqhdr-";
#[derive(Clone, Default)]
pub struct LscBackend {
shard: Rc<RefCell<Option<Rc<CacheShard>>>>,
id2name: Rc<RefCell<Slab<String>>>,
}
impl LscBackend {
pub fn set_shard(&self, shard: Rc<CacheShard>) {
*self.shard.borrow_mut() = Some(shard);
}
}
#[allow(clippy::unused_async)]
impl LscBackend {
/// Open a cache storage. Internally, this allocates an id and maps it
/// to the provided cache name.
pub async fn storage_open(
&self,
cache_name: String,
) -> Result<i64, CacheError> {
if cache_name.is_empty() {
return Err(CacheError::EmptyName);
}
let id = self.id2name.borrow_mut().insert(cache_name);
Ok(id as i64)
}
/// Check if a cache with the provided name exists. Always returns `true`.
pub async fn storage_has(
&self,
_cache_name: String,
) -> Result<bool, CacheError> {
Ok(true)
}
/// Delete a cache storage. Not yet implemented.
pub async fn storage_delete(
&self,
_cache_name: String,
) -> Result<bool, CacheError> {
Err(CacheError::DeletionNotSupported)
}
/// Writes an entry to the cache.
pub async fn put(
&self,
request_response: CachePutRequest,
resource: Option<Rc<dyn Resource>>,
) -> Result<(), CacheError> {
let Some(shard) = self.shard.borrow().as_ref().cloned() else {
return Err(CacheError::NotAvailable);
};
let Some(cache_name) = self
.id2name
.borrow_mut()
.get(request_response.cache_id as usize)
.cloned()
else {
return Err(CacheError::NotFound);
};
let object_key = build_cache_object_key(
cache_name.as_bytes(),
request_response.request_url.as_bytes(),
);
let mut headers = HeaderMap::new();
for hdr in &request_response.request_headers {
headers.insert(
HeaderName::from_bytes(
&[REQHDR_PREFIX.as_bytes(), &hdr.0[..]].concat(),
)?,
HeaderValue::from_bytes(&hdr.1[..])?,
);
}
for hdr in &request_response.response_headers {
if hdr.0.starts_with(b"x-lsc-meta-") {
continue;
}
if hdr.0[..] == b"content-encoding"[..] {
return Err(CacheError::ContentEncodingNotAllowed);
}
headers.insert(
HeaderName::from_bytes(&hdr.0[..])?,
HeaderValue::from_bytes(&hdr.1[..])?,
);
}
headers.insert(
HeaderName::from_bytes(b"x-lsc-meta-cached-at")?,
HeaderValue::from_bytes(
chrono::Utc::now()
.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
.as_bytes(),
)?,
);
let body = try_stream! {
if let Some(resource) = resource {
loop {
let (size, buf) = resource.clone().read_byob(BufMutView::new(64 * 1024)).await.map_err(CacheError::Other)?;
if size == 0 {
break;
}
yield Bytes::copy_from_slice(&buf[..size]);
}
}
};
let (body_tx, body_rx) = futures::channel::mpsc::channel(4);
spawn(body.map(Ok::<Result<_, CacheError>, _>).forward(body_tx));
let body = reqwest::Body::wrap_stream(body_rx);
shard.put_object(&object_key, headers, body).await?;
Ok(())
}
/// Matches a request against the cache.
pub async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
Option<(CacheMatchResponseMeta, Option<CacheResponseResource>)>,
CacheError,
> {
let Some(shard) = self.shard.borrow().as_ref().cloned() else {
return Err(CacheError::NotAvailable);
};
let Some(cache_name) = self
.id2name
.borrow()
.get(request.cache_id as usize)
.cloned()
else {
return Err(CacheError::NotFound);
};
let object_key = build_cache_object_key(
cache_name.as_bytes(),
request.request_url.as_bytes(),
);
let Some(res) = shard.get_object(&object_key).await? else {
return Ok(None);
};
// Is this a tombstone?
if res.headers().contains_key("x-lsc-meta-deleted-at") {
return Ok(None);
}
// From https://w3c.github.io/ServiceWorker/#request-matches-cached-item-algorithm
// If there's Vary header in the response, ensure all the
// headers of the cached request match the query request.
if let Some(vary_header) = res.headers().get(&VARY) {
if !vary_header_matches(
vary_header.as_bytes(),
&request.request_headers,
res.headers(),
) {
return Ok(None);
}
}
let mut response_headers: Vec<(ByteString, ByteString)> = res
.headers()
.iter()
.filter_map(|(k, v)| {
if k.as_str().starts_with("x-lsc-meta-") || k.as_str() == "x-ryw" {
None
} else {
Some((k.as_str().into(), v.as_bytes().into()))
}
})
.collect();
if let Some(x) = res
.headers()
.get("x-lsc-meta-cached-at")
.and_then(|x| x.to_str().ok())
{
if let Ok(cached_at) = chrono::DateTime::parse_from_rfc3339(x) {
let age = chrono::Utc::now()
.signed_duration_since(cached_at)
.num_seconds();
if age >= 0 {
response_headers.push(("age".into(), age.to_string().into()));
}
}
}
let meta = CacheMatchResponseMeta {
response_status: res.status().as_u16(),
response_status_text: res
.status()
.canonical_reason()
.unwrap_or("")
.to_string(),
request_headers: res
.headers()
.iter()
.filter_map(|(k, v)| {
let reqhdr_prefix = REQHDR_PREFIX.as_bytes();
if k.as_str().as_bytes().starts_with(reqhdr_prefix) {
Some((
k.as_str().as_bytes()[REQHDR_PREFIX.len()..].into(),
v.as_bytes().into(),
))
} else {
None
}
})
.collect(),
response_headers,
};
let body = CacheResponseResource::lsc(
res
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);
Ok(Some((meta, Some(body))))
}
pub async fn delete(
&self,
request: CacheDeleteRequest,
) -> Result<bool, CacheError> {
let Some(shard) = self.shard.borrow().as_ref().cloned() else {
return Err(CacheError::NotAvailable);
};
let Some(cache_name) = self
.id2name
.borrow_mut()
.get(request.cache_id as usize)
.cloned()
else {
return Err(CacheError::NotFound);
};
let object_key = build_cache_object_key(
cache_name.as_bytes(),
request.request_url.as_bytes(),
);
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_bytes(b"expires")?,
HeaderValue::from_bytes(b"Thu, 01 Jan 1970 00:00:00 GMT")?,
);
headers.insert(
HeaderName::from_bytes(b"x-lsc-meta-deleted-at")?,
HeaderValue::from_bytes(
chrono::Utc::now()
.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
.as_bytes(),
)?,
);
shard
.put_object(&object_key, headers, reqwest::Body::from(&[][..]))
.await?;
Ok(true)
}
}
impl deno_core::Resource for LscBackend {
fn name(&self) -> std::borrow::Cow<str> {
"LscBackend".into()
}
}
fn vary_header_matches(
vary_header: &[u8],
query_request_headers: &[(ByteString, ByteString)],
cached_headers: &HeaderMap,
) -> bool {
let vary_header = match std::str::from_utf8(vary_header) {
Ok(vary_header) => vary_header,
Err(_) => return false,
};
let headers = get_headers_from_vary_header(vary_header);
for header in headers {
// Ignoring `accept-encoding` is safe because we refuse to cache responses
// with `content-encoding`
if header == "accept-encoding" {
continue;
}
let lookup_key = format!("{}{}", REQHDR_PREFIX, header);
let query_header = get_header(&header, query_request_headers);
let cached_header = cached_headers.get(&lookup_key);
if query_header.as_ref().map(|x| &x[..])
!= cached_header.as_ref().map(|x| x.as_bytes())
{
return false;
}
}
true
}
fn build_cache_object_key(cache_name: &[u8], request_url: &[u8]) -> String {
format!(
"v1/{}/{}",
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(cache_name),
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(request_url),
)
}

65
ext/cache/sqlite.rs vendored
View file

@ -1,5 +1,4 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use std::borrow::Cow;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
@ -7,19 +6,15 @@ use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use async_trait::async_trait;
use deno_core::futures::future::poll_fn;
use deno_core::parking_lot::Mutex;
use deno_core::unsync::spawn_blocking;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufMutView;
use deno_core::ByteString;
use deno_core::Resource;
use rusqlite::params;
use rusqlite::Connection;
use rusqlite::OptionalExtension;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
@ -27,12 +22,12 @@ use crate::deserialize_headers;
use crate::get_header;
use crate::serialize_headers;
use crate::vary_header_matches;
use crate::Cache;
use crate::CacheDeleteRequest;
use crate::CacheError;
use crate::CacheMatchRequest;
use crate::CacheMatchResponseMeta;
use crate::CachePutRequest;
use crate::CacheResponseResource;
#[derive(Clone)]
pub struct SqliteBackedCache {
@ -94,14 +89,14 @@ impl SqliteBackedCache {
}
}
#[async_trait(?Send)]
impl Cache for SqliteBackedCache {
type CacheMatchResourceType = CacheResponseResource;
impl SqliteBackedCache {
/// Open a cache storage. Internally, this creates a row in the
/// sqlite db if the cache doesn't exist and returns the internal id
/// of the cache.
async fn storage_open(&self, cache_name: String) -> Result<i64, CacheError> {
pub async fn storage_open(
&self,
cache_name: String,
) -> Result<i64, CacheError> {
let db = self.connection.clone();
let cache_storage_dir = self.cache_storage_dir.clone();
spawn_blocking(move || {
@ -127,7 +122,10 @@ impl Cache for SqliteBackedCache {
/// Check if a cache with the provided name exists.
/// Note: this doesn't check the disk, it only checks the sqlite db.
async fn storage_has(&self, cache_name: String) -> Result<bool, CacheError> {
pub async fn storage_has(
&self,
cache_name: String,
) -> Result<bool, CacheError> {
let db = self.connection.clone();
spawn_blocking(move || {
let db = db.lock();
@ -145,7 +143,7 @@ impl Cache for SqliteBackedCache {
}
/// Delete a cache storage. Internally, this deletes the row in the sqlite db.
async fn storage_delete(
pub async fn storage_delete(
&self,
cache_name: String,
) -> Result<bool, CacheError> {
@ -174,7 +172,7 @@ impl Cache for SqliteBackedCache {
.await?
}
async fn put(
pub async fn put(
&self,
request_response: CachePutRequest,
resource: Option<Rc<dyn Resource>>,
@ -227,7 +225,7 @@ impl Cache for SqliteBackedCache {
Ok(())
}
async fn r#match(
pub async fn r#match(
&self,
request: CacheMatchRequest,
) -> Result<
@ -298,14 +296,17 @@ impl Cache for SqliteBackedCache {
}
Err(err) => return Err(err.into()),
};
Ok(Some((cache_meta, Some(CacheResponseResource::new(file)))))
Ok(Some((
cache_meta,
Some(CacheResponseResource::sqlite(file)),
)))
}
Some((cache_meta, None)) => Ok(Some((cache_meta, None))),
None => Ok(None),
}
}
async fn delete(
pub async fn delete(
&self,
request: CacheDeleteRequest,
) -> Result<bool, CacheError> {
@ -370,36 +371,6 @@ impl deno_core::Resource for SqliteBackedCache {
}
}
pub struct CacheResponseResource {
file: AsyncRefCell<tokio::fs::File>,
}
impl CacheResponseResource {
fn new(file: tokio::fs::File) -> Self {
Self {
file: AsyncRefCell::new(file),
}
}
async fn read(
self: Rc<Self>,
data: &mut [u8],
) -> Result<usize, std::io::Error> {
let resource = deno_core::RcRef::map(&self, |r| &r.file);
let mut file = resource.borrow_mut().await;
let nread = file.read(data).await?;
Ok(nread)
}
}
impl Resource for CacheResponseResource {
deno_core::impl_readable_byob!();
fn name(&self) -> Cow<str> {
"CacheResponseResource".into()
}
}
pub fn hash(token: &str) -> String {
use sha2::Digest;
format!("{:x}", sha2::Sha256::digest(token.as_bytes()))

View file

@ -7,7 +7,6 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use deno_cache::SqliteBackedCache;
use deno_core::snapshot::*;
use deno_core::v8;
use deno_core::Extension;
@ -283,7 +282,7 @@ pub fn create_runtime_snapshot(
deno_webgpu::deno_webgpu::init_ops_and_esm(),
deno_canvas::deno_canvas::init_ops_and_esm(),
deno_fetch::deno_fetch::init_ops_and_esm::<Permissions>(Default::default()),
deno_cache::deno_cache::init_ops_and_esm::<SqliteBackedCache>(None),
deno_cache::deno_cache::init_ops_and_esm(None),
deno_websocket::deno_websocket::init_ops_and_esm::<Permissions>(
"".to_owned(),
None,

View file

@ -11,6 +11,7 @@ use std::task::Context;
use std::task::Poll;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_cache::CacheImpl;
use deno_cache::CreateCache;
use deno_cache::SqliteBackedCache;
use deno_core::error::CoreError;
@ -376,6 +377,7 @@ pub struct WebWorkerOptions {
pub format_js_error_fn: Option<Arc<FormatJsErrorFn>>,
pub worker_type: WebWorkerType,
pub cache_storage_dir: Option<std::path::PathBuf>,
pub use_lsc_cache: bool,
pub stdio: Stdio,
pub strace_ops: Option<Vec<String>>,
pub close_on_idle: bool,
@ -453,10 +455,39 @@ impl WebWorker {
// Permissions: many ops depend on this
let enable_testing_features = options.bootstrap.enable_testing_features;
let create_cache = options.cache_storage_dir.map(|storage_dir| {
let create_cache_fn = move || SqliteBackedCache::new(storage_dir.clone());
CreateCache(Arc::new(create_cache_fn))
});
fn create_cache_inner(options: &WebWorkerOptions) -> Option<CreateCache> {
if let Some(storage_dir) = &options.cache_storage_dir {
let storage_dir = storage_dir.clone();
let create_cache_fn = move || {
let s = SqliteBackedCache::new(storage_dir.clone())?;
Ok(CacheImpl::Sqlite(s))
};
return Some(CreateCache(Arc::new(create_cache_fn)));
}
if options.use_lsc_cache {
use deno_cache::CacheShard;
let Ok(endpoint) = std::env::var("LSC_ENDPOINT") else {
return None;
};
let Ok(token) = std::env::var("LSC_TOKEN") else {
return None;
};
let shard = Rc::new(CacheShard::new(endpoint, token));
let create_cache_fn = move || {
let x = deno_cache::LscBackend::default();
x.set_shard(shard.clone());
Ok(CacheImpl::Lsc(x))
};
#[allow(clippy::arc_with_non_send_sync)]
return Some(CreateCache(Arc::new(create_cache_fn)));
}
None
}
let create_cache = create_cache_inner(&options);
// NOTE(bartlomieju): ordering is important here, keep it in sync with
// `runtime/worker.rs` and `runtime/snapshot.rs`!
@ -484,9 +515,7 @@ impl WebWorker {
..Default::default()
},
),
deno_cache::deno_cache::init_ops_and_esm::<SqliteBackedCache>(
create_cache,
),
deno_cache::deno_cache::init_ops_and_esm(create_cache),
deno_websocket::deno_websocket::init_ops_and_esm::<PermissionsContainer>(
options.bootstrap.user_agent.clone(),
services.root_cert_store_provider.clone(),

View file

@ -8,6 +8,7 @@ use std::time::Duration;
use std::time::Instant;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_cache::CacheImpl;
use deno_cache::CreateCache;
use deno_cache::SqliteBackedCache;
use deno_core::error::CoreError;
@ -200,6 +201,7 @@ pub struct WorkerOptions {
pub strace_ops: Option<Vec<String>>,
pub cache_storage_dir: Option<std::path::PathBuf>,
pub use_lsc_cache: bool,
pub origin_storage_dir: Option<std::path::PathBuf>,
pub stdio: Stdio,
pub enable_stack_trace_arg_in_ops: bool,
@ -221,6 +223,7 @@ impl Default for WorkerOptions {
format_js_error_fn: Default::default(),
origin_storage_dir: Default::default(),
cache_storage_dir: Default::default(),
use_lsc_cache: false,
extensions: Default::default(),
startup_snapshot: Default::default(),
create_params: Default::default(),
@ -341,6 +344,39 @@ impl MainWorker {
},
);
fn create_cache_inner(options: &WorkerOptions) -> Option<CreateCache> {
if let Some(storage_dir) = &options.cache_storage_dir {
let storage_dir = storage_dir.clone();
let create_cache_fn = move || {
let s = SqliteBackedCache::new(storage_dir.clone())?;
Ok(CacheImpl::Sqlite(s))
};
return Some(CreateCache(Arc::new(create_cache_fn)));
}
if options.use_lsc_cache {
use deno_cache::CacheShard;
let Ok(endpoint) = std::env::var("LSC_ENDPOINT") else {
return None;
};
let Ok(token) = std::env::var("LSC_TOKEN") else {
return None;
};
let shard = Rc::new(CacheShard::new(endpoint, token));
let create_cache_fn = move || {
let x = deno_cache::LscBackend::default();
x.set_shard(shard.clone());
Ok(CacheImpl::Lsc(x))
};
#[allow(clippy::arc_with_non_send_sync)]
return Some(CreateCache(Arc::new(create_cache_fn)));
}
None
}
let create_cache = create_cache_inner(&options);
// Get our op metrics
let (op_summary_metrics, op_metrics_factory_fn) = create_op_metrics(
options.bootstrap.enable_op_summary_metrics,
@ -350,10 +386,6 @@ impl MainWorker {
// Permissions: many ops depend on this
let enable_testing_features = options.bootstrap.enable_testing_features;
let exit_code = ExitCode::default();
let create_cache = options.cache_storage_dir.map(|storage_dir| {
let create_cache_fn = move || SqliteBackedCache::new(storage_dir.clone());
CreateCache(Arc::new(create_cache_fn))
});
// NOTE(bartlomieju): ordering is important here, keep it in sync with
// `runtime/web_worker.rs` and `runtime/snapshot.rs`!
@ -381,9 +413,7 @@ impl MainWorker {
..Default::default()
},
),
deno_cache::deno_cache::init_ops_and_esm::<SqliteBackedCache>(
create_cache,
),
deno_cache::deno_cache::init_ops_and_esm(create_cache),
deno_websocket::deno_websocket::init_ops_and_esm::<PermissionsContainer>(
options.bootstrap.user_agent.clone(),
services.root_cert_store_provider.clone(),