RCU和可解除安裝模組

[最初發表於 LWN 2007 年 1 月 14 日:http://lwn.net/Articles/217484/]

RCU 更新器有時使用 call_rcu() 來啟動對寬限期經過的非同步等待。此原語接受一個指向位於 RCU 保護的資料結構中的 rcu_head 結構體的指標,以及另一個指向稍後可能被呼叫以釋放該結構體的函式的指標。然後,從 IRQ 上下文中刪除連結串列中的元素 p 的程式碼可能如下

list_del_rcu(p);
call_rcu(&p->rcu, p_callback);

由於 call_rcu() 永遠不會阻塞,因此可以在 IRQ 上下文中安全地使用此程式碼。函式 p_callback() 可以定義如下

static void p_callback(struct rcu_head *rp)
{
        struct pstruct *p = container_of(rp, struct pstruct, rcu);

        kfree(p);
}

解除安裝使用 call_rcu() 的模組

但是,如果 p_callback() 函式是在可解除安裝模組中定義的呢?

如果我們在某些 RCU 回撥掛起時解除安裝模組,則執行這些回撥的 CPU 將在稍後被呼叫時感到非常失望,如 http://lwn.net/images/ns/kernel/rcu-drop.jpg 中生動地描繪的那樣。

我們可以嘗試在模組退出程式碼路徑中放置一個 synchronize_rcu(),但這並不足夠。雖然 synchronize_rcu() 確實會等待寬限期經過,但它不會等待回撥完成。

人們可能會試圖嘗試幾個背靠背的 synchronize_rcu() 呼叫,但這仍然不能保證有效。如果 RCU 回撥負載非常重,那麼可能會推遲某些回撥,以便允許進行其他處理。例如,在即時核心中需要這種延遲,以避免過度的排程延遲。

rcu_barrier()

這種情況可以透過 rcu_barrier() 原語來處理。 rcu_barrier() 不是等待寬限期經過,而是等待所有未完成的 RCU 回撥完成。請注意,rcu_barrier() 意味著 synchronize_rcu(),特別是,如果沒有 RCU 回撥在任何地方排隊,rcu_barrier() 有權立即返回,而不等待任何事情,更不用說寬限期了。

使用 rcu_barrier() 的虛擬碼如下

  1. 阻止釋出任何新的 RCU 回撥。

  2. 執行 rcu_barrier()

  3. 允許解除安裝模組。

還有一個用於 SRCU 的 srcu_barrier() 函式,當然,您必須將 srcu_barrier() 的風格與 call_srcu() 的風格相匹配。如果您的模組使用多個 srcu_struct 結構,那麼在解除安裝該模組時,它也必須使用多個 srcu_barrier() 呼叫。例如,如果它在 srcu_struct_1 上使用 call_rcu()call_srcu(),並在 srcu_struct_2 上使用 call_srcu(),那麼在解除安裝時將需要以下三行程式碼

1  rcu_barrier();
2  srcu_barrier(&srcu_struct_1);
3  srcu_barrier(&srcu_struct_2);

如果延遲至關重要,則可以使用工作佇列併發執行這三個函式。

rcutorture 模組的舊版本在其退出函式中使用 rcu_barrier(),如下所示

 1  static void
 2  rcu_torture_cleanup(void)
 3  {
 4    int i;
 5
 6    fullstop = 1;
 7    if (shuffler_task != NULL) {
 8      VERBOSE_PRINTK_STRING("Stopping rcu_torture_shuffle task");
 9      kthread_stop(shuffler_task);
10    }
11    shuffler_task = NULL;
12
13    if (writer_task != NULL) {
14      VERBOSE_PRINTK_STRING("Stopping rcu_torture_writer task");
15      kthread_stop(writer_task);
16    }
17    writer_task = NULL;
18
19    if (reader_tasks != NULL) {
20      for (i = 0; i < nrealreaders; i++) {
21        if (reader_tasks[i] != NULL) {
22          VERBOSE_PRINTK_STRING(
23            "Stopping rcu_torture_reader task");
24          kthread_stop(reader_tasks[i]);
25        }
26        reader_tasks[i] = NULL;
27      }
28      kfree(reader_tasks);
29      reader_tasks = NULL;
30    }
31    rcu_torture_current = NULL;
32
33    if (fakewriter_tasks != NULL) {
34      for (i = 0; i < nfakewriters; i++) {
35        if (fakewriter_tasks[i] != NULL) {
36          VERBOSE_PRINTK_STRING(
37            "Stopping rcu_torture_fakewriter task");
38          kthread_stop(fakewriter_tasks[i]);
39        }
40        fakewriter_tasks[i] = NULL;
41      }
42      kfree(fakewriter_tasks);
43      fakewriter_tasks = NULL;
44    }
45
46    if (stats_task != NULL) {
47      VERBOSE_PRINTK_STRING("Stopping rcu_torture_stats task");
48      kthread_stop(stats_task);
49    }
50    stats_task = NULL;
51
52    /* Wait for all RCU callbacks to fire. */
53    rcu_barrier();
54
55    rcu_torture_stats_print(); /* -After- the stats thread is stopped! */
56
57    if (cur_ops->cleanup != NULL)
58      cur_ops->cleanup();
59    if (atomic_read(&n_rcu_torture_error))
60      rcu_torture_print_module_parms("End of test: FAILURE");
61    else
62      rcu_torture_print_module_parms("End of test: SUCCESS");
63  }

第 6 行設定了一個全域性變數,以防止任何 RCU 回撥重新發布自身。這在大多數情況下是不必要的,因為 RCU 回撥很少包括對 call_rcu() 的呼叫。但是,rcutorture 模組是此規則的一個例外,因此需要設定此全域性變數。

第 7-50 行停止了與 rcutorture 模組關聯的所有核心任務。因此,一旦執行到達第 53 行,將不會發布更多 rcutorture RCU 回撥。第 53 行上的 rcu_barrier() 呼叫會等待任何預先存在的回撥完成。

然後,第 55-62 行列印狀態並執行特定於操作的清理,然後返回,允許完成模組解除安裝操作。

快速測驗 #1

還有其他任何情況需要 rcu_barrier() 嗎?

快速測驗 #1 的答案

您的模組可能存在其他複雜情況。例如,如果您的模組從定時器呼叫 call_rcu(),您需要首先避免釋出新的定時器,取消(或等待)所有已釋出的定時器,然後才能呼叫 rcu_barrier() 以等待任何剩餘的 RCU 回撥完成。

當然,如果您的模組使用 call_rcu(),您需要在解除安裝之前呼叫 rcu_barrier()。同樣,如果您的模組使用 call_srcu(),您需要在解除安裝之前呼叫 srcu_barrier(),並且在同一個 srcu_struct 結構上呼叫。如果您的模組使用 call_rcu() call_srcu(),那麼(如上所述)您需要呼叫 rcu_barrier() srcu_barrier()

實現 rcu_barrier()

Dipankar Sarma 對 rcu_barrier() 的實現利用了以下事實:RCU 回撥一旦在每個 CPU 佇列之一上排隊,就永遠不會重新排序。他的實現將 RCU 回撥排隊在每個 CPU 回撥佇列上,然後等待它們全部開始執行,此時,保證所有較早的 RCU 回撥都已完成。

rcu_barrier() 的原始程式碼大致如下

 1  void rcu_barrier(void)
 2  {
 3    BUG_ON(in_interrupt());
 4    /* Take cpucontrol mutex to protect against CPU hotplug */
 5    mutex_lock(&rcu_barrier_mutex);
 6    init_completion(&rcu_barrier_completion);
 7    atomic_set(&rcu_barrier_cpu_count, 1);
 8    on_each_cpu(rcu_barrier_func, NULL, 0, 1);
 9    if (atomic_dec_and_test(&rcu_barrier_cpu_count))
10      complete(&rcu_barrier_completion);
11    wait_for_completion(&rcu_barrier_completion);
12    mutex_unlock(&rcu_barrier_mutex);
13  }

第 3 行驗證呼叫者是否在程序上下文中,第 5 行和第 12 行使用 rcu_barrier_mutex 來確保一次只有一個 rcu_barrier() 使用全域性完成和計數器,這些計數器在第 6 行和第 7 行初始化。第 8 行使每個 CPU 呼叫 rcu_barrier_func(),如下所示。請注意,on_each_cpu() 的引數列表中的最後一個 “1” 確保對 rcu_barrier_func() 的所有呼叫在 on_each_cpu() 返回之前完成。第 9 行從 rcu_barrier_cpu_count 中刪除初始計數,如果此計數現在為零,則第 10 行完成完成,這會阻止第 11 行阻塞。無論哪種方式,第 11 行都會等待(如果需要)完成。

快速測驗 #2

為什麼第 8 行不將 rcu_barrier_cpu_count 初始化為零,從而避免對第 9 行和第 10 行的需求?

快速測驗 #2 的答案

此程式碼在 2008 年重寫,之後多次重寫,但這仍然給出了總體思路。

rcu_barrier_func() 在每個 CPU 上執行,它呼叫 call_rcu() 以釋出 RCU 回撥,如下所示

 1  static void rcu_barrier_func(void *notused)
 2  {
 3    int cpu = smp_processor_id();
 4    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 5    struct rcu_head *head;
 6
 7    head = &rdp->barrier;
 8    atomic_inc(&rcu_barrier_cpu_count);
 9    call_rcu(head, rcu_barrier_callback);
10  }

第 3 行和第 4 行定位 RCU 的內部每個 CPU rcu_data 結構,該結構包含稍後呼叫 call_rcu() 所需的 struct rcu_head。第 7 行獲取指向此 struct rcu_head 的指標,第 8 行遞增全域性計數器。此計數器稍後將由回撥遞減。然後,第 9 行在當前 CPU 的佇列上註冊 rcu_barrier_callback()。

rcu_barrier_callback() 函式只是原子地遞減 rcu_barrier_cpu_count 變數,並在其達到零時完成完成,如下所示

1  static void rcu_barrier_callback(struct rcu_head *notused)
2  {
3    if (atomic_dec_and_test(&rcu_barrier_cpu_count))
4      complete(&rcu_barrier_completion);
5  }
快速測驗 #3

如果 CPU 0 的 rcu_barrier_func() 立即執行(因此將 rcu_barrier_cpu_count 遞增到值 1),但其他 CPU 的 rcu_barrier_func() 呼叫延遲了完整的寬限期會發生什麼?這是否會導致 rcu_barrier() 過早返回?

快速測驗 #3 的答案

當前的 rcu_barrier() 實現更加複雜,這是由於需要避免干擾空閒 CPU(尤其是在電池供電的系統上)以及需要最小程度地干擾即時系統中的非空閒 CPU。此外,還應用了大量的最佳化。但是,上面的程式碼說明了這些概念。

rcu_barrier() 摘要

rcu_barrier() 原語的使用頻率相對較低,因為大多數使用 RCU 的程式碼都在核心核心中,而不是在模組中。但是,如果您從可解除安裝模組中使用 RCU,則需要使用 rcu_barrier(),以便可以安全地解除安裝您的模組。

快速測驗的答案

快速測驗 #1

還有其他任何情況需要 rcu_barrier() 嗎?

答案

有趣的是,rcu_barrier() 最初不是為模組解除安裝而實現的。 Nikita Danilov 在檔案系統中使用 RCU,這導致了檔案系統解除安裝時的類似情況。 Dipankar Sarma 對 rcu_barrier() 進行了編碼以響應,以便 Nikita 可以在檔案系統解除安裝過程中呼叫它。

很久以後,我自己在實現 rcutorture 時遇到了 RCU 模組解除安裝問題,並發現 rcu_barrier() 也解決了這個問題。

返回快速測驗 #1

快速測驗 #2

為什麼第 8 行不將 rcu_barrier_cpu_count 初始化為零,從而避免對第 9 行和第 10 行的需求?

答案

假設第 8 行上顯示的 on_each_cpu() 函式被延遲,因此 CPU 0 的 rcu_barrier_func() 執行並且相應的寬限期經過,所有這些都在 CPU 1 的 rcu_barrier_func() 開始執行之前。這將導致 rcu_barrier_cpu_count 遞減到零,因此第 11 行的 wait_for_completion() 將立即返回,而未能等待 CPU 1 的回撥被呼叫。

請注意,當 rcu_barrier() 程式碼在 2005 年首次新增時,這不是一個問題。這是因為 on_each_cpu() 停用了搶佔,這充當了 RCU 讀取端臨界區,從而阻止了 CPU 0 的寬限期完成,直到 on_each_cpu() 處理了所有 CPU。

但是,隨著 v4.20 左右的 RCU 風格整合,再次排除了這種可能性,因為整合後的 RCU 再次等待程式碼的非搶佔區域。

但是,額外的計數仍然可能是一個好主意。依賴於這些偶然的實現可能會在實現更改時導致以後的意外錯誤。

返回快速測驗 #2

快速測驗 #3

如果 CPU 0 的 rcu_barrier_func() 立即執行(因此將 rcu_barrier_cpu_count 遞增到值 1),但其他 CPU 的 rcu_barrier_func() 呼叫延遲了完整的寬限期會發生什麼?這是否會導致 rcu_barrier() 過早返回?

答案

這不可能發生。原因是 on_each_cpu() 的最後一個引數(等待標誌)設定為 “1”。此標誌透過 smp_call_function() 並進一步傳遞到 smp_call_function_on_cpu(),導致後者旋轉,直到 rcu_barrier_func() 的跨 CPU 呼叫完成。這本身會阻止寬限期在非 CONFIG_PREEMPTION 核心上完成,因為每個 CPU 必須經過上下文切換(或其他靜止狀態)才能完成寬限期。但是,這在 CONFIG_PREEMPTION 核心中沒有用處。

因此,on_each_cpu() 在其呼叫 smp_call_function() 期間以及本地呼叫 rcu_barrier_func() 期間停用搶佔。由於最近的 RCU 實現將停用搶佔的程式碼區域視為 RCU 讀取端臨界區,因此這會阻止寬限期完成。這意味著所有 CPU 都已執行 rcu_barrier_func(),然後第一個 rcu_barrier_callback() 才可能執行,進而防止 rcu_barrier_cpu_count 過早達到零。

但是,如果 on_each_cpu() 曾經決定放棄停用搶佔(由於即時延遲考慮,這很可能發生),那麼將 rcu_barrier_cpu_count 初始化為 1 將會挽救局面。

返回快速測驗 #3