不可靠的鎖定指南¶
- 作者:
Rusty Russell
簡介¶
歡迎來到 Rusty 關於核心鎖定問題的非常不可靠的指南。本文件描述了 Linux Kernel 2.6 中的鎖定系統。
隨著 HyperThreading 的廣泛應用以及 Linux Kernel 中的搶佔,每個在核心上進行駭客攻擊的人都需要了解 SMP 的併發和鎖定的基本原理。
併發問題¶
(如果您知道什麼是競爭條件,請跳過此部分)。
在一個普通的程式中,您可以像這樣增加計數器
very_important_count++;
這是他們期望發生的事情
例項 1 |
例項 2 |
|---|---|
讀取 very_important_count (5) |
|
加 1 (6) |
|
寫入 very_important_count (6) |
|
讀取 very_important_count (6) |
|
加 1 (7) |
|
寫入 very_important_count (7) |
這可能發生的事情
例項 1 |
例項 2 |
|---|---|
讀取 very_important_count (5) |
|
讀取 very_important_count (5) |
|
加 1 (6) |
|
加 1 (6) |
|
寫入 very_important_count (6) |
|
寫入 very_important_count (6) |
競爭條件和臨界區¶
這種重疊,即結果取決於多個任務的相對時序,稱為競爭條件。 包含併發問題的程式碼段稱為臨界區。 尤其是在 Linux 開始在 SMP 機器上執行之後,它們成為核心設計和實現中的主要問題之一。
搶佔可能具有相同的效果,即使只有一個 CPU:透過在臨界區搶佔一個任務,我們也會遇到完全相同的競爭條件。 在這種情況下,搶佔的執行緒可能會自己執行臨界區。
解決方案是識別何時發生這些同時訪問,並使用鎖來確保任何時候只有一個例項可以進入臨界區。 Linux 核心中有許多友好的原語可以幫助您做到這一點。 然後是一些不友好的原語,但我會假裝它們不存在。
Linux 核心中的鎖定¶
如果我可以給您關於鎖定的一條建議:**保持簡單**。
不情願引入新的鎖。
核心鎖的兩種主要型別:自旋鎖和互斥鎖¶
有兩種主要的核心鎖型別。 基本型別是自旋鎖 (include/asm/spinlock.h),它是一種非常簡單的單持有者鎖:如果您無法獲得自旋鎖,您會一直嘗試(旋轉)直到可以。 自旋鎖非常小且快速,可以在任何地方使用。
第二種型別是互斥鎖 (include/linux/mutex.h):它類似於自旋鎖,但您可以阻塞持有互斥鎖。 如果您無法鎖定互斥鎖,您的任務將暫停自身,並在互斥鎖釋放時喚醒。 這意味著 CPU 可以在您等待時執行其他操作。 在許多情況下,您根本無法休眠(請參閱哪些函式可以從中斷中安全呼叫?),因此必須改用自旋鎖。
這兩種型別的鎖都不是遞迴的:請參閱死鎖:簡單和高階。
鎖和單處理器核心¶
對於沒有CONFIG_SMP且沒有CONFIG_PREEMPT編譯的核心,自旋鎖根本不存在。 這是一個極好的設計決策:當沒有其他人可以同時執行時,沒有理由擁有鎖。
如果核心在沒有CONFIG_SMP的情況下編譯,但設定了CONFIG_PREEMPT,那麼自旋鎖只是停用搶佔,這足以防止任何競爭。 對於大多數目的,我們可以將搶佔視為等同於 SMP,而不必單獨擔心它。
您應該始終使用啟用的CONFIG_SMP和CONFIG_PREEMPT測試您的鎖定程式碼,即使您沒有 SMP 測試盒,因為它仍然會捕獲某些型別的鎖定錯誤。
互斥鎖仍然存在,因為它們是使用者上下文之間同步所必需的,我們將在下面看到。
僅在使用者上下文中鎖定¶
如果您有一個僅從使用者上下文訪問的資料結構,那麼您可以使用一個簡單的互斥鎖 (include/linux/mutex.h) 來保護它。 這是最簡單的情況:您初始化互斥鎖。 然後您可以呼叫mutex_lock_interruptible()來獲取互斥鎖,並呼叫mutex_unlock()來釋放它。 還有一個mutex_lock(),應該避免使用它,因為如果在收到訊號時它不會返回。
示例:net/netfilter/nf_sockopt.c允許註冊新的 setsockopt() 和 getsockopt() 呼叫,使用 nf_register_sockopt()。 註冊和登出僅在模組載入和解除安裝時完成(以及啟動時,沒有併發),並且僅在未知 setsockopt() 或 getsockopt() 系統呼叫時才查詢註冊列表。 nf_sockopt_mutex非常適合保護它,特別是因為 setsockopt 和 getsockopt 呼叫很可能會休眠。
使用者上下文和軟中斷之間的鎖定¶
如果軟中斷與使用者上下文共享資料,您會遇到兩個問題。 首先,當前使用者上下文可能會被軟中斷中斷,其次,可能會從另一個 CPU 進入臨界區。 這就是 spin_lock_bh() (include/linux/spinlock.h) 的用途。 它停用該 CPU 上的軟中斷,然後獲取鎖。 spin_unlock_bh() 執行相反的操作。 (“_bh”字尾是對“Bottom Halves”的歷史引用,即軟體中斷的舊名稱。 在一個完美的世界中,它應該真正被稱為 spin_lock_softirq()’)。
請注意,您也可以在此處使用 spin_lock_irq() 或 spin_lock_irqsave(),它們也會停止硬體中斷:請參閱硬中斷上下文。
這對於 UP 也很有效:自旋鎖消失了,並且此宏只是變成了 local_bh_disable() (include/linux/interrupt.h),它可以保護您免受執行的軟中斷。
使用者上下文和 Tasklet 之間的鎖定¶
這與上面完全相同,因為 tasklet 實際上是從軟中斷執行的。
使用者上下文和定時器之間的鎖定¶
這與上面也完全相同,因為定時器實際上是從軟中斷執行的。 從鎖定的角度來看,tasklet 和定時器是相同的。
Tasklet/定時器之間的鎖定¶
有時 tasklet 或定時器可能想要與其他 tasklet 或定時器共享資料。
同一個 Tasklet/定時器¶
由於 tasklet 永遠不會在兩個 CPU 上同時執行,因此即使在 SMP 上,您也不需要擔心您的 tasklet 是可重入的(一次執行兩次)。
不同的 Tasklet/定時器¶
如果另一個 tasklet/定時器想要與您的 tasklet 或定時器共享資料,您都需要使用 spin_lock() 和 spin_unlock() 呼叫。 在這裡,spin_lock_bh() 是不必要的,因為您已經在 tasklet 中,並且不會在同一 CPU 上執行。
軟中斷之間的鎖定¶
通常,軟中斷可能想要與其自身或 tasklet/定時器共享資料。
同一個軟中斷¶
同一個軟中斷可以在其他 CPU 上執行:您可以使用每個 CPU 陣列(請參閱每個 CPU 資料)以獲得更好的效能。 如果您要使用軟中斷,您可能關心可擴充套件的效能,足以證明額外的複雜性是合理的。
您需要使用 spin_lock() 和 spin_unlock() 來共享資料。
不同的軟中斷¶
您需要使用 spin_lock() 和 spin_unlock() 來共享資料,無論是定時器、tasklet、不同的軟中斷還是相同或另一個軟中斷:它們中的任何一個都可能在不同的 CPU 上執行。
硬中斷上下文¶
硬體中斷通常與 tasklet 或軟中斷通訊。 通常,這涉及將工作放入佇列中,軟中斷會將其取出。
硬中斷和軟中斷/Tasklet 之間的鎖定¶
如果硬體中斷處理程式與軟中斷共享資料,您有兩個問題需要考慮。 首先,軟中斷處理可能會被硬體中斷中斷,其次,臨界區可能會被另一個 CPU 上的硬體中斷進入。 這就是 spin_lock_irq() 的用途。 它被定義為停用該 cpu 上的中斷,然後獲取鎖。 spin_unlock_irq() 執行相反的操作。
中斷處理程式不需要使用 spin_lock_irq(),因為軟中斷在中斷處理程式執行時無法執行:它可以 使用 spin_lock(),這稍微快一些。 唯一的例外是不同的硬體中斷處理程式使用相同的鎖:spin_lock_irq() 會阻止它中斷我們。
這對於 UP 也很有效:自旋鎖消失了,並且此宏只是變成了 local_irq_disable() (include/asm/smp.h),它可以保護您免受執行的軟中斷/tasklet/BH。
spin_lock_irqsave() (include/linux/spinlock.h) 是一種變體,它將中斷是開啟還是關閉的狀態儲存在標誌字中,該標誌字傳遞給 spin_unlock_irqrestore()。 這意味著可以在硬中斷處理程式(中斷已關閉)和軟中斷(需要停用中斷)中使用相同的程式碼。
請注意,軟中斷(以及 tasklet 和定時器)是在從硬體中斷返回時執行的,因此 spin_lock_irq() 也會停止這些中斷。 從這個意義上說,spin_lock_irqsave() 是最通用和最強大的鎖定功能。
兩個硬中斷處理程式之間的鎖定¶
在兩個 IRQ 處理程式之間共享資料的情況很少見,但如果您這樣做,則應使用 spin_lock_irqsave():是否在 irq 處理程式本身中停用所有中斷取決於具體的架構。
鎖定速查表¶
Pete Zaitcev 給出了以下總結
如果您處於程序上下文中(任何系統呼叫)並且想要阻止其他程序,請使用互斥鎖。 您可以獲取互斥鎖並休眠 (
copy_from_user()或kmalloc(x,GFP_KERNEL))。否則(== 資料可以在中斷中被觸控),使用 spin_lock_irqsave() 和 spin_unlock_irqrestore()。
避免持有自旋鎖超過 5 行程式碼,並且不要跨任何函式呼叫(除了像 readb() 這樣的訪問器)。
最低要求表¶
下表列出了各種上下文之間**最低**鎖定要求。 在某些情況下,同一上下文一次只能在一個 CPU 上執行,因此該上下文不需要鎖定(例如,特定的執行緒一次只能在一個 CPU 上執行,但如果它需要與其他執行緒共享資料,則需要鎖定)。
請記住上面的建議:您始終可以使用 spin_lock_irqsave(),它是所有其他自旋鎖原語的超集。
. |
IRQ 處理程式 A |
IRQ 處理程式 B |
軟中斷 A |
軟中斷 B |
Tasklet A |
Tasklet B |
定時器 A |
定時器 B |
使用者上下文 A |
使用者上下文 B |
|---|---|---|---|---|---|---|---|---|---|---|
IRQ 處理程式 A |
無 |
|||||||||
IRQ 處理程式 B |
SLIS |
無 |
||||||||
軟中斷 A |
SLI |
SLI |
SL |
|||||||
軟中斷 B |
SLI |
SLI |
SL |
SL |
||||||
Tasklet A |
SLI |
SLI |
SL |
SL |
無 |
|||||
Tasklet B |
SLI |
SLI |
SL |
SL |
SL |
無 |
||||
定時器 A |
SLI |
SLI |
SL |
SL |
SL |
SL |
無 |
|||
定時器 B |
SLI |
SLI |
SL |
SL |
SL |
SL |
SL |
無 |
||
使用者上下文 A |
SLI |
SLI |
SLBH |
SLBH |
SLBH |
SLBH |
SLBH |
SLBH |
無 |
|
使用者上下文 B |
SLI |
SLI |
SLBH |
SLBH |
SLBH |
SLBH |
SLBH |
SLBH |
MLI |
無 |
表:鎖定要求表
SLIS |
spin_lock_irqsave |
SLI |
spin_lock_irq |
SL |
spin_lock |
SLBH |
spin_lock_bh |
MLI |
mutex_lock_interruptible |
表:鎖定要求表的圖例
trylock 函式¶
有些函式嘗試只獲取一次鎖,並立即返回一個值,指示獲取鎖的成功或失敗。 如果當其他執行緒持有鎖時,您不需要訪問受鎖保護的資料,則可以使用它們。 如果您隨後需要訪問受鎖保護的資料,您應該稍後獲取鎖。
spin_trylock() 不會旋轉,但如果在第一次嘗試時獲取了自旋鎖,則返回非零值,否則返回 0。 此函式可以在所有上下文中像 spin_lock() 一樣使用:您必須停用可能會中斷您並獲取自旋鎖的上下文。
mutex_trylock()不會暫停您的任務,但如果它可以在第一次嘗試時鎖定互斥鎖,則返回非零值,否則返回 0。 儘管不休眠,但此函式不能在硬體或軟體中斷上下文中安全使用。
常見例子¶
讓我們逐步完成一個簡單的示例:數字到名稱對映的快取。 快取會跟蹤每個物件的使用頻率,並在填滿時,丟棄使用最少的物件。
全部在使用者上下文中¶
對於我們的第一個示例,我們假設所有操作都在使用者上下文中(即來自系統呼叫),因此我們可以休眠。 這意味著我們可以使用互斥鎖來保護快取和其中的所有物件。 這是程式碼
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/string.h>
#include <linux/mutex.h>
#include <asm/errno.h>
struct object
{
struct list_head list;
int id;
char name[32];
int popularity;
};
/* Protects the cache, cache_num, and the objects within it */
static DEFINE_MUTEX(cache_lock);
static LIST_HEAD(cache);
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
/* Must be holding cache_lock */
static struct object *__cache_find(int id)
{
struct object *i;
list_for_each_entry(i, &cache, list)
if (i->id == id) {
i->popularity++;
return i;
}
return NULL;
}
/* Must be holding cache_lock */
static void __cache_delete(struct object *obj)
{
BUG_ON(!obj);
list_del(&obj->list);
kfree(obj);
cache_num--;
}
/* Must be holding cache_lock */
static void __cache_add(struct object *obj)
{
list_add(&obj->list, &cache);
if (++cache_num > MAX_CACHE_SIZE) {
struct object *i, *outcast = NULL;
list_for_each_entry(i, &cache, list) {
if (!outcast || i->popularity < outcast->popularity)
outcast = i;
}
__cache_delete(outcast);
}
}
int cache_add(int id, const char *name)
{
struct object *obj;
if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
return -ENOMEM;
strscpy(obj->name, name, sizeof(obj->name));
obj->id = id;
obj->popularity = 0;
mutex_lock(&cache_lock);
__cache_add(obj);
mutex_unlock(&cache_lock);
return 0;
}
void cache_delete(int id)
{
mutex_lock(&cache_lock);
__cache_delete(__cache_find(id));
mutex_unlock(&cache_lock);
}
int cache_find(int id, char *name)
{
struct object *obj;
int ret = -ENOENT;
mutex_lock(&cache_lock);
obj = __cache_find(id);
if (obj) {
ret = 0;
strcpy(name, obj->name);
}
mutex_unlock(&cache_lock);
return ret;
}
請注意,當我們新增、刪除或查詢快取時,我們始終確保擁有 cache_lock:快取基礎設施本身和物件的內容都受到鎖的保護。 在這種情況下,這很容易,因為我們複製使用者的資料,並且從不讓他們直接訪問物件。
這裡有一個輕微(且常見)的最佳化:在 cache_add() 中,我們在獲取鎖之前設定物件的欄位。 這是安全的,因為在我們將其放入快取之前,沒有人可以訪問它。
從中斷上下文中訪問¶
現在考慮可以從中斷上下文呼叫 cache_find() 的情況:硬體中斷或軟中斷。 一個例子是刪除快取中物件的定時器。
更改如下所示,採用標準補丁格式:-是刪除的行,+是新增的行。
--- cache.c.usercontext 2003-12-09 13:58:54.000000000 +1100
+++ cache.c.interrupt 2003-12-09 14:07:49.000000000 +1100
@@ -12,7 +12,7 @@
int popularity;
};
-static DEFINE_MUTEX(cache_lock);
+static DEFINE_SPINLOCK(cache_lock);
static LIST_HEAD(cache);
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
@@ -55,6 +55,7 @@
int cache_add(int id, const char *name)
{
struct object *obj;
+ unsigned long flags;
if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
return -ENOMEM;
@@ -63,30 +64,33 @@
obj->id = id;
obj->popularity = 0;
- mutex_lock(&cache_lock);
+ spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
- mutex_unlock(&cache_lock);
+ spin_unlock_irqrestore(&cache_lock, flags);
return 0;
}
void cache_delete(int id)
{
- mutex_lock(&cache_lock);
+ unsigned long flags;
+
+ spin_lock_irqsave(&cache_lock, flags);
__cache_delete(__cache_find(id));
- mutex_unlock(&cache_lock);
+ spin_unlock_irqrestore(&cache_lock, flags);
}
int cache_find(int id, char *name)
{
struct object *obj;
int ret = -ENOENT;
+ unsigned long flags;
- mutex_lock(&cache_lock);
+ spin_lock_irqsave(&cache_lock, flags);
obj = __cache_find(id);
if (obj) {
ret = 0;
strcpy(name, obj->name);
}
- mutex_unlock(&cache_lock);
+ spin_unlock_irqrestore(&cache_lock, flags);
return ret;
}
請注意,如果中斷已開啟,spin_lock_irqsave() 將關閉中斷,否則不執行任何操作(如果我們已經在中斷處理程式中),因此這些函式可以從任何上下文中安全呼叫。
不幸的是,cache_add() 使用 GFP_KERNEL 標誌呼叫kmalloc(),這僅在使用者上下文中合法。 我假設 cache_add() 仍然僅在使用者上下文中被呼叫,否則這應該成為 cache_add() 的一個引數。
在此檔案外部公開物件¶
如果我們的物件包含更多資訊,則複製資訊的進出可能不足:程式碼的其他部分可能希望保留指向這些物件的指標,例如,而不是每次都查詢 id。 這會產生兩個問題。
第一個問題是我們使用cache_lock來保護物件:我們需要使其成為非靜態的,以便程式碼的其餘部分可以使用它。 這使得鎖定更加棘手,因為它不再都在一個地方。
第二個問題是生存期問題:如果另一個結構保留指向物件的指標,則它大概希望該指標保持有效。 不幸的是,這僅在您持有鎖時才能保證,否則有人可能會呼叫 cache_delete(),甚至更糟糕的是,新增另一個物件,重新使用相同的地址。
由於只有一個鎖,您不能永遠持有它:沒有人可以完成任何工作。
解決此問題的方法是使用引用計數:每個擁有指向物件的指標的人都會在第一次獲取物件時增加它,並在完成物件後刪除引用計數。 無論誰將其刪除為零都知道它未被使用,並且實際上可以刪除它。
這是程式碼
--- cache.c.interrupt 2003-12-09 14:25:43.000000000 +1100
+++ cache.c.refcnt 2003-12-09 14:33:05.000000000 +1100
@@ -7,6 +7,7 @@
struct object
{
struct list_head list;
+ unsigned int refcnt;
int id;
char name[32];
int popularity;
@@ -17,6 +18,35 @@
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
+static void __object_put(struct object *obj)
+{
+ if (--obj->refcnt == 0)
+ kfree(obj);
+}
+
+static void __object_get(struct object *obj)
+{
+ obj->refcnt++;
+}
+
+void object_put(struct object *obj)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&cache_lock, flags);
+ __object_put(obj);
+ spin_unlock_irqrestore(&cache_lock, flags);
+}
+
+void object_get(struct object *obj)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&cache_lock, flags);
+ __object_get(obj);
+ spin_unlock_irqrestore(&cache_lock, flags);
+}
+
/* Must be holding cache_lock */
static struct object *__cache_find(int id)
{
@@ -35,6 +65,7 @@
{
BUG_ON(!obj);
list_del(&obj->list);
+ __object_put(obj);
cache_num--;
}
@@ -63,6 +94,7 @@
strscpy(obj->name, name, sizeof(obj->name));
obj->id = id;
obj->popularity = 0;
+ obj->refcnt = 1; /* The cache holds a reference */
spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
@@ -79,18 +111,15 @@
spin_unlock_irqrestore(&cache_lock, flags);
}
-int cache_find(int id, char *name)
+struct object *cache_find(int id)
{
struct object *obj;
- int ret = -ENOENT;
unsigned long flags;
spin_lock_irqsave(&cache_lock, flags);
obj = __cache_find(id);
- if (obj) {
- ret = 0;
- strcpy(name, obj->name);
- }
+ if (obj)
+ __object_get(obj);
spin_unlock_irqrestore(&cache_lock, flags);
- return ret;
+ return obj;
}
我們將引用計數封裝在標準“get”和“put”函式中。 現在我們可以從 cache_find() 返回物件本身,這具有使用者現在可以休眠持有該物件的優勢(例如,將 copy_to_user() 命名到使用者空間)。
另一點要注意的是,我說應該為每個指向物件的指標持有引用:因此,首次插入快取時,引用計數為 1。 在某些版本中,框架不持有引用計數,但它們更復雜。
使用原子操作進行引用計數¶
實際上,atomic_t通常用於 refcnt。 在include/asm/atomic.h中定義了許多原子操作:保證從系統中所有 CPU 原子地看到這些操作,因此不需要鎖。 在這種情況下,它比使用自旋鎖更簡單,儘管對於任何非平凡的事情,使用自旋鎖更清晰。 atomic_inc()和atomic_dec_and_test()用於代替標準遞增和遞減運算子,並且該鎖不再用於保護引用計數本身。
--- cache.c.refcnt 2003-12-09 15:00:35.000000000 +1100
+++ cache.c.refcnt-atomic 2003-12-11 15:49:42.000000000 +1100
@@ -7,7 +7,7 @@
struct object
{
struct list_head list;
- unsigned int refcnt;
+ atomic_t refcnt;
int id;
char name[32];
int popularity;
@@ -18,33 +18,15 @@
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10
-static void __object_put(struct object *obj)
-{
- if (--obj->refcnt == 0)
- kfree(obj);
-}
-
-static void __object_get(struct object *obj)
-{
- obj->refcnt++;
-}
-
void object_put(struct object *obj)
{
- unsigned long flags;
-
- spin_lock_irqsave(&cache_lock, flags);
- __object_put(obj);
- spin_unlock_irqrestore(&cache_lock, flags);
+ if (atomic_dec_and_test(&obj->refcnt))
+ kfree(obj);
}
void object_get(struct object *obj)
{
- unsigned long flags;
-
- spin_lock_irqsave(&cache_lock, flags);
- __object_get(obj);
- spin_unlock_irqrestore(&cache_lock, flags);
+ atomic_inc(&obj->refcnt);
}
/* Must be holding cache_lock */
@@ -65,7 +47,7 @@
{
BUG_ON(!obj);
list_del(&obj->list);
- __object_put(obj);
+ object_put(obj);
cache_num--;
}
@@ -94,7 +76,7 @@
strscpy(obj->name, name, sizeof(obj->name));
obj->id = id;
obj->popularity = 0;
- obj->refcnt = 1; /* The cache holds a reference */
+ atomic_set(&obj->refcnt, 1); /* The cache holds a reference */
spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
@@ -119,7 +101,7 @@
spin_lock_irqsave(&cache_lock, flags);
obj = __cache_find(id);
if (obj)
- __object_get(obj);
+ object_get(obj);
spin_unlock_irqrestore(&cache_lock, flags);
return obj;
}
保護物件本身¶
在這些示例中,我們假設一旦建立物件,它們(引用計數除外)永遠不會更改。 如果我們想允許更改名稱,則有三種可能性
您可以使
cache_lock成為非靜態的,並告訴人們在更改任何物件中的名稱之前獲取該鎖。您可以提供一個 cache_obj_rename(),它獲取此鎖併為呼叫者更改名稱,並告訴每個人使用該函式。
您可以使
cache_lock僅保護快取本身,並使用另一個鎖來保護名稱。
從理論上講,您可以為每個物件的每個欄位製作鎖,使鎖的粒度儘可能細。 實際上,最常見的變體是
一個鎖,用於保護基礎設施(本示例中的
cache列表)和所有物件。 這就是我們迄今為止所做的。一個鎖,用於保護基礎設施(包括物件內的列表指標),以及物件內的一個鎖,用於保護該物件的其餘部分。
多個鎖,用於保護基礎設施(例如,每個雜湊鏈一個鎖),可能具有單獨的每個物件鎖。
這是“每個物件鎖”的實現
--- cache.c.refcnt-atomic 2003-12-11 15:50:54.000000000 +1100
+++ cache.c.perobjectlock 2003-12-11 17:15:03.000000000 +1100
@@ -6,11 +6,17 @@
struct object
{
+ /* These two protected by cache_lock. */
struct list_head list;
+ int popularity;
+
atomic_t refcnt;
+
+ /* Doesn't change once created. */
int id;
+
+ spinlock_t lock; /* Protects the name */
char name[32];
- int popularity;
};
static DEFINE_SPINLOCK(cache_lock);
@@ -77,6 +84,7 @@
obj->id = id;
obj->popularity = 0;
atomic_set(&obj->refcnt, 1); /* The cache holds a reference */
+ spin_lock_init(&obj->lock);
spin_lock_irqsave(&cache_lock, flags);
__cache_add(obj);
請注意,我決定受歡迎程度計數應由cache_lock而不是每個物件鎖來保護:這是因為它(像物件內的struct list_head一樣)在邏輯上是基礎設施的一部分。 這樣,在尋找最不受歡迎的物件時,我不需要在 __cache_add() 中獲取每個物件的鎖。
我還決定 id 成員是不可更改的,因此我不需要在 __cache_find() 中獲取每個物件鎖來檢查 id:物件鎖僅由想要讀取或寫入 name 欄位的呼叫者使用。
另請注意,我添加了一條註釋,描述了哪些資料受哪些鎖保護。 這非常重要,因為它描述了程式碼的執行時行為,並且很難僅透過閱讀獲得。 正如 Alan Cox 所說,“鎖定資料,而不是程式碼”。
常見問題¶
死鎖:簡單和高階¶
有一個編碼錯誤,其中一段程式碼嘗試獲取自旋鎖兩次:它將永遠旋轉,等待鎖被釋放(自旋鎖、讀寫鎖和互斥鎖在 Linux 中不是遞迴的)。 這很容易診斷:不是一個熬夜五晚與毛茸茸的程式碼兔子交談的那種問題。
對於稍微複雜的情況,想象一下您有一個由軟中斷和使用者上下文共享的區域。 如果您使用 spin_lock() 呼叫來保護它,則使用者上下文可能會在持有鎖時被軟中斷中斷,然後軟中斷將永遠旋轉,嘗試獲取相同的鎖。
這兩種情況都稱為死鎖,如上所示,即使使用單個 CPU 也會發生(儘管在 UP 編譯中不會發生,因為自旋鎖在CONFIG_SMP=n 的核心編譯中消失了。 在第二個示例中,您仍然會得到資料損壞)。
這種完全鎖定很容易診斷:在 SMP 盒子上,看門狗定時器或使用設定的DEBUG_SPINLOCK編譯(include/linux/spinlock.h)會在發生時立即顯示這一點。
一個更復雜的問題是所謂的“致命擁抱”,涉及兩個或多個鎖。 假設您有一個雜湊表:表中的每個條目都是一個自旋鎖,以及一個雜湊物件的鏈。 在軟中斷處理程式中,有時您想將物件從雜湊中的一個位置更改為另一個位置:您獲取舊雜湊鏈的自旋鎖和新雜湊鏈的自旋鎖,然後從舊雜湊鏈中刪除物件,並將其插入到新雜湊鏈中。
這裡有兩個問題。首先,如果你的程式碼嘗試將物件移動到同一條鏈上,它會因為嘗試兩次鎖定它而導致死鎖。其次,如果另一個 CPU 上的同一 softirq 試圖以相反的方向移動另一個物件,則可能發生以下情況:
CPU 1 |
CPU 2 |
|---|---|
獲取鎖 A -> 成功 |
獲取鎖 B -> 成功 |
獲取鎖 B -> 旋轉等待 |
獲取鎖 A -> 旋轉等待 |
表格:後果
這兩個 CPU 將永遠旋轉等待,等待對方釋放它們的鎖。它看起來、聞起來和感覺起來就像崩潰一樣。
防止死鎖¶
教科書會告訴你,如果你總是以相同的順序鎖定,你永遠不會遇到這種死鎖。實踐會告訴你,這種方法無法擴充套件:當我建立一個新鎖時,我對核心的理解還不足以弄清楚它應該放在 5000 個鎖層次結構中的哪個位置。
最好的鎖是被封裝的:它們永遠不會在標頭檔案中暴露,並且永遠不會在同一檔案之外呼叫非平凡函式時持有它們。你可以閱讀這段程式碼,發現它永遠不會死鎖,因為它永遠不會在持有鎖的時候嘗試獲取另一個鎖。使用你程式碼的人甚至不需要知道你正在使用鎖。
這裡一個經典的問題是當你提供回撥或鉤子時:如果在持有鎖的情況下呼叫這些回撥或鉤子,你可能會面臨簡單的死鎖或致命的擁抱(誰知道回撥會做什麼?)。
過度防止死鎖¶
死鎖是有問題的,但不如資料損壞那麼糟糕。獲取讀鎖、搜尋列表、找不到想要的內容、釋放讀鎖、獲取寫鎖並插入物件的程式碼存在競爭條件。
競速定時器:核心的消遣¶
定時器可能會產生其自身特殊的競爭問題。考慮一個物件集合(列表、雜湊等),其中每個物件都有一個定時器,該定時器應該銷燬它。
如果你想銷燬整個集合(比如在模組移除時),你可能會這樣做:
/* THIS CODE BAD BAD BAD BAD: IF IT WAS ANY WORSE IT WOULD USE
HUNGARIAN NOTATION */
spin_lock_bh(&list_lock);
while (list) {
struct foo *next = list->next;
timer_delete(&list->timer);
kfree(list);
list = next;
}
spin_unlock_bh(&list_lock);
遲早,這會在 SMP 上崩潰,因為定時器可能已經在 spin_lock_bh() 之前觸發了,並且它只有在 spin_unlock_bh() 之後才能獲得鎖,然後嘗試釋放元素(該元素已經被釋放了!)。
可以透過檢查 timer_delete() 的結果來避免這種情況:如果它返回 1,則表示定時器已被刪除。 如果返回 0,則表示(在這種情況下)它當前正在執行,所以我們可以這樣做:
retry:
spin_lock_bh(&list_lock);
while (list) {
struct foo *next = list->next;
if (!timer_delete(&list->timer)) {
/* Give timer a chance to delete this */
spin_unlock_bh(&list_lock);
goto retry;
}
kfree(list);
list = next;
}
spin_unlock_bh(&list_lock);
另一個常見的問題是刪除會自行重啟的定時器(透過在其定時器函式的末尾呼叫 add_timer())。因為這是一個相當常見的容易發生競爭的情況,所以你應該使用 timer_delete_sync() (include/linux/timer.h) 來處理這種情況。
在釋放定時器之前,應該呼叫 timer_shutdown() 或 timer_shutdown_sync(),以防止它被重新啟用。 任何後續嘗試重新啟用定時器的操作都將被核心程式碼靜默忽略。
鎖的速度¶
在考慮執行鎖定的程式碼的速度時,主要有三個方面需要注意。首先是併發性:當其他人持有鎖時,有多少東西會處於等待狀態。其次是實際獲取和釋放未競爭鎖所花費的時間。第三是使用更少或更智慧的鎖。我假設鎖的使用頻率相當高:否則,你不會關心效率。
併發性取決於鎖通常被持有的時間:你應該在需要的時間內持有鎖,但不要更長。在快取示例中,我們始終在未持有鎖的情況下建立物件,然後僅在我們準備將其插入列表時才獲取鎖。
獲取時間取決於鎖操作對流水線造成的損害程度(流水線停頓)以及此 CPU 是上一個獲取鎖的 CPU 的可能性(即,對於此 CPU 來說,鎖是否是快取熱的):在具有更多 CPU 的機器上,這種可能性迅速下降。考慮一個 700MHz 的 Intel Pentium III:一條指令大約需要 0.7ns,一個原子遞增大約需要 58ns,一個在此 CPU 上快取熱的鎖需要 160ns,而從另一個 CPU 傳輸快取行則需要額外 170 到 360ns。(這些數字來自 Paul McKenney 的 Linux Journal RCU 文章)。
這兩個目標是衝突的:透過將鎖分成幾部分(例如在我們最終的按物件鎖示例中),可以縮短持有鎖的時間,但這會增加鎖的獲取次數,並且結果通常比擁有單個鎖更慢。這是提倡鎖定簡單性的另一個原因。
第三個問題將在下面討論:有一些方法可以減少需要完成的鎖定量。
讀/寫鎖變體¶
自旋鎖和互斥鎖都有讀/寫變體:rwlock_t 和 struct rw_semaphore。 這些將使用者分為兩類:讀者和寫者。如果你只是讀取資料,你可以獲得一個讀鎖,但是要寫入資料,你需要寫鎖。 許多人可以持有讀鎖,但是寫者必須是唯一的持有者。
如果你的程式碼沿著讀者/寫者的界限整齊地劃分(就像我們的快取程式碼一樣),並且鎖被讀者持有很長時間,那麼使用這些鎖可能會有所幫助。 但是,它們比普通鎖稍微慢一些,因此在實踐中,rwlock_t 通常不值得使用。
避免鎖:讀複製更新¶
有一種稱為讀複製更新的特殊的讀/寫鎖定方法。 使用 RCU,讀者可以完全避免獲取鎖:因為我們希望我們的快取被讀取的頻率高於更新的頻率(否則快取就是浪費時間),所以它是這種最佳化的候選者。
我們如何擺脫讀鎖? 擺脫讀鎖意味著寫者可能會在讀者不知情的情況下更改列表。 這實際上非常簡單:如果寫者非常小心地新增元素,我們可以在新增元素時讀取連結串列。 例如,將 new 新增到名為 list 的單鏈表中
new->next = list->next;
wmb();
list->next = new;
wmb() 是一個寫記憶體屏障。 它確保第一個操作(設定新元素的 next 指標)已完成並且將被所有 CPU 看到,然後才能執行第二個操作(將新元素放入列表中)。 這很重要,因為現代編譯器和現代 CPU 都可以重新排序指令,除非另有說明:我們希望讀者要麼根本看不到新元素,要麼看到 next 指標正確指向列表其餘部分的新元素。
幸運的是,有一個函式可以為標準 struct list_head 列表執行此操作:list_add_rcu() (include/linux/list.h)。
從列表中刪除元素甚至更簡單:我們將指向舊元素的指標替換為指向其後繼元素的指標,讀者要麼看到它,要麼跳過它。
list->next = old->next;
有一個 list_del_rcu() (include/linux/list.h) 可以執行此操作(正常版本會毒化舊物件,這不是我們想要的)。
讀者也必須小心:某些 CPU 可以透過 next 指標提前開始讀取下一個元素的內容,但是當 next 指標在其不知情的情況下更改時,他們不會意識到預取的內容是錯誤的。 再次,有一個 list_for_each_entry_rcu() (include/linux/list.h) 來幫助你。 當然,寫者可以使用 list_for_each_entry(),因為不可能有兩個同時寫者。
我們最終的困境是:我們什麼時候才能真正銷燬被刪除的元素? 請記住,讀者可能現在正在列表中逐步執行此元素:如果我們釋放此元素並且 next 指標更改,則讀者將跳入垃圾資料並崩潰。 我們需要等到我們知道在刪除元素時遍歷列表的所有讀者都已完成。 我們使用 call_rcu() 註冊一個回撥,該回調將在所有先前存在的讀者完成後實際銷燬該物件。 另外,可以使用 synchronize_rcu() 阻塞,直到所有先前存在的讀者都完成。
但是讀複製更新如何知道讀者何時完成? 方法是這樣的:首先,讀者始終在 rcu_read_lock()/rcu_read_unlock() 對中遍歷列表:這些只是停用搶佔,因此讀者在讀取列表時不會進入休眠狀態。
然後 RCU 等待直到每個其他 CPU 至少休眠一次:由於讀者無法休眠,我們知道在刪除期間遍歷列表的任何讀者都已完成,並且回撥已觸發。 真正的讀複製更新程式碼比這稍微最佳化,但這是基本思想。
--- cache.c.perobjectlock 2003-12-11 17:15:03.000000000 +1100
+++ cache.c.rcupdate 2003-12-11 17:55:14.000000000 +1100
@@ -1,15 +1,18 @@
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/string.h>
+#include <linux/rcupdate.h>
#include <linux/mutex.h>
#include <asm/errno.h>
struct object
{
- /* These two protected by cache_lock. */
+ /* This is protected by RCU */
struct list_head list;
int popularity;
+ struct rcu_head rcu;
+
atomic_t refcnt;
/* Doesn't change once created. */
@@ -40,7 +43,7 @@
{
struct object *i;
- list_for_each_entry(i, &cache, list) {
+ list_for_each_entry_rcu(i, &cache, list) {
if (i->id == id) {
i->popularity++;
return i;
@@ -49,19 +52,25 @@
return NULL;
}
+/* Final discard done once we know no readers are looking. */
+static void cache_delete_rcu(void *arg)
+{
+ object_put(arg);
+}
+
/* Must be holding cache_lock */
static void __cache_delete(struct object *obj)
{
BUG_ON(!obj);
- list_del(&obj->list);
- object_put(obj);
+ list_del_rcu(&obj->list);
cache_num--;
+ call_rcu(&obj->rcu, cache_delete_rcu);
}
/* Must be holding cache_lock */
static void __cache_add(struct object *obj)
{
- list_add(&obj->list, &cache);
+ list_add_rcu(&obj->list, &cache);
if (++cache_num > MAX_CACHE_SIZE) {
struct object *i, *outcast = NULL;
list_for_each_entry(i, &cache, list) {
@@ -104,12 +114,11 @@
struct object *cache_find(int id)
{
struct object *obj;
- unsigned long flags;
- spin_lock_irqsave(&cache_lock, flags);
+ rcu_read_lock();
obj = __cache_find(id);
if (obj)
object_get(obj);
- spin_unlock_irqrestore(&cache_lock, flags);
+ rcu_read_unlock();
return obj;
}
請注意,讀者將更改 __cache_find() 中的 popularity 成員,現在它不持有鎖。 一種解決方案是將其設為 atomic_t,但是對於此用法,我們實際上並不關心競爭:近似結果就足夠了,所以我沒有更改它。
結果是 cache_find() 不需要與任何其他函式同步,因此在 SMP 上幾乎與在 UP 上一樣快。
這裡還有一種可能的進一步最佳化:記住我們最初的快取程式碼,其中沒有引用計數,並且呼叫者在使用物件時只需持有鎖? 這仍然是可能的:如果你持有鎖,則沒有人可以刪除該物件,因此你不需要獲取和釋放引用計數。
現在,因為 RCU 中的“讀鎖”只是停用搶佔,所以在呼叫 cache_find() 和 object_put() 之間始終停用搶佔的呼叫者實際上不需要獲取和釋放引用計數:我們可以透過使 __cache_find() 非靜態來暴露它,並且此類呼叫者可以簡單地呼叫該函式。
這裡的好處是引用計數未被寫入:物件沒有以任何方式更改,由於快取,這在 SMP 機器上要快得多。
每個 CPU 的資料¶
另一種用於避免鎖定的技術是為每個 CPU 複製資訊。 例如,如果你想保持對常見條件的計數,則可以使用自旋鎖和單個計數器。 簡單又好。
如果這太慢了(通常不是,但是如果你有一臺非常大的機器可以進行測試並且可以證明確實如此),則可以改為為每個 CPU 使用一個計數器,這樣它們都不需要獨佔鎖。 請參閱 DEFINE_PER_CPU()、get_cpu_var() 和 put_cpu_var() (include/linux/percpu.h)。
對於簡單的每個 CPU 計數器,特別有用的是 local_t 型別以及 cpu_local_inc() 和相關函式,它們在某些體系結構上比簡單程式碼更有效 (include/asm/local.h)。
請注意,在不引入更多鎖的情況下,沒有簡單、可靠的方法來獲取此類計數器的確切值。 對於某些用途,這不是問題。
主要由 IRQ 處理程式使用的資料¶
如果始終從同一個 IRQ 處理程式中訪問資料,則根本不需要鎖:核心已經保證 irq 處理程式不會在多個 CPU 上同時執行。
Manfred Spraul 指出,即使很少在使用者上下文中或 softirqs/tasklets 中訪問資料,你仍然可以這樣做。 irq 處理程式不使用鎖,所有其他訪問都按以下方式完成:
mutex_lock(&lock);
disable_irq(irq);
...
enable_irq(irq);
mutex_unlock(&lock);
disable_irq() 阻止 irq 處理程式執行(如果它當前在其他 CPU 上執行,則等待它完成)。 自旋鎖阻止任何其他訪問同時發生。 當然,這比只調用 spin_lock_irq() 慢,因此只有在這種型別的訪問極少發生時才有意義。
哪些函式可以安全地從中斷中呼叫?¶
核心中的許多函式會休眠(即直接或間接呼叫 schedule()):你永遠不能在持有自旋鎖或停用搶佔的情況下呼叫它們。 這也意味著你需要在使用者上下文中:從中斷中呼叫它們是非法的。
一些會休眠的函式¶
下面列出了最常見的函式,但你通常需要閱讀程式碼才能找出其他呼叫是否安全。 如果其他所有呼叫者都可以休眠,那麼你可能也需要能夠休眠。 特別是,註冊和登出函式通常期望從使用者上下文中呼叫,並且可以休眠。
訪問使用者空間
copy_from_user()
copy_to_user()
kmalloc(GP_KERNEL) <kmalloc>`
mutex_lock_interruptible()和mutex_lock()有一個
mutex_trylock()不會休眠。 儘管如此,它不能在中斷上下文中使用,因為它的實現對於這種情況是不安全的。mutex_unlock()也永遠不會休眠。 它也不能在中斷上下文中使用,因為互斥鎖必須由獲取它的同一任務釋放。
一些不會休眠的函式¶
某些函式可以安全地從任何上下文呼叫,或者持有幾乎任何鎖。
互斥鎖 API 參考¶
-
mutex_init¶
mutex_init (mutex)
初始化互斥鎖
引數
mutex要初始化的互斥鎖
描述
將互斥鎖初始化為未鎖定狀態。
不允許初始化已鎖定的互斥鎖。
-
mutex_init_with_key¶
mutex_init_with_key (mutex, key)
使用給定的 lockdep 鍵初始化互斥鎖
引數
mutex要初始化的互斥鎖
key要與互斥鎖關聯的 lockdep 鍵
描述
將互斥鎖初始化為未鎖定狀態。
不允許初始化已鎖定的互斥鎖。
-
bool mutex_is_locked(struct mutex *lock)¶
互斥鎖是否已鎖定
引數
struct mutex *lock要查詢的互斥鎖
描述
如果互斥鎖已鎖定,則返回 true,如果未鎖定,則返回 false。
-
void mutex_lock(struct mutex *lock)¶
獲取互斥鎖
引數
struct mutex *lock要獲取的互斥鎖
描述
為此任務獨佔鎖定互斥鎖。 如果互斥鎖當前不可用,它將休眠直到可以獲取它。
互斥鎖稍後必須由獲取它的同一任務釋放。 不允許遞迴鎖定。 任務在未先解鎖互斥鎖的情況下不得退出。 此外,互斥鎖所在的核心記憶體不得在互斥鎖仍鎖定的情況下釋放。 互斥鎖必須先初始化(或靜態定義),然後才能鎖定。 不允許 memset() 將互斥鎖設定為 0。
(CONFIG_DEBUG_MUTEXES .config 選項會啟用除錯檢查,該檢查將強制執行限制,並且還將進行死鎖除錯)
此函式類似於(但不等效於)down()。
-
void mutex_unlock(struct mutex *lock)¶
釋放互斥鎖
引數
struct mutex *lock要釋放的互斥鎖
描述
解鎖先前由此任務鎖定的互斥鎖。
此函式不得在中斷上下文中使用。 不允許解鎖未鎖定的互斥鎖。
呼叫者必須確保互斥鎖保持活動狀態,直到此函式返回 - mutex_unlock() 不能直接用於釋放物件,以便另一個併發任務可以釋放它。 在這方面,互斥鎖與自旋鎖和引用計數不同。
此函式類似於(但不等效於)up()。
-
void ww_mutex_unlock(struct ww_mutex *lock)¶
釋放 w/w 互斥鎖
引數
struct ww_mutex *lock要釋放的互斥鎖
描述
解鎖先前由此任務使用任何 ww_mutex_lock* 函式(無論是否帶有獲取上下文)鎖定的互斥鎖。 禁止在釋放獲取上下文後釋放鎖。
此函式不得在中斷上下文中使用。 不允許解鎖未鎖定的互斥鎖。
-
int ww_mutex_trylock(struct ww_mutex *ww, struct ww_acquire_ctx *ww_ctx)¶
嘗試獲取帶有可選獲取上下文的 w/w 互斥鎖
引數
struct ww_mutex *ww要鎖定的互斥鎖
struct ww_acquire_ctx *ww_ctx可選的 w/w 獲取上下文
描述
嘗試使用可選的獲取上下文鎖定互斥鎖; 不可能進行死鎖檢測。 如果互斥鎖已成功獲取,則返回 1,否則返回 0。
與 ww_mutex_lock 不同,不執行死鎖處理。 但是,如果指定了 ctx,則可能會在對 ww_mutex_trylock 的呼叫中發生 -EALREADY 處理。
使用此函式獲取的互斥鎖必須使用 ww_mutex_unlock 釋放。
-
int mutex_lock_interruptible(struct mutex *lock)¶
獲取互斥鎖,可被訊號中斷。
引數
struct mutex *lock要獲取的互斥鎖。
描述
像 mutex_lock() 一樣鎖定互斥鎖。 如果在程序休眠時傳遞訊號,此函式將返回而不獲取互斥鎖。
上下文
程序上下文。
返回
如果成功獲取鎖,則返回 0,如果收到訊號,則返回 -EINTR。
-
int mutex_lock_killable(struct mutex *lock)¶
獲取互斥鎖,可被致命訊號中斷。
引數
struct mutex *lock要獲取的互斥鎖。
描述
像 mutex_lock() 一樣鎖定互斥鎖。 如果在程序休眠時傳遞會使當前程序致命的訊號,此函式將返回而不獲取互斥鎖。
上下文
程序上下文。
返回
0:如果成功獲取鎖;-EINTR:如果收到致命訊號。
-
void mutex_lock_io(struct mutex *lock)¶
獲取互斥鎖並將程序標記為等待 I/O
-
int mutex_trylock(struct mutex *lock)¶
嘗試獲取互斥鎖,無需等待
引數
struct mutex *lock要獲取的互斥鎖
描述
嘗試以原子方式獲取互斥鎖。如果成功獲取互斥鎖,則返回 1,如果存在爭用,則返回 0。
此函式不得在中斷上下文中使用。互斥鎖必須由獲取它的同一任務釋放。
注意
此函式遵循 spin_trylock() 約定,因此它與 down_trylock() 的返回值相反!將訊號量使用者轉換為互斥鎖時,請注意這一點。
-
int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock)¶
如果遞減到 0,則返回持有互斥鎖
引數
atomic_t *cnt我們要遞減的原子變數
struct mutex *lock如果我們遞減到 0,則返回持有的互斥鎖
描述
如果我們遞減到 0,則返回 true 並持有鎖,否則返回 false
Futex API 參考¶
-
void futex_hash_get(struct futex_hash_bucket *hb)¶
獲取本地雜湊的額外引用。
引數
struct futex_hash_bucket *hb指向私有本地雜湊的指標。
描述
獲取已獲得的雜湊桶的額外引用。呼叫者必須已經擁有一個引用。
-
struct futex_hash_bucket *__futex_hash(union futex_key *key, struct futex_private_hash *fph)¶
返回雜湊桶
引數
union futex_key *key指向為其計算雜湊的 futex 鍵的指標
struct futex_private_hash *fph指向私有雜湊(如果已知)的指標
描述
我們對從 get_futex_key 返回的鍵進行雜湊(見下文),並返回相應的雜湊桶。如果 FUTEX 是 PROCESS_PRIVATE,則返回每個程序的雜湊桶(來自私有雜湊),如果存在。否則,返回來自全域性雜湊的雜湊桶。
-
struct hrtimer_sleeper *futex_setup_timer(ktime_t *time, struct hrtimer_sleeper *timeout, int flags, u64 range_ns)¶
設定睡眠 hrtimer。
引數
ktime_t *time指向給定超時值的指標
struct hrtimer_sleeper *timeout要設定的 hrtimer_sleeper 結構
int flagsfutex 標誌
u64 range_ns可選範圍,單位為納秒
返回
- 已初始化的 hrtimer_sleeper 結構,如果沒有超時,則為 NULL
給出值
-
int get_futex_key(u32 __user *uaddr, unsigned int flags, union futex_key *key, enum futex_access rw)¶
獲取作為 futex 鍵的引數
引數
u32 __user *uaddrfutex 的虛擬地址
unsigned int flagsFLAGS_*
union futex_key *key結果儲存的地址。
enum futex_access rw對映需要讀/寫(值:FUTEX_READ,FUTEX_WRITE)
返回
負錯誤程式碼或 0
描述
關鍵字儲存在 key 中,如果成功。
對於共享對映(當 fshared 時),鍵是
( inode->i_sequence, 對映中的頁面偏移量,頁面中的偏移量 )
[ 另請參見 get_inode_sequence_number() ]
對於私有對映(或當 !fshared 時),鍵是
( current->mm, 地址, 0 )
這允許(在適用的情況下,跨程序)識別 futex,而無需在 FUTEX_WAIT 期間保持頁面固定。
lock_page() 可能會休眠,呼叫者不應持有自旋鎖。
-
int fault_in_user_writeable(u32 __user *uaddr)¶
調入使用者地址並驗證 RW 訪問
引數
u32 __user *uaddr指向故障使用者空間地址的指標
描述
慢速路徑,用於修復我們在原子寫入訪問 uaddr 時發生的故障。
我們沒有對使用者地址進行非破壞性寫入的通用實現。我們知道我們在停用原子頁面錯誤的部分中發生了故障,因此我們也可以透過立即呼叫 get_user_pages() 來避免 #PF 開銷。
-
struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb, union futex_key *key)¶
返回 futex 上優先順序最高的等待者
引數
struct futex_hash_bucket *hbfutex_q 所在的雜湊桶
union futex_key *keyfutex 鍵(用於將其與其他 futex futex_q 區分開)
描述
必須在持有 hb 鎖的情況下呼叫。
-
void wait_for_owner_exiting(int ret, struct task_struct *exiting)¶
阻塞直到所有者退出
引數
int ret所有者當前的 futex 鎖狀態
struct task_struct *exiting指向正在退出的任務的指標
描述
呼叫者必須持有 exiting 上的引用計數。
引數
struct futex_q *q要取消排隊的 futex_q
描述
q->lock_ptr 不得為 NULL,並且必須由呼叫者持有。
引數
struct futex_q *q要取消排隊的 futex_q
描述
q->lock_ptr 不得由呼叫者持有。對 futex_unqueue() 的呼叫必須與之前對 futex_queue() 的一次呼叫配對。
返回
1 - 如果 futex_q 仍在排隊中(並且我們取消了它的排隊);
0 - 如果 futex_q 已被喚醒執行緒刪除
-
void futex_exit_recursive(struct task_struct *tsk)¶
將任務的 futex 狀態設定為 FUTEX_STATE_DEAD
引數
struct task_struct *tsk要在其上設定狀態的任務
描述
以無鎖方式設定任務的 futex 退出狀態。當任務正在退出時,futex 等待者程式碼會觀察到該狀態,並迴圈直到任務實際完成 futex 清理。最壞的情況是,等待者執行透過等待迴圈,直到狀態變得可見。
這是從 make_task_dead() 中的遞迴故障處理路徑呼叫的。
這是盡力而為的。要麼 futex 退出程式碼已經執行,要麼沒有執行。如果在 futex 上設定了 OWNER_DIED 位,則等待者可以接管它。如果不是,則問題被推回使用者空間。如果 futex 退出程式碼尚未執行,則已經排隊的等待者可能會永遠阻塞,但對此無能為力。
-
struct futex_q¶
雜湊的 futex 佇列條目,每個等待任務一個
定義:
struct futex_q {
struct plist_node list;
struct task_struct *task;
spinlock_t *lock_ptr;
futex_wake_fn *wake;
void *wake_data;
union futex_key key;
struct futex_pi_state *pi_state;
struct rt_mutex_waiter *rt_waiter;
union futex_key *requeue_pi_key;
u32 bitset;
atomic_t requeue_state;
bool drop_hb_ref;
#ifdef CONFIG_PREEMPT_RT;
struct rcuwait requeue_wait;
#endif;
};
成員
list等待此 futex 的任務的優先順序排序列表
task等待 futex 的任務
lock_ptr雜湊桶鎖
wake此佇列的喚醒處理程式
wake_data與喚醒處理程式關聯的資料
keyfutex 雜湊到的鍵
pi_state可選的優先順序繼承狀態
rt_waiter用於 requeue_pi 的 rt_waiter 儲存
requeue_pi_keyrequeue_pi 目標 futex 鍵
bitset用於可選的位掩碼喚醒的位集
requeue_statefutex_requeue_pi() 的狀態欄位
drop_hb_ref如果為 true,則等待者應刪除額外的雜湊桶引用
requeue_waitfutex_requeue_pi() 的 RCU 等待(僅 RT)
描述
我們使用此雜湊等待佇列,而不是普通的 wait_queue_entry_t,因此我們只能喚醒相關的等待佇列(雜湊佇列可能是共享的)。
futex_q 具有喚醒狀態,就像任務具有 TASK_RUNNING 一樣。當 plist_node_empty(q->list) || q->lock_ptr == 0 時,它被認為是喚醒的。喚醒的順序始終是使第一個條件為真,然後是第二個條件。
PI futex 通常在透過 rt_mutex 程式碼從雜湊列表中刪除之前被喚醒。請參閱 futex_unqueue_pi()。
-
int futex_match(union futex_key *key1, union futex_key *key2)¶
檢查兩個 futex 鍵是否相等
引數
union futex_key *key1指向 key1 的指標
union futex_key *key2指向 key2 的指標
描述
如果兩個 futex_key 相等,則返回 1,否則返回 0。
-
void futex_queue(struct futex_q *q, struct futex_hash_bucket *hb, struct task_struct *task)¶
在 futex_hash_bucket 上對 futex_q 進行排隊
引數
struct futex_q *q要排隊的 futex_q
struct futex_hash_bucket *hb目標雜湊桶
struct task_struct *task為此 futex 排隊的任務
描述
hb->lock 必須由呼叫者持有,並在此處釋放。對 futex_queue() 的呼叫通常與對 futex_unqueue() 的一次呼叫配對。例外情況包括 PI 相關操作,這些操作可能使用 futex_unqueue_pi(),或者如果在喚醒過程中完成取消排隊,並且取消排隊狀態隱含在已喚醒任務的狀態中,則不使用任何操作(有關示例,請參閱 futex_wait_requeue_pi())。
請注意,對於 futex 的非同步使用,task 可以為 NULL。
-
struct futex_vector¶
futex_waitv() 的輔助結構
定義:
struct futex_vector {
struct futex_waitv w;
struct futex_q q;
};
成員
w使用者空間提供的資料
q核心端資料
描述
用於構建包含 futex_waitv() 所需的所有資料的陣列的結構
-
int futex_lock_pi_atomic(u32 __user *uaddr, struct futex_hash_bucket *hb, union futex_key *key, struct futex_pi_state **ps, struct task_struct *task, struct task_struct **exiting, int set_waiters)¶
獲取支援 PI 的 futex 所需的原子操作
引數
u32 __user *uaddrPI futex 的使用者地址
struct futex_hash_bucket *hbPI futex 雜湊桶
union futex_key *key與 uaddr 和 hb 關聯的 futex 鍵
struct futex_pi_state **pspi_state 指標,用於儲存查詢結果
struct task_struct *task執行原子鎖操作的任務。除了 requeue pi 的情況外,通常是 "current"。
struct task_struct **exiting用於儲存正在退出的所有者任務的任務指標的指標
int set_waiters強制設定 FUTEX_WAITERS 位 (1) 或不設定 (0)
返回
0 - 準備好等待;
1 - 獲取了鎖;
<0 - 錯誤
描述
hb->lock 必須由呼叫者持有。
只有當返回值是 -EBUSY 時,才設定 **exiting**。如果設定了,則在返回時持有退出任務的引用計數,呼叫者需要在等待退出完成後釋放該引用計數。
引數
u32 __user *uaddrfutex 的使用者地址
struct futex_q *qfutex_q (包含 pi_state 和對 rt_mutex 的訪問)
int locked嘗試獲取 rt_mutex 是否成功 (1) 或不成功 (0)
描述
在嘗試鎖定 rt_mutex 之後,呼叫此函式來清理 pi_state 所有者,並處理可能允許我們獲取鎖的競爭條件。必須在持有 hb 鎖的情況下呼叫。
返回
1 - 成功,獲取了鎖;
0 - 成功,未獲取鎖;
<0 - 發生錯誤 (-EFAULT)
-
void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1, struct futex_hash_bucket *hb2, union futex_key *key2)¶
將 futex_q 從一個 hb 重新排隊到另一個 hb
引數
struct futex_q *q要重新排隊的 futex_q
struct futex_hash_bucket *hb1源 hash_bucket
struct futex_hash_bucket *hb2目標 hash_bucket
union futex_key *key2重新排隊的 futex_q 的新鍵
-
void requeue_pi_wake_futex(struct futex_q *q, union futex_key *key, struct futex_hash_bucket *hb)¶
喚醒在重新排隊期間獲取鎖的任務
引數
struct futex_q *qfutex_q
union futex_key *key重新排隊目標 futex 的鍵
struct futex_hash_bucket *hb重新排隊目標 futex 的 hash_bucket
描述
在 futex_requeue 期間,當 requeue_pi=1 時,如果目標 futex 沒有競爭或透過鎖竊取,則可以獲取該目標 futex。
將 **q**::key 設定為重新排隊目標 futex 鍵,以便等待者可以檢測到在正確 futex 上的喚醒。
從雜湊桶中取消排隊 **q**。
將 **q**::rt_waiter 設定為 NULL,以便喚醒的任務可以檢測到原子鎖的獲取。
將 q->lock_ptr 設定為重新排隊目標 hb->lock,以防等待者必須修復 PI 狀態。
完成重新排隊狀態,以便等待者可以取得進展。在此之後,如果不需要修復 PI 狀態,等待者任務可以立即從系統呼叫返回。
喚醒等待者任務。
必須在同時持有 q->lock_ptr 和 hb->lock 的情況下呼叫。
-
int futex_proxy_trylock_atomic(u32 __user *pifutex, struct futex_hash_bucket *hb1, struct futex_hash_bucket *hb2, union futex_key *key1, union futex_key *key2, struct futex_pi_state **ps, struct task_struct **exiting, int set_waiters)¶
嘗試為頂層等待者進行原子鎖操作
引數
u32 __user *pifutex要操作的 futex 的使用者地址
struct futex_hash_bucket *hb1來源 futex 雜湊桶,必須由呼叫者鎖定
struct futex_hash_bucket *hb2目標 futex 雜湊桶,必須由呼叫者鎖定
union futex_key *key1來源 futex 鍵
union futex_key *key2目標 futex 鍵
struct futex_pi_state **ps用於儲存 pi_state 指標的地址
struct task_struct **exiting用於儲存正在退出的所有者任務的任務指標的指標
int set_waiters強制設定 FUTEX_WAITERS 位 (1) 或不設定 (0)
描述
如果可以原子地完成,則嘗試代表頂層等待者獲取鎖。 如果成功,則喚醒頂層等待者。 如果呼叫者指定了 set_waiters,則指示 futex_lock_pi_atomic() 強制設定 FUTEX_WAITERS 位。 hb1 和 hb2 必須由呼叫者持有。
只有當返回值是 -EBUSY 時,才設定 **exiting**。如果設定了,則在返回時持有退出任務的引用計數,呼叫者需要在等待退出完成後釋放該引用計數。
返回
0 - 原子獲取鎖失敗;
>0 - 獲取了鎖,返回值是頂層等待者的 vpid
<0 - 錯誤
-
int futex_requeue(u32 __user *uaddr1, unsigned int flags1, u32 __user *uaddr2, unsigned int flags2, int nr_wake, int nr_requeue, u32 *cmpval, int requeue_pi)¶
將等待者從 uaddr1 重新排隊到 uaddr2
引數
u32 __user *uaddr1源 futex 使用者地址
unsigned int flags1futex 標誌 (FLAGS_SHARED, 等等)
u32 __user *uaddr2目標 futex 使用者地址
unsigned int flags2futex 標誌 (FLAGS_SHARED, 等等)
int nr_wake要喚醒的等待者數量(對於 requeue_pi 必須為 1)
int nr_requeue要重新排隊的等待者數量 (0-INT_MAX)
u32 *cmpval**uaddr1** 期望的值 (或
NULL)int requeue_pi如果我們試圖從非 PI futex 重新排隊到 PI futex(不支援 PI 到 PI 的重新排隊)
描述
將 uaddr1 上的等待者重新排隊到 uaddr2。在 requeue_pi 的情況下,嘗試代表頂層等待者原子地獲取 uaddr2。
返回
>=0 - 成功時,重新排隊或喚醒的任務數;
<0 - 出錯
-
int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb, struct futex_q *q, struct hrtimer_sleeper *timeout)¶
處理初始 futex 上的提前喚醒
引數
struct futex_hash_bucket *hbfutex_q 最初排隊的 hash_bucket
struct futex_q *q在等待重新排隊時喚醒的 futex_q
struct hrtimer_sleeper *timeout與等待關聯的超時(如果沒有,則為 NULL)
描述
確定提前喚醒的原因。
返回
-EWOULDBLOCK 或 -ETIMEDOUT 或 -ERESTARTNOINTR
-
int futex_wait_requeue_pi(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset, u32 __user *uaddr2)¶
在 uaddr 上等待並獲取 uaddr2
引數
u32 __user *uaddr我們最初等待的 futex(非 PI)
unsigned int flagsfutex 標誌(FLAGS_SHARED、FLAGS_CLOCKRT 等),它們必須是相同的型別,不能從私有重新排隊到共享等等。
u32 valuaddr 的期望值
ktime_t *abs_time絕對超時
u32 bitset由使用者空間設定的 32 位喚醒位集,預設為全部
u32 __user *uaddr2在返回使用者空間之前我們將要獲取的 PI futex
描述
呼叫者將在 uaddr 上等待,並由 futex_requeue() 重新排隊到 uaddr2,uaddr2 必須是 PI 感知的且與 uaddr 不同。 普通喚醒將在 uaddr2 上喚醒,並在返回使用者空間之前完成 rt_mutex 的獲取。 這確保了 rt_mutex 在有等待者時保持所有者; 如果沒有所有者,PI 邏輯將不知道要提升/降低哪個任務(如果需要)。
當我們在排隊時呼叫 futex_wait_queue() 中的 schedule,並透過以下方式返回:1) 在 futex_requeue() 進行原子鎖獲取後在 uaddr2 上喚醒 2) 重新排隊後在 uaddr2 上喚醒 3) 訊號 4) 超時
如果 3,則清理並返回 -ERESTARTNOINTR。
如果 2,我們可能會阻止嘗試獲取 rt_mutex 並透過以下方式返回:5) 成功鎖定 6) 訊號 7) 超時 8) 其他鎖定獲取失敗
如果 6,則返回 -EWOULDBLOCK(重新啟動系統呼叫會執行相同的操作)。
如果 4 或 7,我們清理並返回 -ETIMEDOUT。
返回
0 - 成功;
<0 - 出錯
-
void futex_do_wait(struct futex_q *q, struct hrtimer_sleeper *timeout)¶
等待喚醒、超時或訊號
引數
struct futex_q *q要在其上排隊的 futex_q
struct hrtimer_sleeper *timeout準備好的 hrtimer_sleeper,或者如果沒有超時則為 null
-
int futex_unqueue_multiple(struct futex_vector *v, int count)¶
從它們的雜湊桶中移除各種 futex
引數
struct futex_vector *v要取消排隊的 futex 列表
int count列表中的 futex 數量
描述
用於取消排隊 futex 列表的輔助函式。 這不會失敗。
返回
>=0 - 最後一個被喚醒的 futex 的索引;
- -1
沒有 futex 被喚醒
-
int futex_wait_multiple_setup(struct futex_vector *vs, int count, int *woken)¶
準備等待和排隊多個 futex
引數
struct futex_vector *vs要等待的 futex 列表
int count列表的大小
int *woken最後一個喚醒的 futex 的索引(如果有)。 用於通知呼叫者,它可以將此索引返回給使用者空間(返回引數)
描述
在一個步驟中準備多個 futex 並將它們排隊。 如果 futex 列表無效或者任何 futex 已經被喚醒,這可能會失敗。 成功後,任務準備好進行可中斷睡眠。
返回
1 - 其中一個 futex 被另一個執行緒喚醒
0 - 成功
<0 - -EFAULT, -EWOULDBLOCK 或 -EINVAL
-
void futex_sleep_multiple(struct futex_vector *vs, unsigned int count, struct hrtimer_sleeper *to)¶
檢查睡眠條件並睡眠
引數
struct futex_vector *vs要等待的互斥鎖列表
unsigned int countvs 的長度
struct hrtimer_sleeper *to超時
描述
當且僅當超時未過期且列表中的互斥鎖均未被喚醒時,才進入睡眠狀態。
-
int futex_wait_multiple(struct futex_vector *vs, unsigned int count, struct hrtimer_sleeper *to)¶
準備等待併入隊多個互斥鎖
引數
struct futex_vector *vs要等待的互斥鎖列表
unsigned int count物件的數量
struct hrtimer_sleeper *to放棄並返回使用者空間之前的超時時間
描述
FUTEX_WAIT_MULTIPLE 互斥鎖操作的入口點,此函式在一個互斥鎖組上睡眠,並在第一個互斥鎖被喚醒時或超時時間過後返回。
返回
>=0 - 喚醒的互斥鎖的提示
<0 - 出錯
-
int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags, struct futex_q *q, union futex_key *key2, struct task_struct *task)¶
準備等待一個互斥鎖
引數
u32 __user *uaddr互斥鎖使用者空間地址
u32 val期望值
unsigned int flagsfutex 標誌 (FLAGS_SHARED, 等等)
struct futex_q *q關聯的 futex_q
union futex_key *key2如果用於 requeue PI,則為第二個 futex_key
struct task_struct *task為此 futex 排隊的任務
描述
設定 futex_q 並定位 hash_bucket。 獲取互斥鎖值並將其與期望值進行比較。 在內部處理原子錯誤。 成功時保持 hb 鎖,失敗時解鎖返回。
返回
0 - uaddr 包含 val 並且 hb 已被鎖定;
- <0 - 出現錯誤,並且 hb 已解鎖。 可能的原因:uaddr 無法
被讀取,不包含期望值或未正確對齊。
進一步閱讀¶
Documentation/locking/spinlocks.rst:Linus Torvalds 在核心原始碼中的自旋鎖教程。現代架構的 Unix 系統:核心程式設計師的對稱多處理和快取
Curt Schimmel 對核心級別鎖定的非常好的介紹(不是為 Linux 編寫的,但幾乎所有內容都適用)。 這本書很貴,但對於理解 SMP 鎖定來說,每一分錢都值得。 [ISBN: 0201633388]
感謝¶
感謝 Telsa Gwynne 進行 DocBooking、整理和新增樣式。
感謝 Martin Pool、Philipp Rumpf、Stephen Rothwell、Paul Mackerras、Ruedi Aschwanden、Alan Cox、Manfred Spraul、Tim Waugh、Pete Zaitcev、James Morris、Robert Love、Paul McKenney、John Ashby 進行校對、更正、批評和評論。
感謝該聯盟對本文件沒有任何影響。
詞彙表¶
- 搶佔
在 2.5 之前,或當
CONFIG_PREEMPT未設定時,核心中使用者上下文中的程序不會相互搶佔(即,除非您放棄 CPU,否則您將擁有它,除非發生中斷)。 隨著 2.5.4 中添加了CONFIG_PREEMPT,這種情況發生了變化:當處於使用者上下文時,優先順序更高的任務可以“切入”:自旋鎖已更改為停用搶佔,即使在 UP 上也是如此。- bh
Bottom Half:由於歷史原因,其中包含“_bh”的函式現在通常指任何軟體中斷,例如 spin_lock_bh() 會阻止當前 CPU 上的任何軟體中斷。 Bottom halves 已被棄用,最終將被 tasklets 替換。 任何時候只有一個 bottom half 將執行。
- 硬體中斷 / 硬體 IRQ
硬體中斷請求。 in_hardirq() 在硬體中斷處理程式中返回 true。
- 中斷上下文
不是使用者上下文:正在處理硬體 irq 或軟體 irq。 由返回 true 的 in_interrupt() 宏指示。
- SMP
對稱多處理器:為多 CPU 機器編譯的核心。 (
CONFIG_SMP=y)。- 軟體中斷 / softirq
軟體中斷處理程式。 in_hardirq() 返回 false; in_softirq() 返回 true。 Tasklets 和 softirqs 都屬於“軟體中斷”類別。
嚴格來說,softirq 是最多 32 個列舉的軟體中斷之一,可以一次在多個 CPU 上執行。 有時也用於指 tasklets(即所有軟體中斷)。
- tasklet
一個可以動態註冊的軟體中斷,保證一次只能在一個 CPU 上執行。
- timer
一個可以動態註冊的軟體中斷,在給定的時間(或接近給定的時間)執行。 執行時,它就像一個 tasklet(事實上,它們是從
TIMER_SOFTIRQ呼叫的)。- UP
單處理器:非 SMP。 (
CONFIG_SMP=n)。- 使用者上下文
核心代表特定程序(即系統呼叫或陷阱)或核心執行緒執行。 您可以使用
current宏來判斷哪個程序。) 不要與使用者空間混淆。 可以被軟體或硬體中斷中斷。- 使用者空間
一個在其自己的程式碼中在核心外部執行的程序。