-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcompression.py
More file actions
156 lines (125 loc) · 6.6 KB
/
compression.py
File metadata and controls
156 lines (125 loc) · 6.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from itertools import pairwise
from bitarray import bitarray
from os import makedirs
from logging_configuration import compression_logger as logger
from coding_tables import ENCODING_TABLE
def calculate_differences(values: list[int] | tuple[int, ...]) -> list[int]:
"""
Calculate differences between consecutive values.
The first value is kept as-is, subsequent values are differences (b - a).
Returns empty list if input is empty.
"""
if not values:
logger.debug("[calculate_differences] Empty values.")
return []
if any(0 < value > 255 for value in values):
logger.error(f"[calculate_differences] Negative values: {values}")
raise ValueError(f"Negative values: {values}")
logger.info("[calculate_differences] Calculating differences...")
return [values[0], *(b - a for a, b in pairwise(values))]
def encode_repetition(n: int) -> bitarray:
"""
Encode a repetition count (1–8) as a 5-bit bitarray with prefix "01".
Args:
n: Number of repetitions (1 to 8 inclusive)
Returns:
bitarray with format "01" + 3-bit representation of (n-1)
"""
if not 1 <= n <= 8:
logger.error(f"[encode_repetition] Invalid number of repetitions: {n}")
raise ValueError("n must be between 1 and 8 inclusive.")
repetition_bits = format(n - 1, "03b") # Format n-1 as a 3-bit binary string (e.g., n=1 -> "000", n=8 -> "111")
logger.debug(f"[encode_repetition] Repetition bits: {repetition_bits}")
bit_string = "01" + repetition_bits # Prefix "01" followed by the 3-bit count
logger.debug(f"[encode_repetition] Bit string: {bit_string}")
return bitarray(bit_string, endian="big")
def absolute_encoding(x: int) -> bitarray:
"""
Encode an integer larger than 30, and less than -30 using 9 bits; first bit is for a sign,
and the rest is for 8-bit number.
Args:
x: Integer with |x| > 30 and |x| <= 255
Returns:
bitarray with format "10" + sign_bit + 8-bit magnitude
"""
if -30 <= x <= 30 or x == 0:
logger.error(f"[absolute_encoding] Invalid number: {x}")
raise ValueError("absolute_encoding expects |x| > 30.")
absolute_x = abs(x)
if absolute_x > 0xFF:
logger.error(f"[absolute_encoding] Number is not supported: {x}")
raise ValueError("Magnitude exceeds 8-bit capacity (max 255).")
bit_string = "10" + ("1" if x < 0 else "0") + format(absolute_x, "08b")
logger.debug(f"[absolute_encoding] Bit string: {bit_string}")
return bitarray(bit_string, endian="big")
def compression_of_sequence(values: list[int] | tuple[int, ...]) -> bytes:
"""
Compress a sequence of integers using differential encoding with variable-length codes.
Encoding scheme:
- First value: 8-bit unsigned (0-255)
- Small differences (|x| <= 30): variable-length encoding from ENCODING_TABLE
- Large differences (|x| > 30): 9-bit absolute encoding ("10" + sign + 8-bit magnitude)
- Zero sequences: repetition encoding ("01" + 3-bit count, max 8 zeros per group)
- Terminator: "11" at the end
Args:
values: Sequence of integers (typically in range [0, 255] for original values)
Returns:
Compressed byte sequence
Raises:
ValueError: If input is empty, first value out of range [0, 255],
or any difference exceeds [-255, 255]
"""
if not values:
logger.error("[calculate_differences] Cannot index empty list.")
raise ValueError("values list must be non-empty.")
values_differences = calculate_differences(values)
logger.info(f"[compression_of_sequence] Values differences: {values_differences}")
invalid_differences = [difference for difference in values_differences[1:] if -255 > difference > 255]
if invalid_differences:
logger.error(f"[compression_of_sequence] Invalid differences found: {invalid_differences}")
raise ValueError(f"All differences must be in range [-255, 255]. Invalid: {invalid_differences}")
first_element_encoded = format(values_differences[0], "08b")
compression_sequence = bitarray(first_element_encoded, endian="big")
logger.debug(f"[compression_of_sequence] Initial compression sequence: {compression_sequence}")
encode_table = None
i, values_size = 1, len(values_differences)
while i < values_size:
element = values_differences[i]
logger.debug(f"[compression_of_sequence] Index order: {i}")
absolute_value = abs(element)
if element != 0:
if absolute_value <= 30:
if encode_table is None:
encode_table = ENCODING_TABLE
logger.debug(f"\n[compression_of_sequence] Encoded table:\n{encode_table}")
logger.debug(f"\n[compression_of_sequence] Size of encoded table: {len(encode_table)}")
encoded_element = encode_table[element + 30]
logger.info(f"[compression_of_sequence] Element encoded using table: {element} -> {encoded_element}")
else:
encoded_element = absolute_encoding(element)
logger.info(
f"[compression_of_sequence] Element encoded using absolute encoding: {element} -> {encoded_element}.")
compression_sequence.extend(encoded_element)
i += 1
else: # element == 0
repetition = 1
while repetition <= 8 and (i + 1) < values_size and values_differences[i + 1] == 0:
logger.debug(f"[compression_of_sequence] Comparing zero: {values_differences[i + 1]}")
repetition += 1
i += 1
logger.info(f"[compression_of_sequence] Zero repetition detected: {repetition}")
repetition_encoding = encode_repetition(repetition)
logger.debug("[compression_of_sequence] Repetition encoded.")
compression_sequence.extend(repetition_encoding)
i += 1
logger.debug("\n")
terminator_bits = bitarray("11", endian="big")
logger.info(f"[compression_of_sequence] Appended terminator bits: {terminator_bits}")
compression_sequence.extend(terminator_bits)
logger.info(f"[compression_of_sequence] Returning compressed sequence: {compression_sequence}")
# Write compressed data to file
makedirs("Data", exist_ok=True)
with open("Data/compressed.txt", "wb") as f:
f.write(compression_sequence.tobytes())
logger.info("[compression_of_sequence] Compressed data written to Data/compressed.txt")
return compression_sequence.tobytes()