I tried to deploy ElasticsearchSinkConnector by
POST /connectors
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS"
}
}
It successfully deployed, however, when I check status by
GET /connectors/elasticsearch-sink/status
tasks
is empty array []
:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [],
"type": "sink"
}
I found this Kafka Connect: No tasks created for a connector
However, I tried those two answers inside, both changing name, and deleting ElasticsearchSinkConnector then redeploy multiple times didn't work for me.
Also, there is no logs in the Kafka Connect pod.
Any idea? Thanks!
CodePudding user response:
After adding these two to the ElasticsearchSinkConnector config
"errors.log.include.messages": "true",
"errors.log.enable": "true"
in the config so like
POST /connectors
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
And this time when I check status by
GET /connectors/elasticsearch-sink/status
tasks
shows error now:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.xxx.xxx.xxx:8083",
"trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
}
],
"type": "sink"
}
Now just need fix the permission issue.
Also, I found a good article about how to debug: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/