Skip to content

Commit df19784

Browse files
authored
add schema producer and consumer example (#402)
1 parent 607cba0 commit df19784

2 files changed

Lines changed: 142 additions & 0 deletions

File tree

examples/consumer-schema.js

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
const Pulsar = require('../');
21+
22+
(async () => {
23+
// Create a client
24+
const client = new Pulsar.Client({
25+
serviceUrl: 'pulsar://localhost:6650',
26+
operationTimeoutSeconds: 30,
27+
});
28+
29+
const schemaInfo = {
30+
schemaType: "Json",
31+
schema: JSON.stringify({
32+
type: 'record',
33+
name: 'Example',
34+
namespace: 'test',
35+
fields: [
36+
{
37+
name: 'a',
38+
type: 'int',
39+
},
40+
{
41+
name: 'b',
42+
type: 'int',
43+
},
44+
],
45+
}),
46+
};
47+
48+
// Create a consumer
49+
const consumer = await client.subscribe({
50+
topic: 'persistent://public/default/schema-test',
51+
subscription: 'sub1',
52+
subscriptionType: 'Shared',
53+
ackTimeoutMs: 10000,
54+
schema: schemaInfo,
55+
});
56+
57+
// Receive messages
58+
for (let i = 0; i < 10; i += 1) {
59+
const msg = await consumer.receive();
60+
console.log(msg.getData().toString());
61+
consumer.acknowledge(msg);
62+
}
63+
64+
await consumer.close();
65+
await client.close();
66+
})();

examples/producer-schema.js

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
const Pulsar = require('../');
21+
22+
(async () => {
23+
24+
// Create a client
25+
const client = new Pulsar.Client({
26+
serviceUrl: 'pulsar://localhost:6650',
27+
operationTimeoutSeconds: 30,
28+
});
29+
30+
const schemaInfo = {
31+
schemaType: "Json",
32+
schema: JSON.stringify({
33+
type: 'record',
34+
name: 'Example',
35+
namespace: 'test',
36+
fields: [
37+
{
38+
name: 'a',
39+
type: 'int',
40+
},
41+
{
42+
name: 'b',
43+
type: 'int',
44+
},
45+
],
46+
}),
47+
};
48+
49+
const message = {
50+
a: 1,
51+
b: 2,
52+
};
53+
54+
// Create a producer
55+
const producer = await client.createProducer({
56+
topic: 'persistent://public/default/schema-test',
57+
sendTimeoutMs: 30000,
58+
batchingEnabled: true,
59+
schema: schemaInfo,
60+
});
61+
62+
// Send messages
63+
for (let i = 0; i < 100; i += 1) {
64+
message.a = i;
65+
message.b = i * 2;
66+
const data = JSON.stringify(message);
67+
producer.send({
68+
data: Buffer.from(data),
69+
});
70+
console.log(`Sent message: ${data}`);
71+
}
72+
await producer.flush();
73+
74+
await producer.close();
75+
await client.close();;
76+
})();

0 commit comments

Comments
 (0)