Home > Mobile >  Kafka JDBC sink connector does not access the correct connector configurations
Kafka JDBC sink connector does not access the correct connector configurations

Time:06-03

I have a postgres running on a pi within a docker container. Debezium connector is running on my local machine (same as zookeeper and kafka).

The kafka topic is up and running and I can see the changes which I make in the postgres going into the kafka topic. So far so good.

Now I started another docker container locally which is not from the same docker compose file as my other containers. THIS is supposed to be my REPLICA DATABASE.

I copied the confluentinc-kafka-connect-jdbc-10.5.0 into the docker container.

sudo docker cp confluentinc-kafka-connect-jdbc-10.5.0 CONTAINER_ID:/kafka/connect/ 

Changed the user and usergroup and the restated the container.

docker exec -it --user root <container-id> /bin/bash
chown -R <username>:<groupname> <folder/file>

Now I created the jdbc-sink connector.

curl --location --request POST 'http://localhost:8083/connectors/' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "piserver.public.customers",
        "connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres",
        "connection.user": "postgres",
        "connection.password": "postgres",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",   
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"                                               
    }
}
'

I get back 201 created.

The error I get is after a few seconds of running:

curl --location --request GET 'localhost:8083/connectors/jdbc-sink/status' \
--data-raw ''

ERROR trace

{
            "id": 0,
            "state": "FAILED",
            "worker_id": "192.168.112.4:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:59)\n\tat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\t... 10 more\nCaused by: org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)\n\tat org.postgresql.Driver.makeConnection(Driver.java:400)\n\tat org.postgresql.Driver.connect(Driver.java:259)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677)\n\tat java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)\n\tat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:250)\n\tat io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getConnection(PostgreSqlDatabaseDialect.java:103)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)\n\tat io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)\n\t... 13 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:241)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:109)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:235)\n\t... 23 more\n"
        }

Short: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.

The hosts I tried in my config:

"connection.url": "jdbc:postgresql:192.168.128.2:5432/postgres" // got this IP from docker inspect POSTGRES_CONTAINER
"connection.url": "jdbc:postgresql:host.docker.internal:5432/postgres" 
"connection.url": "jdbc:postgresql:localhost:5432/postgres" 

None of these worked

Always got the same error with cannot access localhost:5432.

Also tried connecting the docker postgres container(replica) to my docker-compose network.

Any thoughts on this thanks.

Small resume.

POSTGRES(on PI)->DEBEZIUM Connector(locally)-->KAFKA-> JDBC-SINK from within KAFKA -> POSTGRES( will be replica, runs locally)

CodePudding user response:

Dont use IP addresses between containers, and don't use localhost within containers to try to reach other containers - https://docs.docker.com/network/bridge/

Ideally, you'd use Docker Compose to start all services, otherwise you need to create the network bridge yourself

docker network create database-bridge
docker run --network=database-bridge --name=postgres ...  
docker run --network=database-bridge  ... # repeat for zookeeper, kafka, and debezium

Or look at the networks that compose created, and attach the new container to that, since you say

started another docker container locally which is not from the same docker compose file

docker network ls  # look for a name that matches the folder where you ran docker-compose
docker run --network=<name> ... jdbc-connector

Then use jdbc:postgresql://postgres:5432/postgres to connect to that container by its hostname.

If the JDBC Connector is running with connect-distributed.sh and not Docker, only then can you use localhost:5432, but you need a port mapping from the Postgres container to the host.

  • Related