In this post I explain the ABA problem and present a solution related to pointers.
The ABA problem is something that arises when writing lock-free structures. This problem is generally associated with pointers, but may happen with integers. If you have ever tried to write a lock-free stack, you may have faced the following situation:
A,B
pop
A = top.load()
A == null
B = (*A).next
A
C
C
A
pointerA
A,C,B
top.compare_and_swap(A, B)
and suceedsB
. C
is leakedDo you see the problem? Thread had A
, than read B
and CAS on A
succeeded
although we had already changed the stack. The CAS should never ever had
succeeded. Solutions to the ABA problem vary from having a "version" tag
attached to the pointer to having lists of hazardous pointers which the thread
is reading or deleting.
Another problem that arises with the ABA is that thread N could potentially read garbage in a similar lock-free version of a queue, for instance. The problem here seems to be freeing an allocation while another thread is using that allocation. What if we had a way of warning other threads they cannot free pointers right now? That is what hazard pointers do. However, most hazard pointer implementation aren't even lock-free. Even Ticki, a great Rust blogger, has used mutex to implement hazard pointers. Also, hazard pointers require some sort of mini-garbage collection.
One thing that makes hazard pointers hard to implement is: it is pointer specific. So the next step I took was: generalize hazardness of one hazardous pointer to all pointers. This is the idea of the incinerator. Defer all deletions until it is safe to delete. To solve this, let's put a lock-free container which does not suffer from ABA as a global variable, and some global boolean flag to indicate safety of deletion, right? Wrong.
We need a way of allowing multiple threads to say "Not safe to delete". Also, implementing a lock-free container without suffering from ABA is hard and may be expensive. To solve this issues, I used unsigned integer as the global counter of "threads running sensitive code" and used a thread-local vector of garbage. This is the incinerator. A thread-local vector of garbage and a global counter.
NOTE: Although I said vector, it is not required to be vector; any similiar list structure is enough.
static PAUSED_COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static LOCAL_DELETION: GarbageList = GarbageList::new();
}
struct Garbage {
ptr: NonNull<u8>,
dropper: unsafe fn(NonNull<u8>),
}
struct GarbageList {
inner: Cell<Vec<Garbage>>,
}
In order to delete garbage, a thread first removes the pointer from the
concurrent/shared context. The next step is to put the pointer and a destructor
function on the thread-local queue. Then, the thread checks the value of the
global counter. If the counter is 0
, then the thread remove everything from
its queue and run destructor by destructor passing their respective pointers
as argument. If the counter is not 0
, we simply leave the queue there. When
the thread exits, the local queue is cleaned up and all destructors are run.
At thread exit, we have to spin because of safety reasons, but it does not
affect the normal program execution.
pub unsafe fn add<T>(ptr: NonNull<T>, dropper: unsafe fn(NonNull<T>)) {
LOCAL_DELETION.with(|queue| {
queue.add(Garbage {
ptr: NonNull::new_unchecked(ptr.as_ptr() as *mut u8),
dropper: transmute(dropper),
});
if PAUSED_COUNT.load(Acquire) == 0 {
queue.delete();
}
})
}
impl GarbageList {
fn new() -> Self {
Self { inner: Cell::new(Vec::new()) }
}
fn add(&self, garbage: Garbage) {
let mut vec self.inner.replace(Vec::new());
vec.push(garbage);
self.inner.replace(vec);
}
fn delete(&self) {
let mut vec = self.inner.replace(Vec::new());
while let Some(garbage) = vec.pop() {
unsafe {
(garbage.dropper)(garbage.ptr);
}
}
}
}
impl Drop for GarbageList {
fn drop(&mut self) {
while PAUSED_COUNT.load(Acquire) != 0 {}
self.delete();
}
}
The trick is to check the global counter after removing the pointer from the shared context and before actually deleting it. Since the garbage list is thread-local, there is no problem if during deletion another thread executes sensitive code. All garbage was already there and no one can push new garbage in the thread's queue except the thread itself, but it is busy deleting garbage.
To execute sensitive code, a thread first increments the global counter.
Then, the thread runs its sensitive code. After that, the thread decrements the
global counter. The whole cycle of loading and compare-and-swapping is
considered sensitive code. In the case of the stack, we would increment the
counter and then load the top. We would only decrement the counter after we
compare-and-swapped. Note that returning a Pause
would allow the thread to
keep/store the pause abitrarily, and this looks bad since this incinerator is
global.
pub fn pause<F, T>(exec: F) -> T
where
F: FnOnce() -> T,
{
let paused = Pause::new();
let res = exec();
drop(paused);
res
}
struct Pause;
impl Pause {
fn new() -> Self {
if PAUSED_COUNT.fetch_add(1, Acquire) == usize::max_value() {
abort();
}
Pause
}
}
impl Drop for Pause {
fn drop(&mut self) {
PAUSED_COUNT.fetch_sub(1, Release);
}
}
This method is not perfect though. If a thread gets suspended for a long time while executing sensitive code, deletions will be suspended for a long time too. It does not freezes the application, but it may increase memory usage. Also, if a thread does not go back to the garbage list to try the deletion once again, the pending garbage could wait for a long time to be deleted. To soften the impact, one could do some things:
Number 1 would look like this.
pub fn try_force() -> bool {
LOCAL_DELETION.with(|queue| {
let success = PAUSED_COUNT.load(Acquire) == 0;
if success {
queue.delete();
}
success
})
}
Another problem with having a global incinerator is a possible unsoundness.
Imagine if the T
does not have 'static
lifetime. Passing it might dangle
some references. While consumers of the API accept this risk, it still looks
bad. In the original post, I present this incinerator as a final version.
However, I have developed a per-object incinerator in the lockfree
crate
(which looks more efficient with memory too).
First, let's put the essence of the incinerator in a struct: a thread-local garbage list and a counter of "threads running sensitive code".
pub struct Incinerator<T> {
counter: AtomicUsize,
tls_list: ThreadLocal<GarbageList<T>>,
}
Don't worry with the ThreadLocal
implementation. I have implemented it in the
lockfree
crate and I promise I will write a post about it. One day. Now let's
look closer at garbage list:
struct GarbageList<T> {
list: Cell<Vec<T>>,
}
Note that this incinerator has a specific type (unlike the global version which
can only cast some type to u8
). This also fixes our soundness hole. The idea
is we defer the execution of T
's destructor by saving it in the list. If, for
the sake of efficiency, we want to allow many types, sum types (e.g.
enum MyGarbage
) should be used. Let me define some helper methods:
impl<T> GarbageList<T> {
fn new() -> Self {
Self { list: Cell::new(Vec::new()) }
}
fn add(&self, val: T) {
let mut list = self.list.replace(Vec::new());
list.push(val);
self.list.replace(list);
}
fn clear(&self) {
self.list.replace(Vec::new());
}
}
A very important component is a pause of the incinerator:
pub struct Pause<'incin, T>
where
T: 'incin,
{
incin: &'incin Incinerator<T>,
}
This type is public. Since the incinerator is not global anymore, it is ok to allow pauses to be arbitrarily held, stored, whatever (instead of wrapping a pause over some closure). Let's implement pauses:
impl<T> Incinerator<T> {
pub fn pause(&self) -> Pause<T> {
loop {
let init = self.counter.load(Acquire);
if init == usize::max_value() {
panic!("Too many pauses");
}
if self.counter.compare_and_swap(init, init + 1, Release) == init {
break Pause { incin: self };
}
}
}
}
impl<'incin, T> Drop for Pause<'incin, T> {
fn drop(&mut self) {
if self.incin.counter.fetch_sub(1, AcqRel) == 1 {
// If the previous value was 1, this means now it is 0 and... we can
// delete our local list.
self.incin.tls_list.with(GarbageList::clear);
}
}
}
And finally, we can write the add
and try_clear
methods:
impl<T> Incinerator<T> {
pub fn add(&self, val: T) {
if self.counter.load(Acquire) == 0 {
// Safe to drop it all. Note that we check the counter after the
// resource was removed from shared context. Since we use Thread
// Local Storage, nobody can add something to the list meanwhile
// besides us.
self.tls_list.with(GarbageList::clear);
drop(val);
} else {
// Not safe to drop. We have to save the value in the garbage list.
self.tls_list.with_init(GarbageList::new, |list| list.add(val));
}
}
pub fn try_clear(&self) -> bool {
if self.counter.load(Acquire) == 0 {
// It is only safe to drop if there are no active pauses. Remember
// nobody can add something to this specific list besides us because
// it is thread local.
self.tls_list.with(GarbageList::clear);
true
} else {
false
}
}
}
The incinerator is available in my Rust crate lockfree
on
crates.io. You may check source code of incinerator and
lock-free data structures on gitlab.