背景
Linux 内核使用 XArray(eXtensible Array)作为 Linux IO 栈 的页面缓存(page cache)容器实现。
本文会介绍 XArray 的具体实现,以及 XArray 在 page cache 层的应用。
WIP 警告
WORK IN PROGRESS
最近事情特别多,本来一两天写完的文章要拖两三周……(一天加一行注释太搞笑了)
读者看不明白这篇文章很正常,很多概念还没写清楚。先挂几天,实在更新不过来就删掉合适再重发。
基本介绍
仅从 外部 API 来看,XArray 可视为自动扩容的无限大的数组。
而从作者的表态来看,XArray 目标是作为一个「万能」的数据结构,除了替代明确的 IDR 或者 page cache 场景的基数树,还可以部分替代内核中的链表、红黑树和哈希表,其定位类似于朱棣数组。
NOTES:
- 其实是朱迪数组(Judy Array)🙃
- XArray 的场景适用于小数据集(比链表更省结点空间),或者是相对密集的大数据集(比哈希表更有局部性且无冲突)。
- XArray 还满足了内核特定的需求,比如提供 RCU-free 特性、大页 page cache 性能优化和筛选 mask 的快速遍历等。
- XArray 并不是通用类型的键值容器:键(index)肯定是 unsigned long 类型,值(entry)虽然在字面上是 void* 类型,但是有严格的要求,详见后续文章说明。
- 作者表态归表态,实际替代哈希表存疑。内核中没有 commit 是这么做的……
- XArray 事实上也算是新瓶装旧酒。只看原理,它就是基数树。
- 本文基于 Linux 内核版本 v6.15。
调试建议
XArray 支持用户态调试,可以在 Linux 源码根目录下执行:
cd ./tools/testing/radix-tree/
make
gdb ./xarray
总之随意使用调试器即可。
注意有部分宏定义可能不同于内核态版本,而且内存分配也有细节差异。
(调试的时候实在整无语了……当然改成内核态就更不好调了)
数据结构
anchor
/**
* struct xarray - The anchor of the XArray.
* @xa_lock: Lock that protects the contents of the XArray.
*
* To use the xarray, define it statically or embed it in your data structure.
* It is a very small data structure, so it does not usually make sense to
* allocate it separately and keep a pointer to it in your data structure.
*
* You may use the xa_lock to protect your own data structures as well.
*/
/*
* If all of the entries in the array are NULL, @xa_head is a NULL pointer.
* If the only non-NULL entry in the array is at index 0, @xa_head is that
* entry. If any other entry in the array is non-NULL, @xa_head points
* to an @xa_node.
*/
struct xarray {
spinlock_t xa_lock;
/* private: The rest of the data structure is not to be used directly. */
gfp_t xa_flags;
void __rcu * xa_head;
};
struct xarray
是提供的用户外部的核心数据结构,只有三样东西:
xa_lock
:保护并发安全的自旋锁。注意 XArray 还会结合使用 RCU 机制(见 load 操作)。xa_flags
:携带 GFP 标记(get free page),其他细节见 xarray.h 头文件。xa_head
:指向 NULL、entry 或者xa_node
对象。entry 可理解为键值对当中的值。
entry
/**
* xa_mk_value() - Create an XArray entry from an integer.
* @v: Value to store in XArray.
*
* Context: Any context.
* Return: An entry suitable for storing in the XArray.
*/
static inline void *xa_mk_value(unsigned long v)
{
WARN_ON((long)v < 0);
return (void *)((v << 1) | 1);
}
/**
* xa_to_value() - Get value stored in an XArray entry.
* @entry: XArray entry.
*
* Context: Any context.
* Return: The value stored in the XArray entry.
*/
static inline unsigned long xa_to_value(const void *entry)
{
return (unsigned long)entry >> 1;
}
/**
* xa_is_value() - Determine if an entry is a value.
* @entry: XArray entry.
*
* Context: Any context.
* Return: True if the entry is a value, false if it is a pointer.
*/
static inline bool xa_is_value(const void *entry)
{
return (unsigned long)entry & 1;
}
补充一下 entry 的说明,虽然 entry 是 void* 类型,实质上用户在 entry 只可存放:
- 特定指针。要求是经由内核分配的指针(比如 kmalloc())。或者任意四字节对齐过的指针。
- 特定整数。要求是
[0, LONG_MAX)
范围内的整数,注意这是 long 的半侧。
// 见:https://elixir.bootlin.com/linux/v6.15/source/include/linux/xarray.h#L27
// 注意这些 internal entry 的数值是要经过转换的,比如 257 (Zero entry) 实际是 257<<2 | 0b10
/*
* The bottom two bits of the entry determine how the XArray interprets
* the contents:
*
* 00: Pointer entry
* 10: Internal entry
* x1: Value entry or tagged pointer
*
* Attempting to store internal entries in the XArray is a bug.
*
* Most internal entries are pointers to the next node in the tree.
* The following internal entries have a special meaning:
*
* 0-62: Sibling entries
* 256: Retry entry
* 257: Zero entry
*
* Errors are also represented as internal entries, but use the negative
* space (-4094 to -2). They're never stored in the slots array; only
* returned by the normal API.
*/
NOTES:
- 除了用户 entry,XArray 还具有内部 entry:node、sibling、retry 和 zero。
- 并且 index(键)与 entry 也不一定是一对一的关系,见 multi-index entry 说明。
node
/*
* @count is the count of every non-NULL element in the ->slots array
* whether that is a value entry, a retry entry, a user pointer,
* a sibling entry or a pointer to the next level of the tree.
* @nr_values is the count of every element in ->slots which is
* either a value entry or a sibling of a value entry.
*/
struct xa_node {
unsigned char shift; /* Bits remaining in each slot */
unsigned char offset; /* Slot offset in parent */
unsigned char count; /* Total entry count */
unsigned char nr_values; /* Value entry count */
struct xa_node __rcu *parent; /* NULL at top of tree */
struct xarray *array; /* The array we belong to */
union {
struct list_head private_list; /* For tree user */
struct rcu_head rcu_head; /* Used when freeing node */
};
void __rcu *slots[XA_CHUNK_SIZE];
union {
unsigned long tags[XA_MAX_MARKS][XA_MARK_LONGS];
unsigned long marks[XA_MAX_MARKS][XA_MARK_LONGS];
};
};
使用 crash> struct xa_node
解析宏:
- XA_CHUNK_SIZE = 64
- XA_MAX_MARKS = 3
- XA_MARK_LONGS = 1
shift
、offset
、parent
和 slots
用于维护遍历寻址:
shift
:对键的右移偏移量。当shift == 0
时不会有指向下一层的结点(node entry)。offset
:自身在父结点中的slots[]
下标。parent
:指向父结点。slots[64]
:存放 entry 的容器。所以第一层最多能容纳 64 个结点,第二层 4096,以此类推……
// 见:https://elixir.bootlin.com/linux/v6.15/source/include/linux/fs.h#L531
/* XArray tags, for tagging dirty and writeback pages in the pagecache. */
#define PAGECACHE_TAG_DIRTY XA_MARK_0
#define PAGECACHE_TAG_WRITEBACK XA_MARK_1
#define PAGECACHE_TAG_TOWRITE XA_MARK_2
marks
用于支持 xa_for_each_marked() 接口。具体到 page cache 的用途,像「只遍历脏页」的操作就可以轻松实现。
state
/*
* The xa_state is opaque to its users. It contains various different pieces
* of state involved in the current operation on the XArray. It should be
* declared on the stack and passed between the various internal routines.
* The various elements in it should not be accessed directly, but only
* through the provided accessor functions. The below documentation is for
* the benefit of those working on the code, not for users of the XArray.
*
* @xa_node usually points to the xa_node containing the slot we're operating
* on (and @xa_offset is the offset in the slots array). If there is a
* single entry in the array at index 0, there are no allocated xa_nodes to
* point to, and so we store %NULL in @xa_node. @xa_node is set to
* the value %XAS_RESTART if the xa_state is not walked to the correct
* position in the tree of nodes for this operation. If an error occurs
* during an operation, it is set to an %XAS_ERROR value. If we run off the
* end of the allocated nodes, it is set to %XAS_BOUNDS.
*/
struct xa_state {
struct xarray *xa;
unsigned long xa_index;
unsigned char xa_shift;
unsigned char xa_sibs;
unsigned char xa_offset;
unsigned char xa_pad; /* Helps gcc generate better code */
struct xa_node *xa_node;
struct xa_node *xa_alloc;
xa_update_node_t xa_update;
struct list_lru *xa_lru;
};
XA state 可以视为一个贯穿 XArray 操作的状态机,用以保留(快速恢复)必要的上下文信息,不必让每个子操作都从根重新遍历处理。
/**
* XA_STATE() - Declare an XArray operation state.
* @name: Name of this operation state (usually xas).
* @array: Array to operate on.
* @index: Initial index of interest.
*
* Declare and initialise an xa_state on the stack.
*/
#define XA_STATE(name, array, index) \
struct xa_state name = __XA_STATE(array, index, 0, 0)
#define __XA_STATE(array, index, shift, sibs) { \
.xa = array, \
.xa_index = index, \
.xa_shift = shift, \
.xa_sibs = sibs, \
.xa_offset = 0, \
.xa_pad = 0, \
.xa_node = XAS_RESTART, \
.xa_alloc = NULL, \
.xa_update = NULL, \
.xa_lru = NULL, \
}
在常规的 XArray 读写操作中,用户不需要手动声明 XA state 对象。
但是我们后面要看实现,所以得知道一些。XA state 在初始化时,其字段除了声明的 array、index 以及特殊标记的 xa_node
(见注释)以外,均为 0 或者 NULL。注意 xa_state->xa_offset
和 xa_node->offset
含义不同,还是建议亲自看注释。
store 操作
接下来直接看源码。和过往文章一样,是按照深度优先的顺序展开代码。
我们从 store 操作的门面 xa_store()
开始。
xa store
/**
* xa_store() - Store this entry in the XArray.
* @xa: XArray.
* @index: Index into array.
* @entry: New entry.
* @gfp: Memory allocation flags.
*
* After this function returns, loads from this index will return @entry.
* Storing into an existing multi-index entry updates the entry of every index.
* The marks associated with @index are unaffected unless @entry is %NULL.
*
* Context: Any context. Takes and releases the xa_lock.
* May sleep if the @gfp flags permit.
* Return: The old entry at this index on success, xa_err(-EINVAL) if @entry
* cannot be stored in an XArray, or xa_err(-ENOMEM) if memory allocation
* failed.
*/
void *xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
void *curr;
// 获取自旋锁
xa_lock(xa);
// 在内部可能多次释放-获取锁
curr = __xa_store(xa, index, entry, gfp);
// 释放自旋锁
xa_unlock(xa);
return curr;
}
/**
* __xa_store() - Store this entry in the XArray.
* @xa: XArray.
* @index: Index into array.
* @entry: New entry.
* @gfp: Memory allocation flags.
*
* You must already be holding the xa_lock when calling this function.
* It will drop the lock if needed to allocate memory, and then reacquire
* it afterwards.
*
* Context: Any context. Expects xa_lock to be held on entry. May
* release and reacquire xa_lock if @gfp flags permit.
* Return: The old entry at this index or xa_err() if an error happened.
*/
void *__xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
XA_STATE(xas, xa, index);
void *curr;
// 是否误用 advanced API
if (WARN_ON_ONCE(xa_is_advanced(entry)))
return XA_ERROR(-EINVAL);
// 是否改用 zero entry
// 空 entry 在 alloc 特性下转换为 zero entry
if (xa_track_free(xa) && !entry)
entry = XA_ZERO_ENTRY;
do {
// 此时 index 和 xa 信息由 xas 上下文维护而无需传递参数
curr = xas_store(&xas, entry);
if (xa_track_free(xa))
xas_clear_mark(&xas, XA_FREE_MARK);
// xas nomem 流程:用于处理上下文中出现 ENOMEM 的情况
// 会释放锁,调用 kmem cache 接口分配内存到 xas->xa_alloc,再重新获取锁
//
// 非 ENOMEM 情况会只返回 false(顺便销毁 xas 上下文,表示这是最后一步)
} while (__xas_nomem(&xas, gfp));
return xas_result(&xas, xa_zero_to_null(curr));
}
store 操作的门面流程很简单:
- 持锁。
- 创建 xas 上下文。
- 在 xas 上下文处理核心的 xas store 流程。
- 销毁 xas 上下文。
- 解锁并返回结果。
xas store
/**
* xas_store() - Store this entry in the XArray.
* @xas: XArray operation state.
* @entry: New entry.
*
* If @xas is operating on a multi-index entry, the entry returned by this
* function is essentially meaningless (it may be an internal entry or it
* may be %NULL, even if there are non-NULL entries at some of the indices
* covered by the range). This is not a problem for any current users,
* and can be changed if needed.
*
* Return: The old entry at this index.
*/
void *xas_store(struct xa_state *xas, void *entry)
{
struct xa_node *node;
void __rcu **slot = &xas->xa->xa_head;
unsigned int offset, max;
int count = 0;
int values = 0;
// 两个迭代用的 entry 指针
void *first, *next;
bool value = xa_is_value(entry);
// 确定 first
if (entry) {
// 表示是否能直接存放在根结点中
bool allow_root = !xa_is_node(entry) && !xa_is_zero(entry);
// xas create 流程,
// - 签名已有的注释:
// 如果 slot 已存在,则返回对应的 entry
// 如果 slot 新分配,则返回 NULL
// - 补充:
// 如果首次调用 index == 0,不会引起分配且返回 NULL
first = xas_create(xas, allow_root);
} else {
first = xas_load(xas);
}
// 需要重试或者存在错误
if (xas_invalid(xas))
// 返回旧的 entry
return first;
// 这是 xas create 流程提供的 xa_node 结点
node = xas->xa_node;
// xa_sibs 为 multi-order entry 特性相关,暂时省略
if (node && (xas->xa_shift < node->shift))
xas->xa_sibs = 0;
// 内容重复,不需处理
if ((first == entry) && !xas->xa_sibs)
return first;
// 后续 for 迭代过程只看 next(slot++)
next = first;
// 用于寻址 slots[offset]
// 在 xas create (-> xas descend) 流程时会更新该字段
offset = xas->xa_offset;
max = xas->xa_offset + xas->xa_sibs;
if (node) {
slot = &node->slots[offset];
if (xas->xa_sibs)
xas_squash_marks(xas);
}
// 指针 NULL 会进入该分支
// 数值 0 不会进入
if (!entry)
// 初始化三个 mark,见下方注释
xas_init_marks(xas);
for (;;) {
/*
* Must clear the marks before setting the entry to NULL,
* otherwise xas_for_each_marked may find a NULL entry and
* stop early. rcu_assign_pointer contains a release barrier
* so the mark clearing will appear to happen before the
* entry is set to NULL.
*/
// store 核心操作
// slot 在此前有两条选择:
// - 如果存在 node,则优先选择该结点(xas create 流程提供,存放在 xas->xa_node)
// - 如果没有 node,则选择 xas->xa->xa_head
// (xas create 流程会预先抹除原有的 xas->xa_node == XAS_RESTART 标记)
rcu_assign_pointer(*slot, entry);
if (xa_is_node(next) && (!node || node->shift))
xas_free_nodes(xas, xa_to_node(next));
if (!node)
break;
count += !next - !entry;
values += !xa_is_value(first) - !value;
if (entry) {
// 常规 entry 在这里就会 break
if (offset == max)
break;
if (!xa_is_sibling(entry))
entry = xa_mk_sibling(xas->xa_offset);
} else {
if (offset == XA_CHUNK_MASK)
break;
}
next = xa_entry_locked(xas->xa, node, ++offset);
if (!xa_is_sibling(next)) {
if (!entry && (offset > max))
break;
first = next;
}
slot++;
}
update_node(xas, node, count, values);
return first;
}
xas store 流程基本隐藏了具体的分配过程,本体只需完成 *slot = entry
的关键存储操作。在 xas store 具体赋值 slot 前,XArray 甚至可以是一棵完全空内存(无动态内存分配)的树,需要靠 xas create 流程创建具体的 slot 内存空间。
xas create
/*
* xas_create() - Create a slot to store an entry in.
* @xas: XArray operation state.
* @allow_root: %true if we can store the entry in the root directly
*
* Most users will not need to call this function directly, as it is called
* by xas_store(). It is useful for doing conditional store operations
* (see the xa_cmpxchg() implementation for an example).
*
* Return: If the slot already existed, returns the contents of this slot.
* If the slot was newly created, returns %NULL. If it failed to create the
* slot, returns %NULL and indicates the error in @xas.
*/
static void *xas_create(struct xa_state *xas, bool allow_root)
{
struct xarray *xa = xas->xa;
void *entry;
void __rcu **slot;
struct xa_node *node = xas->xa_node;
int shift;
unsigned int order = xas->xa_shift;
// 有多条路径区分场景,总之都是为了确定 shift、entry 和 slot 变量
// 表示当前 xa_node 是 XAS_BOUNDS 或者 XAS_RESTART(初始值)
// 这两个标记见 xas 注释说明
if (xas_top(node)) {
// 无锁获取 xa->xa_head,尝试以 head 作为单独的 entry 容器(从而免去真正的 slot)
entry = xa_head_locked(xa);
// xa_node 从 XAS_RESTART(初始值)改动为 NULL
xas->xa_node = NULL;
// 是否 xa flags 携带 XA_FLAGS_ZERO_BUSY
if (!entry && xa_zero_busy(xa))
entry = XA_ZERO_ENTRY;
// xas expand 流程:提供 node 结点
// 表示不断添加结点直至满足 xas->xa_index 需求
// 该流程在首次调用(非 multi-index)时,由于 xa_head 为空,必直接返回 shift
shift = xas_expand(xas, entry);
if (shift < 0)
return NULL;
if (!shift && !allow_root)
shift = XA_CHUNK_SHIFT;
// 如果是首次调用,其实整个流程会直接退出,对应函数末尾的 return entry
entry = xa_head_locked(xa);
// slot 初步定位到 xa_head,后续将在该函数内为最终定位的 slot 赋值 entry
slot = &xa->xa_head;
} else if (xas_error(xas)) {
return NULL;
// xas 上下文快速恢复,不用重新计算了
} else if (node) {
unsigned int offset = xas->xa_offset;
shift = node->shift;
entry = xa_entry_locked(xa, node, offset);
slot = &node->slots[offset];
} else {
shift = 0;
entry = xa_head_locked(xa);
slot = &xa->xa_head;
}
while (shift > order) {
shift -= XA_CHUNK_SHIFT;
if (!entry) {
// xas alloc 流程:
// 分配一个 struct xa_node 结点,或者从已缓存的 xas->xa_alloc 字段获取并清空
// 此时该分配的结点:
// - parent 对应于 xas->xa_node
// - offset 对应于 xas->xa_offset(如果存在 parent)
// - shift 对应于 shift 参数
node = xas_alloc(xas, shift);
// 可能会存在 ENOMEM 的情况,具体错误记录于 xas 当中
// 这种情况会在上面 xa store 门面的 nomem 流程处理掉并(重试)恢复现场
if (!node)
break;
if (xa_track_free(xa))
node_mark_all(node, XA_FREE_MARK);
// 初始化某个 slot,指向分配好的 node
rcu_assign_pointer(*slot, xa_mk_node(node));
} else if (xa_is_node(entry)) {
node = xa_to_node(entry);
// 命中用户 entry
} else {
break;
}
// xas descend 流程:
// - 向下遍历(一层)
// - 根据 node 更新 xas->xa_node 和 xas->xa_offset
// - 同时寻址对应的 entry
// 该函数也用于 xa load 流程,不会修改树的结构
entry = xas_descend(xas, node);
slot = &node->slots[xas->xa_offset];
}
return entry;
}
xas create 流程本身是为了开辟 slot 所需的内存空间,但是需要注意 XArray 本身的层级是需要 xas exapnd 流程来提供基底。
xas expand
/*
* xas_expand adds nodes to the head of the tree until it has reached
* sufficient height to be able to contain @xas->xa_index
*/
static int xas_expand(struct xa_state *xas, void *head)
{
struct xarray *xa = xas->xa;
struct xa_node *node = NULL;
unsigned int shift = 0;
// 非 CONFIG_XARRAY_MULTI 的情况下,max 等同于 xa_index
unsigned long max = xas_max(xas);
// 尚未真正构建过结点
// 进入该路径,无论如何都会立刻返回
//(因为没有寻址能力)
if (!head) {
// 小对象优化:
// 对于 index == 0 && !CONFIG_XARRAY_MULTI 的情况
if (max == 0)
// 会直接返回 shift = 0
// 后续分配单个用户 entry 无需动态内存分配
return 0;
// 而其他情况,无论如何都会导致返回的 shift 大于等于 XA_CHUNK_SHIFT
// 从而引起动态内存分配
while ((max >> shift) >= XA_CHUNK_SIZE)
shift += XA_CHUNK_SHIFT;
return shift + XA_CHUNK_SHIFT;
} else if (xa_is_node(head)) {
node = xa_to_node(head);
shift = node->shift + XA_CHUNK_SHIFT;
}
xas->xa_node = NULL;
// 就是注释说的,处理这个 head 能寻址的最大范围
while (max > max_index(head)) {
xa_mark_t mark = 0;
XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
// xas alloc 流程前面提到了
// 这里 node 将会在后续成为新的根结点
node = xas_alloc(xas, shift);
if (!node)
return -ENOMEM;
node->count = 1;
if (xa_is_value(head))
node->nr_values = 1;
// 此前的小对象优化潜在 entry,需要搬移过来
RCU_INIT_POINTER(node->slots[0], head);
/* Propagate the aggregated mark info to the new child */
// 这些都是 mark 特性相关的
for (;;) {
if (xa_track_free(xa) && mark == XA_FREE_MARK) {
node_mark_all(node, XA_FREE_MARK);
if (!xa_marked(xa, XA_FREE_MARK)) {
node_clear_mark(node, 0, XA_FREE_MARK);
xa_mark_set(xa, XA_FREE_MARK);
}
} else if (xa_marked(xa, mark)) {
node_set_mark(node, 0, mark);
}
if (mark == XA_MARK_MAX)
break;
mark_inc(mark);
}
/*
* Now that the new node is fully initialised, we can add
* it to the tree
*/
// node 结点完成初始化
if (xa_is_node(head)) {
// 此前已经被指向 slots[0]
xa_to_node(head)->offset = 0;
// 原先的根节点「下降」一层
rcu_assign_pointer(xa_to_node(head)->parent, node);
}
// 分配的 node 成为新的 xa_head(根节点)
// 并且将用于下一次循环
head = xa_mk_node(node);
// 此时 xa_head 不再是 NULL
rcu_assign_pointer(xa->xa_head, head);
// 这个只是给内核用户响应 xas->xa_update 回调的,略
xas_update(xas, node);
// 处理下一层
shift += XA_CHUNK_SHIFT;
}
// 记录 node 到 xas 上下文
xas->xa_node = node;
return shift;
}
xas expand 流程就是看 xas->xa_index
的寻址能力通过 xas alloc 流程按需分配出 XArray 基本层级,并不会完全展开。而在前面 xas create 流程会再次 xas alloc 来分配内存到具体位置。
xas descend
static __always_inline void *xas_descend(struct xa_state *xas,
struct xa_node *node)
{
// 返回 (index >> node->shift) & XA_CHUNK_MASK
unsigned int offset = get_offset(xas->xa_index, node);
// 既然知道 offset 了,那就定位 entry = slots[offset]
void *entry = xa_entry(xas->xa, node, offset);
xas->xa_node = node;
while (xa_is_sibling(entry)) {
offset = xa_to_sibling(entry);
entry = xa_entry(xas->xa, node, offset);
if (node->shift && xa_is_node(entry))
entry = XA_RETRY_ENTRY;
}
xas->xa_offset = offset;
return entry;
}
xas descend 流程非常简单,按照 shift 切割的方式走一层。如果是某种形式的 while() xas_descend,那基本等同于 load 操作。
xas alloc
static void *xas_alloc(struct xa_state *xas, unsigned int shift)
{
struct xa_node *parent = xas->xa_node;
struct xa_node *node = xas->xa_alloc;
if (xas_invalid(xas))
return NULL;
// 如果 xa_alloc 已经缓存好一个结点,直接拿走
if (node) {
xas->xa_alloc = NULL;
// 否则向 kmem cache 要,这里就不是 XArray 的范围了
} else {
// 注意整个流程下来我们是持锁的
// 为了避免睡下去,这里 gfp 并没有使用/继承用户提供的 GFP 标记
gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
gfp |= __GFP_ACCOUNT;
node = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
// 失败就标记 ENOMEM,然后在合适的时机会有 xas nomem 流程挽救
if (!node) {
xas_set_err(xas, -ENOMEM);
return NULL;
}
}
if (parent) {
node->offset = xas->xa_offset;
parent->count++;
XA_NODE_BUG_ON(node, parent->count > XA_CHUNK_SIZE);
xas_update(xas, parent);
}
XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
XA_NODE_BUG_ON(node, !list_empty(&node->private_list));
node->shift = shift;
node->count = 0;
node->nr_values = 0;
RCU_INIT_POINTER(node->parent, xas->xa_node);
node->array = xas->xa;
return node;
}
xas alloc 流程只是一个简单 kmem cache 分配封装。注意整个流程下来我们是持锁的,为了避免睡下去,这里 gfp 并没有使用/继承用户提供的 GFP 标记。分配成功不一定意味着当前要动态内存分配(可能此前就已经做好了),分配失败也不一定意味着流程结束(合适时机有 xas nomem 流程挽救,并且 xas 自动机提供快速恢复路径)。
xas nomem
/*
* __xas_nomem() - Drop locks and allocate memory if needed.
* @xas: XArray operation state.
* @gfp: Memory allocation flags.
*
* Internal variant of xas_nomem().
*
* Return: true if memory was needed, and was successfully allocated.
*/
static bool __xas_nomem(struct xa_state *xas, gfp_t gfp)
__must_hold(xas->xa->xa_lock)
{
// 区分 spinlock 类型:常规、bh 还是 irq
unsigned int lock_type = xa_lock_type(xas->xa);
if (xas->xa_node != XA_ERROR(-ENOMEM)) {
xas_destroy(xas);
return false;
}
if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
gfp |= __GFP_ACCOUNT;
// 分配内存到 xas->xa_alloc,并重新持锁
// xas->xa_alloc 的内存将会在后续 xas_alloc() 中获取
// NOTE: 在 store 操作中,是 xas_store() => xas_create() => xas_alloc() 获取分配到的内存
if (gfpflags_allow_blocking(gfp)) {
xas_unlock_type(xas, lock_type);
xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
xas_lock_type(xas, lock_type);
} else {
xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
}
if (!xas->xa_alloc)
return false;
xas->xa_alloc->parent = NULL;
XA_NODE_BUG_ON(xas->xa_alloc, !list_empty(&xas->xa_alloc->private_list));
xas->xa_node = XAS_RESTART;
return true;
}
load 操作
如果简单了解过 store 操作要做的事情,那么 load 就相当简单。
xa load
/**
* xa_load() - Load an entry from an XArray.
* @xa: XArray.
* @index: index into array.
*
* Context: Any context. Takes and releases the RCU lock.
* Return: The entry at @index in @xa.
*/
void *xa_load(struct xarray *xa, unsigned long index)
{
XA_STATE(xas, xa, index);
void *entry;
rcu_read_lock();
// 通常走一遍 xas load 流程即可
do {
entry = xa_zero_to_null(xas_load(&xas));
// 使用 advanced API 的话,可能会返回内部结点
// 因此会通过判断 retry 结点来重启 xas 上下文,从头遍历
} while (xas_retry(&xas, entry));
rcu_read_unlock();
return entry;
}
xa load 门面流程保证不会改动内部的基数树结构,因此只使用 RCU read lock。
xas load
/**
* xas_load() - Load an entry from the XArray (advanced).
* @xas: XArray operation state.
*
* Usually walks the @xas to the appropriate state to load the entry
* stored at xa_index. However, it will do nothing and return %NULL if
* @xas is in an error state. xas_load() will never expand the tree.
*
* If the xa_state is set up to operate on a multi-index entry, xas_load()
* may return %NULL or an internal entry, even if there are entries
* present within the range specified by @xas.
*
* Context: Any context. The caller should hold the xa_lock or the RCU lock.
* Return: Usually an entry in the XArray, but see description for exceptions.
*/
void *xas_load(struct xa_state *xas)
{
void *entry = xas_start(xas);
while (xa_is_node(entry)) {
struct xa_node *node = xa_to_node(entry);
// 溢出,可能会返回内部结点,见注释
if (xas->xa_shift > node->shift)
break;
entry = xas_descend(xas, node);
// 叶子
if (node->shift == 0)
break;
}
return entry;
}
xas load 流程只是循环的 xas descend 流程。前面已经提到过,就是按照 shift 切割分层。
WIP
WORK IN PROGRESS!
References
XArray – kernel.org
XArray v9
XArray: One Data Structure To Rule Them All
zswap: replace RB tree with xarray – LWN.net
The XArray data structure – LWN.net
xarray – Kernel Exploring
xarray: Define struct xa_node
Maple Tree Work
mm: list_lru: replace linear array with xarray
xarray: Change definition of sibling entries
xarray: use kmem_cache_alloc_lru to allocate xa_node
page cache: Add and replace pages using the XArray