kafka-connect-dynamodb is a Kafka Connector for loading data to and from Amazon DynamoDB.
It is implemented using the AWS Java SDK for DynamoDB. For authentication, the DefaultAWSCredentialsProviderChain is used.
Run:
$ mvn clean package
Then you will find this connector and required JARs it depends upon in target/kafka-connect-dynamodb-$version-SNAPSHOT-package/share/java/kafka-connect-dynamodb/*.
To create an uber JAR:
$ mvn -P standalone clean package
The uber JAR will be created at target/kafka-connect-dynamodb-$version-SNAPSHOT-standalone.jar.
Ingest the orders topic to a DynamoDB table of the same name in the specified region:
name=dynamodb-sink-test topics=orders connector.class=dynamok.sink.DynamoDbSinkConnector region=us-west-2 ignore.record.key=true
Refer to DynamoDB Data Types.
At the top-level, we need to either be converting to the DynamoDB Map data type,
or the top.key.attribute or top.value.attribute configuration options for the Kafka record key or value as applicable should be configured,
so we can ensure being able to hoist the converted value as a DynamoDB record.
| Connect Schema Type | DynamoDB |
INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, Decimal |
Number |
BOOL |
Boolean |
BYTES |
Binary |
STRING |
String |
ARRAY |
List |
MAP [1], STRUCT |
Map |
| [1] | Map keys must be primitive types, and cannot be optional. |
null values for optional schemas are translated to the Null type.
| Java | DynamoDB |
null |
Null |
Number [2] |
Number |
Boolean |
Boolean |
byte[], ByteBuffer |
Binary |
String |
String |
List |
List |
Empty Set [3] |
Null |
Set<String> |
String Set |
Set<Number> |
Number Set |
Set<byte[]>, Set<ByteBuffer> |
Binary Set |
Map [4] |
Map |
Any other datatype will result in the connector to fail.
| [2] | i.e. Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal |
| [3] | It is not possible to determine the element type of an empty set. |
| [4] | Map keys must be primitive types, and cannot be optional. |
regionAWS region for DynamoDB.
- Type: string
- Default: ""
- Importance: high
access.key.idExplicit AWS access key ID. Leave empty to utilize the default credential provider chain.
- Type: password
- Default: [hidden]
- Importance: low
secret.keyExplicit AWS secret access key. Leave empty to utilize the default credential provider chain.
- Type: password
- Default: [hidden]
- Importance: low
batch.sizeBatch size between 1 (dedicated
PutItemRequestfor each record) and 25 (which is the maximum number of items in aBatchWriteItemRequest)- Type: int
- Default: 1
- Importance: high
kafka.attributesTrio of
topic,partition,offsetattribute names to include in records, set to empty to omit these attributes.- Type: list
- Default: [kafka_topic, kafka_partition, kafka_offset]
- Importance: high
table.formatFormat string for destination DynamoDB table name, use
${topic}as placeholder for source topic.- Type: string
- Default: "${topic}"
- Importance: high
ignore.record.keyWhether to ignore Kafka record keys in preparing the DynamoDB record.
- Type: boolean
- Default: false
- Importance: medium
ignore.record.valueWhether to ignore Kafka record value in preparing the DynamoDB record.
- Type: boolean
- Default: false
- Importance: medium
top.key.attributeDynamoDB attribute name to use for the record key. Leave empty if no top-level envelope attribute is desired.
- Type: string
- Default: ""
- Importance: medium
top.value.attributeDynamoDB attribute name to use for the record value. Leave empty if no top-level envelope attribute is desired.
- Type: string
- Default: ""
- Importance: medium
max.retriesThe maximum number of times to retry on errors before failing the task.
- Type: int
- Default: 10
- Importance: medium
retry.backoff.msThe time in milliseconds to wait following an error before a retry attempt is made.
- Type: int
- Default: 3000
- Importance: medium
Ingest all DynamoDB tables in the specified region, to Kafka topics with the same name as the source table:
name=dynamodb-source-test connector.class=dynamok.source.DynamoDbSourceConnector region=us-west-2
TODO describe conversion scheme
DynamoDB records containing heterogeneous lists (L) or maps (M) are not currently supported, these fields will be silently dropped.
It will be possible to add support for them with the implementation of KAFKA-3910.
regionAWS region for DynamoDB.
- Type: string
- Default: ""
- Importance: high
access.key.idExplicit AWS access key ID. Leave empty to utilize the default credential provider chain.
- Type: password
- Default: [hidden]
- Importance: low
secret.keyExplicit AWS secret access key. Leave empty to utilize the default credential provider chain.
- Type: password
- Default: [hidden]
- Importance: low
topic.formatFormat string for destination Kafka topic, use
${table}as placeholder for source table name.- Type: string
- Default: "${table}"
- Importance: high
tables.prefixPrefix for DynamoDB tables to source from.
- Type: string
- Default: ""
- Importance: medium
tables.whitelistWhitelist for DynamoDB tables to source from.
- Type: list
- Default: ""
- Importance: medium
tables.blacklistBlacklist for DynamoDB tables to source from.
- Type: list
- Default: ""
- Importance: medium