Skip to content

Commit ab3325a

Browse files
committed
Address code review & add more unit test
1 parent fbc86dd commit ab3325a

File tree

4 files changed

+82
-10
lines changed

4 files changed

+82
-10
lines changed

index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ export type HashingScheme =
333333
'JavaStringHash';
334334

335335
export type CompressionType =
336+
'None' |
336337
'Zlib' |
337338
'LZ4' |
338339
'ZSTD' |

src/Message.cc

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <pulsar/Message.h>
2424
#include <pulsar/MessageBuilder.h>
2525
#include <pulsar/EncryptionContext.h>
26+
#include <map>
2627

2728
struct _pulsar_message {
2829
pulsar::MessageBuilder builder;
@@ -40,6 +41,12 @@ static const std::string CFG_DELIVER_AT = "deliverAt";
4041
static const std::string CFG_DISABLE_REPLICATION = "disableReplication";
4142
static const std::string CFG_ORDERING_KEY = "orderingKey";
4243

44+
static const std::map<pulsar::CompressionType, std::string> COMPRESSION_TYPE_MAP = {
45+
{pulsar::CompressionNone, "None"}, {pulsar::CompressionLZ4, "LZ4"},
46+
{pulsar::CompressionZLib, "Zlib"}, {pulsar::CompressionZSTD, "ZSTD"},
47+
{pulsar::CompressionSNAPPY, "SNAPPY"},
48+
};
49+
4350
Napi::FunctionReference Message::constructor;
4451

4552
Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
@@ -183,12 +190,7 @@ Napi::Value Message::GetEncryptionContext(const Napi::CallbackInfo &info) {
183190
}
184191
const pulsar::EncryptionContext &encCtx = *encCtxPtr;
185192

186-
if (encCtx.keys().empty() && encCtx.param().empty() && encCtx.algorithm().empty()) {
187-
return env.Null();
188-
}
189-
190193
Napi::Object obj = Napi::Object::New(env);
191-
192194
Napi::Array keys = Napi::Array::New(env);
193195
int i = 0;
194196
for (const auto &keyInfo : encCtx.keys()) {
@@ -208,7 +210,9 @@ Napi::Value Message::GetEncryptionContext(const Napi::CallbackInfo &info) {
208210

209211
obj.Set("param", Napi::Buffer<char>::Copy(env, encCtx.param().c_str(), encCtx.param().length()));
210212
obj.Set("algorithm", Napi::String::New(env, encCtx.algorithm()));
211-
obj.Set("compressionType", Napi::Number::New(env, (int)encCtx.compressionType()));
213+
const auto it = COMPRESSION_TYPE_MAP.find(encCtx.compressionType());
214+
std::string compressionTypeStr = (it != COMPRESSION_TYPE_MAP.end()) ? it->second : "None";
215+
obj.Set("compressionType", Napi::String::New(env, compressionTypeStr));
212216
obj.Set("uncompressedMessageSize", Napi::Number::New(env, encCtx.uncompressedMessageSize()));
213217
obj.Set("batchSize", Napi::Number::New(env, encCtx.batchSize()));
214218
obj.Set("isDecryptionFailed", Napi::Boolean::New(env, encCtx.isDecryptionFailed()));

src/ProducerConfig.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@ static const std::map<std::string, pulsar_hashing_scheme> HASHING_SCHEME = {
7070
};
7171

7272
static std::map<std::string, pulsar_compression_type> COMPRESSION_TYPE = {
73-
{"Zlib", pulsar_CompressionZLib},
74-
{"LZ4", pulsar_CompressionLZ4},
75-
{"ZSTD", pulsar_CompressionZSTD},
76-
{"SNAPPY", pulsar_CompressionSNAPPY},
73+
{"None", pulsar_CompressionNone}, {"Zlib", pulsar_CompressionZLib}, {"LZ4", pulsar_CompressionLZ4},
74+
{"ZSTD", pulsar_CompressionZSTD}, {"SNAPPY", pulsar_CompressionSNAPPY},
7775
};
7876

7977
static std::map<std::string, pulsar_producer_crypto_failure_action> PRODUCER_CRYPTO_FAILURE_ACTION = {

tests/encryption.test.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,81 @@ class MyCryptoKeyReader extends Pulsar.CryptoKeyReader {
102102

103103
const msg = await consumer.receive();
104104
expect(msg.getData().toString()).toBe(msgContent);
105+
const encCtx = msg.getEncryptionContext();
106+
expect(encCtx).not.toBeNull();
107+
expect(encCtx.isDecryptionFailed).toBe(false);
108+
expect(encCtx.keys).toBeDefined();
109+
expect(encCtx.keys.length).toBeGreaterThan(0);
110+
expect(encCtx.keys[0].value).toBeInstanceOf(Buffer);
111+
expect(encCtx.param).toBeInstanceOf(Buffer);
112+
expect(encCtx.algorithm).toBe('');
113+
expect(encCtx.compressionType).toBe('None');
114+
expect(encCtx.uncompressedMessageSize).toBe(0);
115+
expect(encCtx.batchSize).toBe(1);
105116

106117
await consumer.acknowledge(msg);
107118
await producer.close();
108119
await consumer.close();
109120
});
110121

122+
test('End-to-End Encryption with Batching and Compression', async () => {
123+
const topic = `persistent://public/default/test-encryption-batch-compress-${Date.now()}`;
124+
125+
const cryptoKeyReader = new MyCryptoKeyReader(
126+
{ 'my-key': publicKeyPath },
127+
{ 'my-key': privateKeyPath },
128+
);
129+
130+
const producer = await client.createProducer({
131+
topic,
132+
encryptionKeys: ['my-key'],
133+
cryptoKeyReader,
134+
cryptoFailureAction: 'FAIL',
135+
batchingEnabled: true,
136+
batchingMaxMessages: 10,
137+
batchingMaxPublishDelayMs: 100,
138+
compressionType: 'Zlib',
139+
});
140+
141+
const consumer = await client.subscribe({
142+
topic,
143+
subscription: 'sub-encryption-batch-compress',
144+
cryptoKeyReader,
145+
cryptoFailureAction: 'CONSUME',
146+
subscriptionInitialPosition: 'Earliest',
147+
});
148+
149+
const numMessages = 10;
150+
const sendPromises = [];
151+
for (let i = 0; i < numMessages; i += 1) {
152+
sendPromises.push(producer.send({
153+
data: Buffer.from(`message-${i}`),
154+
}));
155+
}
156+
await Promise.all(sendPromises);
157+
158+
for (let i = 0; i < numMessages; i += 1) {
159+
const msg = await consumer.receive();
160+
expect(msg.getData().toString()).toBe(`message-${i}`);
161+
const encCtx = msg.getEncryptionContext();
162+
expect(encCtx).not.toBeNull();
163+
expect(encCtx.isDecryptionFailed).toBe(false);
164+
expect(encCtx.keys).toBeDefined();
165+
expect(encCtx.keys.length).toBeGreaterThan(0);
166+
expect(encCtx.keys[0].value).toBeInstanceOf(Buffer);
167+
expect(encCtx.param).toBeInstanceOf(Buffer);
168+
expect(encCtx.algorithm).toBe('');
169+
expect(encCtx.compressionType).toBe('Zlib');
170+
expect(encCtx.uncompressedMessageSize).toBeGreaterThan(0);
171+
expect(encCtx.batchSize).toBe(numMessages);
172+
173+
await consumer.acknowledge(msg);
174+
}
175+
176+
await producer.close();
177+
await consumer.close();
178+
});
179+
111180
test('Decryption Failure', async () => {
112181
const topic = `persistent://public/default/test-decryption-failure-${Date.now()}`;
113182

0 commit comments

Comments
 (0)