-
Couldn't load subscription status.
- Fork 87
Description
I am new with Kafka and working on Kafka Mqtt Source Connector. I am following 2 different examples to implement Mqtt Source Connector.
Please check below details.
Ref 1: https://johanvandevenne.github.io/kafka-connect-mqtt/
Kafka Mqtt Source Connector
curl -X POST \
http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "mqtt-source-connector",
"config":
{
"connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
"mqtt.topic":"temperature",
"kafka.topic":"mqtt.",
"mqtt.clientID":"my_client_id",
"mqtt.broker":"tcp://127.0.0.1:1883",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":false
}
}'
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "tcp://127.0.0.1:1883",
"mqtt.topics" : "temperature",
"kafka.topic" : "mqtt.",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'
When I execute above examples and check connector status it show me running like :
curl http://localhost:8083/connectors/mqtt-source/status | python -m json.tool
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 185 100 185 0 0 10277 0 --:--:-- --:--:-- --:--:-- 10277
{
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"name": "mqtt-source",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "source"
}
But when I publish data from Mqtt Topic it is not recieved at the kafka consumer topics.
Consumer:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt. --from-beginning
Mqtt publish topic:
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"
When I publish data from Mqtt, Both Kafka Mqtt connectors give me below error:
{
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"name": "mqtt-source",
"tasks": [
{
"id": 0,
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.convertHeaderFor(WorkerSourceTask.java:296)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n",
"worker_id": "127.0.1.1:8083"
}
],
"type": "source"
}
Is there any issue with connector properties? or something else..?
Thanks in Advance