2023-08-22 13:56:00 +08:00
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use crate::proto::datapath as pb;
use crate::AtomicWrite;
use crate::CommitResult;
use crate::Database;
use crate::DatabaseHandler;
use crate::KvEntry;
use crate::MutationKind;
use crate::QueueMessageHandle;
use crate::ReadRange;
use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use anyhow::Context;
use async_trait::async_trait;
use chrono::DateTime;
use chrono::Utc;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::TryFutureExt;
2023-08-23 17:03:05 -06:00
use deno_core::unsync::JoinHandle;
2023-08-22 13:56:00 +08:00
use deno_core::OpState;
use prost::Message;
use rand::Rng;
use serde::Deserialize;
use tokio::sync::watch;
use url::Url;
use uuid::Uuid;
pub trait RemoteDbHandlerPermissions {
fn check_env(&mut self, var: &str) -> Result<(), AnyError>;
fn check_net_url(
&mut self,
url: &Url,
api_name: &str,
) -> Result<(), AnyError>;
pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
_p: std::marker::PhantomData<P>,
impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
pub fn new() -> Self {
Self { _p: PhantomData }
impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> {
fn default() -> Self {
struct VersionInfo {
version: u64,
#[serde(rename_all = "camelCase")]
struct DatabaseMetadata {
version: u64,
database_id: Uuid,
endpoints: Vec<EndpointInfo>,
token: String,
expires_at: DateTime<Utc>,
#[serde(rename_all = "camelCase")]
pub struct EndpointInfo {
pub url: String,
// Using `String` instead of an enum, so that parsing doesn't
// break if more consistency levels are added.
pub consistency: String,
impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
type DB = RemoteDb<P>;
async fn open(
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Self::DB, AnyError> {
let Some(url) = path else {
return Err(type_error("Missing database url"));
let Ok(parsed_url) = Url::parse(&url) else {
return Err(type_error(format!("Invalid database url: {}", url)));
let mut state = state.borrow_mut();
let permissions = state.borrow_mut::<P>();
permissions.check_net_url(&parsed_url, "Deno.openKv")?;
let access_token = std::env::var(ENV_VAR_NAME)
.with_context(|| {
"Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
let refresher = MetadataRefresher::new(url, access_token);
let db = RemoteDb {
client: reqwest::Client::new(),
_p: PhantomData,
pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> {
client: reqwest::Client,
refresher: MetadataRefresher,
_p: std::marker::PhantomData<P>,
pub struct DummyQueueMessageHandle {}
impl QueueMessageHandle for DummyQueueMessageHandle {
async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
async fn finish(&self, _success: bool) -> Result<(), AnyError> {
impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
type QMH = DummyQueueMessageHandle;
async fn snapshot_read(
state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
let req = pb::SnapshotRead {
ranges: requests
.map(|r| pb::ReadRange {
start: r.start,
end: r.end,
limit: r.limit.get() as _,
reverse: r.reverse,
let res: pb::SnapshotReadOutput = call_remote::<P, _, _>(
if res.read_disabled {
return Err(type_error("Reads are disabled for this database."));
let out = res
.map(|r| {
Ok(ReadRangeOutput {
entries: r
.map(|e| {
let encoding = e.encoding();
Ok(KvEntry {
key: e.key,
value: decode_value(e.value, encoding)?,
versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?,
.collect::<Result<_, AnyError>>()?,
.collect::<Result<Vec<_>, AnyError>>()?;
async fn atomic_write(
state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
if !write.enqueues.is_empty() {
return Err(type_error("Enqueue operations are not supported yet."));
let req = pb::AtomicWrite {
kv_checks: write
.map(|x| {
Ok(pb::KvCheck {
key: x.key,
versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(),
kv_mutations: write
.map(|x| encode_mutation(x.key, x.kind))
enqueues: vec![],
let res: pb::AtomicWriteOutput = call_remote::<P, _, _>(
match res.status() {
pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult {
versionstamp: if res.versionstamp.is_empty() {
} else {
pb::AtomicWriteStatus::AwCheckFailure => Ok(None),
pb::AtomicWriteStatus::AwUnsupportedWrite => {
Err(type_error("Unsupported write"))
pb::AtomicWriteStatus::AwUsageLimitExceeded => {
Err(type_error("The database usage limit has been exceeded."))
pb::AtomicWriteStatus::AwWriteDisabled => {
// TODO: Auto retry
Err(type_error("Writes are disabled for this database."))
pb::AtomicWriteStatus::AwUnspecified => {
Err(type_error("Unspecified error"))
pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => {
Err(type_error("Queue backlog limit exceeded"))
async fn dequeue_next_message(
_state: Rc<RefCell<OpState>>,
) -> Result<Self::QMH, AnyError> {
fn close(&self) {}
fn decode_value(
value: Vec<u8>,
encoding: pb::KvValueEncoding,
) -> anyhow::Result<crate::Value> {
match encoding {
pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)),
pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)),
pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes(
<[u8; 8]>::try_from(&value[..])?,
pb::KvValueEncoding::VeUnspecified => {
Err(anyhow::anyhow!("Unspecified value encoding, cannot decode"))
fn encode_value(value: crate::Value) -> pb::KvValue {
match value {
crate::Value::V8(data) => pb::KvValue {
encoding: pb::KvValueEncoding::VeV8 as _,
crate::Value::Bytes(data) => pb::KvValue {
encoding: pb::KvValueEncoding::VeBytes as _,
crate::Value::U64(x) => pb::KvValue {
data: x.to_le_bytes().to_vec(),
encoding: pb::KvValueEncoding::VeLe64 as _,
fn encode_mutation(key: Vec<u8>, mutation: MutationKind) -> pb::KvMutation {
match mutation {
MutationKind::Set(x) => pb::KvMutation {
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MSet as _,
MutationKind::Delete => pb::KvMutation {
value: Some(encode_value(crate::Value::Bytes(vec![]))),
mutation_type: pb::KvMutationType::MClear as _,
MutationKind::Max(x) => pb::KvMutation {
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MMax as _,
MutationKind::Min(x) => pb::KvMutation {
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MMin as _,
MutationKind::Sum(x) => pb::KvMutation {
value: Some(encode_value(x)),
mutation_type: pb::KvMutationType::MSum as _,
enum MetadataState {
struct MetadataRefresher {
metadata_rx: watch::Receiver<MetadataState>,
handle: JoinHandle<()>,
impl MetadataRefresher {
pub fn new(url: String, access_token: String) -> Self {
let (tx, rx) = watch::channel(MetadataState::Pending);
let handle =
2023-08-23 17:03:05 -06:00
deno_core::unsync::spawn(metadata_refresh_task(url, access_token, tx));
2023-08-22 13:56:00 +08:00
Self {
metadata_rx: rx,
impl Drop for MetadataRefresher {
fn drop(&mut self) {
async fn metadata_refresh_task(
metadata_url: String,
access_token: String,
tx: watch::Sender<MetadataState>,
) {
let client = reqwest::Client::new();
loop {
let mut attempt = 0u64;
let metadata = loop {
match fetch_metadata(&client, &metadata_url, &access_token).await {
Ok(Ok(x)) => break x,
Ok(Err(e)) => {
if tx.send(MetadataState::Invalid(e)).is_err() {
Err(e) => {
log::error!("Failed to fetch database metadata: {}", e);
randomized_exponential_backoff(Duration::from_secs(5), attempt).await;
attempt += 1;
let ms_until_expire = u64::try_from(
// Refresh 10 minutes before expiry
// In case of buggy clocks, don't refresh more than once per minute
let interval = Duration::from_millis(ms_until_expire)
if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() {
async fn fetch_metadata(
client: &reqwest::Client,
metadata_url: &str,
access_token: &str,
) -> anyhow::Result<Result<DatabaseMetadata, String>> {
let res = client
.header("authorization", format!("Bearer {}", access_token))
if !res.status().is_success() {
if res.status().is_client_error() {
return Ok(Err(format!(
"Client error while fetching metadata: {:?} {}",
} else {
"remote returned error: {:?} {}",
let res = res.bytes().await?;
let version_info: VersionInfo = match serde_json::from_slice(&res) {
Ok(x) => x,
Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))),
if version_info.version > 1 {
return Ok(Err(format!(
"Unsupported metadata version: {}",
.map_err(|e| format!("Failed to decode metadata: {}", e)),
async fn randomized_exponential_backoff(base: Duration, attempt: u64) {
let attempt = attempt.min(12);
let delay = base.as_millis() as u64 + (2 << attempt);
let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1);
async fn call_remote<
P: RemoteDbHandlerPermissions + 'static,
T: Message,
R: Message + Default,
state: &RefCell<OpState>,
refresher: &MetadataRefresher,
client: &reqwest::Client,
method: &str,
req: &T,
) -> anyhow::Result<R> {
let mut attempt = 0u64;
let res = loop {
let mut metadata_rx = refresher.metadata_rx.clone();
let metadata = loop {
match &*metadata_rx.borrow() {
MetadataState::Pending => {}
MetadataState::Ready(x) => break x.clone(),
MetadataState::Invalid(e) => {
return Err(type_error(format!("Metadata error: {}", e)))
// `unwrap()` never fails because `tx` is owned by the task held by `refresher`.
2023-08-27 12:04:12 +08:00
let Some(sc_endpoint) = metadata
.find(|x| x.consistency == "strong")
else {
return Err(type_error(
"No strong consistency endpoint is available for this database",
2023-08-22 13:56:00 +08:00
let full_url = format!("{}/{}", sc_endpoint.url, method);
let parsed_url = Url::parse(&full_url)?;
let mut state = state.borrow_mut();
let permissions = state.borrow_mut::<P>();
permissions.check_net_url(&parsed_url, "Deno.Kv")?;
let res = client
.header("x-transaction-domain-id", metadata.database_id.to_string())
.header("authorization", format!("Bearer {}", metadata.token))
.and_then(|x| async move {
if x.status().is_success() {
} else if x.status().is_client_error() {
Ok(Err((x.status(), x.text().await?)))
} else {
"server error ({:?}): {}",
match res {
Ok(x) => break x,
Err(e) => {
log::error!("retryable error in {}: {}", method, e);
randomized_exponential_backoff(Duration::from_millis(0), attempt).await;
attempt += 1;
let res = match res {
Ok(x) => x,
Err((status, message)) => {
return Err(type_error(format!(
"client error in {} (status {:?}): {}",
method, status, message
match R::decode(&*res) {
Ok(x) => Ok(x),
Err(e) => Err(type_error(format!(
"failed to decode response from {}: {}",
method, e