I am trying to run the new apache pulsar Lakehouse Sink Connector
and I am getting java.lang.IllegalArgumentException
Below is my setup. docker-compose.yaml file:
version: '3.7'
volumes:
mssql-data:
minio-data:
networks:
oentity:
driver: bridge
services:
pulsar:
image: apachepulsar/pulsar:latest
command: bin/pulsar standalone
hostname: pulsar
ports:
- "8080:8080"
- "6650:6650"
restart: unless-stopped
networks:
oentity:
volumes:
- "./data/:/pulsar/data"
- "./connectors/:/pulsar/connectors"
dashboard:
image: apachepulsar/pulsar-manager:latest
ports:
- "9528:9527"
- "7750:7750"
networks:
oentity:
depends_on:
- pulsar
links:
- pulsar
environment:
SPRING_CONFIGURATION_FILE: /pulsar-manager/pulsar-manager/application.properties
minio:
image: 'minio/minio:latest'
hostname: minio
container_name: minio
ports:
- '9000:9000'
- '9001:9001'
volumes:
- minio-data:/data
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
command: server --console-address ":9001" /data
networks:
oentity:
I download the connector for here and copied the NAR package to the Pulsar connectors directory
$PULSAR_HOME/connectors
in the container.I logged in to miniO from http://localhost:9001/login and created a bucket call lakehouse.
I used the cconfiguration similar to one described here and ureplacing the
tablePath
value with my miniO path. I named the filesink-connector-config.json
.
{
"tenant":"public",
"namespace":"default",
"name":"delta_sink",
"parallelism":1,
"inputs": [
"test-delta-pulsar"
],
"archive": "connectors/pulsar-io-lakehouse-2.9.3.7-cloud.nar",
"processingGuarantees":"EFFECTIVELY_ONCE",
"configs":{
"type":"delta",
"maxCommitInterval":120,
"maxRecordsPerCommit":10000000,
"tablePath": "s3a://lakehouse/delta_sink",
"hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
}
}
- I ran the lakehouse sink connector from the container.
docker exec -it <container name> bash
then I executed
PULSAR_HOME/bin/pulsar-admin sink localrun \
--sink-config-file sink-connector-config.json
And I got the error below;
2022-09-06T16:53:08,396 0000 [main] INFO org.apache.pulsar.functions.utils.io.ConnectorUtils - Found connector ConnectorDefinition(name=lakehouse, description=Lakehouse connectors, sourceClass=org.apache.pulsar.ecosystem.io.lakehouse.SourceConnector, sinkClass=org.apache.pulsar.ecosystem.io.lakehouse.SinkConnector, sourceConfigClass=org.apache.pulsar.ecosystem.io.lakehouse.SourceConnectorConfig, sinkConfigClass=org.apache.pulsar.ecosystem.io.lakehouse.SinkConnectorConfig) from /pulsar/connectors/pulsar-io-lakehouse-2.9.3.7-cloud.nar
2022-09-06T16:53:44,562 0000 [main] ERROR org.apache.pulsar.functions.LocalRunner - Encountered error starting localrunner
java.lang.IllegalArgumentException: Could not validate sink config: Cannot construct instance of `org.apache.pulsar.ecosystem.io.lakehouse.SinkConnectorConfig` (no Creators, like default constructor, exist): abstract types either need to be mapped
to concrete types, have custom deserializer, or contain additional type information
at [Source: UNKNOWN; byte offset: #UNKNOWN]
at org.apache.pulsar.functions.utils.SinkConfigUtils.validateSinkConfig(SinkConfigUtils.java:594) ~[org.apache.pulsar-pulsar-functions-utils-2.9.3.jar:2.9.3]
at org.apache.pulsar.functions.utils.SinkConfigUtils.validateAndExtractDetails(SinkConfigUtils.java:441) ~[org.apache.pulsar-pulsar-functions-utils-2.9.3.jar:2.9.3]
at org.apache.pulsar.functions.LocalRunner.start(LocalRunner.java:439) ~[org.apache.pulsar-pulsar-functions-local-runner-original-2.9.3.jar:2.9.3]
at org.apache.pulsar.functions.LocalRunner.main(LocalRunner.java:198) [org.apache.pulsar-pulsar-functions-local-runner-original-2.9.3.jar:2.9.3]
root@pulsar:/pulsar#
CodePudding user response:
Since you are running this as a bild-in connector, have you confirmed that the connector is available, e.g. $ pulsar-admin sinks available-sinks
Have you tried running it as non-built-in connector, e.g.
PULSAR_HOME/bin/pulsar-admin sinks create \
--sink-config-file <lakehouse-sink-config.yaml>
CodePudding user response:
Take a look at my example
https://github.com/tspannhw/FLiP-Pi-DeltaLake-Thermal
You may need the file type
configs: type: "delta" maxCommitInterval: 120 maxRecordsPerCommit: 10_000_000 tablePath: "file:///opt/demo/lakehouse" processingGuarantees: "EXACTLY_ONCE" deltaFileType: "parquet" subscriptionType: "Failover"
Also don't do a localrun, do a create
bin/pulsar-admin sinks create --archive ./connectors/pulsar-io-lakehouse-2.9.2.24.nar --tenant public --namespace default --name delta_sink --sink-config-file conf/deltalakesink.yml --inputs "persistent://public/default/pi-sensors" --parallelism 1 --subs-name pisensorwatch --processing-guarantees EFFECTIVELY_ONCE
Also try the YML configuration not JSON