背景

Linux 内核使用 XArray(eXtensible Array)作为 Linux IO 栈 的页面缓存(page cache)容器实现。

本文会介绍 XArray 的具体实现,以及 XArray 在 page cache 层的应用。

WIP 警告

WORK IN PROGRESS

最近事情特别多,本来一两天写完的文章要拖两三周……(一天加一行注释太搞笑了)

读者看不明白这篇文章很正常,很多概念还没写清楚。先挂几天,实在更新不过来就删掉合适再重发。

基本介绍

仅从 外部 API 来看,XArray 可视为自动扩容的无限大的数组。

而从作者的表态来看,XArray 目标是作为一个「万能」的数据结构,除了替代明确的 IDR 或者 page cache 场景的基数树,还可以部分替代内核中的链表红黑树和哈希表,其定位类似于朱棣数组

NOTES:

  • 其实是朱迪数组(Judy Array)🙃
  • XArray 的场景适用于小数据集(比链表更省结点空间),或者是相对密集的大数据集(比哈希表更有局部性且无冲突)。
  • XArray 还满足了内核特定的需求,比如提供 RCU-free 特性、大页 page cache 性能优化和筛选 mask 的快速遍历等。
  • XArray 并不是通用类型的键值容器:键(index)肯定是 unsigned long 类型,值(entry)虽然在字面上是 void* 类型,但是有严格的要求,详见后续文章说明。
  • 作者表态归表态,实际替代哈希表存疑。内核中没有 commit 是这么做的……
  • XArray 事实上也算是新瓶旧酒。只看原理,它就是基数树。
  • 本文基于 Linux 内核版本 v6.15。

调试建议

XArray 支持用户态调试,可以在 Linux 源码根目录下执行:

cd ./tools/testing/radix-tree/
make
gdb ./xarray

总之随意使用调试器即可。

注意有部分宏定义可能不同于内核态版本,而且内存分配也有细节差异。

(调试的时候实在整无语了……当然改成内核态就更不好调了)

数据结构

anchor

/**
 * struct xarray - The anchor of the XArray.
 * @xa_lock: Lock that protects the contents of the XArray.
 *
 * To use the xarray, define it statically or embed it in your data structure.
 * It is a very small data structure, so it does not usually make sense to
 * allocate it separately and keep a pointer to it in your data structure.
 *
 * You may use the xa_lock to protect your own data structures as well.
 */
/*
 * If all of the entries in the array are NULL, @xa_head is a NULL pointer.
 * If the only non-NULL entry in the array is at index 0, @xa_head is that
 * entry.  If any other entry in the array is non-NULL, @xa_head points
 * to an @xa_node.
 */
struct xarray {
        spinlock_t      xa_lock;
/* private: The rest of the data structure is not to be used directly. */
        gfp_t           xa_flags;
        void __rcu *    xa_head;
};

struct xarray 是提供的用户外部的核心数据结构,只有三样东西:

  • xa_lock:保护并发安全的自旋锁。注意 XArray 还会结合使用 RCU 机制(见 load 操作)。
  • xa_flags:携带 GFP 标记(get free page),其他细节见 xarray.h 头文件
  • xa_head:指向 NULL、entry 或者 xa_node 对象。entry 可理解为键值对当中的值。

entry

/**
 * xa_mk_value() - Create an XArray entry from an integer.
 * @v: Value to store in XArray.
 *
 * Context: Any context.
 * Return: An entry suitable for storing in the XArray.
 */
static inline void *xa_mk_value(unsigned long v)
{
        WARN_ON((long)v < 0);
        return (void *)((v << 1) | 1);
}

/**
 * xa_to_value() - Get value stored in an XArray entry.
 * @entry: XArray entry.
 *
 * Context: Any context.
 * Return: The value stored in the XArray entry.
 */
static inline unsigned long xa_to_value(const void *entry)
{
        return (unsigned long)entry >> 1;
}

/**
 * xa_is_value() - Determine if an entry is a value.
 * @entry: XArray entry.
 *
 * Context: Any context.
 * Return: True if the entry is a value, false if it is a pointer.
 */
static inline bool xa_is_value(const void *entry)
{
        return (unsigned long)entry & 1;
}

补充一下 entry 的说明,虽然 entry 是 void* 类型,实质上用户在 entry 只可存放:

  • 特定指针。要求是经由内核分配的指针(比如 kmalloc())。或者任意四字节对齐过的指针。
  • 特定整数。要求是 [0, LONG_MAX) 范围内的整数,注意这是 long 的半侧。
// 见:https://elixir.bootlin.com/linux/v6.15/source/include/linux/xarray.h#L27
// 注意这些 internal entry 的数值是要经过转换的,比如 257 (Zero entry) 实际是 257<<2 | 0b10
/*
 * The bottom two bits of the entry determine how the XArray interprets
 * the contents:
 *
 * 00: Pointer entry
 * 10: Internal entry
 * x1: Value entry or tagged pointer
 *
 * Attempting to store internal entries in the XArray is a bug.
 *
 * Most internal entries are pointers to the next node in the tree.
 * The following internal entries have a special meaning:
 *
 * 0-62: Sibling entries
 * 256: Retry entry
 * 257: Zero entry
 *
 * Errors are also represented as internal entries, but use the negative
 * space (-4094 to -2).  They're never stored in the slots array; only
 * returned by the normal API.
 */

NOTES:

  • 除了用户 entry,XArray 还具有内部 entry:node、sibling、retry 和 zero。
  • 并且 index(键)与 entry 也不一定是一对一的关系,见 multi-index entry 说明。

node

/*
 * @count is the count of every non-NULL element in the ->slots array
 * whether that is a value entry, a retry entry, a user pointer,
 * a sibling entry or a pointer to the next level of the tree.
 * @nr_values is the count of every element in ->slots which is
 * either a value entry or a sibling of a value entry.
 */
struct xa_node {
        unsigned char   shift;          /* Bits remaining in each slot */
        unsigned char   offset;         /* Slot offset in parent */
        unsigned char   count;          /* Total entry count */
        unsigned char   nr_values;      /* Value entry count */
        struct xa_node __rcu *parent;   /* NULL at top of tree */
        struct xarray   *array;         /* The array we belong to */
        union {
                struct list_head private_list;  /* For tree user */
                struct rcu_head rcu_head;       /* Used when freeing node */
        };
        void __rcu      *slots[XA_CHUNK_SIZE];
        union {
                unsigned long   tags[XA_MAX_MARKS][XA_MARK_LONGS];
                unsigned long   marks[XA_MAX_MARKS][XA_MARK_LONGS];
        };
};

使用 crash> struct xa_node 解析宏:

  • XA_CHUNK_SIZE = 64
  • XA_MAX_MARKS = 3
  • XA_MARK_LONGS = 1

shiftoffsetparentslots 用于维护遍历寻址:

  • shift:对键的右移偏移量。当 shift == 0 时不会有指向下一层的结点(node entry)。
  • offset:自身在父结点中的 slots[] 下标。
  • parent:指向父结点。
  • slots[64]:存放 entry 的容器。所以第一层最多能容纳 64 个结点,第二层 4096,以此类推……
// 见:https://elixir.bootlin.com/linux/v6.15/source/include/linux/fs.h#L531

/* XArray tags, for tagging dirty and writeback pages in the pagecache. */
#define PAGECACHE_TAG_DIRTY     XA_MARK_0
#define PAGECACHE_TAG_WRITEBACK XA_MARK_1
#define PAGECACHE_TAG_TOWRITE   XA_MARK_2

marks 用于支持 xa_for_each_marked() 接口。具体到 page cache 的用途,像「只遍历脏页」的操作就可以轻松实现。

state

/*
 * The xa_state is opaque to its users.  It contains various different pieces
 * of state involved in the current operation on the XArray.  It should be
 * declared on the stack and passed between the various internal routines.
 * The various elements in it should not be accessed directly, but only
 * through the provided accessor functions.  The below documentation is for
 * the benefit of those working on the code, not for users of the XArray.
 *
 * @xa_node usually points to the xa_node containing the slot we're operating
 * on (and @xa_offset is the offset in the slots array).  If there is a
 * single entry in the array at index 0, there are no allocated xa_nodes to
 * point to, and so we store %NULL in @xa_node.  @xa_node is set to
 * the value %XAS_RESTART if the xa_state is not walked to the correct
 * position in the tree of nodes for this operation.  If an error occurs
 * during an operation, it is set to an %XAS_ERROR value.  If we run off the
 * end of the allocated nodes, it is set to %XAS_BOUNDS.
 */
struct xa_state {
        struct xarray *xa;
        unsigned long xa_index;
        unsigned char xa_shift;
        unsigned char xa_sibs;
        unsigned char xa_offset;
        unsigned char xa_pad;           /* Helps gcc generate better code */
        struct xa_node *xa_node;
        struct xa_node *xa_alloc;
        xa_update_node_t xa_update;
        struct list_lru *xa_lru;
};

XA state 可以视为一个贯穿 XArray 操作的状态机,用以保留(快速恢复)必要的上下文信息,不必让每个子操作都从根重新遍历处理。

/**
 * XA_STATE() - Declare an XArray operation state.
 * @name: Name of this operation state (usually xas).
 * @array: Array to operate on.
 * @index: Initial index of interest.
 *
 * Declare and initialise an xa_state on the stack.
 */
#define XA_STATE(name, array, index)                            \
        struct xa_state name = __XA_STATE(array, index, 0, 0)

#define __XA_STATE(array, index, shift, sibs)  {        \
        .xa = array,                                    \
        .xa_index = index,                              \
        .xa_shift = shift,                              \
        .xa_sibs = sibs,                                \
        .xa_offset = 0,                                 \
        .xa_pad = 0,                                    \
        .xa_node = XAS_RESTART,                         \
        .xa_alloc = NULL,                               \
        .xa_update = NULL,                              \
        .xa_lru = NULL,                                 \
}

在常规的 XArray 读写操作中,用户不需要手动声明 XA state 对象。

但是我们后面要看实现,所以得知道一些。XA state 在初始化时,其字段除了声明的 array、index 以及特殊标记的 xa_node(见注释)以外,均为 0 或者 NULL。注意 xa_state->xa_offsetxa_node->offset 含义不同,还是建议亲自看注释。

store 操作

接下来直接看源码。和过往文章一样,是按照深度优先的顺序展开代码。

我们从 store 操作的门面 xa_store() 开始。

xa store

/**
 * xa_store() - Store this entry in the XArray.
 * @xa: XArray.
 * @index: Index into array.
 * @entry: New entry.
 * @gfp: Memory allocation flags.
 *
 * After this function returns, loads from this index will return @entry.
 * Storing into an existing multi-index entry updates the entry of every index.
 * The marks associated with @index are unaffected unless @entry is %NULL.
 *
 * Context: Any context.  Takes and releases the xa_lock.
 * May sleep if the @gfp flags permit.
 * Return: The old entry at this index on success, xa_err(-EINVAL) if @entry
 * cannot be stored in an XArray, or xa_err(-ENOMEM) if memory allocation
 * failed.
 */
void *xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
        void *curr;

        // 获取自旋锁
        xa_lock(xa);
        // 在内部可能多次释放-获取锁
        curr = __xa_store(xa, index, entry, gfp);
        // 释放自旋锁
        xa_unlock(xa);

        return curr;
}

/**
 * __xa_store() - Store this entry in the XArray.
 * @xa: XArray.
 * @index: Index into array.
 * @entry: New entry.
 * @gfp: Memory allocation flags.
 *
 * You must already be holding the xa_lock when calling this function.
 * It will drop the lock if needed to allocate memory, and then reacquire
 * it afterwards.
 *
 * Context: Any context.  Expects xa_lock to be held on entry.  May
 * release and reacquire xa_lock if @gfp flags permit.
 * Return: The old entry at this index or xa_err() if an error happened.
 */
void *__xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
        XA_STATE(xas, xa, index);
        void *curr;

        // 是否误用 advanced API
        if (WARN_ON_ONCE(xa_is_advanced(entry)))
                return XA_ERROR(-EINVAL);
        // 是否改用 zero entry
        // 空 entry 在 alloc 特性下转换为 zero entry
        if (xa_track_free(xa) && !entry)
                entry = XA_ZERO_ENTRY;

        do {
                // 此时 index 和 xa 信息由 xas 上下文维护而无需传递参数
                curr = xas_store(&xas, entry);
                if (xa_track_free(xa))
                        xas_clear_mark(&xas, XA_FREE_MARK);
        // xas nomem 流程:用于处理上下文中出现 ENOMEM 的情况
        // 会释放锁,调用 kmem cache 接口分配内存到 xas->xa_alloc,再重新获取锁
        //
        // 非 ENOMEM 情况会只返回 false(顺便销毁 xas 上下文,表示这是最后一步)
        } while (__xas_nomem(&xas, gfp));

        return xas_result(&xas, xa_zero_to_null(curr));
}

store 操作的门面流程很简单:

  • 持锁。
  • 创建 xas 上下文。
  • 在 xas 上下文处理核心的 xas store 流程。
  • 销毁 xas 上下文。
  • 解锁并返回结果。

xas store

/**
 * xas_store() - Store this entry in the XArray.
 * @xas: XArray operation state.
 * @entry: New entry.
 *
 * If @xas is operating on a multi-index entry, the entry returned by this
 * function is essentially meaningless (it may be an internal entry or it
 * may be %NULL, even if there are non-NULL entries at some of the indices
 * covered by the range).  This is not a problem for any current users,
 * and can be changed if needed.
 *
 * Return: The old entry at this index.
 */
void *xas_store(struct xa_state *xas, void *entry)
{
        struct xa_node *node;
        void __rcu **slot = &xas->xa->xa_head;
        unsigned int offset, max;
        int count = 0;
        int values = 0;
        // 两个迭代用的 entry 指针
        void *first, *next;
        bool value = xa_is_value(entry);

        // 确定 first

        if (entry) {
                // 表示是否能直接存放在根结点中
                bool allow_root = !xa_is_node(entry) && !xa_is_zero(entry);
                // xas create 流程,
                // - 签名已有的注释:
                //   如果 slot 已存在,则返回对应的 entry
                //   如果 slot 新分配,则返回 NULL
                // - 补充:
                //   如果首次调用 index == 0,不会引起分配且返回 NULL
                first = xas_create(xas, allow_root);
        } else {
                first = xas_load(xas);
        }

        // 需要重试或者存在错误
        if (xas_invalid(xas))
                // 返回旧的 entry
                return first;
        // 这是 xas create 流程提供的 xa_node 结点
        node = xas->xa_node;
        // xa_sibs 为 multi-order entry 特性相关,暂时省略
        if (node && (xas->xa_shift < node->shift))
                xas->xa_sibs = 0;
        // 内容重复,不需处理
        if ((first == entry) && !xas->xa_sibs)
                return first;

        // 后续 for 迭代过程只看 next(slot++)
        next = first;
        // 用于寻址 slots[offset]
        // 在 xas create (-> xas descend) 流程时会更新该字段
        offset = xas->xa_offset;
        max = xas->xa_offset + xas->xa_sibs;
        if (node) {
                slot = &node->slots[offset];
                if (xas->xa_sibs)
                        xas_squash_marks(xas);
        }
        // 指针 NULL 会进入该分支
        // 数值 0 不会进入
        if (!entry)
                // 初始化三个 mark,见下方注释
                xas_init_marks(xas);

        for (;;) {
                /*
                 * Must clear the marks before setting the entry to NULL,
                 * otherwise xas_for_each_marked may find a NULL entry and
                 * stop early.  rcu_assign_pointer contains a release barrier
                 * so the mark clearing will appear to happen before the
                 * entry is set to NULL.
                 */
                // store 核心操作
                // slot 在此前有两条选择:
                // - 如果存在 node,则优先选择该结点(xas create 流程提供,存放在 xas->xa_node)
                // - 如果没有 node,则选择 xas->xa->xa_head
                //  (xas create 流程会预先抹除原有的 xas->xa_node == XAS_RESTART 标记)
                rcu_assign_pointer(*slot, entry);
                if (xa_is_node(next) && (!node || node->shift))
                        xas_free_nodes(xas, xa_to_node(next));
                if (!node)
                        break;
                count += !next - !entry;
                values += !xa_is_value(first) - !value;
                if (entry) {
                        // 常规 entry 在这里就会 break
                        if (offset == max)
                                break;
                        if (!xa_is_sibling(entry))
                                entry = xa_mk_sibling(xas->xa_offset);
                } else {
                        if (offset == XA_CHUNK_MASK)
                                break;
                }
                next = xa_entry_locked(xas->xa, node, ++offset);
                if (!xa_is_sibling(next)) {
                        if (!entry && (offset > max))
                                break;
                        first = next;
                }
                slot++;
        }

        update_node(xas, node, count, values);
        return first;
}

xas store 流程基本隐藏了具体的分配过程,本体只需完成 *slot = entry 的关键存储操作。在 xas store 具体赋值 slot 前,XArray 甚至可以是一棵完全空内存(无动态内存分配)的树,需要靠 xas create 流程创建具体的 slot 内存空间。

xas create

/*
 * xas_create() - Create a slot to store an entry in.
 * @xas: XArray operation state.
 * @allow_root: %true if we can store the entry in the root directly
 *
 * Most users will not need to call this function directly, as it is called
 * by xas_store().  It is useful for doing conditional store operations
 * (see the xa_cmpxchg() implementation for an example).
 *
 * Return: If the slot already existed, returns the contents of this slot.
 * If the slot was newly created, returns %NULL.  If it failed to create the
 * slot, returns %NULL and indicates the error in @xas.
 */
static void *xas_create(struct xa_state *xas, bool allow_root)
{
        struct xarray *xa = xas->xa;
        void *entry;
        void __rcu **slot;
        struct xa_node *node = xas->xa_node;
        int shift;
        unsigned int order = xas->xa_shift;

        // 有多条路径区分场景,总之都是为了确定 shift、entry 和 slot 变量

        // 表示当前 xa_node 是 XAS_BOUNDS 或者 XAS_RESTART(初始值)
        // 这两个标记见 xas 注释说明
        if (xas_top(node)) { 
                // 无锁获取 xa->xa_head,尝试以 head 作为单独的 entry 容器(从而免去真正的 slot)
                entry = xa_head_locked(xa);
                // xa_node 从 XAS_RESTART(初始值)改动为 NULL
                xas->xa_node = NULL;
                // 是否 xa flags 携带 XA_FLAGS_ZERO_BUSY
                if (!entry && xa_zero_busy(xa))
                        entry = XA_ZERO_ENTRY;
                // xas expand 流程:提供 node 结点
                // 表示不断添加结点直至满足 xas->xa_index 需求
                // 该流程在首次调用(非 multi-index)时,由于 xa_head 为空,必直接返回 shift
                shift = xas_expand(xas, entry);
                if (shift < 0)
                        return NULL;
                if (!shift && !allow_root)
                        shift = XA_CHUNK_SHIFT;
                // 如果是首次调用,其实整个流程会直接退出,对应函数末尾的 return entry
                entry = xa_head_locked(xa);
                // slot 初步定位到 xa_head,后续将在该函数内为最终定位的 slot 赋值 entry
                slot = &xa->xa_head;
        } else if (xas_error(xas)) {
                return NULL;
        // xas 上下文快速恢复,不用重新计算了
        } else if (node) {
                unsigned int offset = xas->xa_offset;

                shift = node->shift;
                entry = xa_entry_locked(xa, node, offset);
                slot = &node->slots[offset];
        } else {
                shift = 0;
                entry = xa_head_locked(xa);
                slot = &xa->xa_head;
        }

        while (shift > order) {
                shift -= XA_CHUNK_SHIFT;
                if (!entry) {
                        // xas alloc 流程:
                        // 分配一个 struct xa_node 结点,或者从已缓存的 xas->xa_alloc 字段获取并清空
                        // 此时该分配的结点:
                        // - parent 对应于 xas->xa_node
                        // - offset 对应于 xas->xa_offset(如果存在 parent)
                        // - shift 对应于 shift 参数
                        node = xas_alloc(xas, shift);
                        // 可能会存在 ENOMEM 的情况,具体错误记录于 xas 当中
                        // 这种情况会在上面 xa store 门面的 nomem 流程处理掉并(重试)恢复现场
                        if (!node)
                                break;
                        if (xa_track_free(xa))
                                node_mark_all(node, XA_FREE_MARK);
                        // 初始化某个 slot,指向分配好的 node
                        rcu_assign_pointer(*slot, xa_mk_node(node));
                } else if (xa_is_node(entry)) {
                        node = xa_to_node(entry);
                // 命中用户 entry
                } else {
                        break;
                }
                // xas descend 流程:
                // - 向下遍历(一层)
                // - 根据 node 更新 xas->xa_node 和 xas->xa_offset
                // - 同时寻址对应的 entry
                // 该函数也用于 xa load 流程,不会修改树的结构
                entry = xas_descend(xas, node);
                slot = &node->slots[xas->xa_offset];
        }

        return entry;
}

xas create 流程本身是为了开辟 slot 所需的内存空间,但是需要注意 XArray 本身的层级是需要 xas exapnd 流程来提供基底。

xas expand

/*
 * xas_expand adds nodes to the head of the tree until it has reached
 * sufficient height to be able to contain @xas->xa_index
 */
static int xas_expand(struct xa_state *xas, void *head)
{
        struct xarray *xa = xas->xa;
        struct xa_node *node = NULL;
        unsigned int shift = 0;
        // 非 CONFIG_XARRAY_MULTI 的情况下,max 等同于 xa_index
        unsigned long max = xas_max(xas);

        // 尚未真正构建过结点
        // 进入该路径,无论如何都会立刻返回
        //(因为没有寻址能力)
        if (!head) {
                // 小对象优化:
                // 对于 index == 0 && !CONFIG_XARRAY_MULTI 的情况
                if (max == 0)
                        // 会直接返回 shift = 0
                        // 后续分配单个用户 entry 无需动态内存分配
                        return 0;
                // 而其他情况,无论如何都会导致返回的 shift 大于等于 XA_CHUNK_SHIFT
                // 从而引起动态内存分配
                while ((max >> shift) >= XA_CHUNK_SIZE)
                        shift += XA_CHUNK_SHIFT;
                return shift + XA_CHUNK_SHIFT;
        } else if (xa_is_node(head)) {
                node = xa_to_node(head);
                shift = node->shift + XA_CHUNK_SHIFT;
        }
        xas->xa_node = NULL;

        // 就是注释说的,处理这个 head 能寻址的最大范围
        while (max > max_index(head)) {
                xa_mark_t mark = 0;

                XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
                // xas alloc 流程前面提到了
                // 这里 node 将会在后续成为新的根结点
                node = xas_alloc(xas, shift);
                if (!node)
                        return -ENOMEM;

                node->count = 1;
                if (xa_is_value(head))
                        node->nr_values = 1;
                // 此前的小对象优化潜在 entry,需要搬移过来
                RCU_INIT_POINTER(node->slots[0], head);

                /* Propagate the aggregated mark info to the new child */
                // 这些都是 mark 特性相关的
                for (;;) {
                        if (xa_track_free(xa) && mark == XA_FREE_MARK) {
                                node_mark_all(node, XA_FREE_MARK);
                                if (!xa_marked(xa, XA_FREE_MARK)) {
                                        node_clear_mark(node, 0, XA_FREE_MARK);
                                        xa_mark_set(xa, XA_FREE_MARK);
                                }
                        } else if (xa_marked(xa, mark)) {
                                node_set_mark(node, 0, mark);
                        }
                        if (mark == XA_MARK_MAX)
                                break;
                        mark_inc(mark);
                }

                /*
                 * Now that the new node is fully initialised, we can add
                 * it to the tree
                 */
                // node 结点完成初始化
                if (xa_is_node(head)) {
                        // 此前已经被指向 slots[0]
                        xa_to_node(head)->offset = 0;
                        // 原先的根节点「下降」一层
                        rcu_assign_pointer(xa_to_node(head)->parent, node);
                }
                // 分配的 node 成为新的 xa_head(根节点)
                // 并且将用于下一次循环
                head = xa_mk_node(node);
                // 此时 xa_head 不再是 NULL
                rcu_assign_pointer(xa->xa_head, head);
                // 这个只是给内核用户响应 xas->xa_update 回调的,略
                xas_update(xas, node);

                // 处理下一层
                shift += XA_CHUNK_SHIFT;
        }

        // 记录 node 到 xas 上下文
        xas->xa_node = node;
        return shift;
}

xas expand 流程就是看 xas->xa_index 的寻址能力通过 xas alloc 流程按需分配出 XArray 基本层级,并不会完全展开。而在前面 xas create 流程会再次 xas alloc 来分配内存到具体位置。

xas descend

static __always_inline void *xas_descend(struct xa_state *xas,
                                        struct xa_node *node)
{
        // 返回 (index >> node->shift) & XA_CHUNK_MASK
        unsigned int offset = get_offset(xas->xa_index, node);
        // 既然知道 offset 了,那就定位 entry = slots[offset]
        void *entry = xa_entry(xas->xa, node, offset);

        xas->xa_node = node;
        while (xa_is_sibling(entry)) {
                offset = xa_to_sibling(entry);
                entry = xa_entry(xas->xa, node, offset);
                if (node->shift && xa_is_node(entry))
                        entry = XA_RETRY_ENTRY;
        }

        xas->xa_offset = offset;
        return entry;
}

xas descend 流程非常简单,按照 shift 切割的方式走一层。如果是某种形式的 while() xas_descend,那基本等同于 load 操作。

xas alloc

static void *xas_alloc(struct xa_state *xas, unsigned int shift)
{
        struct xa_node *parent = xas->xa_node;
        struct xa_node *node = xas->xa_alloc;

        if (xas_invalid(xas))
                return NULL;

        // 如果 xa_alloc 已经缓存好一个结点,直接拿走
        if (node) {
                xas->xa_alloc = NULL;
        // 否则向 kmem cache 要,这里就不是 XArray 的范围了
        } else {
                // 注意整个流程下来我们是持锁的
                // 为了避免睡下去,这里 gfp 并没有使用/继承用户提供的 GFP 标记
                gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;

                if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
                        gfp |= __GFP_ACCOUNT;

                node = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
                // 失败就标记 ENOMEM,然后在合适的时机会有 xas nomem 流程挽救
                if (!node) {
                        xas_set_err(xas, -ENOMEM);
                        return NULL;
                }
        }

        if (parent) {
                node->offset = xas->xa_offset;
                parent->count++;
                XA_NODE_BUG_ON(node, parent->count > XA_CHUNK_SIZE);
                xas_update(xas, parent);
        }
        XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
        XA_NODE_BUG_ON(node, !list_empty(&node->private_list));
        node->shift = shift;
        node->count = 0;
        node->nr_values = 0;
        RCU_INIT_POINTER(node->parent, xas->xa_node);
        node->array = xas->xa;

        return node;
}

xas alloc 流程只是一个简单 kmem cache 分配封装。注意整个流程下来我们是持锁的,为了避免睡下去,这里 gfp 并没有使用/继承用户提供的 GFP 标记。分配成功不一定意味着当前要动态内存分配(可能此前就已经做好了),分配失败也不一定意味着流程结束(合适时机有 xas nomem 流程挽救,并且 xas 自动机提供快速恢复路径)。

xas nomem

/*
 * __xas_nomem() - Drop locks and allocate memory if needed.
 * @xas: XArray operation state.
 * @gfp: Memory allocation flags.
 *
 * Internal variant of xas_nomem().
 *
 * Return: true if memory was needed, and was successfully allocated.
 */
static bool __xas_nomem(struct xa_state *xas, gfp_t gfp)
        __must_hold(xas->xa->xa_lock)
{
        // 区分 spinlock 类型:常规、bh 还是 irq
        unsigned int lock_type = xa_lock_type(xas->xa);

        if (xas->xa_node != XA_ERROR(-ENOMEM)) {
                xas_destroy(xas);
                return false;
        }
        if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
                gfp |= __GFP_ACCOUNT;
        // 分配内存到 xas->xa_alloc,并重新持锁
        // xas->xa_alloc 的内存将会在后续 xas_alloc() 中获取
        // NOTE: 在 store 操作中,是 xas_store() => xas_create() => xas_alloc() 获取分配到的内存
        if (gfpflags_allow_blocking(gfp)) {
                xas_unlock_type(xas, lock_type);
                xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
                xas_lock_type(xas, lock_type);
        } else {
                xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
        }
        if (!xas->xa_alloc)
                return false;
        xas->xa_alloc->parent = NULL;
        XA_NODE_BUG_ON(xas->xa_alloc, !list_empty(&xas->xa_alloc->private_list));
        xas->xa_node = XAS_RESTART;
        return true;
}

load 操作

如果简单了解过 store 操作要做的事情,那么 load 就相当简单。

xa load

/**
 * xa_load() - Load an entry from an XArray.
 * @xa: XArray.
 * @index: index into array.
 *
 * Context: Any context.  Takes and releases the RCU lock.
 * Return: The entry at @index in @xa.
 */
void *xa_load(struct xarray *xa, unsigned long index)
{
        XA_STATE(xas, xa, index);
        void *entry;

        rcu_read_lock();
        // 通常走一遍 xas load 流程即可
        do {
                entry = xa_zero_to_null(xas_load(&xas));
        // 使用 advanced API 的话,可能会返回内部结点
        // 因此会通过判断 retry 结点来重启 xas 上下文,从头遍历
        } while (xas_retry(&xas, entry));
        rcu_read_unlock();

        return entry;
}

xa load 门面流程保证不会改动内部的基数树结构,因此只使用 RCU read lock。

xas load

/**
 * xas_load() - Load an entry from the XArray (advanced).
 * @xas: XArray operation state.
 *
 * Usually walks the @xas to the appropriate state to load the entry
 * stored at xa_index.  However, it will do nothing and return %NULL if
 * @xas is in an error state.  xas_load() will never expand the tree.
 *
 * If the xa_state is set up to operate on a multi-index entry, xas_load()
 * may return %NULL or an internal entry, even if there are entries
 * present within the range specified by @xas.
 *
 * Context: Any context.  The caller should hold the xa_lock or the RCU lock.
 * Return: Usually an entry in the XArray, but see description for exceptions.
 */
void *xas_load(struct xa_state *xas)
{
        void *entry = xas_start(xas);

        while (xa_is_node(entry)) {
                struct xa_node *node = xa_to_node(entry);

                // 溢出,可能会返回内部结点,见注释
                if (xas->xa_shift > node->shift)
                        break;
                entry = xas_descend(xas, node);
                // 叶子
                if (node->shift == 0)
                        break;
        }
        return entry;
}

xas load 流程只是循环的 xas descend 流程。前面已经提到过,就是按照 shift 切割分层。

WIP

WORK IN PROGRESS!

References

XArray – kernel.org
XArray v9
XArray: One Data Structure To Rule Them All
zswap: replace RB tree with xarray – LWN.net
The XArray data structure – LWN.net
xarray – Kernel Exploring
xarray: Define struct xa_node
Maple Tree Work
mm: list_lru: replace linear array with xarray
xarray: Change definition of sibling entries
xarray: use kmem_cache_alloc_lru to allocate xa_node
page cache: Add and replace pages using the XArray