Skip to content

Commit dea249d

Browse files
committed
Add a minimal background queueing framework
Note: This is intended to possibly be extracted into a library, so the docs are written as if this were its own library. Cargo specific code (besides the use of `CargoResult`) will go in its own module for the same reason. This adds an MVP background queueing system intended to be used in place of the "try to commit 20 times" loop in `git.rs`. This is a fairly simple queue, that is intended to be "the easiest thing that fits our needs, with the least operational impact". There's a few reasons I've opted to go with our own queuing system here, rather than an existing solution like Faktory or Beanstalkd. - We'd have to write the majority of this code ourselves no matter what. - Client libraries for beanstalkd don't deal with the actual job logic, only `storage.rs` would get replaced - The only client for faktory that exists made some really odd API choices. Faktory also hasn't seen a lot of use in the wild yet. - I want to limit the number of services we have to manage. We have extremely limited ops bandwidth today, and every new part of the stack we have to manage is a huge cost. Right now we only have our server and PG. I'd like to keep it that way for as long as possible. This system takes advantage of the `SKIP LOCKED` feature in PostgreSQL 9.5 to handle all of the hard stuff for us. We use PG's row locking to treat a row as "currently being processed", which means we don't have to worry about returning it to the queue if the power goes out on one of our workers. This queue is intended only for jobs with "at least once" semantics. That means the entire job has to be idempotent. If the entire job completes successfully, but the power goes out before we commit the transaction, we will run the whole thing again. The code today also makes a few additional assumptions based on our current needs. We expect all jobs to complete successfully the first time, and the most likely reason a job would fail is due to an incident happening at GitHub, hence the extremely high retry timeout. I'm also assuming that all jobs will eventually complete, and that any job failing N (likely 5) times is an event that should page whoever is on call. (Paging is not part of this PR). Finally, it's unlikely that this queue will be appropriate for high thoughput use cases, since it requires one PG connection per worker (a real connection, adding pg bouncer wouldn't help here). Right now our only background work that happens is something that comes in on average every 5 minutes, but if we start moving more code to be run here we may want to revisit this in the future.
1 parent 74d533a commit dea249d

File tree

10 files changed

+412
-6
lines changed

10 files changed

+412
-6
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE background_jobs;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
CREATE TABLE background_jobs (
2+
id BIGSERIAL PRIMARY KEY,
3+
job_type TEXT NOT NULL,
4+
data JSONB NOT NULL,
5+
retries INTEGER NOT NULL DEFAULT 0,
6+
last_retry TIMESTAMP NOT NULL DEFAULT '1970-01-01',
7+
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
8+
);

src/background/job.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use diesel::PgConnection;
2+
use serde::{Serialize, de::DeserializeOwned};
3+
4+
use super::storage;
5+
use util::CargoResult;
6+
7+
/// A background job, meant to be run asynchronously.
8+
pub trait Job: Serialize + DeserializeOwned {
9+
/// The environment this job is run with. This is a struct you define,
10+
/// which should encapsulate things like database connection pools, any
11+
/// configuration, and any other static data or shared resources.
12+
type Environment;
13+
14+
/// The key to use for storing this job, and looking it up later.
15+
///
16+
/// Typically this is the name of your struct in `snake_case`
17+
const JOB_TYPE: &'static str;
18+
19+
/// Enqueue this job to be run at some point in the future.
20+
fn enqueue(self, conn: &PgConnection) -> CargoResult<()> {
21+
storage::enqueue_job(conn, self)
22+
}
23+
24+
/// The logic involved in actually performing this job.
25+
fn perform(self, env: &Self::Environment) -> CargoResult<()>;
26+
}

src/background/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
mod job;
2+
mod registry;
3+
mod runner;
4+
mod storage;
5+
6+
pub use self::job::*;
7+
pub use self::registry::Registry;
8+
pub use self::runner::*;

src/background/registry.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use serde_json;
2+
use std::collections::HashMap;
3+
4+
use super::Job;
5+
use util::CargoResult;
6+
7+
#[doc(hidden)]
8+
pub type PerformFn<Env> = Box<dyn Fn(serde_json::Value, &Env) -> CargoResult<()>>;
9+
10+
#[derive(Default)]
11+
#[allow(missing_debug_implementations)] // Can't derive debug
12+
/// A registry of background jobs, used to map job types to concrege perform
13+
/// functions at runtime.
14+
pub struct Registry<Env> {
15+
job_types: HashMap<&'static str, PerformFn<Env>>,
16+
}
17+
18+
impl<Env> Registry<Env> {
19+
/// Create a new, empty registry
20+
pub fn new() -> Self {
21+
Registry {
22+
job_types: Default::default(),
23+
}
24+
}
25+
26+
/// Get the perform function for a given job type
27+
pub fn get(&self, job_type: &str) -> Option<&PerformFn<Env>> {
28+
self.job_types.get(job_type)
29+
}
30+
31+
/// Register a new background job. This will override any existing
32+
/// registries with the same `JOB_TYPE`, if one exists.
33+
pub fn register<T: Job<Environment = Env>>(&mut self) {
34+
self.job_types.insert(T::JOB_TYPE, Box::new(|data, env| {
35+
let data = serde_json::from_value(data)?;
36+
T::perform(data, env)
37+
}));
38+
}
39+
}

src/background/runner.rs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
#![allow(dead_code)]
2+
use diesel::prelude::*;
3+
use std::panic::{catch_unwind, UnwindSafe};
4+
5+
use super::storage;
6+
use util::errors::*;
7+
8+
fn get_single_job<F>(conn: &PgConnection, f: F) -> CargoResult<()>
9+
where
10+
F: FnOnce(storage::BackgroundJob) -> CargoResult<()> + UnwindSafe,
11+
{
12+
conn.transaction::<_, Box<dyn CargoError>, _>(|| {
13+
let job = storage::find_next_unlocked_job(conn)?;
14+
let job_id = job.id;
15+
16+
let result = catch_unwind(|| f(job))
17+
.map_err(|_| internal("job panicked"))
18+
.and_then(|r| r);
19+
20+
if result.is_ok() {
21+
storage::delete_successful_job(conn, job_id)?;
22+
} else {
23+
storage::update_failed_job(conn, job_id);
24+
}
25+
Ok(())
26+
})
27+
}
28+
29+
#[cfg(test)]
30+
mod tests {
31+
use diesel::prelude::*;
32+
33+
use schema::background_jobs::dsl::*;
34+
use std::sync::{Mutex, MutexGuard, Barrier, Arc};
35+
use std::panic::AssertUnwindSafe;
36+
use std::thread;
37+
use super::*;
38+
39+
#[test]
40+
fn jobs_are_locked_when_fetched() {
41+
let _guard = TestGuard::lock();
42+
43+
let conn = connection();
44+
let first_job_id = create_dummy_job(&conn).id;
45+
let second_job_id = create_dummy_job(&conn).id;
46+
let fetch_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
47+
let fetch_barrier2 = fetch_barrier.clone();
48+
let return_barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
49+
let return_barrier2 = return_barrier.clone();
50+
51+
let t1 = thread::spawn(move || {
52+
let _ = get_single_job(&connection(), |job| {
53+
fetch_barrier.0.wait(); // Tell thread 2 it can lock its job
54+
assert_eq!(first_job_id, job.id);
55+
return_barrier.0.wait(); // Wait for thread 2 to lock its job
56+
Ok(())
57+
});
58+
});
59+
60+
let t2 = thread::spawn(move || {
61+
fetch_barrier2.0.wait(); // Wait until thread 1 locks its job
62+
get_single_job(&connection(), |job| {
63+
assert_eq!(second_job_id, job.id);
64+
return_barrier2.0.wait(); // Tell thread 1 it can unlock its job
65+
Ok(())
66+
})
67+
.unwrap();
68+
});
69+
70+
t1.join().unwrap();
71+
t2.join().unwrap();
72+
}
73+
74+
#[test]
75+
fn jobs_are_deleted_when_successfully_run() {
76+
let _guard = TestGuard::lock();
77+
78+
let conn = connection();
79+
create_dummy_job(&conn);
80+
81+
get_single_job(&conn, |_| {
82+
Ok(())
83+
}).unwrap();
84+
85+
let remaining_jobs = background_jobs.count()
86+
.get_result(&conn);
87+
assert_eq!(Ok(0), remaining_jobs);
88+
}
89+
90+
#[test]
91+
fn failed_jobs_do_not_release_lock_before_updating_retry_time() {
92+
let _guard = TestGuard::lock();
93+
create_dummy_job(&connection());
94+
let barrier = Arc::new(AssertUnwindSafe(Barrier::new(2)));
95+
let barrier2 = barrier.clone();
96+
97+
let t1 = thread::spawn(move || {
98+
let _ = get_single_job(&connection(), |_| {
99+
barrier.0.wait();
100+
// error so the job goes back into the queue
101+
Err(human("nope"))
102+
});
103+
});
104+
105+
let t2 = thread::spawn(move || {
106+
let conn = connection();
107+
// Wait for the first thread to acquire the lock
108+
barrier2.0.wait();
109+
// We are intentionally not using `get_single_job` here.
110+
// `SKIP LOCKED` is intentionally omitted here, so we block until
111+
// the lock on the first job is released.
112+
// If there is any point where the row is unlocked, but the retry
113+
// count is not updated, we will get a row here.
114+
let available_jobs = background_jobs
115+
.select(id)
116+
.filter(retries.eq(0))
117+
.for_update()
118+
.load::<i64>(&conn)
119+
.unwrap();
120+
assert_eq!(0, available_jobs.len());
121+
122+
// Sanity check to make sure the job actually is there
123+
let total_jobs_including_failed = background_jobs
124+
.select(id)
125+
.for_update()
126+
.load::<i64>(&conn)
127+
.unwrap();
128+
assert_eq!(1, total_jobs_including_failed.len());
129+
});
130+
131+
t1.join().unwrap();
132+
t2.join().unwrap();
133+
}
134+
135+
#[test]
136+
fn panicking_in_jobs_updates_retry_counter() {
137+
let _guard = TestGuard::lock();
138+
let conn = connection();
139+
let job_id = create_dummy_job(&conn).id;
140+
141+
let t1 = thread::spawn(move || {
142+
let _ = get_single_job(&connection(), |_| {
143+
panic!()
144+
});
145+
});
146+
147+
let _ = t1.join();
148+
149+
let tries = background_jobs
150+
.find(job_id)
151+
.select(retries)
152+
.for_update()
153+
.first::<i32>(&conn)
154+
.unwrap();
155+
assert_eq!(1, tries);
156+
}
157+
158+
159+
lazy_static! {
160+
// Since these tests deal with behavior concerning multiple connections
161+
// running concurrently, they have to run outside of a transaction.
162+
// Therefore we can't run more than one at a time.
163+
//
164+
// Rather than forcing the whole suite to be run with `--test-threads 1`,
165+
// we just lock these tests instead.
166+
static ref TEST_MUTEX: Mutex<()> = Mutex::new(());
167+
}
168+
169+
struct TestGuard<'a>(MutexGuard<'a, ()>);
170+
171+
impl<'a> TestGuard<'a> {
172+
fn lock() -> Self {
173+
TestGuard(TEST_MUTEX.lock().unwrap())
174+
}
175+
}
176+
177+
impl<'a> Drop for TestGuard<'a> {
178+
fn drop(&mut self) {
179+
::diesel::sql_query("TRUNCATE TABLE background_jobs")
180+
.execute(&connection())
181+
.unwrap();
182+
}
183+
}
184+
185+
fn connection() -> PgConnection {
186+
use dotenv;
187+
188+
let database_url =
189+
dotenv::var("TEST_DATABASE_URL").expect("TEST_DATABASE_URL must be set to run tests");
190+
PgConnection::establish(&database_url).unwrap()
191+
}
192+
193+
fn create_dummy_job(conn: &PgConnection) -> storage::BackgroundJob {
194+
::diesel::insert_into(background_jobs)
195+
.values((job_type.eq("Foo"), data.eq(json!(null))))
196+
.returning((id, job_type, data))
197+
.get_result(conn)
198+
.unwrap()
199+
}
200+
}

0 commit comments

Comments
 (0)