Skip to content

Commit 3f25630

Browse files
committed
Fix lz4 decompression for protocol v3 and v4
1 parent 2585646 commit 3f25630

File tree

6 files changed

+88
-6
lines changed

6 files changed

+88
-6
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## v1.2.0
2+
3+
### Fixed
4+
* Fixed lz4 decompression for protocol v3 and v4
5+
16
## v1.1.0
27

38
This release introduces the ability to override the default pool of allowed protocol versions (v5, v4, v3) via the new `allowedProtocolVersions` property in `ConnectionOptions`. This is a low-level feature intended for advanced use cases and should not be used by most users. The default behavior, which attempts to negotiate the highest supported version (v5 > v4 > v3), is recommended for the majority of situations.

src/Connection.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1106,13 +1106,18 @@ private function configureStartupOptions(Response\Supported $supportedReponse):
11061106
$protocolVersion = ProtocolVersion::getHighestSupportedVersion($versionsSupportedByServer, $this->options->allowedProtocolVersions);
11071107
if ($protocolVersion === null) {
11081108

1109+
$versionsSupportedByServerInOptionFormat = array_map(
1110+
fn (ProtocolVersion $v) => $v->inOptionFormat(),
1111+
$versionsSupportedByServer
1112+
);
1113+
11091114
$allowedProtocolVersionsInOptionFormat = array_map(
11101115
fn (ProtocolVersion $v) => $v->inOptionFormat(),
11111116
$this->options->allowedProtocolVersions
11121117
);
11131118

11141119
throw new ConnectionException('Server does not support a compatible protocol version.', ExceptionCode::CONNECTION_SERVER_PROTOCOL_UNSUPPORTED->value, [
1115-
'proocol_versions_supported_by_server' => $serverOptions['PROTOCOL_VERSIONS'] ?? null,
1120+
'proocol_versions_supported_by_server' => $versionsSupportedByServerInOptionFormat,
11161121
'proocol_versions_supported_by_client' => ProtocolVersion::CASES_IN_OPTION_FORMAT,
11171122
'proocol_versions_allowed_by_connection_options' => $allowedProtocolVersionsInOptionFormat,
11181123
]);

src/Connection/ConnectionOptions.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,15 @@
44

55
namespace Cassandra\Connection;
66

7+
use Cassandra\Exception\ConnectionException;
8+
use Cassandra\Exception\ExceptionCode;
79
use Cassandra\Protocol\ProtocolVersion;
810
use Cassandra\ReleaseConstants;
911

1012
final class ConnectionOptions {
13+
/**
14+
* @throws \Cassandra\Exception\ConnectionException
15+
*/
1116
public function __construct(
1217
public readonly bool $enableCompression = false,
1318
public readonly bool $throwOnOverload = false,
@@ -19,6 +24,16 @@ public function __construct(
1924
public readonly ProtocolVersion $initialProtocolVersion = ProtocolVersion::V3,
2025
) {
2126

27+
if (!in_array($this->initialProtocolVersion, $this->allowedProtocolVersions, true)) {
28+
throw new ConnectionException(
29+
'The initial protocol version must be one of the allowed protocol versions.',
30+
ExceptionCode::CONNECTION_INITIAL_PROTOCOL_VERSION_NOT_IN_ALLOWED_VERSIONS->value,
31+
[
32+
'initialProtocolVersion' => $this->initialProtocolVersion,
33+
'allowedProtocolVersions' => $this->allowedProtocolVersions,
34+
]
35+
);
36+
}
2237
}
2338

2439
/**

src/Connection/ResponseReader.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,28 @@ public function readResponse(Node $node, ProtocolVersion $version, bool $waitFor
6363
&& $header->length > 0
6464
&& $header->flags & Flag::COMPRESSION
6565
) {
66-
$this->lz4Decompressor->setInput($body);
66+
$uncompressedLength = unpack('N', substr($body, 0, 4));
67+
if ($uncompressedLength === false) {
68+
throw new ConnectionException(
69+
'Cannot read uncompressed length from compressed frame',
70+
ExceptionCode::CONNECTION_CANNOT_READ_DECOMPRESSED_FRAME_LENGTH->value,
71+
[]
72+
);
73+
}
74+
75+
$this->lz4Decompressor->setInput(substr($body, 4));
6776
$body = $this->lz4Decompressor->decompressBlock();
77+
78+
if ($uncompressedLength[1] !== strlen($body)) {
79+
throw new ConnectionException(
80+
'Decompressed frame length does not match expected length',
81+
ExceptionCode::CONNECTION_DECOMPRESSED_FRAME_LENGTH_MISMATCH->value,
82+
[
83+
'expected_length' => $uncompressedLength,
84+
'actual_length' => strlen($body),
85+
]
86+
);
87+
}
6888
}
6989

7090
return $this->createResponse($header, $body);

src/Exception/ExceptionCode.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* Global enumeration of all exception codes used throughout the Cassandra library.
99
* Each exception class has its own prefix based on its namespace or class name in uppercase.
1010
*
11-
* Next free code: 1323
11+
* Next free code: 1326
1212
*/
1313
enum ExceptionCode: int {
1414
// CompressionException (COMPRESSION_)
@@ -30,9 +30,12 @@ enum ExceptionCode: int {
3030
case CONNECTION_AUTO_PREPARE_UNEXPECTED_RESPONSE = 1306;
3131
case CONNECTION_AUTO_PREPARE_UNEXPECTED_RESPONSE_REEXECUTE = 1304;
3232
case CONNECTION_AUTO_PREPARE_UNEXPECTED_RESULT_TYPE = 1302;
33+
case CONNECTION_CANNOT_READ_DECOMPRESSED_FRAME_LENGTH = 1324;
3334
case CONNECTION_CANNOT_READ_RESPONSE_HEADER = 1008;
3435
case CONNECTION_COMPRESSION_NOT_SUPPORTED = 1009;
36+
case CONNECTION_DECOMPRESSED_FRAME_LENGTH_MISMATCH = 1323;
3537
case CONNECTION_EXECUTE_UNEXPECTED_RESPONSE = 1010;
38+
case CONNECTION_INITIAL_PROTOCOL_VERSION_NOT_IN_ALLOWED_VERSIONS = 1325;
3639
case CONNECTION_INVALID_OPCODE_TYPE = 1012;
3740
case CONNECTION_NOT_CONNECTED = 1013;
3841
case CONNECTION_OPTIONS_UNEXPECTED_RESPONSE = 1014;

test/Integration/CompressionTest.php

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,46 @@
88
use Cassandra\Connection\ConnectionOptions;
99
use Cassandra\Connection\SocketNodeConfig;
1010
use Cassandra\Consistency;
11+
use Cassandra\Protocol\ProtocolVersion;
1112
use Cassandra\Request\Options\QueryOptions;
1213

1314
final class CompressionTest extends AbstractIntegrationTestCase {
14-
// todo: test with v3,v4,v5 protocol versions
15-
public function testNegotiatesCompressionAndDecodesCompressedFrames(): void {
15+
public function testNegotiatesCompressionAndDecodesCompressedFramesV3(): void {
16+
17+
$options = new ConnectionOptions(
18+
enableCompression: true,
19+
allowedProtocolVersions: [ProtocolVersion::V3],
20+
);
21+
22+
$this->testCompression($options);
23+
}
24+
25+
public function testNegotiatesCompressionAndDecodesCompressedFramesV4(): void {
26+
27+
$options = new ConnectionOptions(
28+
enableCompression: true,
29+
allowedProtocolVersions: [ProtocolVersion::V4],
30+
initialProtocolVersion: self::isScyllaDb() ? ProtocolVersion::V4 : ProtocolVersion::V3,
31+
);
32+
33+
$this->testCompression($options);
34+
}
35+
36+
public function testNegotiatesCompressionAndDecodesCompressedFramesV5(): void {
37+
38+
if (self::isScyllaDb()) {
39+
$this->markTestSkipped('Skipping V5 compression test on ScyllaDB');
40+
}
41+
42+
$options = new ConnectionOptions(
43+
enableCompression: true,
44+
allowedProtocolVersions: [ProtocolVersion::V5],
45+
);
46+
47+
$this->testCompression($options);
48+
}
49+
50+
private function testCompression(ConnectionOptions $options): void {
1651

1752
$nodes = [
1853
new SocketNodeConfig(
@@ -23,7 +58,6 @@ public function testNegotiatesCompressionAndDecodesCompressedFrames(): void {
2358
),
2459
];
2560

26-
$options = new ConnectionOptions(enableCompression: true);
2761
$conn = new Connection($nodes, self::$defaultKeyspace, $options);
2862
$conn->setConsistency(Consistency::ONE);
2963
$conn->connect();

0 commit comments

Comments
 (0)