Ever needed to run a task repeatedly in your Rust application? Maybe it's sending out daily reports, cleaning up old data weekly using a CRON schedule, or checking an external service every few minutes? You might reach for a job scheduler. While building a recent project, I found myself needing exactly this, but the existing Rust landscape didn't quite fit the bill for the specific blend of features I wanted. So, naturally, I decided to build my own: TurnKeeper.
This wasn't just about reinventing the wheel (okay, maybe a little!), but about creating something tailored to the specific needs of modern async Rust applications, particularly those built with Tokio.
The Scheduling Scene in Rust
When you look for job scheduling in Rust, you'll find some great options:
cron crate: Excellent for parsing standard cron strings and figuring out the next time something should run based on that schedule. It's focused and does its job well, but it doesn't handle the execution, retries, or state management itself. You still need to build the runner around it.
tokio-cron-scheduler: Builds upon the cron idea and integrates directly with Tokio. It manages tasks, allows async jobs, and handles standard cron schedules. It's quite powerful and popular for CRON-based needs!
System cron / Task Scheduler: Just use the operating system's scheduler (cron on Linux/macOS, Task Scheduler on Windows) to run your Rust binary periodically. Simple, robust, but externalizes the scheduling logic and state entirely. Can be tricky for managing complex application state or frequent jobs.
Why Another Scheduler? The "TurnKeeper" Angle
So, why build TurnKeeper? While the existing tools are great, my requirements pushed towards a slightly different combination of features:
Async First: Deep integration with Tokio was paramount. Jobs themselves needed to be async functions, capable of non-blocking I/O without holding up the scheduler.
Flexible & Programmatic Scheduling: I wanted multiple ways to define schedules within Rust code, offering type safety and composability. This meant supporting not just CRON strings, but also simple FixedIntervals ("every 5 minutes"), specific WeekdayTimes combinations ("Monday 9:00 AM UTC"), precise Once executions, and even Never for manually triggered jobs.
Integrated Resilience: If a job fails (e.g., a network hiccup), I didn't just want it to fail silently. I needed built-in, configurable retries handled by the scheduler with exponential backoff (wait longer after repeated failures) or fixed delays. Graceful handling of panic!s within jobs was also crucial.
Observability: I needed to know what was going on. How many jobs are scheduled? How many failed? How long do they take? Built-in metrics (get_metrics_snapshot) and the ability to query job status (get_job_details, including its defined Schedule) were essential.
Controlled Concurrency: Limit the number of jobs running simultaneously (max_workers) to avoid overwhelming resources.
Lifecycle Management: The ability to cancel_job lineages, update_job configurations (like schedule or retries, requires HandleBased queue), and trigger_job_now on demand were key requirements.
Choice of Backend: Provide an option between a standard library BinaryHeap backend and a HandleBased backend (using the priority-queue crate) that enables proactive cancellation and job updates.
(Optional) Ergonomics: A job_fn! macro to simplify defining the async job logic.
While tokio-cron-scheduler covers much of the async/cron ground well, the desire for these multiple programmatic schedule types, integrated/configurable retries, specific observability features, and lifecycle management (updates, triggers, cancellation options) led me down the path of creating TurnKeeper.
How TurnKeeper Works (The Gist)
TurnKeeper isn't overly complicated conceptually. It revolves around a few key ideas:
Coordinator + Workers: A central async task (the "Coordinator") manages the schedule and state. A pool of async "Worker" tasks actually executes the jobs. This avoids blocking the coordinator while jobs run.
Priority Queue Abstraction: The Coordinator keeps track of upcoming jobs using an internal priority queue abstraction (supporting BinaryHeap or the priority-queue crate's PriorityQueue based on configuration), ordered by their next scheduled run time.
Job Definitions: You define a job with:
A name (for logging/identification).
A Schedule enum variant specifying how it recurs (e.g., Schedule::Cron, Schedule::FixedInterval, Schedule::WeekdayTimes, Schedule::Once, Schedule::Never).
A maximum retry count (MaxRetries).
An optional fixed retry delay (std::time::Duration), otherwise exponential backoff is used.
An async function/closure to execute (your actual task logic, matching BoxedExecFn).
Communication: Tasks communicate using efficient async channels (tokio::mpsc, async_channel, tokio::sync::watch). The Coordinator dispatches job instance IDs to idle workers. Workers report success (true), failure (false), or panic back to the Coordinator, which then handles rescheduling or state updates.
A Taste of the API
Using TurnKeeper feels something like this:
use turnkeeper::{
TurnKeeper,
job::{RecurringJobRequest, Schedule, MaxRetries}, // Import needed types
job_fn, // Use the macro!
scheduler::PriorityQueueType
};
use chrono::{Duration as ChronoDuration, NaiveTime, Utc, Weekday, DateTime};
use std::time::Duration as StdDuration;
use uuid::Uuid; // Used for RecurringJobId
# async fn run() -> Result<(), Box<dyn std::error::Error>> {
// 1. Build the scheduler
let scheduler = TurnKeeper::builder()
.max_workers(4)
// .priority_queue(PriorityQueueType::HandleBased) // Default, needed for update_job
.build()?;
// 2. Define what the job does using the job_fn! macro
let my_async_job_logic = job_fn! {
// No setup block needed here
{ // Main async logic block
println!("Doing important work...");
tokio::time::sleep(StdDuration::from_millis(10)).await;
// ... await database calls, network requests etc. ...
let success = rand::random::<f64>() > 0.1; // Simulate occasional failure
if !success { println!("Job failed this time!"); }
success // Return true/false
}
};
// 3. Define various job requests
let req_weekday = RecurringJobRequest::from_week_day(
"SendReport",
vec![(Weekday::Mon, NaiveTime::from_hms_opt(9, 0, 0).unwrap())], // Monday 9 AM UTC
3 as MaxRetries // 3 retries on failure
);
// Cron requires the `cron_schedule` feature
#[cfg(feature = "cron_schedule")]
let req_cron = RecurringJobRequest::from_cron(
"CleanupTask",
"0 0 3 * * SUN", // Every Sunday at 3:00 AM UTC
1
);
let mut req_interval = RecurringJobRequest::from_interval(
"CheckAPI",
StdDuration::from_secs(60), // Every 60 seconds
5
);
// Optionally set a specific start time for the interval job
req_interval.with_initial_run_time(Utc::now() + ChronoDuration::seconds(5));
let req_once = RecurringJobRequest::from_once(
"DeployNotification",
Utc::now() + ChronoDuration::minutes(1), // Run once in 1 minute
0
);
// Example with fixed retry delay
let req_fixed_retry = RecurringJobRequest::with_fixed_retry_delay(
"FixedRetryServiceCheck",
Schedule::FixedInterval(StdDuration::from_secs(300)), // Every 5 mins
4,
StdDuration::from_secs(15) // Retry after 15s on failure
);
// 4. Add the jobs to the scheduler
let job_id_report = scheduler.add_job_async(req_weekday, my_async_job_logic.clone()).await?; // Clone logic if reused
println!("Report job {} submitted!", job_id_report);
#[cfg(feature = "cron_schedule")] {
let job_id_cleanup = scheduler.add_job_async(req_cron, job_fn!({ println!("Cleaning..."); true })).await?;
println!("Cleanup job {} submitted!", job_id_cleanup);
}
let job_id_api = scheduler.add_job_async(req_interval, job_fn!({ println!("Checking API..."); true })).await?;
println!("API Check job {} submitted!", job_id_api);
let job_id_notify = scheduler.add_job_async(req_once, job_fn!({ println!("Notifying deploy!"); true })).await?;
println!("Notify job {} submitted!", job_id_notify);
// 5. Later... maybe update the API check interval? (Requires HandleBased PQ)
// let new_api_schedule = Schedule::FixedInterval(StdDuration::from_secs(120));
// scheduler.update_job(job_id_api, Some(new_api_schedule), None).await?; // Change interval to 2 mins
// println!("Updated API Check interval!");
// 6. Manually trigger the report job now?
// scheduler.trigger_job_now(job_id_report).await?;
// println!("Manually triggered report job.");
// 7. Query metrics or job status
// let metrics = scheduler.get_metrics_snapshot().await?;
// let details = scheduler.get_job_details(job_id_api).await?;
// println!("Current API Check schedule: {:?}", details.schedule); // Access the Schedule enum
// 8. Shutdown when done
// scheduler.shutdown_graceful(Some(StdDuration::from_secs(10))).await?;
# Ok(())
# }
The Journey & What's Next
Building TurnKeeper was a great exercise in async Rust, involving Arc, Mutex/RwLock, various channel types, task management, and careful state handling. Integrating different scheduling logic (especially Cron via the cron crate) and supporting features like job updates and triggers while maintaining a consistent internal flow were key challenges. Implementing the HandleBased queue option and the associated proactive cancellation/update logic added another layer of complexity but unlocked significant features.
Is it perfect? Definitely not yet! There's always room for more robust testing across edge cases, potential performance optimizations (especially around lock contention under high load, though RwLock helps), and maybe features like persistent job storage across application restarts could be added. But it now solves the initial scheduling problem with the flexibility and features I originally envisioned.
Get Involved
If you're interested in async job scheduling in Rust, check out the existing crates first – they might be exactly what you need. But if you find yourself wanting multiple programmatic scheduling options, integrated retries with fixed/backoff delays, job updates/triggers, and built-in observability like I did, maybe TurnKeeper could be helpful.
Crates.io: https://crates.io/crates/turnkeeper
Docs.rs: https://docs.rs/turnkeeper