-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecompression.py
More file actions
121 lines (98 loc) · 5.27 KB
/
decompression.py
File metadata and controls
121 lines (98 loc) · 5.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from bitarray import bitarray, frozenbitarray
from bitarray.util import ba2int
from os import makedirs
from logging_configuration import decompression_logger as logger
from coding_tables import DECODING_TABLE
def decompression_of_sequence(compressed_sequence: bytes) -> list[int]:
"""
Decompress a byte sequence back to the original list of integers.
Decoding format:
- First value: 8-bit unsigned (0-255)
- Small differences: "00" prefix + (N-2 as 2 bits) + (index as N bits)
- Zero repetition: "01" prefix + 3-bit count (1-8 zeros)
- Large differences: "10" prefix + sign bit + 8-bit magnitude
- Terminator: "11" (required)
Args:
compressed_sequence: Compressed byte sequence
Returns:
List of integers representing the decompressed sequence
Raises:
ValueError: If input is too short, invalid format, missing terminator,
or corrupted data
"""
if not compressed_sequence:
logger.error("[decompression] Empty compressed sequence.")
raise ValueError("Compressed sequence cannot be empty.")
compressed_bits = bitarray(endian="big")
compressed_bits.frombytes(compressed_sequence)
logger.info(f"[decompression] Compressed sequence: {compressed_bits}")
# Validate minimum length (at least 8 bits for first value + 2 bits for terminator)
if len(compressed_bits) < 10:
logger.error(f"[decompression] Input too short: {len(compressed_bits)} bits")
raise ValueError(f"Compressed sequence too short: need at least 10 bits, got {len(compressed_bits)}")
# Read first element (8 bits)
first_value = ba2int(compressed_bits[:8])
logger.info(f"[decompression] First element (8-bit): {first_value}")
differences = [first_value]
i, sequence_size = 8, len(compressed_bits)
terminator_found = False
while i < sequence_size:
logger.debug(f"\n[decompression] Position: {i}")
prefix = compressed_bits[i:i + 2]
logger.debug(f"[decompression] Prefix: {prefix}")
i += 2
if prefix == bitarray("00", endian="big"):
logger.debug("[decompression] Token '00' (small difference).")
N = ba2int(compressed_bits[i: i + 2]) + 2
logger.debug(f"[decompression] Number of bits: {N}")
i += 2
# Decode from table using full encoding (prefix + N-2 bits + N bits)
encoding_bits = compressed_bits[i - 4:i + N]
logger.debug(f"[decompression] Decoding: {encoding_bits}")
try:
decoded_difference = DECODING_TABLE[frozenbitarray(encoding_bits)]
except KeyError as e:
logger.error(f"[decompression] Table lookup failed: {e}")
raise ValueError(f"Failed to decode difference at position {i - 4}") from e
differences.append(decoded_difference)
logger.info(f"[decompression] Decoded difference: {decoded_difference}")
i += N
elif prefix == bitarray("01", endian="big"):
logger.debug("[decompression] Token '01' (zero repetition).")
repetition_count = ba2int(compressed_bits[i:i + 3]) + 1 # n-1 encoded, so add 1
logger.debug(f"[decompression] Repetition count: {repetition_count}")
# Repetition encoding is only for zeros
for _ in range(repetition_count):
differences.append(0)
logger.info(f"[decompression] Added {repetition_count} zeros.")
i += 3
elif prefix == bitarray("10", endian="big"):
logger.debug("[decompression] Token '10' (absolute encoding)")
sign_bit = compressed_bits[i]
magnitude = ba2int(compressed_bits[i + 1:i + 9])
decoded_difference = -magnitude if sign_bit else magnitude
logger.info(f"[decompression] Decoded difference: {decoded_difference}")
differences.append(decoded_difference)
i += 9
elif prefix == bitarray("11", endian="big"):
logger.info("[decompression] Terminator '11' encountered.")
terminator_found = True
break
else:
logger.error(f"[decompression] Unknown prefix at position={i - 2}: {prefix}")
raise ValueError(f"Unknown prefix bits: {prefix.to01()}")
# Validate that terminator was found
if not terminator_found:
logger.error("[decompression] Terminator '11' not found in compressed sequence.")
raise ValueError("Compressed sequence must end with terminator '11'")
# Convert differences back to original sequence
original_sequence = [differences[0]] # differences[0] is the first value, differences[1:] are actual differences
for difference in differences[1:]:
original_sequence.append(original_sequence[-1] + difference)
logger.info(f"[decompression] Sequence decompressed. Output length: {len(original_sequence)}")
# Write decompressed data to file
makedirs("Data", exist_ok=True)
with open("Data/decompressed.txt", "w") as f:
f.write(str(original_sequence))
logger.info("[decompression] Decompressed data written to Data/decompressed.txt")
return original_sequence