背景

Linux 内核使用 XArray(eXtensible Array)作为 Linux IO 栈 的页面缓存(page cache)容器实现。

本文会介绍 XArray 的具体实现,以及 XArray 在 page cache 层的应用。

基本介绍

仅从 外部 API 来看,XArray 可视为自动扩容的无限大的数组。

而从作者的表态来看,XArray 目标是作为一个「万能」的数据结构,除了替代明确的 IDR 或者 page cache 场景的基数树,还可以部分替代内核中的链表红黑树和哈希表,其定位类似于朱棣数组

NOTES:

  • 其实是朱迪数组(Judy Array)🙃
  • XArray 的场景适用于小数据集(比链表更省结点空间),或者是相对密集的大数据集(比哈希表更有局部性且无冲突)。
  • XArray 还满足了内核特定的需求,比如提供 RCU-free 特性、大页 page cache 性能优化和筛选 mask 的快速遍历等。
  • XArray 并不是通用类型的键值容器:键(index)肯定是 unsigned long 类型,值(entry)虽然在字面上是 void* 类型,但是有严格的要求,详见后续文章说明。
  • 作者表态归表态,实际替代哈希表存疑。内核中没有 commit 是这么做的……
  • XArray 事实上也算是新瓶旧酒。只看原理,它就是基数树。
  • 本文基于 Linux 内核版本 v6.15。

调试建议

XArray 支持用户态调试,可以在 Linux 源码根目录下执行:

cd ./tools/testing/radix-tree/
make
gdb ./xarray

总之随意使用调试器即可。

注意有部分宏定义可能不同于内核态版本,而且内存分配也有细节差异。

(调试的时候实在整无语了……当然改成内核态就更不好调了)

数据结构

anchor

/**
 * struct xarray - The anchor of the XArray.
 * @xa_lock: Lock that protects the contents of the XArray.
 *
 * To use the xarray, define it statically or embed it in your data structure.
 * It is a very small data structure, so it does not usually make sense to
 * allocate it separately and keep a pointer to it in your data structure.
 *
 * You may use the xa_lock to protect your own data structures as well.
 */
/*
 * If all of the entries in the array are NULL, @xa_head is a NULL pointer.
 * If the only non-NULL entry in the array is at index 0, @xa_head is that
 * entry.  If any other entry in the array is non-NULL, @xa_head points
 * to an @xa_node.
 */
struct xarray {
        spinlock_t      xa_lock;
/* private: The rest of the data structure is not to be used directly. */
        gfp_t           xa_flags;
        void __rcu *    xa_head;
};

struct xarray 是提供的用户外部的核心数据结构,只有三样东西:

  • xa_lock:写操作的自旋锁。注意 XArray 读操作使用 RCU 机制,因此并发读写不会互斥。
  • xa_flags:携带 GFP 标记(get free page),其他细节见 xarray.h 头文件
  • xa_head:指向 NULL、entry 或者 xa_node 对象。entry 可理解为键值对当中的值。

entry

/**
 * xa_mk_value() - Create an XArray entry from an integer.
 * @v: Value to store in XArray.
 *
 * Context: Any context.
 * Return: An entry suitable for storing in the XArray.
 */
static inline void *xa_mk_value(unsigned long v)
{
        WARN_ON((long)v < 0);
        return (void *)((v << 1) | 1);
}

/**
 * xa_to_value() - Get value stored in an XArray entry.
 * @entry: XArray entry.
 *
 * Context: Any context.
 * Return: The value stored in the XArray entry.
 */
static inline unsigned long xa_to_value(const void *entry)
{
        return (unsigned long)entry >> 1;
}

/**
 * xa_is_value() - Determine if an entry is a value.
 * @entry: XArray entry.
 *
 * Context: Any context.
 * Return: True if the entry is a value, false if it is a pointer.
 */
static inline bool xa_is_value(const void *entry)
{
        return (unsigned long)entry & 1;
}

补充一下 entry 的说明,虽然 entry 是 void* 类型,实质上用户在 entry 只可存放:

  • 特定指针。要求是经由内核分配的指针(比如 kmalloc())。或者任意四字节对齐过的指针。
  • 特定整数。要求是 [0, LONG_MAX) 范围内的整数,注意这是 long 的半侧。
// 见:https://elixir.bootlin.com/linux/v6.15/source/include/linux/xarray.h#L27
// 注意这些 internal entry 的数值是要经过转换的,比如 257 (Zero entry) 实际是 257<<2 | 0b10
/*
 * The bottom two bits of the entry determine how the XArray interprets
 * the contents:
 *
 * 00: Pointer entry
 * 10: Internal entry
 * x1: Value entry or tagged pointer
 *
 * Attempting to store internal entries in the XArray is a bug.
 *
 * Most internal entries are pointers to the next node in the tree.
 * The following internal entries have a special meaning:
 *
 * 0-62: Sibling entries
 * 256: Retry entry
 * 257: Zero entry
 *
 * Errors are also represented as internal entries, but use the negative
 * space (-4094 to -2).  They're never stored in the slots array; only
 * returned by the normal API.
 */

NOTES:

  • 除了用户 entry,XArray 还具有内部 entry:node、sibling、retry 和 zero。
  • 并且 index(键)与 entry 也不一定是一对一的关系,见 multi-index entry 说明。

node

/*
 * @count is the count of every non-NULL element in the ->slots array
 * whether that is a value entry, a retry entry, a user pointer,
 * a sibling entry or a pointer to the next level of the tree.
 * @nr_values is the count of every element in ->slots which is
 * either a value entry or a sibling of a value entry.
 */
struct xa_node {
        unsigned char   shift;          /* Bits remaining in each slot */
        unsigned char   offset;         /* Slot offset in parent */
        unsigned char   count;          /* Total entry count */
        unsigned char   nr_values;      /* Value entry count */
        struct xa_node __rcu *parent;   /* NULL at top of tree */
        struct xarray   *array;         /* The array we belong to */
        union {
                struct list_head private_list;  /* For tree user */
                struct rcu_head rcu_head;       /* Used when freeing node */
        };
        void __rcu      *slots[XA_CHUNK_SIZE];
        union {
                unsigned long   tags[XA_MAX_MARKS][XA_MARK_LONGS];
                unsigned long   marks[XA_MAX_MARKS][XA_MARK_LONGS];
        };
};

使用 crash> struct xa_node 解析宏:

  • XA_CHUNK_SIZE = 64
  • XA_MAX_MARKS = 3
  • XA_MARK_LONGS = 1

shiftoffsetparentslots 用于维护遍历寻址:

  • shift:对键的右移偏移量。当 shift == 0 时不会有指向下一层的结点(node entry)。
  • offset:自身在父结点中的 slots[] 下标。
  • parent:指向父结点。
  • slots[64]:存放 entry 的容器。所以第一层最多能容纳 64 个结点,第二层 4096,以此类推……
// 见:https://elixir.bootlin.com/linux/v6.15/source/include/linux/fs.h#L531

/* XArray tags, for tagging dirty and writeback pages in the pagecache. */
#define PAGECACHE_TAG_DIRTY     XA_MARK_0
#define PAGECACHE_TAG_WRITEBACK XA_MARK_1
#define PAGECACHE_TAG_TOWRITE   XA_MARK_2

marks 用于支持 xa[s]_for_each_marked() 接口。具体到 page cache 的用途,像「只遍历脏页」的操作就可以轻松实现。

state

/*
 * The xa_state is opaque to its users.  It contains various different pieces
 * of state involved in the current operation on the XArray.  It should be
 * declared on the stack and passed between the various internal routines.
 * The various elements in it should not be accessed directly, but only
 * through the provided accessor functions.  The below documentation is for
 * the benefit of those working on the code, not for users of the XArray.
 *
 * @xa_node usually points to the xa_node containing the slot we're operating
 * on (and @xa_offset is the offset in the slots array).  If there is a
 * single entry in the array at index 0, there are no allocated xa_nodes to
 * point to, and so we store %NULL in @xa_node.  @xa_node is set to
 * the value %XAS_RESTART if the xa_state is not walked to the correct
 * position in the tree of nodes for this operation.  If an error occurs
 * during an operation, it is set to an %XAS_ERROR value.  If we run off the
 * end of the allocated nodes, it is set to %XAS_BOUNDS.
 */
struct xa_state {
        struct xarray *xa;
        unsigned long xa_index;
        unsigned char xa_shift;
        unsigned char xa_sibs;
        unsigned char xa_offset;
        unsigned char xa_pad;           /* Helps gcc generate better code */
        struct xa_node *xa_node;
        struct xa_node *xa_alloc;
        xa_update_node_t xa_update;
        struct list_lru *xa_lru;
};

XA state 可以视为一个贯穿 XArray 操作的状态机,用以保留(快速恢复)必要的上下文信息,不必让每个子操作都从根重新遍历处理。

/**
 * XA_STATE() - Declare an XArray operation state.
 * @name: Name of this operation state (usually xas).
 * @array: Array to operate on.
 * @index: Initial index of interest.
 *
 * Declare and initialise an xa_state on the stack.
 */
#define XA_STATE(name, array, index)                            \
        struct xa_state name = __XA_STATE(array, index, 0, 0)

#define __XA_STATE(array, index, shift, sibs)  {        \
        .xa = array,                                    \
        .xa_index = index,                              \
        .xa_shift = shift,                              \
        .xa_sibs = sibs,                                \
        .xa_offset = 0,                                 \
        .xa_pad = 0,                                    \
        .xa_node = XAS_RESTART,                         \
        .xa_alloc = NULL,                               \
        .xa_update = NULL,                              \
        .xa_lru = NULL,                                 \
}

在常规的 XArray 读写操作中,用户不需要手动声明 XA state 对象。

但是我们后面要看实现,所以得知道一些。XA state 在初始化时,其字段除了声明的 array、index 以及特殊标记的 xa_node(见注释)以外,均为 0 或者 NULL。注意 xa_state->xa_offsetxa_node->offset 含义不同,还是建议亲自看注释。

NOTE: xa_shiftxa_sibs 只在 multi-index entry 特性中使用。

store 操作

接下来直接看源码。和过往文章一样,是按照深度优先的顺序展开代码。

我们从 store 操作的门面 xa_store() 开始。

xa store

/**
 * xa_store() - Store this entry in the XArray.
 * @xa: XArray.
 * @index: Index into array.
 * @entry: New entry.
 * @gfp: Memory allocation flags.
 *
 * After this function returns, loads from this index will return @entry.
 * Storing into an existing multi-index entry updates the entry of every index.
 * The marks associated with @index are unaffected unless @entry is %NULL.
 *
 * Context: Any context.  Takes and releases the xa_lock.
 * May sleep if the @gfp flags permit.
 * Return: The old entry at this index on success, xa_err(-EINVAL) if @entry
 * cannot be stored in an XArray, or xa_err(-ENOMEM) if memory allocation
 * failed.
 */
void *xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
        void *curr;

        // 获取自旋锁
        xa_lock(xa);
        // 在内部可能多次释放-获取锁
        curr = __xa_store(xa, index, entry, gfp);
        // 释放自旋锁
        xa_unlock(xa);

        return curr;
}

/**
 * __xa_store() - Store this entry in the XArray.
 * @xa: XArray.
 * @index: Index into array.
 * @entry: New entry.
 * @gfp: Memory allocation flags.
 *
 * You must already be holding the xa_lock when calling this function.
 * It will drop the lock if needed to allocate memory, and then reacquire
 * it afterwards.
 *
 * Context: Any context.  Expects xa_lock to be held on entry.  May
 * release and reacquire xa_lock if @gfp flags permit.
 * Return: The old entry at this index or xa_err() if an error happened.
 */
void *__xa_store(struct xarray *xa, unsigned long index, void *entry, gfp_t gfp)
{
        XA_STATE(xas, xa, index);
        void *curr;

        // 是否误用 advanced API
        if (WARN_ON_ONCE(xa_is_advanced(entry)))
                return XA_ERROR(-EINVAL);
        // 是否改用 zero entry
        // 空 entry 在 alloc 特性下转换为 zero entry,避免 NULL 歧义
        if (xa_track_free(xa) && !entry)
                entry = XA_ZERO_ENTRY;

        do {
                // 此时 index 和 xa 信息由 xas 上下文维护而无需传递参数
                curr = xas_store(&xas, entry);
                if (xa_track_free(xa))
                        xas_clear_mark(&xas, XA_FREE_MARK);
        // xas nomem 流程:用于处理上下文中出现 ENOMEM 的情况
        // 会释放锁,调用 kmem cache 接口分配内存到 xas->xa_alloc,再重新获取锁
        //
        // 非 ENOMEM 情况会只返回 false(顺便销毁 xas 上下文,表示这是最后一步)
        } while (__xas_nomem(&xas, gfp));

        return xas_result(&xas, xa_zero_to_null(curr));
}

store 操作的门面流程很简单:

  • 持锁。
  • 创建 xas 上下文。
  • 在 xas 上下文处理核心的 xas store 流程。
  • 销毁 xas 上下文。
  • 解锁并返回结果。

xas store

/**
 * xas_store() - Store this entry in the XArray.
 * @xas: XArray operation state.
 * @entry: New entry.
 *
 * If @xas is operating on a multi-index entry, the entry returned by this
 * function is essentially meaningless (it may be an internal entry or it
 * may be %NULL, even if there are non-NULL entries at some of the indices
 * covered by the range).  This is not a problem for any current users,
 * and can be changed if needed.
 *
 * Return: The old entry at this index.
 */
void *xas_store(struct xa_state *xas, void *entry)
{
        struct xa_node *node;
        void __rcu **slot = &xas->xa->xa_head;
        unsigned int offset, max;
        int count = 0;
        int values = 0;
        // 两个迭代用的 entry 指针
        void *first, *next;
        // 仅用于统计
        bool value = xa_is_value(entry);

        // 确定 first

        // 一般情况
        if (entry) {
                // 表示是否能直接存放在根结点中
                bool allow_root = !xa_is_node(entry) && !xa_is_zero(entry);
                // xas create 流程,
                // - 签名已有的注释:
                //   如果 slot 已存在,则返回对应的 entry
                //   如果 slot 新分配,则返回 NULL
                // - 补充:
                //   如果首次调用 index == 0,不会引起分配且返回 NULL
                first = xas_create(xas, allow_root);
        // 应该仅有存入 NULL 的场景走该路径
        } else {
                // 肯定不会引起分配,见后续 load 操作
                first = xas_load(xas);
        }

        // 需要重试或者存在错误
        if (xas_invalid(xas))
                // 返回旧的 entry
                return first;
        // 这是 xas create 流程提供的 xa_node 结点
        node = xas->xa_node;
        // xa_sibs 为 multi-index entry 特性相关
        // 这里指的是 node 管辖的范围足够大以至于不需要 sibs(TODO:具体场景?)
        // 感兴趣可以先看 xas_set_order() 函数
        if (node && (xas->xa_shift < node->shift))
                xas->xa_sibs = 0;
        // 内容重复,不需处理
        if ((first == entry) && !xas->xa_sibs)
                return first;

        // 后续 for 迭代过程只看 next(slot++)
        next = first;
        // 用于寻址 slots[offset]
        // 在 xas create (-> xas descend) 流程时会更新该字段
        offset = xas->xa_offset;
        max = xas->xa_offset + xas->xa_sibs;
        if (node) {
                slot = &node->slots[offset];
                if (xas->xa_sibs)
                        xas_squash_marks(xas);
        }
        // 指针 NULL 会进入该分支
        // 数值 0 不会进入
        if (!entry)
                // 初始化三个 mark,见下方注释
                xas_init_marks(xas);

        for (;;) {
                /*
                 * Must clear the marks before setting the entry to NULL,
                 * otherwise xas_for_each_marked may find a NULL entry and
                 * stop early.  rcu_assign_pointer contains a release barrier
                 * so the mark clearing will appear to happen before the
                 * entry is set to NULL.
                 */
                // store 核心操作
                // slot 在此前有两条选择:
                // - 如果存在 node,则优先选择该结点(xas create 流程提供,存放在 xas->xa_node)
                // - 如果没有 node,则选择 xas->xa->xa_head
                //  (xas create 流程会预先抹除原有的 xas->xa_node == XAS_RESTART 标记)
                rcu_assign_pointer(*slot, entry);
                if (xa_is_node(next) && (!node || node->shift))
                        xas_free_nodes(xas, xa_to_node(next));
                if (!node)
                        break;
                count += !next - !entry;
                values += !xa_is_value(first) - !value;
                if (entry) {
                        // 常规 entry 在这里就会 break
                        if (offset == max)
                                break;
                        if (!xa_is_sibling(entry))
                                entry = xa_mk_sibling(xas->xa_offset);
                } else {
                        if (offset == XA_CHUNK_MASK)
                                break;
                }
                next = xa_entry_locked(xas->xa, node, ++offset);
                if (!xa_is_sibling(next)) {
                        if (!entry && (offset > max))
                                break;
                        first = next;
                }
                slot++;
        }

        // 这里更新统计信息,比如 count;且会在合适的时机删掉 xa_node
        //(TODO:可能是不断存入 NULL 导致?因为 erase 操作其实就是 xa_store(..., NULL, ...))
        update_node(xas, node, count, values);
        return first;
}

xas store 流程基本隐藏了具体的分配过程,本体只需完成 *slot = entry 的关键存储操作。在 xas store 具体赋值 slot 前,XArray 甚至可以是一棵完全空内存(无动态内存分配)的树,需要靠 xas create 流程创建具体的 slot 内存空间。

xas create

/*
 * xas_create() - Create a slot to store an entry in.
 * @xas: XArray operation state.
 * @allow_root: %true if we can store the entry in the root directly
 *
 * Most users will not need to call this function directly, as it is called
 * by xas_store().  It is useful for doing conditional store operations
 * (see the xa_cmpxchg() implementation for an example).
 *
 * Return: If the slot already existed, returns the contents of this slot.
 * If the slot was newly created, returns %NULL.  If it failed to create the
 * slot, returns %NULL and indicates the error in @xas.
 */
static void *xas_create(struct xa_state *xas, bool allow_root)
{
        struct xarray *xa = xas->xa;
        void *entry;
        void __rcu **slot;
        struct xa_node *node = xas->xa_node;
        int shift;
        unsigned int order = xas->xa_shift;

        // 有多条路径区分场景,总之都是为了确定 shift、entry 和 slot 变量

        // 表示当前 xa_node 是 XAS_BOUNDS 或者 XAS_RESTART(初始值)
        // 这两个标记见 xas 注释说明
        if (xas_top(node)) { 
                // 无锁获取 xa->xa_head,尝试以 head 作为单独的 entry 容器(从而免去真正的 slot)
                entry = xa_head_locked(xa);
                // xa_node 从 XAS_RESTART(初始值)改动为 NULL
                xas->xa_node = NULL;
                // 是否 xa flags 携带 XA_FLAGS_ZERO_BUSY
                if (!entry && xa_zero_busy(xa))
                        entry = XA_ZERO_ENTRY;
                // xas expand 流程:提供 node 结点
                // 表示不断添加结点直至满足 xas->xa_index 需求
                // 该流程在首次调用(非 multi-index)时,由于 xa_head 为空,必直接返回(最大的)shift
                // shift 用于确定后续分配结点的 ->shift 字段(减去 XA_CHUNK_SHIFT)
                shift = xas_expand(xas, entry);
                if (shift < 0)
                        return NULL;
                if (!shift && !allow_root)
                        shift = XA_CHUNK_SHIFT;
                // 如果是首次调用,其实整个流程会直接退出,对应函数末尾的 return entry
                entry = xa_head_locked(xa);
                // slot 初步定位到 xa_head,后续将在该函数内为最终定位的 slot 赋值 entry
                slot = &xa->xa_head;
        } else if (xas_error(xas)) {
                return NULL;
        // xas 上下文快速恢复,不用重新计算了
        } else if (node) {
                unsigned int offset = xas->xa_offset;

                shift = node->shift;
                entry = xa_entry_locked(xa, node, offset);
                slot = &node->slots[offset];
        } else {
                shift = 0;
                entry = xa_head_locked(xa);
                slot = &xa->xa_head;
        }

        while (shift > order) {
                shift -= XA_CHUNK_SHIFT;
                if (!entry) {
                        // xas alloc 流程:
                        // 分配一个 struct xa_node 结点,或者从已缓存的 xas->xa_alloc 字段获取并清空
                        // 此时该分配的结点:
                        // - parent 对应于 xas->xa_node
                        // - offset 对应于 xas->xa_offset(如果存在 parent)
                        // - shift 对应于 shift 参数
                        node = xas_alloc(xas, shift);
                        // 可能会存在 ENOMEM 的情况,具体错误记录于 xas 当中
                        // 这种情况会在上面 xa store 门面的 nomem 流程处理掉并(重试)恢复现场
                        if (!node)
                                break;
                        if (xa_track_free(xa))
                                node_mark_all(node, XA_FREE_MARK);
                        // 初始化某个 slot,指向分配好的 node
                        rcu_assign_pointer(*slot, xa_mk_node(node));
                } else if (xa_is_node(entry)) {
                        node = xa_to_node(entry);
                // 命中用户 entry
                } else {
                        break;
                }
                // xas descend 流程:
                // - 向下遍历(一层)
                // - 根据 node 更新 xas->xa_node 和 xas->xa_offset
                // - 同时寻址对应的 entry
                // 该函数也用于 xa load 流程,不会修改树的结构
                entry = xas_descend(xas, node);
                slot = &node->slots[xas->xa_offset];
        }

        return entry;
}

xas create 流程本身是为了开辟 slot 所需的内存空间,但是需要注意 XArray 本身的层级是需要先从 xas exapnd 流程提供基底(后续再回到 xas create 流程按需分配具体定位的 slot 所需的内存空间)。

补充一下 XA_CHUNK_SHIFT = 6,可以通过 make lib/xarray.i 解析得到。

xas expand

/*
 * xas_expand adds nodes to the head of the tree until it has reached
 * sufficient height to be able to contain @xas->xa_index
 */
static int xas_expand(struct xa_state *xas, void *head)
{
        struct xarray *xa = xas->xa;
        struct xa_node *node = NULL;
        unsigned int shift = 0;
        // 非 CONFIG_XARRAY_MULTI 的情况下,max 等同于 xa_index
        unsigned long max = xas_max(xas);

        // 尚未真正构建过结点
        // 进入该路径,无论如何都会立刻返回
        //(因为没有寻址能力)
        if (!head) {
                // 小对象优化:
                // 对于 index == 0 && !CONFIG_XARRAY_MULTI 的情况
                if (max == 0)
                        // 会直接返回 shift = 0
                        // 后续分配单个用户 entry 无需动态内存分配
                        return 0;
                // 而其他情况,无论如何都会导致返回的 shift 大于等于 XA_CHUNK_SHIFT
                // 从而引起动态内存分配
                while ((max >> shift) >= XA_CHUNK_SIZE)
                        // 每次多一层
                        shift += XA_CHUNK_SHIFT;
                return shift + XA_CHUNK_SHIFT;
        } else if (xa_is_node(head)) {
                node = xa_to_node(head);
                shift = node->shift + XA_CHUNK_SHIFT;
        }
        xas->xa_node = NULL;

        // 就是注释说的,处理这个 head 能寻址的最大范围
        while (max > max_index(head)) {
                xa_mark_t mark = 0;

                XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
                // xas alloc 流程前面提到了
                // 这里 node 将会在后续成为新的根结点
                node = xas_alloc(xas, shift);
                if (!node)
                        return -ENOMEM;

                node->count = 1;
                if (xa_is_value(head))
                        node->nr_values = 1;
                // 此前的小对象优化潜在 entry,需要搬移过来
                RCU_INIT_POINTER(node->slots[0], head);

                /* Propagate the aggregated mark info to the new child */
                // 这些都是 mark 特性相关的
                for (;;) {
                        if (xa_track_free(xa) && mark == XA_FREE_MARK) {
                                node_mark_all(node, XA_FREE_MARK);
                                if (!xa_marked(xa, XA_FREE_MARK)) {
                                        node_clear_mark(node, 0, XA_FREE_MARK);
                                        xa_mark_set(xa, XA_FREE_MARK);
                                }
                        } else if (xa_marked(xa, mark)) {
                                node_set_mark(node, 0, mark);
                        }
                        if (mark == XA_MARK_MAX)
                                break;
                        mark_inc(mark);
                }

                /*
                 * Now that the new node is fully initialised, we can add
                 * it to the tree
                 */
                // node 结点完成初始化
                if (xa_is_node(head)) {
                        // 此前已经被指向 slots[0]
                        xa_to_node(head)->offset = 0;
                        // 原先的根节点「下降」一层
                        rcu_assign_pointer(xa_to_node(head)->parent, node);
                }
                // 分配的 node 成为新的 xa_head(根节点)
                // 并且将用于下一次循环
                head = xa_mk_node(node);
                // 此时 xa_head 不再是 NULL
                rcu_assign_pointer(xa->xa_head, head);
                // 这个只是给内核用户响应 xas->xa_update 回调的,略
                xas_update(xas, node);

                // 处理下一层
                //(所以整个 XArray 基底就是全部走 [0] 进行快速倍增)
                shift += XA_CHUNK_SHIFT;
        }

        // 记录 node 到 xas 上下文,表示正在操作的结点
        xas->xa_node = node;
        return shift;
}

xas expand 流程就是看 xas->xa_index 的寻址能力通过 xas alloc 流程按需分配出 XArray 基本层级,并不会完全展开。而在前面 xas create 流程会再次 xas alloc 来分配内存到具体位置。

xas descend

static __always_inline void *xas_descend(struct xa_state *xas,
                                        struct xa_node *node)
{
        // 返回 (index >> node->shift) & XA_CHUNK_MASK
        unsigned int offset = get_offset(xas->xa_index, node);
        // 既然知道 offset 了,那就定位 entry = slots[offset]
        void *entry = xa_entry(xas->xa, node, offset);

        xas->xa_node = node;
        // 这个留到后续 multi-index entry 再解释
        while (xa_is_sibling(entry)) {
                offset = xa_to_sibling(entry);
                entry = xa_entry(xas->xa, node, offset);
                // 推测是读侧在 RCU 无阻塞读时,外部的写侧引起了删除之类的操作,结点关系改动
                // 为了避免后续错误遍历,需要设为 retry entry 表示已经检测出冲突
                if (node->shift && xa_is_node(entry))
                        entry = XA_RETRY_ENTRY;
        }

        xas->xa_offset = offset;
        return entry;
}

xas descend 流程非常简单,按照 shift 切割的方式走一层。如果是某种形式的 while() xas_descend,那基本等同于 load 操作。

xas alloc

static void *xas_alloc(struct xa_state *xas, unsigned int shift)
{
        struct xa_node *parent = xas->xa_node;
        struct xa_node *node = xas->xa_alloc;

        if (xas_invalid(xas))
                return NULL;

        // 如果 xa_alloc 已经缓存好一个结点,直接拿走
        if (node) {
                xas->xa_alloc = NULL;
        // 否则向 kmem cache 要,这里就不是 XArray 的范围了
        } else {
                // 注意整个流程下来我们是持锁的
                // 为了避免睡下去,这里 gfp 并没有使用/继承用户提供的 GFP 标记
                gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;

                if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
                        gfp |= __GFP_ACCOUNT;

                node = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
                // 失败就标记 ENOMEM,然后在合适的时机会有 xas nomem 流程挽救
                if (!node) {
                        xas_set_err(xas, -ENOMEM);
                        return NULL;
                }
        }

        if (parent) {
                node->offset = xas->xa_offset;
                parent->count++;
                XA_NODE_BUG_ON(node, parent->count > XA_CHUNK_SIZE);
                xas_update(xas, parent);
        }
        XA_NODE_BUG_ON(node, shift > BITS_PER_LONG);
        XA_NODE_BUG_ON(node, !list_empty(&node->private_list));
        node->shift = shift;
        node->count = 0;
        node->nr_values = 0;
        RCU_INIT_POINTER(node->parent, xas->xa_node);
        node->array = xas->xa;

        return node;
}

xas alloc 流程只是一个简单 kmem cache 分配封装。注意整个流程下来我们是持锁的,为了避免睡下去,这里 gfp 并没有使用/继承用户提供的 GFP 标记。分配成功不一定意味着当前要动态内存分配(可能此前就已经做好了),分配失败也不一定意味着流程结束(合适时机有 xas nomem 流程挽救,并且 xas 自动机提供快速恢复路径)。

xas nomem

/*
 * __xas_nomem() - Drop locks and allocate memory if needed.
 * @xas: XArray operation state.
 * @gfp: Memory allocation flags.
 *
 * Internal variant of xas_nomem().
 *
 * Return: true if memory was needed, and was successfully allocated.
 */
static bool __xas_nomem(struct xa_state *xas, gfp_t gfp)
        __must_hold(xas->xa->xa_lock)
{
        // 区分 spinlock 类型:常规、bh 还是 irq
        unsigned int lock_type = xa_lock_type(xas->xa);

        if (xas->xa_node != XA_ERROR(-ENOMEM)) {
                xas_destroy(xas);
                return false;
        }
        if (xas->xa->xa_flags & XA_FLAGS_ACCOUNT)
                gfp |= __GFP_ACCOUNT;
        // 分配内存到 xas->xa_alloc,并重新持锁
        // xas->xa_alloc 的内存将会在后续 xas_alloc() 中获取
        // NOTE: 在 store 操作中,是 xas_store() => xas_create() => xas_alloc() 获取分配到的内存
        if (gfpflags_allow_blocking(gfp)) {
                xas_unlock_type(xas, lock_type);
                xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
                xas_lock_type(xas, lock_type);
        } else {
                xas->xa_alloc = kmem_cache_alloc_lru(radix_tree_node_cachep, xas->xa_lru, gfp);
        }
        if (!xas->xa_alloc)
                return false;
        xas->xa_alloc->parent = NULL;
        XA_NODE_BUG_ON(xas->xa_alloc, !list_empty(&xas->xa_alloc->private_list));
        xas->xa_node = XAS_RESTART;
        return true;
}

load 操作

如果简单了解过 store 操作要做的事情,那么 load 就相当简单。

xa load

/**
 * xa_load() - Load an entry from an XArray.
 * @xa: XArray.
 * @index: index into array.
 *
 * Context: Any context.  Takes and releases the RCU lock.
 * Return: The entry at @index in @xa.
 */
void *xa_load(struct xarray *xa, unsigned long index)
{
        XA_STATE(xas, xa, index);
        void *entry;

        rcu_read_lock();
        // 通常走一遍 xas load 流程即可
        do {
                entry = xa_zero_to_null(xas_load(&xas));
        // 使用 advanced API 的话,可能会返回内部结点
        // 因此会通过判断 retry 结点来重启 xas 上下文,从头遍历
        } while (xas_retry(&xas, entry));
        rcu_read_unlock();

        return entry;
}

xa load 门面流程保证不会改动内部的基数树结构,因此只使用 RCU read lock。也就是说,XArray 的 load 操作保证不会因并发存在 store 操作而互斥。

xas load

/**
 * xas_load() - Load an entry from the XArray (advanced).
 * @xas: XArray operation state.
 *
 * Usually walks the @xas to the appropriate state to load the entry
 * stored at xa_index.  However, it will do nothing and return %NULL if
 * @xas is in an error state.  xas_load() will never expand the tree.
 *
 * If the xa_state is set up to operate on a multi-index entry, xas_load()
 * may return %NULL or an internal entry, even if there are entries
 * present within the range specified by @xas.
 *
 * Context: Any context.  The caller should hold the xa_lock or the RCU lock.
 * Return: Usually an entry in the XArray, but see description for exceptions.
 */
void *xas_load(struct xa_state *xas)
{
        // start walking,可以认为从 xa head 开始
        //(有一点复杂的小优化,因为可以直接从 xas node 中间开始,先略过)
        void *entry = xas_start(xas);

        while (xa_is_node(entry)) {
                struct xa_node *node = xa_to_node(entry);

                // 溢出,可能会返回内部结点,见注释
                if (xas->xa_shift > node->shift)
                        break;
                entry = xas_descend(xas, node);
                // 叶子
                if (node->shift == 0)
                        break;
        }
        return entry;
}

xas load 流程只是循环的 xas descend 流程。前面已经提到过,就是按照 shift 切割分层。

阶段小总结

这里以三次 store 操作 {0, 998}, {61, 244}, {127, 353} 为例进行上述代码的总结。注意括号内表示 {index, entry},而 entry 需要经过 xa_mk_value 转换,比如 998 转换后为 1997,详见 entry 小节的函数实现。

{0, 998}

struct xarray {
        xa_head = 0x7cd;
};

这是插入 {0, 998} 后 dump 出来的内存布局。xa_head 在 count = 1 && index = 0 的特殊情况下可以直接内联存放用户 entry。

其中 0x7cd(1997)是对应于转换前的 998 数值。

{61, 244}

struct xarray {
        xa_head = 0xffff88800093548a;
};

第二次插入 {61, 244} 后,xa_head 将作为 node entry 去使用。XArray 怎么去识别结点就看后两位(xa_is_internal 是其中之一的条件)。0xa = 0b1010,因此不再是第一种内联情况。

其中 xa_head 解析得出的 node 地址为 0xffff888000935488,因为 0x8 = 0b1000。后续为了避免繁琐说明,就全部把数值换算好了。

xa_head = 0xffff888000935488(为方便阅读,全都转义好了)

&node[0-63] = 0xffff888000935488
        slots[0]:index[0]    = 998
        slots[1]:index[61]   = 244

第二次插入完成后的内存布局如上所示。此前内联于 xa_head 的 entry 将被移动到分配好的结点并插入到 slots[0];由于最大的 index 为 61 不超过 1<<6 = 64 的范围,所以 XArray 此时只有一层。

{127, 353}

head = 0xffff888000935b60

&node[0-4095] = 0xffff888000935b60
        slots[0]:&node[0-63]   = 0xffff888000935488(还是原来的地址)
                slots[0]:index[0]    = 998
                slots[1]:index[61]   = 244
        slots[1]:&node[64-127] = 0xffff888000935918
                slots[63]:index[127] = 353

也就是:
             [0-4095]
            /       \
        [0-63]       [64-127]
       /     \           |
{0, 998}  {61, 244}  {127, 353}

第三次操作。XArray 会为了满足最大寻址范围(127 > 64)而扩展树结构。显然旧有的结点是会保留(作为子树)而非全部摧毁重来的。

如果是插入 {0, 998}, {61, 244}, {4096, 353}……

XArray 将会是这样:

                [0-262143]
               /          \
        [0-4095]      [4096-8191]
           |               |
       [0-63]        [4096-4159]
      /     \              |
{0, 998}  {61, 244}    {4096, 353}
如果是插入 {0, 998}, {1, 244}, {2, 353}……

为了避免误会成二叉树(怎么前面例子全是刚好两个叉),多构造一个示例:

            [0-63]
         /    |    \
{0, 998}  {1, 244}  {2, 353}

简单结论

至此,可以有三个简单的结论:

  • XArray 允许内联存放一个 entry。作者提到有些场景其实是大概率只有一个元素:典型的 SCSI 目标设备通常只连接一个设备
  • XArray 是按需分层的。虽然按照 index 是从高到低每隔 6 位切割一层,但是像 61 这种显然只需一层,而不必按照 unsigned long 固定切割 ⌈64/6⌉ 次。这也是动态记录 shift 的原因之一。
  • XArray 所有的用户 entry 都在叶子层,而且是同一层。层级深度取决于最大的 index。

感兴趣的读者可以自己测试一下(内核模块)。

mark 特性

struct xa_node {
        /* ... */
        union {
                unsigned long   tags[XA_MAX_MARKS][XA_MARK_LONGS];
                unsigned long   marks[XA_MAX_MARKS][XA_MARK_LONGS];
        };
};

XArray 具有 mark 标记特性。简单来说就是可以在 entry 上打标记(至多三个),然后 for_each 迭代操作可以高效筛选出有特定标记的 entry。出于历史原因也有 tag 别名。

xa set mark

/**
 * xa_set_mark() - Set this mark on this entry.
 * @xa: XArray.
 * @index: Index of entry.
 * @mark: Mark number.
 *
 * Attempting to set a mark on a %NULL entry does not succeed.
 *
 * Context: Process context.  Takes and releases the xa_lock.
 */
void xa_set_mark(struct xarray *xa, unsigned long index, xa_mark_t mark)
{
        xa_lock(xa);
        __xa_set_mark(xa, index, mark);
        xa_unlock(xa);
}

/**
 * __xa_set_mark() - Set this mark on this entry while locked.
 * @xa: XArray.
 * @index: Index of entry.
 * @mark: Mark number.
 *
 * Attempting to set a mark on a %NULL entry does not succeed.
 *
 * Context: Any context.  Expects xa_lock to be held on entry.
 */
void __xa_set_mark(struct xarray *xa, unsigned long index, xa_mark_t mark)
{
        XA_STATE(xas, xa, index);
        void *entry = xas_load(&xas);

        if (entry)
                xas_set_mark(&xas, mark);
}

mark 标记操作的门面流程非常简单,找到对应 index 的 entry,进一步执行 xas set mark 流程。

xas set mark

/**
 * xas_set_mark() - Sets the mark on this entry and its parents.
 * @xas: XArray operation state.
 * @mark: Mark number.
 *
 * Sets the specified mark on this entry, and walks up the tree setting it
 * on all the ancestor entries.  Does nothing if @xas has not been walked to
 * an entry, or is in an error state.
 */
void xas_set_mark(const struct xa_state *xas, xa_mark_t mark)
{
        struct xa_node *node = xas->xa_node;
        unsigned int offset = xas->xa_offset;

        if (xas_invalid(xas))
                return;

        // 向上传播 mark 标记,直到顶层 parent 为 NULL
        while (node) {
                // 如果已经存在,那就截断传播流程
                // 整个传播过程将会有 node->marks[mark] |= 1<<offset
                if (node_set_mark(node, offset, mark))
                        return;
                // 往上一层,更新 offset 和 node
                offset = node->offset;
                node = xa_parent_locked(xas->xa, node);
        }

        // 这里是判断 xas->xa->xa_flags 字段是否包含 mark
        //(可能是考虑内联的情况下没有 node,见 xa_get_mark() 函数)
        if (!xa_marked(xas->xa, mark))
                xa_mark_set(xas->xa, mark);
}

xas set mark 流程也是一个非常简单的 pushup 传播标记过程。从 index 对应的 node 及其任意祖先均会标记到 mark(到对应的 ->offset bit)。

不难想象,迭代操作将可以快速过滤掉不含有对应 mark 操作的整棵子树。(不过具体的内部 find 操作是很复杂的……)

除此以外,get mark 操作也可以根据传播的性质立刻排除不存在 mark 的场景。并且 XArray 的用户 entry 均处于叶子层,也就是说这种向上传播的操作不会存在互相「污染」的情况。get mark 的正确性也要依赖于这种树形结构。

NOTE: mark 是有可能向下传播的,此前 store 操作的代码可以看到,alloc 特性是会往子结点传播 XA_FREE_MARK 标记。大概意思推测是主动为用户提供 index 时可以快速用位图运算求出。

multi-index entry 特性

multi-index entry 具有 order 性质,表示该 entry 将占用 1<<order 个下标。

xas set order

/**
 * xas_set_order() - Set up XArray operation state for a multislot entry.
 * @xas: XArray operation state.
 * @index: Target of the operation.
 * @order: Entry occupies 2^@order indices.
 */
static inline void xas_set_order(struct xa_state *xas, unsigned long index,
                                        unsigned int order)
{
#ifdef CONFIG_XARRAY_MULTI
        xas->xa_index = order < BITS_PER_LONG ? (index >> order) << order : 0;
        xas->xa_shift = order - (order % XA_CHUNK_SHIFT);
        xas->xa_sibs = (1 << (order % XA_CHUNK_SHIFT)) - 1;
        xas->xa_node = XAS_RESTART;
#else
        BUG_ON(order > 0);
        xas_set(xas, index);
#endif
}

xas set order 是 store 操作前初始化 order 的流程,字面含义非常抽象:

  • (index >> order) << order:表示 order 对齐,去除 1<<order 以下的低位 1。
  • order - (order % XA_CHUNK_SHIFT):表示 XA_CHUNK_SHIFT 的最大整数倍(<= order)。
  • 1 << (order % XA_CHUNK_SHIFT):表示 1<<order 在 XA_CHUNK_SHIFT 切割意义下的余数。
  • XAS_RESTART:表示初始状态,此前保存过的上下文作废。

NOTE: sibs 表示 siblings(兄弟)。可以简单认为整个 multi-index entry 由 canonical entry 加上若干 non-canonical entries 组成,兄弟就是后者。

具体的含义可以通过一个简单的测试程序来了解,它会输出 [xa_index, xa_shift, xa_sibs] 的结果。

test: index=[0, 18), order=3
[0, 0, 7]
[0, 0, 7]
[0, 0, 7]
[0, 0, 7]
[0, 0, 7]
[0, 0, 7]
[0, 0, 7]
[0, 0, 7]
[8, 0, 7]
[8, 0, 7]
[8, 0, 7]
[8, 0, 7]
[8, 0, 7]
[8, 0, 7]
[8, 0, 7]
[8, 0, 7]
[16, 0, 7]
[16, 0, 7]

比如 index=[0, 18), order=3 的场景。可以看出 xa state 当中的 index 总是对齐 order 的(只有 0,8,16);并且由于对齐意义下 slots[] 总能覆盖到所有 index(因为 order 足够小),所以兄弟数目 sibs 就是 1<<order 减去自身。

test: index=[56, 67), order=7
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]
[0, 6, 1]

test: index=[56, 67), order=8
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]
[0, 6, 3]

当 order 过大(> 6)导致无论如何对齐都会切割时,兄弟是需要计算完整的 slots[](或者叶子结点)个数。比如 order = 7,需要找 2 个叶子结点来完整覆盖 1<<7 个 index;当 order = 8,需要找 4 个叶子结点(4*64 = 1<<8)。

test: index=[56, 67), order=6
[0, 6, 0]
[0, 6, 0]
[0, 6, 0]
[0, 6, 0]
[0, 6, 0]
[0, 6, 0]
[0, 6, 0]
[0, 6, 0]
[64, 6, 0]
[64, 6, 0]
[64, 6, 0]

像是 order=6 这种场景,恰好一个 entry 占据整个 slots[],所以是不存在兄弟的。

但是仅靠这些解释不了 shift 是啥东西,还有大页性能优化的秘密。

test: index=[56, 67), order=11
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]
[0, 6, 31]

test: index=[56, 67), order=12
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]
[0, 12, 0]

为什么我们在 order < 6 时讨论的兄弟是一个(普通)entry 大小,而在 order = [6, 11] 时又成了整个 slots[] 或叶子(即 64 个 entry)的大小?因为它的大小由 shift 控制:在 order 增长至 12 时,shift 也首次增长到 12,此时的兄弟(以及 canonical entry)能更高阶地表示一棵包含 1<<12 个 entry 的子树,从而进一步减少树操作的遍历层数(最明显的是 xas create 流程)。更高阶的 order 也是如此类推。也推荐读者看下 xas_size() 函数的计算,很好明白 shift 和 sibs 的关系。

xas store 回顾

void *xas_store(struct xa_state *xas, void *entry)
{
        // ...

        offset = xas->xa_offset;
        max = xas->xa_offset + xas->xa_sibs;

        // ...

        for (;;) {
                // canonical entry 就是用户提供的 entry 值
                rcu_assign_pointer(*slot, entry);

                // ...

                if (entry) {
                        // 从最初的 offset 到最终的 max,其实只会遍历 1+sibs 次数
                        if (offset == max)
                                break;
                        // 而 sibling entry 将全部指向此前的 canonical entry
                        //(下一次迭代 rcu assign 时赋值)
                        if (!xa_is_sibling(entry))
                                entry = xa_mk_sibling(xas->xa_offset);
                } else {
                        if (offset == XA_CHUNK_MASK)
                                break;
                }

                // ...

                slot++;
        }
}

static void *xas_create(struct xa_state *xas, bool allow_root)
{
        int shift;
        unsigned int order = xas->xa_shift;

        // ...

        // 只需要走到处理 order 就行了,免除了继续往下层遍历的开销
        while (shift > order) {
                shift -= XA_CHUNK_SHIFT;
                if (!entry) {
                        node = xas_alloc(xas, shift);
                        if (!node)
                                break;
                        if (xa_track_free(xa))
                                node_mark_all(node, XA_FREE_MARK);
                        rcu_assign_pointer(*slot, xa_mk_node(node));
                } else if (xa_is_node(entry)) {
                        node = xa_to_node(entry);
                } else {
                        break;
                }
                entry = xas_descend(xas, node);
                slot = &node->slots[xas->xa_offset];
        }

        return entry;
}

static __always_inline void *xas_descend(struct xa_state *xas,
                                        struct xa_node *node)
{

        // ...

        while (xa_is_sibling(entry)) {
                // sibling entry 的数值内部有指向 canonical entry 的偏移量/下标
                offset = xa_to_sibling(entry);
                // xas descend 流程还自带 entry 修正功能,防止误指向 sibling entry
                entry = xa_entry(xas->xa, node, offset);
                if (node->shift && xa_is_node(entry))
                        entry = XA_RETRY_ENTRY;
        }

        xas->xa_offset = offset;
        return entry;
}

简单回顾此前的 xas store 流程。就是遍历开销更少了,具体看注释吧。

page cache 层应用

这里整理了一些 page cache 层的 XArray 应用。都是比较简单的,太复杂的看不过来……

无锁页缓存协议

/*
 * Lockless page cache protocol:
 * On the lookup side:
 * 1. Load the folio from i_pages
 * 2. Increment the refcount if it's not zero
 * 3. If the folio is not found by xas_reload(), put the refcount and retry
 *
 * On the removal side:
 * A. Freeze the page (by zeroing the refcount if nobody else has a reference)
 * B. Remove the page from i_pages
 * C. Return the page to the page allocator
 *
 * This means that any page may have its reference count temporarily
 * increased by a speculative page cache (or GUP-fast) lookup as it can
 * be allocated by another user before the RCU grace period expires.
 * Because the refcount temporarily acquired here may end up being the
 * last refcount on the page, any page allocation must be freeable by
 * folio_put().
 */

/*
 * filemap_get_entry - Get a page cache entry.
 * @mapping: the address_space to search
 * @index: The page cache index.
 *
 * Looks up the page cache entry at @mapping & @index.  If it is a folio,
 * it is returned with an increased refcount.  If it is a shadow entry
 * of a previously evicted folio, or a swap entry from shmem/tmpfs,
 * it is returned without further action.
 *
 * Return: The folio, swap or shadow entry, %NULL if nothing is found.
 */
void *filemap_get_entry(struct address_space *mapping, pgoff_t index)
{
        XA_STATE(xas, &mapping->i_pages, index);
        struct folio *folio;

        rcu_read_lock();
repeat:
        xas_reset(&xas);
        folio = xas_load(&xas);
        if (xas_retry(&xas, folio))
                goto repeat;
        /*
         * A shadow entry of a recently evicted page, or a swap entry from
         * shmem/tmpfs.  Return it without attempting to raise page count.
         */
        if (!folio || xa_is_value(folio))
                goto out;

        // 尝试增加 folio->page->_refcount 引用计数
        if (!folio_try_get(folio))
                goto repeat;

        // xas reload 不会重新遍历,就简单基于当前上下文的 node 和对应 offset 直接访问 entry
        // https://elixir.bootlin.com/linux/v6.15/source/include/linux/xarray.h#L1608
        if (unlikely(folio != xas_reload(&xas))) {
                folio_put(folio);
                goto repeat;
        }
out:
        rcu_read_unlock();

        return folio;
}

既然 XArray 用于存放 page/folio,而且 XArray 的获取(load)操作是 RCU 无锁实现的,那么从 XArray 获取页操作也有必要同样是无锁实现的。但是 page 有自身的引用计数,那么 xas load + get page 两步就有竞态条件问题,于是有了一套无锁 lookup 协议。这里至少要避免移除页时 put 为 0 了还会被另外一端增加计数的问题:先使用 try get(表示 add 1 unless refcnt=0),再 reload 检测一致性(表示 try get 的时候确实仍然是 XArray 当中未被改动的 entry)。

这份代码不会存在 try get 时 refcnt=0 且 page 已被释放解引用失败的问题,也不会存在 xas reload 对比时潜在 ABA 问题。这些问题 RCU 机制都能避免。

直接回收

当内存不足时,直接回收会将文件页回写到文件本身,也就是扣掉 page cache 来节省内存。这里会间接使用 XArray 的标记查找功能。

UPDATE: 目前所有文件系统已经没有 a_ops->writepage 实现了,pageout() 实质上应该没有用途,还没找到 commit 说明。

fsync 写回

const struct file_operations f2fs_file_operations = {
        // ...
        .fsync          = f2fs_sync_file,
        // ...
};

const struct address_space_operations f2fs_dblock_aops = {
        // ....
        .writepages     = f2fs_write_data_pages,
        // ...
};

xas_find_marked(xas, max, mark)
find_get_entry(&xas, end, tag)
filemap_get_folios_tag
f2fs_write_cache_pages // 这里选择需要的 mark
__f2fs_write_data_pages
f2fs_write_data_pages
mapping->a_ops->writepages
do_writepages
filemap_fdatawrite_wbc
__filemap_fdatawrite_range
file_write_and_wait_range // write out & wait on a file range
f2fs_do_sync_file
f2fs_sync_file

static int f2fs_write_cache_pages(struct address_space *mapping, ...)
{
        // ...
        // 选择需要的 mark/tag
        if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages)
                tag = PAGECACHE_TAG_TOWRITE;
        else
                tag = PAGECACHE_TAG_DIRTY;
        // ...
        while (!done && !retry && (index <= end)) {
                nr_pages = 0;
                nr_folios = filemap_get_folios_tag(mapping, &index, end,
                                tag, &fbatch);
                if (nr_folios == 0) {
                        if (nr_pages)
                                goto write;
                        break;
                }
                // ...
        }
        // ...
}

换一个 writepages 示例,比如 fsync 时也需要写回脏页。以 F2FS 为例,filemap_get_folios_tag 可以使用 XArray 的 mark 标记特性(filemap 将 XA_MARK 封装为内部定义的 PAGECACHE_TAG)来加速查找脏页。

大页存储

noinline int __filemap_add_folio(struct address_space *mapping,
                struct folio *folio, pgoff_t index, gfp_t gfp, void **shadowp)
{
        // 结合 folio order 和 multi-index entry
        XA_STATE_ORDER(xas, &mapping->i_pages, index, folio_order(folio));
        // ...
        folio_ref_add(folio, nr);
        folio->mapping = mapping;
        folio->index = xas.xa_index;
        // ...
        xas_store(&xas, folio);
        // ...
}

__filemap_add_folio 通常封装在获取 page cache 的过程中,比如 grab cache 等函数。如果上层失败了就意味着没有 page cache,需要走 IO 拿到数据并封装回 folio 再插到 XArray 里面作为后续的 page cache 去使用。不考虑复杂优化的话,这个 xas store 操作就是直接关联 folio order 到 multi-index entry 的 order。

总结

goto #基本介绍;

References

XArray – kernel.org
XArray: One Data Structure To Rule Them All
zswap: replace RB tree with xarray – LWN.net
The XArray data structure – LWN.net
xarray – Kernel Exploring
xarray: Define struct xa_node
Maple Tree Work
mm: list_lru: replace linear array with xarray
xarray: Change definition of sibling entries
xarray: use kmem_cache_alloc_lru to allocate xa_node
page cache: Add and replace pages using the XArray