RCU和可解除安裝模組¶
[最初發表於 LWN 2007 年 1 月 14 日:http://lwn.net/Articles/217484/]
RCU 更新器有時使用 call_rcu() 來啟動對寬限期經過的非同步等待。此原語接受一個指向位於 RCU 保護的資料結構中的 rcu_head 結構體的指標,以及另一個指向稍後可能被呼叫以釋放該結構體的函式的指標。然後,從 IRQ 上下文中刪除連結串列中的元素 p 的程式碼可能如下
list_del_rcu(p);
call_rcu(&p->rcu, p_callback);
由於 call_rcu() 永遠不會阻塞,因此可以在 IRQ 上下文中安全地使用此程式碼。函式 p_callback() 可以定義如下
static void p_callback(struct rcu_head *rp)
{
struct pstruct *p = container_of(rp, struct pstruct, rcu);
kfree(p);
}
解除安裝使用 call_rcu() 的模組¶
但是,如果 p_callback() 函式是在可解除安裝模組中定義的呢?
如果我們在某些 RCU 回撥掛起時解除安裝模組,則執行這些回撥的 CPU 將在稍後被呼叫時感到非常失望,如 http://lwn.net/images/ns/kernel/rcu-drop.jpg 中生動地描繪的那樣。
我們可以嘗試在模組退出程式碼路徑中放置一個 synchronize_rcu(),但這並不足夠。雖然 synchronize_rcu() 確實會等待寬限期經過,但它不會等待回撥完成。
人們可能會試圖嘗試幾個背靠背的 synchronize_rcu() 呼叫,但這仍然不能保證有效。如果 RCU 回撥負載非常重,那麼可能會推遲某些回撥,以便允許進行其他處理。例如,在即時核心中需要這種延遲,以避免過度的排程延遲。
rcu_barrier()¶
這種情況可以透過 rcu_barrier() 原語來處理。 rcu_barrier() 不是等待寬限期經過,而是等待所有未完成的 RCU 回撥完成。請注意,rcu_barrier() 不 意味著 synchronize_rcu(),特別是,如果沒有 RCU 回撥在任何地方排隊,rcu_barrier() 有權立即返回,而不等待任何事情,更不用說寬限期了。
使用 rcu_barrier() 的虛擬碼如下
阻止釋出任何新的 RCU 回撥。
執行
rcu_barrier()。允許解除安裝模組。
還有一個用於 SRCU 的 srcu_barrier() 函式,當然,您必須將 srcu_barrier() 的風格與 call_srcu() 的風格相匹配。如果您的模組使用多個 srcu_struct 結構,那麼在解除安裝該模組時,它也必須使用多個 srcu_barrier() 呼叫。例如,如果它在 srcu_struct_1 上使用 call_rcu()、call_srcu(),並在 srcu_struct_2 上使用 call_srcu(),那麼在解除安裝時將需要以下三行程式碼
1 rcu_barrier();
2 srcu_barrier(&srcu_struct_1);
3 srcu_barrier(&srcu_struct_2);
如果延遲至關重要,則可以使用工作佇列併發執行這三個函式。
rcutorture 模組的舊版本在其退出函式中使用 rcu_barrier(),如下所示
1 static void
2 rcu_torture_cleanup(void)
3 {
4 int i;
5
6 fullstop = 1;
7 if (shuffler_task != NULL) {
8 VERBOSE_PRINTK_STRING("Stopping rcu_torture_shuffle task");
9 kthread_stop(shuffler_task);
10 }
11 shuffler_task = NULL;
12
13 if (writer_task != NULL) {
14 VERBOSE_PRINTK_STRING("Stopping rcu_torture_writer task");
15 kthread_stop(writer_task);
16 }
17 writer_task = NULL;
18
19 if (reader_tasks != NULL) {
20 for (i = 0; i < nrealreaders; i++) {
21 if (reader_tasks[i] != NULL) {
22 VERBOSE_PRINTK_STRING(
23 "Stopping rcu_torture_reader task");
24 kthread_stop(reader_tasks[i]);
25 }
26 reader_tasks[i] = NULL;
27 }
28 kfree(reader_tasks);
29 reader_tasks = NULL;
30 }
31 rcu_torture_current = NULL;
32
33 if (fakewriter_tasks != NULL) {
34 for (i = 0; i < nfakewriters; i++) {
35 if (fakewriter_tasks[i] != NULL) {
36 VERBOSE_PRINTK_STRING(
37 "Stopping rcu_torture_fakewriter task");
38 kthread_stop(fakewriter_tasks[i]);
39 }
40 fakewriter_tasks[i] = NULL;
41 }
42 kfree(fakewriter_tasks);
43 fakewriter_tasks = NULL;
44 }
45
46 if (stats_task != NULL) {
47 VERBOSE_PRINTK_STRING("Stopping rcu_torture_stats task");
48 kthread_stop(stats_task);
49 }
50 stats_task = NULL;
51
52 /* Wait for all RCU callbacks to fire. */
53 rcu_barrier();
54
55 rcu_torture_stats_print(); /* -After- the stats thread is stopped! */
56
57 if (cur_ops->cleanup != NULL)
58 cur_ops->cleanup();
59 if (atomic_read(&n_rcu_torture_error))
60 rcu_torture_print_module_parms("End of test: FAILURE");
61 else
62 rcu_torture_print_module_parms("End of test: SUCCESS");
63 }
第 6 行設定了一個全域性變數,以防止任何 RCU 回撥重新發布自身。這在大多數情況下是不必要的,因為 RCU 回撥很少包括對 call_rcu() 的呼叫。但是,rcutorture 模組是此規則的一個例外,因此需要設定此全域性變數。
第 7-50 行停止了與 rcutorture 模組關聯的所有核心任務。因此,一旦執行到達第 53 行,將不會發布更多 rcutorture RCU 回撥。第 53 行上的 rcu_barrier() 呼叫會等待任何預先存在的回撥完成。
然後,第 55-62 行列印狀態並執行特定於操作的清理,然後返回,允許完成模組解除安裝操作。
- 快速測驗 #1
還有其他任何情況需要
rcu_barrier()嗎?
您的模組可能存在其他複雜情況。例如,如果您的模組從定時器呼叫 call_rcu(),您需要首先避免釋出新的定時器,取消(或等待)所有已釋出的定時器,然後才能呼叫 rcu_barrier() 以等待任何剩餘的 RCU 回撥完成。
當然,如果您的模組使用 call_rcu(),您需要在解除安裝之前呼叫 rcu_barrier()。同樣,如果您的模組使用 call_srcu(),您需要在解除安裝之前呼叫 srcu_barrier(),並且在同一個 srcu_struct 結構上呼叫。如果您的模組使用 call_rcu() 和 call_srcu(),那麼(如上所述)您需要呼叫 rcu_barrier() 和 srcu_barrier()。
實現 rcu_barrier()¶
Dipankar Sarma 對 rcu_barrier() 的實現利用了以下事實:RCU 回撥一旦在每個 CPU 佇列之一上排隊,就永遠不會重新排序。他的實現將 RCU 回撥排隊在每個 CPU 回撥佇列上,然後等待它們全部開始執行,此時,保證所有較早的 RCU 回撥都已完成。
rcu_barrier() 的原始程式碼大致如下
1 void rcu_barrier(void)
2 {
3 BUG_ON(in_interrupt());
4 /* Take cpucontrol mutex to protect against CPU hotplug */
5 mutex_lock(&rcu_barrier_mutex);
6 init_completion(&rcu_barrier_completion);
7 atomic_set(&rcu_barrier_cpu_count, 1);
8 on_each_cpu(rcu_barrier_func, NULL, 0, 1);
9 if (atomic_dec_and_test(&rcu_barrier_cpu_count))
10 complete(&rcu_barrier_completion);
11 wait_for_completion(&rcu_barrier_completion);
12 mutex_unlock(&rcu_barrier_mutex);
13 }
第 3 行驗證呼叫者是否在程序上下文中,第 5 行和第 12 行使用 rcu_barrier_mutex 來確保一次只有一個 rcu_barrier() 使用全域性完成和計數器,這些計數器在第 6 行和第 7 行初始化。第 8 行使每個 CPU 呼叫 rcu_barrier_func(),如下所示。請注意,on_each_cpu() 的引數列表中的最後一個 “1” 確保對 rcu_barrier_func() 的所有呼叫在 on_each_cpu() 返回之前完成。第 9 行從 rcu_barrier_cpu_count 中刪除初始計數,如果此計數現在為零,則第 10 行完成完成,這會阻止第 11 行阻塞。無論哪種方式,第 11 行都會等待(如果需要)完成。
- 快速測驗 #2
為什麼第 8 行不將 rcu_barrier_cpu_count 初始化為零,從而避免對第 9 行和第 10 行的需求?
此程式碼在 2008 年重寫,之後多次重寫,但這仍然給出了總體思路。
rcu_barrier_func() 在每個 CPU 上執行,它呼叫 call_rcu() 以釋出 RCU 回撥,如下所示
1 static void rcu_barrier_func(void *notused)
2 {
3 int cpu = smp_processor_id();
4 struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
5 struct rcu_head *head;
6
7 head = &rdp->barrier;
8 atomic_inc(&rcu_barrier_cpu_count);
9 call_rcu(head, rcu_barrier_callback);
10 }
第 3 行和第 4 行定位 RCU 的內部每個 CPU rcu_data 結構,該結構包含稍後呼叫 call_rcu() 所需的 struct rcu_head。第 7 行獲取指向此 struct rcu_head 的指標,第 8 行遞增全域性計數器。此計數器稍後將由回撥遞減。然後,第 9 行在當前 CPU 的佇列上註冊 rcu_barrier_callback()。
rcu_barrier_callback() 函式只是原子地遞減 rcu_barrier_cpu_count 變數,並在其達到零時完成完成,如下所示
1 static void rcu_barrier_callback(struct rcu_head *notused)
2 {
3 if (atomic_dec_and_test(&rcu_barrier_cpu_count))
4 complete(&rcu_barrier_completion);
5 }
- 快速測驗 #3
如果 CPU 0 的 rcu_barrier_func() 立即執行(因此將 rcu_barrier_cpu_count 遞增到值 1),但其他 CPU 的 rcu_barrier_func() 呼叫延遲了完整的寬限期會發生什麼?這是否會導致
rcu_barrier()過早返回?
當前的 rcu_barrier() 實現更加複雜,這是由於需要避免干擾空閒 CPU(尤其是在電池供電的系統上)以及需要最小程度地干擾即時系統中的非空閒 CPU。此外,還應用了大量的最佳化。但是,上面的程式碼說明了這些概念。
rcu_barrier() 摘要¶
rcu_barrier() 原語的使用頻率相對較低,因為大多數使用 RCU 的程式碼都在核心核心中,而不是在模組中。但是,如果您從可解除安裝模組中使用 RCU,則需要使用 rcu_barrier(),以便可以安全地解除安裝您的模組。
快速測驗的答案¶
- 快速測驗 #1
還有其他任何情況需要
rcu_barrier()嗎?- 答案
有趣的是,
rcu_barrier()最初不是為模組解除安裝而實現的。 Nikita Danilov 在檔案系統中使用 RCU,這導致了檔案系統解除安裝時的類似情況。 Dipankar Sarma 對rcu_barrier()進行了編碼以響應,以便 Nikita 可以在檔案系統解除安裝過程中呼叫它。很久以後,我自己在實現 rcutorture 時遇到了 RCU 模組解除安裝問題,並發現
rcu_barrier()也解決了這個問題。
- 快速測驗 #2
為什麼第 8 行不將 rcu_barrier_cpu_count 初始化為零,從而避免對第 9 行和第 10 行的需求?
- 答案
假設第 8 行上顯示的 on_each_cpu() 函式被延遲,因此 CPU 0 的 rcu_barrier_func() 執行並且相應的寬限期經過,所有這些都在 CPU 1 的 rcu_barrier_func() 開始執行之前。這將導致 rcu_barrier_cpu_count 遞減到零,因此第 11 行的 wait_for_completion() 將立即返回,而未能等待 CPU 1 的回撥被呼叫。
請注意,當
rcu_barrier()程式碼在 2005 年首次新增時,這不是一個問題。這是因為 on_each_cpu() 停用了搶佔,這充當了 RCU 讀取端臨界區,從而阻止了 CPU 0 的寬限期完成,直到 on_each_cpu() 處理了所有 CPU。但是,隨著 v4.20 左右的 RCU 風格整合,再次排除了這種可能性,因為整合後的 RCU 再次等待程式碼的非搶佔區域。
但是,額外的計數仍然可能是一個好主意。依賴於這些偶然的實現可能會在實現更改時導致以後的意外錯誤。
- 快速測驗 #3
如果 CPU 0 的 rcu_barrier_func() 立即執行(因此將 rcu_barrier_cpu_count 遞增到值 1),但其他 CPU 的 rcu_barrier_func() 呼叫延遲了完整的寬限期會發生什麼?這是否會導致
rcu_barrier()過早返回?- 答案
這不可能發生。原因是 on_each_cpu() 的最後一個引數(等待標誌)設定為 “1”。此標誌透過 smp_call_function() 並進一步傳遞到 smp_call_function_on_cpu(),導致後者旋轉,直到 rcu_barrier_func() 的跨 CPU 呼叫完成。這本身會阻止寬限期在非 CONFIG_PREEMPTION 核心上完成,因為每個 CPU 必須經過上下文切換(或其他靜止狀態)才能完成寬限期。但是,這在 CONFIG_PREEMPTION 核心中沒有用處。
因此,on_each_cpu() 在其呼叫 smp_call_function() 期間以及本地呼叫 rcu_barrier_func() 期間停用搶佔。由於最近的 RCU 實現將停用搶佔的程式碼區域視為 RCU 讀取端臨界區,因此這會阻止寬限期完成。這意味著所有 CPU 都已執行 rcu_barrier_func(),然後第一個 rcu_barrier_callback() 才可能執行,進而防止 rcu_barrier_cpu_count 過早達到零。
但是,如果 on_each_cpu() 曾經決定放棄停用搶佔(由於即時延遲考慮,這很可能發生),那麼將 rcu_barrier_cpu_count 初始化為 1 將會挽救局面。