I have some CDC data in Kafka. Now I am trying to sink from Kafka to Elasticsearch. Here is what I have done so far:
Step 1 - Deploy Elasticsearch in Kubernetes (succeed)
I deployed Elasticsearch in Kubernetes by following this tutorial using Elastic Operator:
- Deploy ECK in your Kubernetes cluster: https://www.elastic.co/guide/en/cloud-on-k8s/current/k8s-deploy-eck.html
- Deploy an Elasticsearch cluster: https://www.elastic.co/guide/en/cloud-on-k8s/current/k8s-deploy-elasticsearch.html
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: hm-elasticsearch
namespace: elastic
spec:
version: 7.14.0
nodeSets:
- name: default
count: 1
config:
node.store.allow_mmap: false
Based on the tutorial, I can succeed calling by providing username elastic
and password passw0rd
in the header by
curl -u "elastic:passw0rd" -k "https://hm-elasticsearch-es-http.elastic:9200"
which returns
{
"name": "hm-elasticsearch-es-default-0",
"cluster_name": "hm-elasticsearch",
"cluster_uuid": "TWgIk0YGR_GVr7IJZcW62g",
"version": {
"number": "7.14.0",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date": "2021-07-29T20:49:32.864135063Z",
"build_snapshot": false,
"lucene_version": "8.9.0",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1"
},
"tagline": "You Know, for Search"
}
Step 2 - Add ElasticsearchSinkConnector (failed to connect Elasticsearch)
Now I am trying to adding ElasticsearchSinkConnector, however, I am having issue to set it up.
I saw How to Kafka Connect Elasticsearch with SSL?. Elastic Cloud just needs just to pass the username and password. So I thought it would be similar to mine.
Also, based on this ElasticsearchSinkConnector config, I wrote my config, and then try to verify by
curl --location --request PUT 'http://hm-connect-cluster-connect-api.kafka:8083/connector-plugins/io.confluent.connect.elasticsearch.ElasticsearchSinkConnector/config/validate' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "elasticsearch-sink",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "opa_db_server.public.roles",
"connection.url": "https://hm-elasticsearch-es-http.elastic:9200",
"connection.username": "elastic",
"connection.password": "passw0rd",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "role_id",
"key.ignore": "false",
"behavior.on.null.values": "delete"
}'
It returns the error
{
"name": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"error_count": 3,
"groups": [
"Common",
"Transforms",
"Predicates",
"Error Handling",
"Transforms: unwrap",
"Transforms: key",
"Connector",
"Data Conversion",
"Proxy",
"Security",
"Kerberos",
"Data Stream"
],
"configs": [
// ...
{
"definition": {
"name": "connection.url",
"type": "LIST",
"required": true,
"default_value": null,
"importance": "HIGH",
"documentation": "The comma-separated list of one or more Elasticsearch URLs, such as ``http://eshost1:9200,http://eshost2:9200`` or ``https://eshost3:9200``. HTTPS is used for all connections if any of the URLs starts with ``https:``. A URL without a protocol is treated as ``http``.",
"group": "Connector",
"width": "LONG",
"display_name": "Connection URLs",
"dependents": [],
"order": 1
},
"value": {
"name": "connection.url",
"value": "https://hm-elasticsearch-es-http.elastic:9200",
"recommended_values": [],
"errors": [
"Could not connect to Elasticsearch. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
],
"visible": true
}
},
{
"definition": {
"name": "connection.username",
"type": "STRING",
"required": false,
"default_value": null,
"importance": "MEDIUM",
"documentation": "The username used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.",
"group": "Connector",
"width": "SHORT",
"display_name": "Connection Username",
"dependents": [],
"order": 2
},
"value": {
"name": "connection.username",
"value": "elastic",
"recommended_values": [],
"errors": [
"Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
],
"visible": true
}
},
{
"definition": {
"name": "connection.password",
"type": "PASSWORD",
"required": false,
"default_value": null,
"importance": "MEDIUM",
"documentation": "The password used to authenticate with Elasticsearch. The default is the null, and authentication will only be performed if both the username and password are non-null.",
"group": "Connector",
"width": "SHORT",
"display_name": "Connection Password",
"dependents": [],
"order": 3
},
"value": {
"name": "connection.password",
"value": "[hidden]",
"recommended_values": [],
"errors": [
"Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target"
],
"visible": true
}
},
// ...
]
}
The full verification response with all fields can be found here which has all info for the config field can be used.
UPDATE 1 (12/8/2021):
I found this document which has more config regarding SSL
"elastic.security.protocol": "SSL"
"elastic.https.ssl.keystore.location": "/path/to/keystore.jks"
"elastic.https.ssl.keystore.password": "xxx"
"elastic.https.ssl.key.password": "xxx"
"elastic.https.ssl.keystore.type": "JKS"
"elastic.https.ssl.truststore.location": "/path/to/truststore.jks"
"elastic.https.ssl.truststore.password": "xxx"
"elastic.https.ssl.truststore.type": "JKS"
"elastic.https.ssl.protocol": "TLS"
In my case, I only need provider username and password to succeed. But in above config, there is no place to provide username. So I am not sure how to really write it correctly.
UPDATE 2 (12/9/2021):
I have these pods running
> kubectl get pods --namespace=elastic
NAME READY STATUS RESTARTS AGE
hm-kibana-kb-77d4d9b456-m6th9 1/1 Running 0 6d3h
hm-elasticsearch-es-default-0 1/1 Running 0 35h
and these secrets
kubectl get secrets --namespace=elastic
NAME TYPE DATA AGE
hm-kibana-kibana-user Opaque 1 6d3h
elastic-hm-kibana-kibana-user Opaque 3 6d3h
hm-kibana-kb-http-ca-internal Opaque 2 6d3h
hm-elasticsearch-es-http-ca-internal Opaque 2 6d3h
hm-elasticsearch-es-http-certs-internal Opaque 3 6d3h
hm-elasticsearch-es-http-certs-public Opaque 2 6d3h
hm-kibana-kb-es-ca Opaque 2 6d3h
hm-kibana-kb-http-certs-internal Opaque 3 6d3h
hm-kibana-kb-http-certs-public Opaque 2 6d3h
hm-elasticsearch-es-transport-ca-internal Opaque 2 6d3h
hm-elasticsearch-es-transport-certs-public Opaque 1 6d3h
hm-elasticsearch-es-remote-ca Opaque 1 6d3h
hm-elasticsearch-es-elastic-user Opaque 1 6d3h
hm-elasticsearch-es-internal-users Opaque 3 6d3h
hm-elasticsearch-es-xpack-file-realm Opaque 3 6d3h
hm-elasticsearch-es-default-es-config Opaque 1 6d3h
hm-elasticsearch-es-default-es-transport-certs Opaque 3 6d3h
hm-kibana-kb-config Opaque 2 6d3h
I am able to save ca.crt
, tls.crt
, tls.key
locally by
kubectl get secret hm-elasticsearch-es-http-certs-public \
--namespace=elastic \
--output=go-template='{{index .data "ca.crt" | base64decode }}' \
> ca.crt
kubectl get secret hm-elasticsearch-es-http-certs-public \
--namespace=elastic \
--output=go-template='{{index .data "tls.crt" | base64decode }}' \
> tls.crt
kubectl get secret hm-elasticsearch-es-http-certs-internal \
--namespace=elastic \
--output=go-template='{{index .data "tls.key" | base64decode }}' \
> tls.key
Before I have to use -k
flag to disable the certificate verification to let the curl
succeed. Now in Kubernetes, I can succeed without the flag:
curl --request GET \
--url https://hm-elasticsearch-es-http.elastic:9200 \
--cacert ca.crt \
--key tls.key \
--cert tls.crt \
--header 'Content-Type: application/json' \
-u "elastic:passw0rd"
Also, I succeed generating keystore.jks
by
openssl pkcs12 -export \
-in tls.crt \
-inkey tls.key \
-CAfile ca.crt \
-caname root \
-out keystore.p12 \
-password pass:SFLzyT8DPkGGjDtn \
-name hm-elasticsearch-keystore
keytool -importkeystore \
-srckeystore keystore.p12 \
-srcstoretype PKCS12 \
-srcstorepass SFLzyT8DPkGGjDtn \
-deststorepass MPx57vkACsRWKVap \
-destkeypass MPx57vkACsRWKVap \
-destkeystore keystore.jks \
-alias hm-elasticsearch-keystore
I am still not sure how truststore in UPDATE 1 is. But now I can fill this part regarding keystore:
"elastic.https.ssl.keystore.location": "/path/to/keystore.jks"
"elastic.https.ssl.keystore.password": "MPx57vkACsRWKVap"
"elastic.https.ssl.key.password": "MPx57vkACsRWKVap"
"elastic.https.ssl.keystore.type": "JKS"
However, my keystore.jks
is currently on my laptop locally. My Elasticsearch and ElasticsearchSinkConnector is running in the Kubernetes.
Where does this "elastic.https.ssl.keystore.location": "/path/to/keystore.jks"
refer to?
CodePudding user response:
First add more background. The way I deployed Kafka is using Strimzi:
kubectl create namespace kafka
kubectl apply --filename="https://strimzi.io/install/latest?namespace=kafka" --namespace=kafka
kubectl apply --filename=https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml --namespace=kafka
Following the question UPDATE 2, once got keystore.jks, I created the secret by
kubectl create secret generic hm-elasticsearch-keystore \
--from-file=keystore.jks \
--namespace=kafka
I have files
kafkaconnect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: hm-connect-cluster
namespace: kafka
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: hongbomiao/hm-connect-debezium:latest
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: hm-elasticsearch-keystore-volume
secret:
secretName: hm-elasticsearch-keystore
elasticsearch-sink-kafkaconnector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: elasticsearch-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: hm-connect-cluster
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasksMax: 1
# https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html
config:
connector.class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
tasks.max: "1"
topics: "opa_db_server.public.roles"
connection.url: "https://hm-elasticsearch-es-http.elastic:9200"
connection.username: "elastic"
connection.password: "passw0rd"
transforms: "unwrap,key"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
transforms.unwrap.drop.tombstones: "false"
transforms.key.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
transforms.key.field: "role_id"
key.ignore: "false"
behavior.on.null.values: "delete"
elastic.security.protocol: "SSL"
elastic.https.ssl.keystore.location: "/opt/kafka/external-configuration/hm-elasticsearch-volume/keystore.jks"
elastic.https.ssl.keystore.password: "MPx57vkACsRWKVap"
elastic.https.ssl.key.password: "MPx57vkACsRWKVap"
elastic.https.ssl.keystore.type: "JKS"
elastic.https.ssl.truststore.location: "/opt/kafka/external-configuration/hm-elasticsearch-keystore-volume/keystore.jks"
elastic.https.ssl.truststore.password: "MPx57vkACsRWKVap"
elastic.https.ssl.truststore.type: "JKS"
elastic.https.ssl.protocol: "TLS1.3"
Note keystore and truststore share the same one.
Then just need run
kubectl apply --filename=kafkaconnect.yaml
kubectl apply --filename=elasticsearch-sink-kafkaconnector.yaml
Now it works!