使用 RCU 保護主要用於讀取的連結串列¶
RCU 最常見的用途之一是保護主要用於讀取的連結串列(list.h 中的 struct list_head)。這種方法的一個主要優點是所有必需的記憶體順序都由列表宏提供。本文件描述了幾個基於列表的 RCU 用例。
當持有 rcu_read_lock() 迭代列表時,寫入器可能會修改列表。讀取器保證看到在獲取 rcu_read_lock() 之前新增到列表中的所有元素,並且在釋放 rcu_read_unlock() 時仍然在列表上。可能會看到或可能不會看到新增到列表或從列表中刪除的元素。如果寫入器呼叫 list_replace_rcu(),則讀取器可能會看到舊元素或新元素;他們不會同時看到兩者,也不會兩者都看不到。
示例 1:主要用於讀取的列表:延遲銷燬¶
核心中 RCU 列表的一個廣泛使用的用例是在系統中無鎖迭代所有程序。task_struct::tasks 表示連結所有程序的列表節點。列表可以與任何列表新增或刪除並行遍歷。
列表的遍歷是使用 for_each_process() 完成的,它由 2 個宏定義
#define next_task(p) \
list_entry_rcu((p)->tasks.next, struct task_struct, tasks)
#define for_each_process(p) \
for (p = &init_task ; (p = next_task(p)) != &init_task ; )
遍歷所有程序列表的程式碼通常如下所示
rcu_read_lock();
for_each_process(p) {
/* Do something with p */
}
rcu_read_unlock();
從任務列表中刪除程序的簡化和高度內聯的程式碼是
void release_task(struct task_struct *p)
{
write_lock(&tasklist_lock);
list_del_rcu(&p->tasks);
write_unlock(&tasklist_lock);
call_rcu(&p->rcu, delayed_put_task_struct);
}
當程序退出時,release_task() 透過 tasklist_lock 寫入器鎖保護下的 __exit_signal() 和 __unhash_process() 呼叫 list_del_rcu(&p->tasks)。list_del_rcu() 呼叫從所有任務的列表中刪除該任務。tasklist_lock 阻止併發的列表新增/刪除損壞列表。使用 for_each_process() 的讀取器不受 tasklist_lock 的保護。為了防止讀取器注意到列表指標中的更改,只有在一個或多個寬限期過去後才會釋放 task_struct 物件,藉助 call_rcu(),它透過 put_task_struct_rcu_user() 呼叫。這種延遲銷燬確保了遍歷列表的任何讀取器都將看到有效的 p->tasks.next 指標,並且刪除/釋放可以與列表的遍歷並行發生。這種模式也稱為**存在鎖**,因為 RCU 在所有現有讀取器完成後才呼叫 delayed_put_task_struct() 回撥函式,這保證了有問題的 task_struct 物件將在所有可能具有對該物件引用的 RCU 讀取器完成之後才存在。
示例 2:在鎖外採取讀取端操作:沒有就地更新¶
一些讀取器-寫入器鎖定用例在持有讀取端鎖的同時計算一個值,但在釋放該鎖後繼續使用該值。這些用例通常是轉換為 RCU 的好候選者。一個突出的例子涉及網路資料包路由。因為資料包路由資料跟蹤計算機外部裝置的狀態,所以有時會包含過時的資料。因此,一旦計算出路由,就沒有必要在資料包傳輸期間保持路由表靜態。畢竟,你可以隨意保持路由表靜態,但這不會阻止外部 Internet 發生變化,而外部 Internet 的狀態才是真正重要的。此外,路由條目通常是新增或刪除,而不是就地修改。這是一個光速有限和原子非零大小的罕見例子,實際上有助於使同步更輕量級。
這種型別的 RCU 用例的一個簡單示例可以在系統呼叫審計支援中找到。例如,audit_filter_task() 的讀取器-寫入器鎖定實現可能如下所示
static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
read_lock(&auditsc_lock);
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
read_unlock(&auditsc_lock);
return state;
}
}
read_unlock(&auditsc_lock);
return AUDIT_BUILD_CONTEXT;
}
這裡列表在鎖下搜尋,但在返回相應的值之前釋放鎖。在這個值被執行時,列表很可能已經被修改了。這是有道理的,因為如果你正在關閉審計,那麼審計一些額外的系統呼叫是可以的。
這意味著 RCU 可以很容易地應用於讀取端,如下所示
static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
rcu_read_unlock();
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
read_lock() 和 read_unlock() 呼叫分別變為 rcu_read_lock() 和 rcu_read_unlock(),並且 list_for_each_entry() 變為 list_for_each_entry_rcu()。_rcu() 列表遍歷原語新增 READ_ONCE() 和診斷檢查,以防止在 RCU 讀取端臨界區之外的錯誤使用。
更新端的更改也很簡單。讀取器-寫入器鎖可以如下所示用於 audit_del_rule() 和 audit_add_rule() 的簡化版本中的刪除和插入
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
write_lock(&auditsc_lock);
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del(&e->list);
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
write_lock(&auditsc_lock);
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add(&entry->list, list);
} else {
list_add_tail(&entry->list, list);
}
write_unlock(&auditsc_lock);
return 0;
}
以下是這兩個函式的 RCU 等效項
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* No need to use the _rcu iterator here, since this is the only
* deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del_rcu(&e->list);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add_rcu(&entry->list, list);
} else {
list_add_tail_rcu(&entry->list, list);
}
return 0;
}
通常,write_lock() 和 write_unlock() 將被 spin_lock() 和 spin_unlock() 替換。但在這種情況下,所有呼叫者都持有 audit_filter_mutex,因此不需要額外的鎖定。因此可以消除 auditsc_lock,因為使用 RCU 消除了寫入器排除讀取器的需要。
list_del()、list_add() 和 list_add_tail() 原語已被 list_del_rcu()、list_add_rcu() 和 list_add_tail_rcu() 替換。_rcu() 列表操作原語添加了弱序 CPU 上所需的記憶體屏障。list_del_rcu() 原語省略了指標中毒除錯輔助程式碼,否則會導致併發讀取器失敗。
因此,當讀取器可以容忍過時的資料,並且條目被新增或刪除,而沒有就地修改時,使用 RCU 非常容易!
示例 3:處理就地更新¶
系統呼叫審計程式碼不會就地更新審計規則。但是,如果它這樣做了,那麼這樣做的讀取器-寫入器鎖定程式碼可能如下所示(假設只更新了 field_count,否則,將需要填充新增的欄位)
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_entry *ne;
write_lock(&auditsc_lock);
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
e->rule.action = newaction;
e->rule.field_count = newfield_count;
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
RCU 版本建立一個副本,更新副本,然後用新更新的條目替換舊條目。這個動作序列,允許併發讀取,同時建立一個副本以執行更新,這就是 RCU (讀取-複製更新) 名稱的由來。
audit_upd_rule() 的 RCU 版本如下
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_entry *ne;
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
if (ne == NULL)
return -ENOMEM;
audit_copy_rule(&ne->rule, &e->rule);
ne->rule.action = newaction;
ne->rule.field_count = newfield_count;
list_replace_rcu(&e->list, &ne->list);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
同樣,這假設呼叫者持有 audit_filter_mutex。通常,在這種程式碼中,寫入器鎖會變成自旋鎖。
update_lsm_rule() 做的事情非常相似,對於那些希望檢視真正的 Linux 核心程式碼的人來說。
這種模式的另一個用途可以在 openswitch 驅動程式的 連線跟蹤表 程式碼中的 ct_limit_set() 中找到。該表儲存連線跟蹤條目,並且對最大條目數有限制。每個區域都有一個這樣的表,因此每個區域都有一個 限制。這些區域透過使用 RCU 管理的 hlist 的雜湊表對映到它們的限制。當設定新的限制時,將分配一個新的限制物件,並呼叫 ct_limit_set() 以使用 list_replace_rcu() 替換舊的限制物件。然後使用 kfree_rcu() 在寬限期後釋放舊的限制物件。
示例 4:消除過時的資料¶
上面的審計示例容忍過時的資料,就像大多數跟蹤外部狀態的演算法一樣。畢竟,考慮到從外部狀態更改到 Linux 意識到更改之間存在延遲,因此如前所述,少量額外的 RCU 引起的過時通常不是問題。
但是,在許多示例中不能容忍過時的資料。Linux 核心中的一個例子是 System V IPC(參見 ipc/shm.c 中的 shm_lock() 函式)。此程式碼在每個條目的自旋鎖下檢查 deleted 標誌,如果設定了 deleted 標誌,則假裝該條目不存在。為了使其有用,搜尋函式必須返回持有每個條目的自旋鎖,正如 shm_lock() 實際上所做的那樣。
- 快速測驗
為了使 deleted 標誌技術有用,為什麼有必要在從搜尋函式返回時持有每個條目的鎖?
如果系統呼叫審計模組需要拒絕過時的資料,一種方法是在 audit_entry 結構中新增一個 deleted 標誌和一個 lock 自旋鎖,並按如下方式修改 audit_filter_task()
static struct audit_entry *audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
spin_lock(&e->lock);
if (e->deleted) {
spin_unlock(&e->lock);
rcu_read_unlock();
return NULL;
}
rcu_read_unlock();
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
/* As long as e->lock is held, e is valid and
* its value is not stale */
return e;
}
}
rcu_read_unlock();
return NULL;
}
audit_del_rule() 函式需要在自旋鎖下設定 deleted 標誌,如下所示
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* No need to use the _rcu iterator here, since this
* is the only deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
spin_lock(&e->lock);
list_del_rcu(&e->list);
e->deleted = 1;
spin_unlock(&e->lock);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
這也假設呼叫者持有 audit_filter_mutex。
請注意,此示例假設條目僅新增和刪除。需要額外的機制來正確處理 audit_upd_rule() 執行的就地更新。例如,audit_upd_rule() 需要在執行 list_replace_rcu() 時持有舊的 audit_entry 及其替換項的鎖。
示例 5:跳過過時的物件¶
對於某些用例,可以透過在讀取端列表遍歷期間跳過過時的物件來提高讀取器效能,其中過時的物件是指將在一個或多個寬限期後刪除和銷燬的物件。一個這樣的例子可以在 timerfd 子系統中找到。當重新程式設計 CLOCK_REALTIME 時鐘(例如,由於設定系統時間)時,所有已程式設計的依賴於該時鐘的 timerfds 都會被觸發,並且等待它們的程序會在其計劃到期之前被喚醒。為了方便這一點,當在 timerfd_setup_cancel() 中設定時,所有這些計時器都會被新增到 RCU 管理的 cancel_list 中
static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
{
spin_lock(&ctx->cancel_lock);
if ((ctx->clockid == CLOCK_REALTIME ||
ctx->clockid == CLOCK_REALTIME_ALARM) &&
(flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
if (!ctx->might_cancel) {
ctx->might_cancel = true;
spin_lock(&cancel_lock);
list_add_rcu(&ctx->clist, &cancel_list);
spin_unlock(&cancel_lock);
}
} else {
__timerfd_remove_cancel(ctx);
}
spin_unlock(&ctx->cancel_lock);
}
當釋放 timerfd(關閉 fd)時,timerfd 物件的 might_cancel 標誌會被清除,物件從 cancel_list 中刪除並銷燬,如 timerfd_release() 的這個簡化和內聯版本所示
int timerfd_release(struct inode *inode, struct file *file)
{
struct timerfd_ctx *ctx = file->private_data;
spin_lock(&ctx->cancel_lock);
if (ctx->might_cancel) {
ctx->might_cancel = false;
spin_lock(&cancel_lock);
list_del_rcu(&ctx->clist);
spin_unlock(&cancel_lock);
}
spin_unlock(&ctx->cancel_lock);
if (isalarm(ctx))
alarm_cancel(&ctx->t.alarm);
else
hrtimer_cancel(&ctx->t.tmr);
kfree_rcu(ctx, rcu);
return 0;
}
如果設定了 CLOCK_REALTIME 時鐘,例如透過時間伺服器,則 hrtimer 框架會呼叫 timerfd_clock_was_set(),它會遍歷 cancel_list 並喚醒等待 timerfd 的程序。在迭代 cancel_list 時,會查詢 might_cancel 標誌以跳過過時的物件
void timerfd_clock_was_set(void)
{
ktime_t moffs = ktime_mono_to_real(0);
struct timerfd_ctx *ctx;
unsigned long flags;
rcu_read_lock();
list_for_each_entry_rcu(ctx, &cancel_list, clist) {
if (!ctx->might_cancel)
continue;
spin_lock_irqsave(&ctx->wqh.lock, flags);
if (ctx->moffs != moffs) {
ctx->moffs = KTIME_MAX;
ctx->ticks++;
wake_up_locked_poll(&ctx->wqh, EPOLLIN);
}
spin_unlock_irqrestore(&ctx->wqh.lock, flags);
}
rcu_read_unlock();
}
關鍵點是,因為 RCU 保護的 cancel_list 遍歷與物件新增和刪除同時發生,所以有時遍歷可以訪問已從列表中刪除的物件。在這個例子中,使用一個標誌來跳過這些物件。
總結¶
主要用於讀取的基於列表的資料結構,可以容忍過時的資料最適合使用 RCU。最簡單的情況是從資料結構中新增或刪除條目(或原子地就地修改),但是非原子就地修改可以透過製作副本,更新副本,然後用副本替換原始副本來處理。如果不能容忍過時的資料,那麼可以使用 deleted 標誌與每個條目的自旋鎖結合使用,以便允許搜尋函式拒絕新刪除的資料。
- 快速測驗的答案
為了使 deleted 標誌技術有用,為什麼有必要在從搜尋函式返回時持有每個條目的鎖?
如果搜尋函式在返回之前釋放每個條目的鎖,那麼無論如何,呼叫者都將處理過時的資料。如果處理過時的資料確實可以,那麼你不需要 deleted 標誌。如果處理過時的資料確實是一個問題,那麼你需要跨越所有使用返回值的程式碼持有每個條目的鎖。