Simple Scalable Unbounded Queue
Created on 2022-11-22T12:36:19-06:00
Oliver Giersch and Jörg Nolte published a paper [..] called Fast and Portable Concurrent FIFO Queues With Deterministic Memory Reclamation" [ref]. In this, they note that fetch_add (atomic "fetch and add", or FAA) scales better than looping with [..] "compare and swap"
Use FAA on the producer to get the current buffer while also reserving an index into it atomically. If the reserved index is outside the buffer's bounds, then install a new buffer to the producer. It's the most ideal starting point for a concurrent queue but it has a few edge case that needs to be addressed.
type Slot(T): value: Uninit(T) ready: Atomic(bool) read() -> ?T: if not LOAD(&ready, Acquire): return null return READ(&value) write(v: T): WRITE(&value, v) STORE(&ready, true, Release) @align(4096) type Buffer(T): slots: [buffer_size]Slot(T) next: Atomic(?*Buffer(T)) pending: Atomic(isize) // basic refcount stuff unref(count: isize): p = ADD(&pending, count, Release) if (p + count != 0) return FENCE(Acquire) free(this) type Queue(T): producer: Atomic(?*Buffer(T)) consumer: Atomic(?*Buffer(T)) push(value: T): cached_buf = null defer if (cached_buf != null) free(cached_buf) loop: // fast path (buf, idx) = decode(ADD(&producer, 1, Acquire)) if (buf != null) and (idx < buffer_size): return buf.slots[idx].write(value) // find where to register & link next buffer prev_link = if (buf != null) &buf.next else &consumer next = LOAD(prev_link, Acquire) if (next == null): // cache the malloc if (cached_buf == null) cached_buf = malloc(Buffer(T)) next = cached_buf match CAS(prev_link, null, next, Release, Acquire): Ok(_): cached_buf = null // registered so dont free it Err(updated): next = updated p = LOAD(&producer, Relaxed) (cur_buf, cur_idx) = decode(p) loop: // retry FAA if failed to install if (buf != cur_buf): if (buf != null) buf.unref(-1) break // install new buffer + reserve slot 0 in it if Err(updated) = CAS(&producer, p, encode(next, 1), Release, Relaxed): p = updated continue (old_buf, inc) = (buf, cur_idx - buffer_size) if (buf == null): (old_buf, inc) = (next, 1) // account for consumer old_buf.unref(inc) return next.slots[0].write(value) pop() -> ?T: (buf, idx) = decode(LOAD(&consumer, Acquire)) if (buf == bull): return null if (idx == buffer_size): next = LOAD(&buf.next, Acquire) if (next == null): return null buf.unref(-1) (buf, idx) = (next, 0) STORE(&consumer, encode(buf, idx), Unordered) value = buf.slots[idx].read() if (value != null): STORE(&consumer, encode(buf, idx + 1), Unordered) return value