Implementing a job queue in Rust
I recently finished my first rust project - a command line utility called “bandwhich” that displays network utilization information. As a newcomer to rust, this project offered quite some challenges for me. This post is a write up of one of them, going into detail on the parts that I personally found most difficult to understand. I hope it will be the first of many. If you’d like to check out the utility itself and browse its source, you can find it here
I try to always be very open to new ideas and better or different ways of doing things. If you have any thoughts, please do drop me a line! :)
In this first post, I’d like to talk about implementing a job queue to resolve IPs into their hostnames by querying a remote DNS server.
A little bit about “bandwhich”
“bandwhich”’s purpose is to help keep track of and debug issues involving network traffic. Like a network sniffer, it listens on a given network interface and records traffic. It then continuously displays information (eg. bandwidth) by process, connection and remote network address in a terminal UI.
Why do we need a queue?
When “bandwhich” displays information about a network address, it would ideally like to display an “easier on the eyes” hostname rather than an IP address (eg. “dns.google” instead of “8.8.8.8”). To do this, it uses libc’s getnameinfo function. Unfortunately though, this function blocks the current thread. Waiting for it to return before displaying information would be a bad user experience.
For this reason, we elect to have a best effort resolution. Whenever we encounter a previously unknown IP address, we place it in a queue and have a different thread read it and resolve it. We display the raw IP address until it is resolved, and its hostname once we know it.
We’ll be concentrating on implementing the queue itself. The resolution and recording of the host addresses are out of scope for this post.
How does the queue work?
The queue consists of several components that work together to achieve the functionality we desire.
The API
-
The “publish” function: used to give our queue a list of ips that need to be resolved.
-
The “subscribe” function: asks the queue to give it an ip address to resolve, or to block its thread otherwise.
-
The “cleanup” function: called when our program ends, so that the subscribe function knows it can stop asking for more work.
VecDeque
The VecDeque is a data structure from the std library. Like a normal Vec, it’s a growable list on the heap memory, and unlike Vec, it’s double-ended. Meaning that adding/removing elements from its beginning or end is a relatively cheap operation. For more information, take a look at the official docs
In our dns queue, its role is to keep track of the ip addresses we need to resolve. We push an address onto its end when we want it to be resolved, and pull an address from its beginning when we want to resolve it.
Our queue is represented as an Option<VecDeque>
to make it easier for the subscribe function to know if it should be waiting for more work. As long as the Option is a Some(VecDeque)
, even if the queue happens to be empty at the time, the subscribe function should keep blocking its thread. When it is a None
, the subscribe function should stop listening (read: return a None itself).
Condvar
The condvar (conditional variable) is our alarm clock. It allows us to block a thread in one place and wake it up in another: the official docs
Mutex
The mutex is what we use to wrap data in order to make sure different threads can read to and write from it safely. It will only let one thread access it at a time.
In our case, we use it to protect the Option<VecDeque>
. Here are the official docs
The code
We want to start with a struct that has a jobs queue and a conditional variable.
pub struct DnsQueue {
jobs: Mutex<Option<VecDeque<Ipv4Addr>>>,
cvar: Condvar,
}
impl DnsQueue {
pub fn new() -> Self {
DnsQueue {
jobs: Mutex::new(Some(VecDeque::new())),
cvar: Condvar::new(),
}
}
}
Our jobs are a Mutex wrapping an Option<VecDeque>
of Ipv4Addr
. As mentioned earlier, we represent our queue as an Option so that we could let our subscriber function know it can stop listening by switching the Some(VecDeque)
for a None
value.
Next, our publish function - called here “resolve_ips” to convey more information in the code using our queue:
pub fn resolve_ips(&self, unresolved_ips: Vec<Ipv4Addr>) {
let mut jobs = self.jobs.lock().unwrap();
if let Some(queue) = jobs.as_mut() {
queue.extend(unresolved_ips);
self.cvar.notify_all();
}
}
Here, we receive a list of IPs, and then:
- acquire an exclusive Mutex lock on the queue
- push the IPs onto the back of the
VecDeque
- send a notification to all subscribers through our
Condvar
, letting them know there’s some work ready for them - drop the mutex lock implicitly as the function goes out of scope
Now, our subscribe function. Here, we call it “wait_for_job”:
pub fn wait_for_job(&self) -> Option<Ipv4Addr> {
let mut jobs = self.jobs.lock().unwrap();
loop {
match jobs.as_mut()?.pop_front() {
Some(job) => return Some(job),
None => {
jobs = self.cvar.wait(jobs).unwrap()
}
}
}
}
This function:
- checks if the
Option<VecDeque>
contains a VecDeque or a None. If it contains a None, it returns it here:jobs.as_mut()?
(as_mut
will return a None if that’s what theOption
contains). - Then, if there’s something in the queue, it returns it. Otherwise, it sleeps.
We sleep by moving our MutexGuard (jobs) into the Condvar and reassigning it to the Condvar’s return value. The Condvar unlocks the guard, thus freeing it for writing. When it returns, we repeat this process, so that if it returned prematurely, we’ll simply sleep again.
We have to re-check our queue every time we wake up, because at times the wait
function will return without actually being woken up, for more info check out the last paragraph in the docs
Finally, our cleanup function, called “end” in our case:
pub fn end(&self) {
let mut jobs = self.jobs.lock().unwrap();
*jobs = None;
self.cvar.notify_all();
}
The end function:
- acquires a lock on the
Mutex
- changes the
Option
inside theMutex
toNone
- sends a change notification through the Condvar
Putting it all together
use ::std::collections::VecDeque;
use ::std::net::Ipv4Addr;
use ::std::sync::{Condvar, Mutex};
pub struct DnsQueue {
jobs: Mutex<Option<VecDeque<Ipv4Addr>>>,
cvar: Condvar,
}
impl DnsQueue {
pub fn new() -> Self {
DnsQueue {
jobs: Mutex::new(Some(VecDeque::new())),
cvar: Condvar::new(),
}
}
}
impl DnsQueue {
pub fn resolve_ips(&self, unresolved_ips: Vec<Ipv4Addr>) {
let mut jobs = self.jobs.lock().unwrap();
if let Some(queue) = jobs.as_mut() {
queue.extend(unresolved_ips);
self.cvar.notify_all();
}
}
pub fn wait_for_job(&self) -> Option<Ipv4Addr> {
let mut jobs = self.jobs.lock().unwrap();
loop {
match jobs.as_mut()?.pop_front() {
Some(job) => return Some(job),
None => {
jobs = self.cvar.wait(jobs).unwrap()
}
}
}
}
pub fn end(&self) {
let mut jobs = self.jobs.lock().unwrap();
*jobs = None;
self.cvar.notify_all();
}
}
An example usage of our queue might be something like:
fn main () {
let dns_queue = Arc::new(DnsQueue::new());
let dns_handler = thread::spawn({
let dns_queue = dns_queue.clone();
move || {
while let Some(ip) = dns_queue.wait_for_job() {
// do something with ip here
}
// we get here once we receive a None from the queue
}
});
let some_other_thread_handler = thread::spawn({
let dns_queue = dns_queue.clone();
move || {
// get `unresolved_ips: Vec<Ipv4Addr>` somehow...
dns_queue.resolve_ips(unresolved_ips);
// ...
dns_queue.end();
}
});
some_other_thread_handler.join().unwrap();
dns_handler.join().unwrap();
}
Finally
It’s worth noting that our implementation uses a dns resolution queue of Ipv4Addr
. This is the specific case of “bandwhich”, but this queue could also be used with a Generic T
type instead of an Ipv4Addr
for other uses.
Credits
Thanks Mara Bos for the helpful comments.
Some diagram icons made by Those Icons and Smash Icons from https://www.flaticon.com