This PR adds unstable `Deno.cron` API to trigger execution of cron jobs.

* State: All cron state is in memory. Cron jobs are scheduled according
to the cron schedule expression and the current time. No state is
persisted to disk.
* Time zone: Cron expressions specify time in UTC.
* Overlapping executions: not permitted. If the next scheduled execution
time occurs while the same cron job is still executing, the scheduled
execution is skipped.
* Retries: failed jobs are automatically retried until they succeed or
until retry threshold is reached. Retry policy can be optionally
specified using `options.backoffSchedule`.
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::OnceCell;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::env;
use std::rc::Rc;
use std::rc::Weak;
use std::sync::Arc;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures;
use deno_core::futures::FutureExt;
use deno_unsync::spawn;
use deno_unsync::JoinHandle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::WeakSender;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;
use crate::CronHandle;
use crate::CronHandler;
use crate::CronSpec;
const MAX_CRONS: usize = 100;
const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour
const MAX_BACKOFF_COUNT: usize = 5;
const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000];
pub struct LocalCronHandler {
cron_schedule_tx: OnceCell<mpsc::Sender<(String, bool)>>,
concurrency_limiter: Arc<Semaphore>,
cron_loop_join_handle: OnceCell<JoinHandle<()>>,
runtime_state: Rc<RefCell<RuntimeState>>,
struct RuntimeState {
crons: HashMap<String, Cron>,
scheduled_deadlines: BTreeMap<u64, Vec<String>>,
struct Cron {
spec: CronSpec,
next_tx: mpsc::WeakSender<()>,
current_execution_retries: u32,
impl Cron {
fn backoff_schedule(&self) -> &[u32] {
impl Default for LocalCronHandler {
fn default() -> Self {
impl LocalCronHandler {
pub fn new() -> Self {
Self {
cron_schedule_tx: OnceCell::new(),
concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)),
cron_loop_join_handle: OnceCell::new(),
runtime_state: Rc::new(RefCell::new(RuntimeState {
crons: HashMap::new(),
scheduled_deadlines: BTreeMap::new(),
async fn cron_loop(
runtime_state: Rc<RefCell<RuntimeState>>,
mut cron_schedule_rx: mpsc::Receiver<(String, bool)>,
) -> Result<(), AnyError> {
loop {
let earliest_deadline = runtime_state
let sleep_fut = if let Some(earliest_deadline) = earliest_deadline {
let now = crate::time::utc_now().timestamp_millis() as u64;
if let Some(delta) = earliest_deadline.checked_sub(now) {
} else {
} else {
let cron_to_schedule = tokio::select! {
_ = sleep_fut => None,
x = cron_schedule_rx.recv() => {
if x.is_none() {
return Ok(());
// Schedule next execution of the cron if needed.
if let Some((name, prev_success)) = cron_to_schedule {
let mut runtime_state = runtime_state.borrow_mut();
if let Some(cron) = runtime_state.crons.get_mut(&name) {
let backoff_schedule = cron.backoff_schedule();
let next_deadline = if !prev_success
&& cron.current_execution_retries < backoff_schedule.len() as u32
let backoff_ms =
backoff_schedule[cron.current_execution_retries as usize];
let now = crate::time::utc_now().timestamp_millis() as u64;
cron.current_execution_retries += 1;
now + backoff_ms as u64
} else {
let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?;
cron.current_execution_retries = 0;
// Dispatch ready to execute crons.
let crons_to_execute = {
let mut runtime_state = runtime_state.borrow_mut();
for (_, tx) in crons_to_execute {
if let Some(tx) = tx.upgrade() {
let _ = tx.send(()).await;
impl RuntimeState {
fn get_ready_crons(
&mut self,
) -> Result<Vec<(String, WeakSender<()>)>, AnyError> {
let now = crate::time::utc_now().timestamp_millis() as u64;
let ready = {
let to_remove = self
.map(|(ts, _)| *ts)
.flat_map(|ts| {
.map(move |name| (*ts, name.clone()))
.map(|(_, name)| {
(name.clone(), self.crons.get(&name).unwrap().next_tx.clone())
impl CronHandler for LocalCronHandler {
type EH = CronExecutionHandle;
fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError> {
// Ensure that the cron loop is started.
self.cron_loop_join_handle.get_or_init(|| {
let (cron_schedule_tx, cron_schedule_rx) =
mpsc::channel::<(String, bool)>(1);
let runtime_state = self.runtime_state.clone();
spawn(async move {
LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx)
let mut runtime_state = self.runtime_state.borrow_mut();
if runtime_state.crons.len() > MAX_CRONS {
return Err(type_error("Too many crons"));
if runtime_state.crons.contains_key(&spec.name) {
return Err(type_error("Cron with this name already exists"));
// Validate schedule expression.
.map_err(|_| type_error("Invalid cron schedule"))?;
// Validate backoff_schedule.
if let Some(backoff_schedule) = &spec.backoff_schedule {
let (next_tx, next_rx) = mpsc::channel::<()>(1);
let cron = Cron {
spec: spec.clone(),
next_tx: next_tx.downgrade(),
current_execution_retries: 0,
runtime_state.crons.insert(spec.name.clone(), cron);
Ok(CronExecutionHandle {
name: spec.name.clone(),
cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(),
concurrency_limiter: self.concurrency_limiter.clone(),
runtime_state: Rc::downgrade(&self.runtime_state),
inner: RefCell::new(Inner {
next_rx: Some(next_rx),
shutdown_tx: Some(next_tx),
permit: None,
pub struct CronExecutionHandle {
name: String,
runtime_state: Weak<RefCell<RuntimeState>>,
cron_schedule_tx: mpsc::Sender<(String, bool)>,
concurrency_limiter: Arc<Semaphore>,
inner: RefCell<Inner>,
struct Inner {
next_rx: Option<mpsc::Receiver<()>>,
shutdown_tx: Option<mpsc::Sender<()>>,
permit: Option<OwnedSemaphorePermit>,
impl CronHandle for CronExecutionHandle {
async fn next(&self, prev_success: bool) -> Result<bool, AnyError> {
if self
.send((self.name.clone(), prev_success))
return Ok(false);
let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else {
return Ok(false);
if next_rx.recv().await.is_none() {
return Ok(false);
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
let mut inner = self.inner.borrow_mut();
inner.next_rx = Some(next_rx);
inner.permit = Some(permit);
fn close(&self) {
if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() {
if let Some(runtime_state) = self.runtime_state.upgrade() {
let mut runtime_state = runtime_state.borrow_mut();
fn compute_next_deadline(cron_expression: &str) -> Result<u64, AnyError> {
let now = crate::time::utc_now();
if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") {
if let Ok(offset) = test_schedule.parse::<u64>() {
return Ok(now.timestamp_millis() as u64 + offset);
let cron = cron_expression
.map_err(|_| anyhow::anyhow!("invalid cron expression"))?;
let Some(next_deadline) = cron.next_after(now) else {
return Err(anyhow::anyhow!("invalid cron expression"));
Ok(next_deadline.timestamp_millis() as u64)
fn validate_backoff_schedule(
backoff_schedule: &Vec<u32>,
) -> Result<(), AnyError> {
if backoff_schedule.len() > MAX_BACKOFF_COUNT {
return Err(type_error("Invalid backoff schedule"));
if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) {
return Err(type_error("Invalid backoff schedule"));
mod tests {
use super::*;
fn test_compute_next_deadline() {
let now = crate::time::utc_now().timestamp_millis() as u64;
assert!(compute_next_deadline("*/1 * * * *").unwrap() > now);
assert!(compute_next_deadline("* * * * *").unwrap() > now);
assert!(compute_next_deadline("* * * * * *").is_err());
assert!(compute_next_deadline("* * *").is_err());