Simple Scalable Unbounded Queue
Created on 2022-11-22T12:36:19-06:00
Oliver Giersch and Jörg Nolte published a paper [..] called Fast and Portable Concurrent FIFO Queues With Deterministic Memory Reclamation" [ref]. In this, they note that fetch_add (atomic "fetch and add", or FAA) scales better than looping with [..] "compare and swap"
Use FAA on the producer to get the current buffer while also reserving an index into it atomically. If the reserved index is outside the buffer's bounds, then install a new buffer to the producer. It's the most ideal starting point for a concurrent queue but it has a few edge case that needs to be addressed.
type Slot(T):
value: Uninit(T)
ready: Atomic(bool)
read() -> ?T:
if not LOAD(&ready, Acquire): return null
return READ(&value)
write(v: T):
WRITE(&value, v)
STORE(&ready, true, Release)
@align(4096)
type Buffer(T):
slots: [buffer_size]Slot(T)
next: Atomic(?*Buffer(T))
pending: Atomic(isize)
// basic refcount stuff
unref(count: isize):
p = ADD(&pending, count, Release)
if (p + count != 0) return
FENCE(Acquire)
free(this)
type Queue(T):
producer: Atomic(?*Buffer(T))
consumer: Atomic(?*Buffer(T))
push(value: T):
cached_buf = null
defer if (cached_buf != null) free(cached_buf)
loop:
// fast path
(buf, idx) = decode(ADD(&producer, 1, Acquire))
if (buf != null) and (idx < buffer_size):
return buf.slots[idx].write(value)
// find where to register & link next buffer
prev_link = if (buf != null) &buf.next else &consumer
next = LOAD(prev_link, Acquire)
if (next == null):
// cache the malloc
if (cached_buf == null) cached_buf = malloc(Buffer(T))
next = cached_buf
match CAS(prev_link, null, next, Release, Acquire):
Ok(_): cached_buf = null // registered so dont free it
Err(updated): next = updated
p = LOAD(&producer, Relaxed)
(cur_buf, cur_idx) = decode(p)
loop:
// retry FAA if failed to install
if (buf != cur_buf):
if (buf != null) buf.unref(-1)
break
// install new buffer + reserve slot 0 in it
if Err(updated) = CAS(&producer, p, encode(next, 1), Release, Relaxed):
p = updated
continue
(old_buf, inc) = (buf, cur_idx - buffer_size)
if (buf == null):
(old_buf, inc) = (next, 1) // account for consumer
old_buf.unref(inc)
return next.slots[0].write(value)
pop() -> ?T:
(buf, idx) = decode(LOAD(&consumer, Acquire))
if (buf == bull): return null
if (idx == buffer_size):
next = LOAD(&buf.next, Acquire)
if (next == null): return null
buf.unref(-1)
(buf, idx) = (next, 0)
STORE(&consumer, encode(buf, idx), Unordered)
value = buf.slots[idx].read()
if (value != null):
STORE(&consumer, encode(buf, idx + 1), Unordered)
return value