Home > Blockchain >  Landoop. Kafka connect how to change worker properties
Landoop. Kafka connect how to change worker properties

Time:07-20

landoop/fast-data-dev:2.6

I want to change default batch.size using 'producer.override.batch.size=65536' when creating new connector. But in order to do that, it's required to apply override policy on the worker side

connector.client.config.override.policy=All

Otherwise there is exception

"producer.override.batch.size" : The 'None' policy does not allow 'batch.size' to be overridden in the connector configuration.

It's not clear, how exactly:

  1. to change the default worker properties
  2. where it expects them to be placed
  3. which name they should have

So that landoop sees them

I start the landoop using the following docker-compose.

version: '2'

services:

  kafka-cluster:
    image: landoop/fast-data-dev:2.6
    environment:
      ADV_HOST: 127.0.0.1         
      RUNTESTS: 0                 
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker
    volumes:
      - ./connectors/news/crypto-panic-connector-1.0.jar:/connectors/crypto-panic-connector-1.0.jar
  

distributed.properties at folder /connect/connect-avro-distributed.properties generated by Landoop

offset.storage.partitions=5
key.converter.schema.registry.url=http://127.0.0.1:8081
value.converter.schema.registry.url=http://127.0.0.1:8081
config.storage.replication.factor=1
offset.storage.topic=connect-offsets
status.storage.partitions=3
offset.storage.replication.factor=1
key.converter=io.confluent.connect.avro.AvroConverter
config.storage.topic=connect-configs
config.storage.partitions=1
group.id=connect-fast-data
rest.advertised.host.name=127.0.0.1
port=8083
value.converter=io.confluent.connect.avro.AvroConverter
rest.port=8083
status.storage.replication.factor=1
status.storage.topic=connect-statuses
access.control.allow.origin=*
access.control.allow.methods=GET,POST,PUT,DELETE,OPTIONS
jmx.port=9584
plugin.path=/var/run/connect/connectors/stream-reactor,/var/run/connect/connectors/third-party,/connectors
bootstrap.servers=PLAINTEXT://127.0.0.1:9092

crypto-panic-connector-1.0 connector directories structure:

/config:
  > worker.properties
/src:
  > ...     

UPDATE

Adding to environment properties:

CONNECT_CONNECT_CLIENT_CONFIG_OVERRIDE_POLICY: 'All' 
CONNECT_PRODUCER_OVERRIDE_BATCH_SIZE: 65536

Doesn't work for landoop/fast-data-dev:2.6
In logs it's still

'connector.client.config.override.policy = None'

And warning

WARN The configuration 'connect.client.config.override.policy' was supplied but isn't a known config.

Changing this to

CONNECTOR_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: 'All'
CONNECT_PRODUCER_OVERRIDE_BATCH_SIZE: 65536

Removes warning, but at the end the override policy is still 'None' and it's not possible to override properties for client when creating connector.

Changing to

CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: 'All'
CONNECT_PRODUCER_OVERRIDE_BATCH_SIZE: 65536

has same effect, policy 'None'.

Also batch size overriding is not aplied. So I assume those overriding features are not supported in Landoop.

WARN The configuration 'producer.override.batch.size' was supplied but isn't a known config. 

I assume 'confluentinc/cp-kafka-connect' doesn't have UI built-in, and for learning purposes seems better to have it. So it's more preferable to do it in Landoop. But thanks for recommendation to use 'confluentinc/cp-kafka-connect'. I will try to do this config overriding there also

CodePudding user response:

For starters, that image is very old and no longer maintained. I'd recommend you use confluentinc/cp-kafka-connect

In any case, for both images, you can use

environment:
    CONNECT_CONNECT_CLIENT_CONFIG_OVERRIDE_POLICY: 'All' 
    CONNECT_PRODUCER_OVERRIDE_BATCH_SIZE: 65536

It's not clear, how exactly ... change the default worker properties

Look at the source code

  • Related