Simple Scalable Unbounded Queue

Created on 2022-11-22T12:36:19-06:00

Return to the Index

This card pertains to a resource available on the internet.

This card can also be read via Gemini.

Oliver Giersch and Jörg Nolte published a paper [..] called Fast and Portable Concurrent FIFO Queues With Deterministic Memory Reclamation" [ref]. In this, they note that fetch_add (atomic "fetch and add", or FAA) scales better than looping with [..] "compare and swap"
Use FAA on the producer to get the current buffer while also reserving an index into it atomically. If the reserved index is outside the buffer's bounds, then install a new buffer to the producer. It's the most ideal starting point for a concurrent queue but it has a few edge case that needs to be addressed.
type Slot(T):
    value: Uninit(T)
    ready: Atomic(bool)

    read() -> ?T:
        if not LOAD(&ready, Acquire): return null
        return READ(&value)

    write(v: T):
        WRITE(&value, v)
        STORE(&ready, true, Release)

@align(4096)
type Buffer(T):
    slots: [buffer_size]Slot(T)
    next: Atomic(?*Buffer(T))
    pending: Atomic(isize)

    // basic refcount stuff
    unref(count: isize):
        p = ADD(&pending, count, Release)
        if (p + count != 0) return

        FENCE(Acquire)
        free(this)

type Queue(T):
    producer: Atomic(?*Buffer(T))
    consumer: Atomic(?*Buffer(T))

    push(value: T):
        cached_buf = null
        defer if (cached_buf != null) free(cached_buf)

        loop:
            // fast path
            (buf, idx) = decode(ADD(&producer, 1, Acquire))
            if (buf != null) and (idx < buffer_size):
                return buf.slots[idx].write(value)

            // find where to register & link next buffer
            prev_link = if (buf != null) &buf.next else &consumer
            next = LOAD(prev_link, Acquire)

            if (next == null):
                // cache the malloc
                if (cached_buf == null) cached_buf = malloc(Buffer(T))
                next = cached_buf

                match CAS(prev_link, null, next, Release, Acquire):
                    Ok(_): cached_buf = null // registered so dont free it
                    Err(updated): next = updated

            p = LOAD(&producer, Relaxed)
            (cur_buf, cur_idx) = decode(p)
            loop:
                // retry FAA if failed to install
                if (buf != cur_buf):
                    if (buf != null) buf.unref(-1)
                    break

                // install new buffer + reserve slot 0 in it
                if Err(updated) = CAS(&producer, p, encode(next, 1), Release, Relaxed):
                    p = updated
                    continue

                (old_buf, inc) = (buf, cur_idx - buffer_size)
                if (buf == null):
                    (old_buf, inc) = (next, 1) // account for consumer

                old_buf.unref(inc)
                return next.slots[0].write(value)

    pop() -> ?T:
        (buf, idx) = decode(LOAD(&consumer, Acquire))
        if (buf == bull): return null

        if (idx == buffer_size):
            next = LOAD(&buf.next, Acquire)
            if (next == null): return null

            buf.unref(-1)
            (buf, idx) = (next, 0)
            STORE(&consumer, encode(buf, idx), Unordered)

        value = buf.slots[idx].read()
        if (value != null):
            STORE(&consumer, encode(buf, idx + 1), Unordered)
        return value