Skip to content

Commit 5f92522

Browse files
committed
Update
1 parent 40d464c commit 5f92522

File tree

1 file changed

+79
-100
lines changed
  • docs/server/features/connectors/sinks

1 file changed

+79
-100
lines changed

docs/server/features/connectors/sinks/pulsar.md

Lines changed: 79 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,17 @@ title: 'Pulsar Sink'
66

77
## Overview
88

9-
The Apache Pulsar Sink connector writes events from your service to a specified Pulsar topic. It supports:
10-
- **Partitioning:** Extract partition keys from various sources (e.g., stream ID, headers, record key).
11-
- **Security:** Offers token-based authentication for secure communication.
12-
- **Resilience:** Leverages Apache Pulsar’s built-in resilience for robust message handling.
13-
14-
## Pre-requisites
15-
16-
Before setting up the Pulsar Sink connector, ensure that:
17-
- **Apache Pulsar Cluster:** Your Pulsar cluster is up and running.
18-
- **Network Access:** The service URL is accessible (adjust firewall settings as needed).
19-
- **Security Tokens:** If using authentication, have your JSON web token ready.
20-
- **Basic Knowledge:** Familiarity with JSON and command line operations.
9+
The Apache Pulsar Sink connector writes events from your KurrentDB stream to a specified Pulsar topic.
2110

2211
## Quickstart
2312

24-
Follow these steps to create and start the Pulsar Sink connector.
13+
You can create the Pulsar Sink connector as follows. Replace `id` with a unique connector name or ID:
2514

26-
::: tabs
27-
@tab Powershell
15+
```http
16+
POST /connectors/{{id}}
17+
Host: localhost:2113
18+
Content-Type: application/json
2819
29-
1. Create the JSON Configuration:
30-
31-
```powershell
32-
$JSON = @"
3320
{
3421
"settings": {
3522
"instanceTypeName": "pulsar-sink",
@@ -40,50 +27,8 @@ $JSON = @"
4027
"subscription:filter:expression": "example-stream"
4128
}
4229
}
43-
"@ `
44-
```
45-
46-
2. Send a POST request to create the sink connector:
47-
48-
```powershell
49-
curl.exe -X POST `
50-
-H "Content-Type: application/json" `
51-
-d $JSON `
52-
http://localhost:2113/connectors/pulsar-sink-connector
53-
```
54-
55-
@tab Bash
56-
57-
1. Create the JSON Configuration:
58-
59-
```bash
60-
JSON='{
61-
"settings": {
62-
"instanceTypeName": "pulsar-sink",
63-
"topic": "customers",
64-
"url": "pulsar://localhost:6650",
65-
"subscription:filter:scope": "stream",
66-
"subscription:filter:filterType": "streamId",
67-
"subscription:filter:expression": "example-stream"
68-
}
69-
}'
7030
```
7131

72-
2. Send a POST request to create the sink connector:
73-
74-
```bash
75-
curl -X POST \
76-
-H "Content-Type: application/json" \
77-
-d "$JSON" \
78-
http://localhost:2113/connectors/pulsar-sink-connector
79-
```
80-
81-
:::
82-
83-
::: tip
84-
Replace the URL with your KurrentDB URL. The default value is `http://localhost:2113`.
85-
:::
86-
8732
After running the command, verify the connector status by checking the management API or connector logs. See [Management API Reference](../manage.md).
8833

8934
## Settings
@@ -97,41 +42,79 @@ the [Sink Options](../settings.md#sink-options) page.
9742

9843
### General settings
9944

100-
| Name | Details |
101-
| ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
102-
| `topic` | _Required_<br><br>**Description:** The Pulsar topic where records are published. to. |
103-
| `url` | <br><br>**Description:** The service URL for the Pulsar cluster.<br><br>**Default**: `"pulsar://localhost:6650"` |
104-
| `defaultHeaders` | **Description**: Default headers to include in all outgoing messages along with KurrentDB system headers. Prefix header names with `defaultHeaders:` followed by the header key name.<br><br>**Example**: `"defaultHeaders:AppName": "Kurrent"` <br><br>**Default**: None |
105-
| `authentication:token` | **Description:** A JSON web token for authenticating the connector with Pulsar. |
45+
| Name | Details |
46+
| ---------------------- | ----------------------------------------------------------------------------------------------------------- |
47+
| `topic` | _required_<br><br>**Description:**<br>The Pulsar topic where records are published. to. |
48+
| `url` | **Description:**<br>The service URL for the Pulsar cluster.<br><br>**Default**: `"pulsar://localhost:6650"` |
49+
| `defaultHeaders` | **Description:**<br>Headers included in all produced messages.<br><br>**Default**: Empty |
50+
| `authentication:token` | **Description:**<br>A JSON web token for authenticating the connector with Pulsar. |
10651

10752
### Partitioning
10853

10954
Partitioning options determine how the connector assigns partition keys, which affect message routing and topic compaction.
11055

111-
| Name | Details |
112-
| ----------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
113-
| `partitionKeyExtraction:enabled` | **Description:** Enables partition key extraction.<br><br>**Default**: false |
114-
| `partitionKeyExtraction:source` | **Description:** The source for extracting the partition key.<br><br>**Accepted Values:**`stream`, `streamSuffix`, `headers`<br><br>**Default**: `PartitionKey` |
115-
| `partitionKeyExtraction:expression` | **Description:** A regex (for `stream` source) or a comma-separated list of header keys (for `headers` source) used to extract or combine values for the partition key. When using headers, values are concatenated with a hyphen (for example, `value1-value2`). |
56+
| Name | Details |
57+
| ----------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
58+
| `partitionKeyExtraction:enabled` | **Description:**<br>Enables partition key extraction.<br><br>**Default**: `"false"` |
59+
| `partitionKeyExtraction:source` | **Description:**<br>The source for extracting the partition key.<br><br>**Accepted Values:**`"stream"`, `"streamSuffix"`, `"headers"`<br><br>**Default**: `"partitionKey"` |
60+
| `partitionKeyExtraction:expression` | **Description:**<br>A regex (for `stream` source) or a comma-separated list of header keys (for `headers` source) used to extract or combine values for the partition key. When using headers, values are concatenated with a hyphen (for example, `value1-value2`). |
11661

11762
### Resilience
11863

119-
These settings customize the connector’s behavior in handling message failures and retries provided by Apache Pulsar.
64+
The Pulsar sink connector relies on its own retry mechanism and doesn't include the configuration from [Resilience configuration](../settings.md#resilience-configuration).
65+
66+
| Name | Details |
67+
| -------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
68+
| `resilience:enabled` | **Description:**<br>Enables resilience features for message handling.<br><br>**Default**: `"true"` |
69+
| `resilience:retryInterval` | **Description:**<br>Retry interval in seconds. Must be greater than 0.<br><br>**Format:** seconds or `"HH:MM:SS"`.<br><br> **Default:** `"00:00:03"` |
70+
71+
## Headers
72+
73+
The Kafka sink connector lets you include custom headers in the message headers
74+
it sends to your topic. To add custom headers, use the `defaultHeaders` setting
75+
in your connector configuration. Each custom header should be specified with the
76+
prefix `defaultHeaders:` followed by the header name.
77+
78+
Example:
79+
80+
```http
81+
PUT /connectors/{{id}}
82+
Host: localhost:2113
83+
Content-Type: application/json
84+
85+
{
86+
"defaultHeaders:X-API-Key": "your-api-key-here",
87+
"defaultHeaders:X-Tenant-ID": "production-tenant",
88+
"defaultHeaders:X-Source-System": "KurrentDB"
89+
}
90+
```
91+
92+
These headers will be included in every message sent by the connector, in addition to the [default headers](../features.md#headers) automatically added by the connector's plugin.
12093

121-
| Name | Details |
122-
| -------------------------- | ------------------------------------------------------------------------------------------------------------------------ |
123-
| `resilience:enabled` | **Description:** Enables resilience features for message handling.<br><br>**Default**: `"true"` |
124-
| `resilience:retryInterval` | **Description:** Retry interval in seconds. Must be greater than 0.<br><br>**Format:** seconds or `"HH:MM:SS"`.<br><br> **Default:** `"00:00:03"` |
12594

12695
## Examples
12796

128-
These examples demonstrate how to configure partitioning, security, and other practical scenarios.
97+
### Partitioning
98+
99+
The Pulsar sink connector allows customizing the partition keys that are sent
100+
with the message.
101+
102+
By default, it will use `"partitionKey"` and the message will be distributed
103+
using round-robin partitioning across the available partitions in the topic.
129104

130-
### Partition using Stream ID
105+
**Partition using Stream ID**
106+
107+
You can extract part of the stream name using a regular expression (regex) to
108+
define the partition key. The expression is optional and can be customized based
109+
on your naming convention. In this example, the expression captures the stream
110+
name up to `_data`.
111+
112+
```http
113+
PUT /connectors/{{id}}/settings
114+
Host: localhost:2113
115+
Content-Type: application/json
131116
132-
Extract part of a stream name using a regex. In this example, the regex captures everything up to `_data`.
133117
134-
```json
135118
{
136119
"partitionKeyExtraction:enabled": "true",
137120
"partitionKeyExtraction:source": "stream",
@@ -143,7 +126,12 @@ Alternatively, if you only need the last segment of the stream name (after a
143126
hyphen), you can use the `streamSuffix` source. This
144127
doesn't require an expression since it automatically extracts the suffix.
145128

146-
```json
129+
```http
130+
PUT /connectors/{{id}}/settings
131+
Host: localhost:2113
132+
Content-Type: application/json
133+
134+
147135
{
148136
"partitionKeyExtraction:enabled": "true",
149137
"partitionKeyExtraction:source": "streamSuffix"
@@ -154,31 +142,22 @@ The `streamSuffix` source is useful when stream names follow a structured
154142
format, and you want to use only the trailing part as the partition key. For
155143
example, if the stream is named `user-123`, the partition key would be `123`.
156144

157-
### Partition using header values
145+
**Partition using header values**
146+
147+
You can create partition keys by combining values from a record's metadata.
158148

159-
Combine multiple header values to form the partition key. This example concatenates header values `key1` and `key2` using a hyphen.
149+
```http
150+
PUT /connectors/{{id}}/settings
151+
Host: localhost:2113
152+
Content-Type: application/json
160153
161-
```json
162154
{
163155
"partitionKeyExtraction:enabled": "true",
164156
"partitionKeyExtraction:source": "headers",
165157
"partitionKeyExtraction:expression": "key1,key2"
166158
}
167159
```
168160

169-
The `Headers` source allows you to pull values from the event's metadata. The
170-
`documentId:expression` field lists the header keys (in this case, `key1` and
171-
`key2`), and their values are concatenated to generate the partition key.
172-
173-
::: details Click here to see an example
161+
Specify the header keys you want to use in the `partitionKeyExtraction:expression` field (e.g., `key1,key2`). The connector will concatenate the header values with a hyphen (`-`) to create the partition key.
174162

175-
```json
176-
{
177-
"key1": "value1",
178-
"key2": "value2"
179-
}
180-
181-
// outputs "value1-value2"
182-
```
183-
184-
:::
163+
For example, if your event has headers `key1: regionA` and `key2: zone1`, the partition key will be `regionA-zone1`.

0 commit comments

Comments
 (0)