I am trying to consume messages from MQTT to Kafka using lenses.io reactor. Latest version of the Stream Reactor
Kafka/Confluent version
sh-4.4$ kafka-topics --version
7.1.0-ccs (Commit:c86722379ab997cc)
kafka-connect-mqtt-3.0.1-2.5.0-all.jar
Expected behaviour: The avro topic should have been printed on console
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Additional details
kafka-connect:
image: kafka-connect
build:
context: .
hostname: kafka-connect
container_name: kafka-connect
depends_on:
- zookeeper-1
- kafka-broker-1
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: SSL://kafka-broker-1:19093
CONNECT_GROUP_ID: 'kafka-connect'
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_TOPIC: 'connect-config-storage'
CONNECT_OFFSET_STORAGE_TOPIC: 'connect-offset-storage'
CONNECT_STATUS_STORAGE_TOPIC: 'connect-status-storage'
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_PLUGIN_PATH: /etc/kafka/secrets/plugins
CONNECT_SECURITY_PROTOCOL: 'SSL'
CONNECT_SSL_KEY_PASSWORD: confluent
CONNECT_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_KAFKASTORE_SECURITY_PROTOCOL: 'SSL'
CONNECT_KAFKASTORE_SSL_KEY_PASSWORD: confluent
CONNECT_KAFKASTORE_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_KAFKASTORE_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_PRODUCER_SECURITY_PROTOCOL: 'SSL'
CONNECT_PRODUCER_SSL_KEY_PASSWORD: confluent
CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: confluent
CONNECT_CONSUMER_SECURITY_PROTOCOL: 'SSL'
CONNECT_CONSUMER_SSL_KEY_PASSWORD: confluent
CONNECT_CONSUMER_SSL_KEYSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.keystore.jks'
CONNECT_CONSUMER_SSL_KEYSTORE_PASSWORD: confluent
CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: '/etc/kafka/secrets/kafka.consumer.truststore.jks'
CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: confluent
volumes:
- ${KAFKA_SSL_SECRETS_DIR}/connects:/etc/kafka/secrets
networks:
- kafka-cluster-network
connector properties configuration (my-connector.properties)
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"name": "mqtt-source",
"connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector",
"tasks.max": "1",
"topics": "mqtt",
"connect.mqtt.connection.clean": "true",
"connect.mqtt.connection.timeout": "1000",
"connect.mqtt.kcql": "INSERT INTO mqtt SELECT * FROM /ais",
"connect.mqtt.connection.keep.alive": "1000",
"connect.mqtt.source.converters": "/ais=com.datamountaineer.streamreactor.connect.converters.source.AvroConverter",
"connect.source.converter.avro.schemas": "/ais=/etc/kafka/secrets/plugins/classAPositionReportSchema.json",
"connect.mqtt.client.id": "dm_source_id",
"connect.mqtt.converter.throw.on.error": "true",
"connect.mqtt.hosts": "tcp://mqtt:1883",
"connect.mqtt.service.quality": "1"
}' http://localhost:8083/connectors/mqtt-source/config | jq .
avro json file
{
"type": "record",
"name": "aisClassAPositionReport",
"namespace": "com.landoop.ais",
"doc": "Schema for AIS Class A Position Reports.",
"fields": [
{
"name": "Type",
"type": "int",
"doc": "The type of the AIS Message. 1/2/3 are Class A position reports."
},
{
"name": "Repeat",
"type":"int",
"doc": "Repeat Indicator"
},
{
"name": "MMSI",
"type": "long",
"doc": "User ID (MMSI)"
},
{
"name": "Speed",
"type": "float",
"doc": "Speed over Ground (SOG)"
},
{
"name": "Accuracy",
"type": "boolean",
"doc": "Position Accuracy"
},
{
"name": "Longitude",
"type": "double",
"doc": "Longitude"
},
{
"name": "Latitude",
"type": "double",
"doc": "Latitude"
},
{
"name": "Course",
"type": "float",
"doc": "Course over Ground (COG)"
},
{
"name": "Heading",
"type": "int",
"doc": "True Heading (HDG)"
},
{
"name": "Second",
"type": "int",
"doc": "Time Stamp"
},
{
"name": "RAIM",
"type": "boolean",
"doc": "RAIM flag"
},
{
"name": "Radio",
"type": "long",
"doc": "Radio Status"
},
{
"name": "Status",
"type": "int",
"doc": "Navigation Status (enumerated type)"
},
{
"name": "Turn",
"type": "float",
"doc": "Rate of Turn (ROT)"
},
{
"name": "Maneuver",
"type": "int",
"doc": "Manuever Indicator (enumerated type)"
},
{
"name": "Timestamp",
"type": "long",
"doc": "Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering."
}
]
}
MQTT broker message
mosquitto_pub \
-m "{\"Type\": 384558914, \"Repeat\": 1429873353, \"MMSI\": 1421443607430111832, \"Speed\": 0.32155126, \"Accuracy\": true, \"Longitude\": 0.3627212439937161, \"Latitude\": 0.2725890739370421, \"Course\": 0.99500954, \"Heading\": -2064209033, \"Second\": -1096102271, \"RAIM\": true, \"Radio\": -189624595456590919, \"Status\": -139830130, \"Turn\": 0.035991907, \"Maneuver\": 1595359693, \"Timestamp\": -932628952948741103}" \
-d -r -t /ais
full log
sh-4.4$ kafka-avro-console-consumer --bootstrap-server kafka-broker-1:19093 --topic mqtt --from-beginning --max-messages 10 --consumer.config /etc/kafka/secrets/host.consumer.ssl.config --property schema.registry.url=http://0.0.0.0:8081
[2022-04-13 05:49:22,333] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2022-04-13 05:49:23,082] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka-broker-1:19093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = console-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-27706
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm =
ssl.engine.factory.class = null
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /etc/kafka/secrets/kafka.consumer.keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /etc/kafka/secrets/kafka.consumer.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2022-04-13 05:49:23,269] INFO Kafka version: 7.1.0-ce (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka commitId: 5c05312ab63acecf (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,269] INFO Kafka startTimeMs: 1649828963261 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-13 05:49:23,274] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Subscribed to topic(s): mqtt (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:24,085] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting the last seen epoch of partition mqtt-0 to 0 since the associated topicId changed from null to eLc9qW-WTemQ53DDH9JgzA (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,093] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Cluster ID: 7mB45_SgTXSxROWQruwrRQ (org.apache.kafka.clients.Metadata)
[2022-04-13 05:49:24,095] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Discovered group coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,099] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,167] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: need to re-join with the given member-id (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:24,168] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,174] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully joined group with generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,179] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Finished assignment for group at generation 1: {console-consumer-35015e12-2725-473a-b7b1-70cce478ed76=Assignment(partitions=[mqtt-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,202] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Successfully synced group in generation Generation{generationId=1, memberId='console-consumer-35015e12-2725-473a-b7b1-70cce478ed76', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,203] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Notifying assignor about the new Assignment(partitions=[mqtt-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,210] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Adding newly assigned partitions: mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,234] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Found no committed offset for partition mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,370] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Seeking to offset 1 for partition mqtt-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Revoke previously assigned partitions mqtt-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,371] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Member console-consumer-35015e12-2725-473a-b7b1-70cce478ed76 sending LeaveGroup request to coordinator kafka-broker-1:19093 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,374] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,375] INFO [Consumer clientId=console-consumer, groupId=console-consumer-27706] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-04-13 05:49:27,383] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,383] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-04-13 05:49:27,396] INFO App info kafka.consumer for console-consumer unregistered (org.apache.kafka.common.utils.AppInfoParser)
Processed a total of 1 messages
[2022-04-13 05:49:27,400] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:322)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:112)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:87)
at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserialize(AvroMessageFormatter.java:133)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:92)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:181)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Update:
Also registered schema on schema-registry
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1 json" --data '{ "schema": "{\"type\": \"record\", \"name\": \"aisClassAPositionReport\", \"namespace\": \"com.landoop.ais\", \"doc\": \"Schema for AIS Class A Position Reports.\", \"fields\": [{\"name\": \"Type\",\"type\": \"int\",\"doc\": \"The type of the AIS Message. 1/2/3 are Class A position reports.\"},{\"name\": \"Repeat\",\"type\":\"int\",\"doc\": \"Repeat Indicator\"},{\"name\": \"MMSI\",\"type\": \"long\",\"doc\": \"User ID (MMSI)\"},{\"name\": \"Speed\",\"type\": \"float\",\"doc\": \"Speed over Ground (SOG)\"},{\"name\": \"Accuracy\",\"type\": \"boolean\",\"doc\": \"Position Accuracy\"},{\"name\": \"Longitude\",\"type\": \"double\",\"doc\": \"Longitude\"},{\"name\": \"Latitude\",\"type\": \"double\",\"doc\": \"Latitude\"},{\"name\": \"Course\",\"type\": \"float\",\"doc\": \"Course over Ground (COG)\"},{\"name\": \"Heading\",\"type\": \"int\",\"doc\": \"True Heading (HDG)\"},{\"name\": \"Second\",\"type\": \"int\",\"doc\": \"Time Stamp\"},{\"name\": \"RAIM\",\"type\": \"boolean\",\"doc\": \"RAIM flag\"},{\"name\": \"Radio\",\"type\": \"long\",\"doc\": \"Radio Status\"},{\"name\": \"Status\",\"type\": \"int\",\"doc\": \"Navigation Status (enumerated type)\"},{\"name\": \"Turn\",\"type\": \"float\",\"doc\": \"Rate of Turn (ROT)\"},{\"name\": \"Maneuver\",\"type\": \"int\",\"doc\": \"Manuever Indicator (enumerated type)\"},{\"name\": \"Timestamp\",\"type\": \"long\",\"doc\": \"Time the message was encoded to avro (nanoseconds since epoch). May be used for ordering.\"}]}"}' http://0.0.0.0:8081/subjects/mqtt-value/versions
what could I be doing wrong?
CodePudding user response:
According to CONNECT_VALUE_CONVERTER
, you're producing JSON, not Avro, so you cannot use kafka-avro-console-consumer
to read JSON data and your Avro schema is not being used
I'm not sure how connect.mqtt.source.converters
works, but you've not configured it to use any Registry, which is a requirement for kafka-avro-console-consumer
to work