使用 RCU hlist_nulls 保護列表和物件

本節描述瞭如何使用 hlist_nulls 來保護主要為只讀的連結串列和物件,這些物件使用 SLAB_TYPESAFE_BY_RCU 分配。

請閱讀使用 RCU 保護主要為只讀的連結串列中的基礎知識。

使用 'nulls'

使用特殊的標記(稱為“nulls”)是解決以下問題的便捷方法。

如果沒有“nulls”,則典型的 RCU 連結串列管理使用 SLAB_TYPESAFE_BY_RCU kmem_cache 分配的物件,可以使用以下演算法。以下示例假設“obj”是指向此類物件的指標,其型別如下。

struct object {
  struct hlist_node obj_node;
  atomic_t refcnt;
  unsigned int key;
};

1) 查詢演算法

begin:
rcu_read_lock();
obj = lockless_lookup(key);
if (obj) {
  if (!try_get_ref(obj)) { // might fail for free objects
    rcu_read_unlock();
    goto begin;
  }
  /*
  * Because a writer could delete object, and a writer could
  * reuse these object before the RCU grace period, we
  * must check key after getting the reference on object
  */
  if (obj->key != key) { // not the object we expected
    put_ref(obj);
    rcu_read_unlock();
    goto begin;
  }
}
rcu_read_unlock();

請注意,lockless_lookup(key) 不能使用傳統的 hlist_for_each_entry_rcu(),而應使用帶有附加記憶體屏障 (smp_rmb()) 的版本

lockless_lookup(key)
{
  struct hlist_node *node, *next;
  for (pos = rcu_dereference((head)->first);
       pos && ({ next = pos->next; smp_rmb(); prefetch(next); 1; }) &&
       ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; });
       pos = rcu_dereference(next))
    if (obj->key == key)
      return obj;
  return NULL;
}

並請注意,傳統的 hlist_for_each_entry_rcu() 缺少此 smp_rmb()

struct hlist_node *node;
for (pos = rcu_dereference((head)->first);
     pos && ({ prefetch(pos->next); 1; }) &&
     ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; });
     pos = rcu_dereference(pos->next))
  if (obj->key == key)
    return obj;
return NULL;

引用 Corey Minyard

"If the object is moved from one list to another list in-between the
time the hash is calculated and the next field is accessed, and the
object has moved to the end of a new list, the traversal will not
complete properly on the list it should have, since the object will
be on the end of the new list and there's not a way to tell it's on a
new list and restart the list traversal. I think that this can be
solved by pre-fetching the "next" field (with proper barriers) before
checking the key."

2) 插入演算法

我們需要確保讀者無法讀取新的 'obj->obj_node.next' 值和 'obj->key' 的先前值。否則,可以從一個鏈中刪除一個專案,並將其插入到另一個鏈中。如果新鏈在移動之前是空的,則“next”指標為 NULL,並且無鎖讀者無法檢測到它錯過了原始鏈中的以下專案。

/*
 * Please note that new inserts are done at the head of list,
 * not in the middle or end.
 */
obj = kmem_cache_alloc(...);
lock_chain(); // typically a spin_lock()
obj->key = key;
atomic_set_release(&obj->refcnt, 1); // key before refcnt
hlist_add_head_rcu(&obj->obj_node, list);
unlock_chain(); // typically a spin_unlock()

3) 刪除演算法

這裡沒什麼特別的,我們可以使用標準的 RCU hlist 刪除。但由於 SLAB_TYPESAFE_BY_RCU,請注意,刪除的物件可以非常非常快地重複使用(在 RCU 寬限期結束之前)

if (put_last_reference_on(obj) {
  lock_chain(); // typically a spin_lock()
  hlist_del_init_rcu(&obj->obj_node);
  unlock_chain(); // typically a spin_unlock()
  kmem_cache_free(cachep, obj);
}

避免額外的 smp_rmb()

使用 hlist_nulls,我們可以避免在 lockless_lookup() 中使用額外的 smp_rmb()。

例如,如果我們選擇將槽號儲存為雜湊表的每個槽的“nulls”列表結束標記,則可以檢測到競爭(一些寫入者執行了刪除和/或將物件移動到另一個鏈),檢查最終的“nulls”值,如果查詢遇到了鏈的末尾。如果最終的“nulls”值不是槽號,那麼我們必須重新開始從頭查詢。如果物件被移動到同一個鏈中,那麼讀者並不關心:它可能會偶爾再次掃描列表而沒有害處。

請注意,使用 hlist_nulls 意味著“struct object”的“obj_node”欄位的型別變為“struct hlist_nulls_node”。

1) 查詢演算法

head = &table[slot];
begin:
rcu_read_lock();
hlist_nulls_for_each_entry_rcu(obj, node, head, obj_node) {
  if (obj->key == key) {
    if (!try_get_ref(obj)) { // might fail for free objects
      rcu_read_unlock();
      goto begin;
    }
    if (obj->key != key) { // not the object we expected
      put_ref(obj);
      rcu_read_unlock();
      goto begin;
    }
    goto out;
  }
}

// If the nulls value we got at the end of this lookup is
// not the expected one, we must restart lookup.
// We probably met an item that was moved to another chain.
if (get_nulls_value(node) != slot) {
  put_ref(obj);
  rcu_read_unlock();
  goto begin;
}
obj = NULL;

out:
rcu_read_unlock();

2) 插入演算法

與上面相同,但使用 hlist_nulls_add_head_rcu() 而不是 hlist_add_head_rcu()

/*
 * Please note that new inserts are done at the head of list,
 * not in the middle or end.
 */
obj = kmem_cache_alloc(cachep);
lock_chain(); // typically a spin_lock()
obj->key = key;
atomic_set_release(&obj->refcnt, 1); // key before refcnt
/*
 * insert obj in RCU way (readers might be traversing chain)
 */
hlist_nulls_add_head_rcu(&obj->obj_node, list);
unlock_chain(); // typically a spin_unlock()