迴圈緩衝區

作者:

David Howells <dhowells@redhat.com>

作者:

Paul E. McKenney <paulmck@linux.ibm.com>

Linux 提供了許多可用於實現迴圈緩衝的功能。這些功能分為兩類:

  1. 用於確定 2 的冪大小緩衝區資訊的便利函式。

  2. 當緩衝區中物件的生產者和消費者不想共享鎖時使用的記憶體屏障。

要使用這些功能,如以下所述,只需要一個生產者和一個消費者。可以透過序列化來處理多個生產者,也可以透過序列化來處理多個消費者。

什麼是迴圈緩衝區?

首先,什麼是迴圈緩衝區?迴圈緩衝區是一個固定大小、有限容量的緩衝區,其中包含兩個索引:

  1. 一個“頭部”索引 — 生產者向緩衝區中插入項的位置。

  2. 一個“尾部”索引 — 消費者在緩衝區中找到下一個項的位置。

通常,當尾部指標等於頭部指標時,緩衝區為空;當頭部指標比尾部指標小 1 時,緩衝區為滿。

當新增項時,頭部索引遞增;當移除項時,尾部索引遞增。尾部索引不應越過頭部索引,並且當它們到達緩衝區末尾時,兩個索引都應迴繞到 0,從而允許無限量的資料流過緩衝區。

通常,所有項都將具有相同的單位大小,但這並非使用以下技術的嚴格要求。如果緩衝區中要包含多個項或可變大小的項,索引可以增加 1 以上,前提是兩個索引都不能超越對方。但是,實現者必須小心,因為大小超過一個單位的區域可能會在緩衝區末尾迴繞並被分成兩個段。

測量 2 的冪緩衝區

計算任意大小迴圈緩衝區的佔用率或剩餘容量通常是一個緩慢的操作,需要使用模(除法)指令。但是,如果緩衝區的大小是 2 的冪,則可以使用更快的位與指令代替。

Linux 提供了一組用於處理 2 的冪迴圈緩衝區的宏。這些宏可以透過以下方式使用:

#include <linux/circ_buf.h>

這些宏是:

  1. 測量緩衝區的剩餘容量

    CIRC_SPACE(head_index, tail_index, buffer_size);
    

    這會返回緩衝區中剩餘的空間量[1],可用於插入項。

  2. 測量緩衝區中最大連續即時空間

    CIRC_SPACE_TO_END(head_index, tail_index, buffer_size);
    

    這會返回緩衝區中剩餘的連續空間量[1],可用於立即插入項而無需迴繞到緩衝區開頭。

  3. 測量緩衝區的佔用率

    CIRC_CNT(head_index, tail_index, buffer_size);
    

    這會返回當前佔用緩衝區[2]的項數。

  4. 測量緩衝區的非迴繞佔用率

    CIRC_CNT_TO_END(head_index, tail_index, buffer_size);
    

    這會返回可從緩衝區中提取的連續項數[2],而無需迴繞到緩衝區開頭。

每個宏名義上將返回一個介於 0 和 buffer_size-1 之間的值,但是:

  1. CIRC_SPACE*() 旨在用於生產者。對於生產者,它們將返回一個下限,因為生產者控制頭部索引,但消費者可能仍在另一個 CPU 上消耗緩衝區並移動尾部索引。

    對消費者而言,它將顯示一個上限,因為生產者可能正在忙於消耗空間。

  2. CIRC_CNT*() 旨在用於消費者。對於消費者,它們將返回一個下限,因為消費者控制尾部索引,但生產者可能仍在另一個 CPU 上填充緩衝區並移動頭部索引。

    對於生產者,它將顯示一個上限,因為消費者可能正在忙於清空緩衝區。

  3. 對於第三方而言,生產者和消費者對索引的寫入可見順序無法保證,因為它們是獨立的,並且可能在不同的 CPU 上進行——因此,在這種情況下,結果僅僅是一個猜測,甚至可能是負數。

在迴圈緩衝區中使用記憶體屏障

透過在迴圈緩衝區中結合使用記憶體屏障,您可以避免以下情況:

  1. 使用單個鎖來管理對緩衝區兩端的訪問,從而允許緩衝區同時被填充和清空;以及

  2. 使用原子計數器操作。

這有兩個方面:填充緩衝區的生產者和清空緩衝區的消費者。在任何給定時間,只能有一個物件填充緩衝區,也只能有一個物件清空緩衝區,但這兩方可以同時操作。

生產者

生產者將大致如下所示:

spin_lock(&producer_lock);

unsigned long head = buffer->head;
/* The spin_unlock() and next spin_lock() provide needed ordering. */
unsigned long tail = READ_ONCE(buffer->tail);

if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
        /* insert one item into the buffer */
        struct item *item = buffer[head];

        produce_item(item);

        smp_store_release(buffer->head,
                          (head + 1) & (buffer->size - 1));

        /* wake_up() will make sure that the head is committed before
         * waking anyone up */
        wake_up(consumer);
}

spin_unlock(&producer_lock);

這將指示 CPU,新項的內容必須在頭部索引使其對消費者可用之前寫入,然後指示 CPU,修改後的頭部索引必須在喚醒消費者之前寫入。

請注意,wake_up() 不保證任何型別的屏障,除非有實際的東西被喚醒。因此,我們不能依賴它來保證排序。然而,陣列中總是會留有一個空元素。因此,生產者必須生產兩個元素,才能有可能損壞當前正在被消費者讀取的元素。因此,消費者連續呼叫之間的解鎖-加鎖對,提供了消費者騰空給定元素所指示的索引讀取與生產者向同一元素寫入之間所需的排序。

消費者

消費者將大致如下所示:

spin_lock(&consumer_lock);

/* Read index before reading contents at that index. */
unsigned long head = smp_load_acquire(buffer->head);
unsigned long tail = buffer->tail;

if (CIRC_CNT(head, tail, buffer->size) >= 1) {

        /* extract one item from the buffer */
        struct item *item = buffer[tail];

        consume_item(item);

        /* Finish reading descriptor before incrementing tail. */
        smp_store_release(buffer->tail,
                          (tail + 1) & (buffer->size - 1));
}

spin_unlock(&consumer_lock);

這將指示 CPU 確保在讀取新項之前索引是最新的,然後它將確保 CPU 在寫入新的尾部指標(這將擦除該項)之前已完成項的讀取。

請注意使用 READ_ONCE() 和 smp_load_acquire() 來讀取對方索引。這可以防止編譯器丟棄並重新載入其快取值。如果您能確保對方索引只使用一次,則這並非嚴格需要。smp_load_acquire() 還會強制 CPU 對後續記憶體引用進行排序。類似地,smp_store_release() 在兩種演算法中都用於寫入執行緒的索引。這記錄了我們正在寫入可以併發讀取的內容這一事實,防止編譯器撕裂儲存,並強制對以前的訪問進行排序。

進一步閱讀

另請參閱 Documentation/memory-barriers.txt,瞭解 Linux 記憶體屏障功能的描述。