使用 RCU 保護主要用於讀取的連結串列

RCU 最常見的用途之一是保護主要用於讀取的連結串列(list.h 中的 struct list_head)。這種方法的一個主要優點是所有必需的記憶體順序都由列表宏提供。本文件描述了幾個基於列表的 RCU 用例。

當持有 rcu_read_lock() 迭代列表時,寫入器可能會修改列表。讀取器保證看到在獲取 rcu_read_lock() 之前新增到列表中的所有元素,並且在釋放 rcu_read_unlock() 時仍然在列表上。可能會看到或可能不會看到新增到列表或從列表中刪除的元素。如果寫入器呼叫 list_replace_rcu(),則讀取器可能會看到舊元素或新元素;他們不會同時看到兩者,也不會兩者都看不到。

示例 1:主要用於讀取的列表:延遲銷燬

核心中 RCU 列表的一個廣泛使用的用例是在系統中無鎖迭代所有程序。task_struct::tasks 表示連結所有程序的列表節點。列表可以與任何列表新增或刪除並行遍歷。

列表的遍歷是使用 for_each_process() 完成的,它由 2 個宏定義

#define next_task(p) \
        list_entry_rcu((p)->tasks.next, struct task_struct, tasks)

#define for_each_process(p) \
        for (p = &init_task ; (p = next_task(p)) != &init_task ; )

遍歷所有程序列表的程式碼通常如下所示

rcu_read_lock();
for_each_process(p) {
        /* Do something with p */
}
rcu_read_unlock();

從任務列表中刪除程序的簡化和高度內聯的程式碼是

void release_task(struct task_struct *p)
{
        write_lock(&tasklist_lock);
        list_del_rcu(&p->tasks);
        write_unlock(&tasklist_lock);
        call_rcu(&p->rcu, delayed_put_task_struct);
}

當程序退出時,release_task() 透過 tasklist_lock 寫入器鎖保護下的 __exit_signal() 和 __unhash_process() 呼叫 list_del_rcu(&p->tasks)list_del_rcu() 呼叫從所有任務的列表中刪除該任務。tasklist_lock 阻止併發的列表新增/刪除損壞列表。使用 for_each_process() 的讀取器不受 tasklist_lock 的保護。為了防止讀取器注意到列表指標中的更改,只有在一個或多個寬限期過去後才會釋放 task_struct 物件,藉助 call_rcu(),它透過 put_task_struct_rcu_user() 呼叫。這種延遲銷燬確保了遍歷列表的任何讀取器都將看到有效的 p->tasks.next 指標,並且刪除/釋放可以與列表的遍歷並行發生。這種模式也稱為**存在鎖**,因為 RCU 在所有現有讀取器完成後才呼叫 delayed_put_task_struct() 回撥函式,這保證了有問題的 task_struct 物件將在所有可能具有對該物件引用的 RCU 讀取器完成之後才存在。

示例 2:在鎖外採取讀取端操作:沒有就地更新

一些讀取器-寫入器鎖定用例在持有讀取端鎖的同時計算一個值,但在釋放該鎖後繼續使用該值。這些用例通常是轉換為 RCU 的好候選者。一個突出的例子涉及網路資料包路由。因為資料包路由資料跟蹤計算機外部裝置的狀態,所以有時會包含過時的資料。因此,一旦計算出路由,就沒有必要在資料包傳輸期間保持路由表靜態。畢竟,你可以隨意保持路由表靜態,但這不會阻止外部 Internet 發生變化,而外部 Internet 的狀態才是真正重要的。此外,路由條目通常是新增或刪除,而不是就地修改。這是一個光速有限和原子非零大小的罕見例子,實際上有助於使同步更輕量級。

這種型別的 RCU 用例的一個簡單示例可以在系統呼叫審計支援中找到。例如,audit_filter_task() 的讀取器-寫入器鎖定實現可能如下所示

static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
        struct audit_entry *e;
        enum audit_state   state;

        read_lock(&auditsc_lock);
        /* Note: audit_filter_mutex held by caller. */
        list_for_each_entry(e, &audit_tsklist, list) {
                if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                        if (state == AUDIT_STATE_RECORD)
                                *key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
                        read_unlock(&auditsc_lock);
                        return state;
                }
        }
        read_unlock(&auditsc_lock);
        return AUDIT_BUILD_CONTEXT;
}

這裡列表在鎖下搜尋,但在返回相應的值之前釋放鎖。在這個值被執行時,列表很可能已經被修改了。這是有道理的,因為如果你正在關閉審計,那麼審計一些額外的系統呼叫是可以的。

這意味著 RCU 可以很容易地應用於讀取端,如下所示

static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
        struct audit_entry *e;
        enum audit_state   state;

        rcu_read_lock();
        /* Note: audit_filter_mutex held by caller. */
        list_for_each_entry_rcu(e, &audit_tsklist, list) {
                if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                        if (state == AUDIT_STATE_RECORD)
                                *key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
                        rcu_read_unlock();
                        return state;
                }
        }
        rcu_read_unlock();
        return AUDIT_BUILD_CONTEXT;
}

read_lock() 和 read_unlock() 呼叫分別變為 rcu_read_lock()rcu_read_unlock(),並且 list_for_each_entry() 變為 list_for_each_entry_rcu()_rcu() 列表遍歷原語新增 READ_ONCE() 和診斷檢查,以防止在 RCU 讀取端臨界區之外的錯誤使用。

更新端的更改也很簡單。讀取器-寫入器鎖可以如下所示用於 audit_del_rule() 和 audit_add_rule() 的簡化版本中的刪除和插入

static inline int audit_del_rule(struct audit_rule *rule,
                                 struct list_head *list)
{
        struct audit_entry *e;

        write_lock(&auditsc_lock);
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        list_del(&e->list);
                        write_unlock(&auditsc_lock);
                        return 0;
                }
        }
        write_unlock(&auditsc_lock);
        return -EFAULT;         /* No matching rule */
}

static inline int audit_add_rule(struct audit_entry *entry,
                                 struct list_head *list)
{
        write_lock(&auditsc_lock);
        if (entry->rule.flags & AUDIT_PREPEND) {
                entry->rule.flags &= ~AUDIT_PREPEND;
                list_add(&entry->list, list);
        } else {
                list_add_tail(&entry->list, list);
        }
        write_unlock(&auditsc_lock);
        return 0;
}

以下是這兩個函式的 RCU 等效項

static inline int audit_del_rule(struct audit_rule *rule,
                                 struct list_head *list)
{
        struct audit_entry *e;

        /* No need to use the _rcu iterator here, since this is the only
         * deletion routine. */
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        list_del_rcu(&e->list);
                        call_rcu(&e->rcu, audit_free_rule);
                        return 0;
                }
        }
        return -EFAULT;         /* No matching rule */
}

static inline int audit_add_rule(struct audit_entry *entry,
                                 struct list_head *list)
{
        if (entry->rule.flags & AUDIT_PREPEND) {
                entry->rule.flags &= ~AUDIT_PREPEND;
                list_add_rcu(&entry->list, list);
        } else {
                list_add_tail_rcu(&entry->list, list);
        }
        return 0;
}

通常,write_lock() 和 write_unlock() 將被 spin_lock() 和 spin_unlock() 替換。但在這種情況下,所有呼叫者都持有 audit_filter_mutex,因此不需要額外的鎖定。因此可以消除 auditsc_lock,因為使用 RCU 消除了寫入器排除讀取器的需要。

list_del()list_add()list_add_tail() 原語已被 list_del_rcu()list_add_rcu()list_add_tail_rcu() 替換。_rcu() 列表操作原語添加了弱序 CPU 上所需的記憶體屏障。list_del_rcu() 原語省略了指標中毒除錯輔助程式碼,否則會導致併發讀取器失敗。

因此,當讀取器可以容忍過時的資料,並且條目被新增或刪除,而沒有就地修改時,使用 RCU 非常容易!

示例 3:處理就地更新

系統呼叫審計程式碼不會就地更新審計規則。但是,如果它這樣做了,那麼這樣做的讀取器-寫入器鎖定程式碼可能如下所示(假設只更新了 field_count,否則,將需要填充新增的欄位)

static inline int audit_upd_rule(struct audit_rule *rule,
                                 struct list_head *list,
                                 __u32 newaction,
                                 __u32 newfield_count)
{
        struct audit_entry *e;
        struct audit_entry *ne;

        write_lock(&auditsc_lock);
        /* Note: audit_filter_mutex held by caller. */
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        e->rule.action = newaction;
                        e->rule.field_count = newfield_count;
                        write_unlock(&auditsc_lock);
                        return 0;
                }
        }
        write_unlock(&auditsc_lock);
        return -EFAULT;         /* No matching rule */
}

RCU 版本建立一個副本,更新副本,然後用新更新的條目替換舊條目。這個動作序列,允許併發讀取,同時建立一個副本以執行更新,這就是 RCU (讀取-複製更新) 名稱的由來。

audit_upd_rule() 的 RCU 版本如下

static inline int audit_upd_rule(struct audit_rule *rule,
                                 struct list_head *list,
                                 __u32 newaction,
                                 __u32 newfield_count)
{
        struct audit_entry *e;
        struct audit_entry *ne;

        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
                        if (ne == NULL)
                                return -ENOMEM;
                        audit_copy_rule(&ne->rule, &e->rule);
                        ne->rule.action = newaction;
                        ne->rule.field_count = newfield_count;
                        list_replace_rcu(&e->list, &ne->list);
                        call_rcu(&e->rcu, audit_free_rule);
                        return 0;
                }
        }
        return -EFAULT;         /* No matching rule */
}

同樣,這假設呼叫者持有 audit_filter_mutex。通常,在這種程式碼中,寫入器鎖會變成自旋鎖。

update_lsm_rule() 做的事情非常相似,對於那些希望檢視真正的 Linux 核心程式碼的人來說。

這種模式的另一個用途可以在 openswitch 驅動程式的 連線跟蹤表 程式碼中的 ct_limit_set() 中找到。該表儲存連線跟蹤條目,並且對最大條目數有限制。每個區域都有一個這樣的表,因此每個區域都有一個 限制。這些區域透過使用 RCU 管理的 hlist 的雜湊表對映到它們的限制。當設定新的限制時,將分配一個新的限制物件,並呼叫 ct_limit_set() 以使用 list_replace_rcu() 替換舊的限制物件。然後使用 kfree_rcu() 在寬限期後釋放舊的限制物件。

示例 4:消除過時的資料

上面的審計示例容忍過時的資料,就像大多數跟蹤外部狀態的演算法一樣。畢竟,考慮到從外部狀態更改到 Linux 意識到更改之間存在延遲,因此如前所述,少量額外的 RCU 引起的過時通常不是問題。

但是,在許多示例中不能容忍過時的資料。Linux 核心中的一個例子是 System V IPC(參見 ipc/shm.c 中的 shm_lock() 函式)。此程式碼在每個條目的自旋鎖下檢查 deleted 標誌,如果設定了 deleted 標誌,則假裝該條目不存在。為了使其有用,搜尋函式必須返回持有每個條目的自旋鎖,正如 shm_lock() 實際上所做的那樣。

快速測驗

為了使 deleted 標誌技術有用,為什麼有必要在從搜尋函式返回時持有每個條目的鎖?

快速測驗的答案

如果系統呼叫審計模組需要拒絕過時的資料,一種方法是在 audit_entry 結構中新增一個 deleted 標誌和一個 lock 自旋鎖,並按如下方式修改 audit_filter_task()

static struct audit_entry *audit_filter_task(struct task_struct *tsk, char **key)
{
        struct audit_entry *e;
        enum audit_state   state;

        rcu_read_lock();
        list_for_each_entry_rcu(e, &audit_tsklist, list) {
                if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                        spin_lock(&e->lock);
                        if (e->deleted) {
                                spin_unlock(&e->lock);
                                rcu_read_unlock();
                                return NULL;
                        }
                        rcu_read_unlock();
                        if (state == AUDIT_STATE_RECORD)
                                *key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
                        /* As long as e->lock is held, e is valid and
                         * its value is not stale */
                        return e;
                }
        }
        rcu_read_unlock();
        return NULL;
}

audit_del_rule() 函式需要在自旋鎖下設定 deleted 標誌,如下所示

static inline int audit_del_rule(struct audit_rule *rule,
                                 struct list_head *list)
{
        struct audit_entry *e;

        /* No need to use the _rcu iterator here, since this
         * is the only deletion routine. */
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        spin_lock(&e->lock);
                        list_del_rcu(&e->list);
                        e->deleted = 1;
                        spin_unlock(&e->lock);
                        call_rcu(&e->rcu, audit_free_rule);
                        return 0;
                }
        }
        return -EFAULT;         /* No matching rule */
}

這也假設呼叫者持有 audit_filter_mutex

請注意,此示例假設條目僅新增和刪除。需要額外的機制來正確處理 audit_upd_rule() 執行的就地更新。例如,audit_upd_rule() 需要在執行 list_replace_rcu() 時持有舊的 audit_entry 及其替換項的鎖。

示例 5:跳過過時的物件

對於某些用例,可以透過在讀取端列表遍歷期間跳過過時的物件來提高讀取器效能,其中過時的物件是指將在一個或多個寬限期後刪除和銷燬的物件。一個這樣的例子可以在 timerfd 子系統中找到。當重新程式設計 CLOCK_REALTIME 時鐘(例如,由於設定系統時間)時,所有已程式設計的依賴於該時鐘的 timerfds 都會被觸發,並且等待它們的程序會在其計劃到期之前被喚醒。為了方便這一點,當在 timerfd_setup_cancel() 中設定時,所有這些計時器都會被新增到 RCU 管理的 cancel_list

static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
{
        spin_lock(&ctx->cancel_lock);
        if ((ctx->clockid == CLOCK_REALTIME ||
             ctx->clockid == CLOCK_REALTIME_ALARM) &&
            (flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
                if (!ctx->might_cancel) {
                        ctx->might_cancel = true;
                        spin_lock(&cancel_lock);
                        list_add_rcu(&ctx->clist, &cancel_list);
                        spin_unlock(&cancel_lock);
                }
        } else {
                __timerfd_remove_cancel(ctx);
        }
        spin_unlock(&ctx->cancel_lock);
}

當釋放 timerfd(關閉 fd)時,timerfd 物件的 might_cancel 標誌會被清除,物件從 cancel_list 中刪除並銷燬,如 timerfd_release() 的這個簡化和內聯版本所示

int timerfd_release(struct inode *inode, struct file *file)
{
        struct timerfd_ctx *ctx = file->private_data;

        spin_lock(&ctx->cancel_lock);
        if (ctx->might_cancel) {
                ctx->might_cancel = false;
                spin_lock(&cancel_lock);
                list_del_rcu(&ctx->clist);
                spin_unlock(&cancel_lock);
        }
        spin_unlock(&ctx->cancel_lock);

        if (isalarm(ctx))
                alarm_cancel(&ctx->t.alarm);
        else
                hrtimer_cancel(&ctx->t.tmr);
        kfree_rcu(ctx, rcu);
        return 0;
}

如果設定了 CLOCK_REALTIME 時鐘,例如透過時間伺服器,則 hrtimer 框架會呼叫 timerfd_clock_was_set(),它會遍歷 cancel_list 並喚醒等待 timerfd 的程序。在迭代 cancel_list 時,會查詢 might_cancel 標誌以跳過過時的物件

void timerfd_clock_was_set(void)
{
        ktime_t moffs = ktime_mono_to_real(0);
        struct timerfd_ctx *ctx;
        unsigned long flags;

        rcu_read_lock();
        list_for_each_entry_rcu(ctx, &cancel_list, clist) {
                if (!ctx->might_cancel)
                        continue;
                spin_lock_irqsave(&ctx->wqh.lock, flags);
                if (ctx->moffs != moffs) {
                        ctx->moffs = KTIME_MAX;
                        ctx->ticks++;
                        wake_up_locked_poll(&ctx->wqh, EPOLLIN);
                }
                spin_unlock_irqrestore(&ctx->wqh.lock, flags);
        }
        rcu_read_unlock();
}

關鍵點是,因為 RCU 保護的 cancel_list 遍歷與物件新增和刪除同時發生,所以有時遍歷可以訪問已從列表中刪除的物件。在這個例子中,使用一個標誌來跳過這些物件。

總結

主要用於讀取的基於列表的資料結構,可以容忍過時的資料最適合使用 RCU。最簡單的情況是從資料結構中新增或刪除條目(或原子地就地修改),但是非原子就地修改可以透過製作副本,更新副本,然後用副本替換原始副本來處理。如果不能容忍過時的資料,那麼可以使用 deleted 標誌與每個條目的自旋鎖結合使用,以便允許搜尋函式拒絕新刪除的資料。

快速測驗的答案

為了使 deleted 標誌技術有用,為什麼有必要在從搜尋函式返回時持有每個條目的鎖?

如果搜尋函式在返回之前釋放每個條目的鎖,那麼無論如何,呼叫者都將處理過時的資料。如果處理過時的資料確實可以,那麼你不需要 deleted 標誌。如果處理過時的資料確實是一個問題,那麼你需要跨越所有使用返回值的程式碼持有每個條目的鎖。

返回快速測驗