Home > Enterprise >  Tasks are empty after deploying ElasticsearchSinkConnector
Tasks are empty after deploying ElasticsearchSinkConnector

Time:03-22

I tried to deploy ElasticsearchSinkConnector by

POST /connectors

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "tasks.max": "1",
        "topics": "my_db_server.public.my_table",
        "connection.url": "https://my-elasticsearch.com:9200",
        "connection.username": "xxx",
        "connection.password": "xxx",
        "key.ignore": "true",
        "schema.ignore": "true",
        "elastic.security.protocol": "SSL",
        "elastic.https.ssl.keystore.location": "path/to/keystore.jks",
        "elastic.https.ssl.keystore.password": "xxx",
        "elastic.https.ssl.key.password": "xxx",
        "elastic.https.ssl.keystore.type": "JKS",
        "elastic.https.ssl.truststore.location": "path/to/truststore.jks",
        "elastic.https.ssl.truststore.password": "xxx",
        "elastic.https.ssl.truststore.type": "JKS",
        "elastic.https.ssl.protocol": "TLS"
    }
}

It successfully deployed, however, when I check status by

GET /connectors/elasticsearch-sink/status

tasks is empty array []:

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [],
    "type": "sink"
}

I found this Kafka Connect: No tasks created for a connector

However, I tried those two answers inside, both changing name, and deleting ElasticsearchSinkConnector then redeploy multiple times didn't work for me.

Also, there is no logs in the Kafka Connect pod.

Any idea? Thanks!

CodePudding user response:

After adding these two to the ElasticsearchSinkConnector config

        "errors.log.include.messages": "true",
        "errors.log.enable": "true"

in the config so like

POST /connectors

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "_doc",
        "tasks.max": "1",
        "topics": "my_db_server.public.my_table",
        "connection.url": "https://my-elasticsearch.com:9200",
        "connection.username": "xxx",
        "connection.password": "xxx",
        "key.ignore": "true",
        "schema.ignore": "true",
        "elastic.security.protocol": "SSL",
        "elastic.https.ssl.keystore.location": "path/to/keystore.jks",
        "elastic.https.ssl.keystore.password": "xxx",
        "elastic.https.ssl.key.password": "xxx",
        "elastic.https.ssl.keystore.type": "JKS",
        "elastic.https.ssl.truststore.location": "path/to/truststore.jks",
        "elastic.https.ssl.truststore.password": "xxx",
        "elastic.https.ssl.truststore.type": "JKS",
        "elastic.https.ssl.protocol": "TLS",
        "errors.log.include.messages": "true",
        "errors.log.enable": "true"
    }
}

And this time when I check status by

GET /connectors/elasticsearch-sink/status

tasks shows error now:

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "10.xxx.xxx.xxx:8083",
            "trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
        }
    ],
    "type": "sink"
}

Now just need fix the permission issue.

Also, I found a good article about how to debug: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

  • Related