|
6 | 6 | import sys
|
7 | 7 | sys.path.append("..")
|
8 | 8 |
|
9 |
| -import os |
| 9 | +import string |
| 10 | +from typing import TextIO |
| 11 | + |
10 | 12 | from qiling import Qiling
|
| 13 | +from qiling.os.posix import stat |
| 14 | + |
| 15 | +ROOTFS = r"rootfs/x86_linux" |
11 | 16 |
|
12 |
| -class MyPipe(): |
| 17 | +class MyPipe(TextIO): |
13 | 18 | def __init__(self):
|
14 |
| - self.buf = b'' |
| 19 | + self.buf = bytearray() |
15 | 20 |
|
16 |
| - def write(self, s): |
17 |
| - self.buf += s |
| 21 | + def write(self, s: bytes): |
| 22 | + self.buf.extend(s) |
18 | 23 |
|
19 |
| - def read(self, size): |
20 |
| - if size <= len(self.buf): |
21 |
| - ret = self.buf[: size] |
22 |
| - self.buf = self.buf[size:] |
23 |
| - else: |
24 |
| - ret = self.buf |
25 |
| - self.buf = '' |
26 |
| - return ret |
| 24 | + def read(self, size: int) -> bytes: |
| 25 | + ret = self.buf[:size] |
| 26 | + self.buf = self.buf[size:] |
| 27 | + |
| 28 | + return bytes(ret) |
27 | 29 |
|
28 |
| - def fileno(self): |
29 |
| - return 0 |
| 30 | + def fileno(self) -> int: |
| 31 | + return sys.stdin.fileno() |
30 | 32 |
|
31 |
| - def show(self): |
32 |
| - pass |
| 33 | + def fstat(self): |
| 34 | + return stat.Fstat(self.fileno()) |
33 | 35 |
|
34 |
| - def clear(self): |
35 |
| - pass |
| 36 | +class Solver: |
| 37 | + def __init__(self, invalid: bytes): |
| 38 | + # create a silent qiling instance |
| 39 | + self.ql = Qiling([rf"{ROOTFS}/bin/crackme_linux"], ROOTFS, |
| 40 | + console=False, # thwart qiling logger output |
| 41 | + stdin=MyPipe(), # take over the input to the program using a fake stdin |
| 42 | + stdout=sys.stdout) # thwart program output |
36 | 43 |
|
37 |
| - def flush(self): |
38 |
| - pass |
| 44 | + # execute program until it reaches the 'main' function |
| 45 | + self.ql.run(end=0x0804851b) |
39 | 46 |
|
40 |
| - def close(self): |
41 |
| - self.outpipe.close() |
| 47 | + # record replay starting and ending points. |
| 48 | + # |
| 49 | + # since the emulation halted upon entering 'main', its return address is there on |
| 50 | + # the stack. we use it to limit the emulation till function returns |
| 51 | + self.replay_starts = self.ql.reg.arch_pc |
| 52 | + self.replay_ends = self.ql.stack_read(0) |
42 | 53 |
|
43 |
| - def fstat(self): |
44 |
| - return os.fstat(sys.stdin.fileno()) |
| 54 | + # instead of restarting the whole program every time a new flag character is guessed, |
| 55 | + # we will restore its state to the latest point possible, fast-forwarding a good |
| 56 | + # amount of start-up code that is not affected by the input. |
| 57 | + # |
| 58 | + # here we save the state just when 'main' is about to be called so we could use it |
| 59 | + # to jumpstart the initialization part and get to 'main' immediately |
| 60 | + self.jumpstart = self.ql.save() or {} |
| 61 | + |
| 62 | + # calibrate the replay instruction count by running the code with an invalid input |
| 63 | + # first. the instruction count returned from the calibration process will be then |
| 64 | + # used as a baseline for consequent replays |
| 65 | + self.best_icount = self.__run(invalid) |
| 66 | + |
| 67 | + def __run(self, input: bytes) -> int: |
| 68 | + icount = [0] |
45 | 69 |
|
46 |
| -def instruction_count(ql: Qiling, address: int, size: int, user_data): |
47 |
| - user_data[0] += 1 |
| 70 | + def __count_instructions(ql: Qiling, address: int, size: int): |
| 71 | + icount[0] += 1 |
48 | 72 |
|
49 |
| -def my__llseek(ql, *args, **kw): |
50 |
| - pass |
| 73 | + # set a hook to fire up every time an instruction is about to execute |
| 74 | + hobj = self.ql.hook_code(__count_instructions) |
51 | 75 |
|
52 |
| -def run_one_round(payload: bytes): |
53 |
| - stdin = MyPipe() |
| 76 | + # feed stdin with input |
| 77 | + self.ql.stdin.write(input + b'\n') |
54 | 78 |
|
55 |
| - ql = Qiling(["rootfs/x86_linux/bin/crackme_linux"], "rootfs/x86_linux", |
56 |
| - console=False, # thwart qiling logger output |
57 |
| - stdin=stdin, # take over the input to the program |
58 |
| - stdout=sys.stdout) # thwart program output |
| 79 | + # resume emulation till function returns |
| 80 | + self.ql.run(begin=self.replay_starts, end=self.replay_ends) |
59 | 81 |
|
60 |
| - ins_count = [0] |
61 |
| - ql.hook_code(instruction_count, ins_count) |
62 |
| - ql.set_syscall("_llseek", my__llseek) |
| 82 | + hobj.remove() |
63 | 83 |
|
64 |
| - stdin.write(payload + b'\n') |
65 |
| - ql.run() |
| 84 | + return icount[0] |
66 | 85 |
|
67 |
| - del stdin |
68 |
| - del ql |
| 86 | + def replay(self, input: bytes) -> bool: |
| 87 | + """Restore state and replay with a new input. |
69 | 88 |
|
70 |
| - return ins_count[0] |
| 89 | + Returns an indication to execution progress: `True` if a progress |
| 90 | + was made, `False` otherwise |
| 91 | + """ |
71 | 92 |
|
72 |
| -def solve(): |
| 93 | + # restore program's state back to the starting point |
| 94 | + self.ql.restore(self.jumpstart) |
| 95 | + |
| 96 | + # resume emulation and count emulated instructions |
| 97 | + curr_icount = self.__run(input) |
| 98 | + |
| 99 | + # the larger part of the input is correct, the more instructions are expected to be executed. this is true |
| 100 | + # for traditional loop-based validations like strcmp or memcmp which bails as soon as a mismatch is found: |
| 101 | + # more correct characters mean more loop iterations - thus more executed instructions. |
| 102 | + # |
| 103 | + # if we got a higher instruction count, it means we made a progress in the right direction |
| 104 | + if curr_icount > self.best_icount: |
| 105 | + self.best_icount = curr_icount |
| 106 | + |
| 107 | + return True |
| 108 | + |
| 109 | + return False |
| 110 | + |
| 111 | +def main(): |
73 | 112 | idx_list = (1, 4, 2, 0, 3)
|
74 | 113 | flag = [0] * len(idx_list)
|
75 | 114 |
|
76 |
| - prev_ic = run_one_round(bytes(flag)) |
| 115 | + solver = Solver(bytes(flag)) |
| 116 | + |
77 | 117 | for idx in idx_list:
|
78 | 118 |
|
79 | 119 | # bruteforce all possible flag characters
|
80 |
| - for ch in '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ ': |
| 120 | + for ch in string.printable: |
81 | 121 | flag[idx] = ord(ch)
|
82 | 122 |
|
83 |
| - print(f'\rguessing char at {idx}: {ch}... ', end='', flush=True) |
84 |
| - ic = run_one_round(bytes(flag)) |
| 123 | + print(f'\rGuessing... [{"".join(chr(ch) if ch else "_" for ch in flag)}]', end='', file=sys.stderr, flush=True) |
85 | 124 |
|
86 |
| - if ic > prev_ic: |
87 |
| - print(f'ok') |
88 |
| - prev_ic = ic |
| 125 | + if solver.replay(bytes(flag)): |
89 | 126 | break
|
| 127 | + |
90 | 128 | else:
|
91 |
| - print(f'no match found') |
| 129 | + print(f'No match found') |
92 | 130 |
|
93 |
| - print(f'flag: "{"".join(chr(ch) for ch in flag)}"') |
| 131 | + print(f'\nFlag found!') |
94 | 132 |
|
95 | 133 | if __name__ == "__main__":
|
96 |
| - solve() |
| 134 | + main() |
0 commit comments