I finally managed to run elasticsearch source connector with apache kafka.
{
"name": "elastic-source",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.1.1:8083"
}
],
"type": "source"
}
However, when I run the elasticsearch-source.properties
name=elastic-source
connector.class=com.github.dariobalinzo.ElasticSourceConnector
tasks.max=1
es.host=127.0.0.1
es.port=1750
index.prefix=products
topic.prefix=es_
topic=elastic-events
I have some error like this:
[2022-09-20 17:45:38,127] INFO [elastic-source|task-0] fetching from products (com.github.dariobalinzo.task.ElasticSourceTask:201)
[2022-09-20 17:45:38,128] INFO [elastic-source|task-0] found last value Cursor{primaryCursor='null', secondaryCursor='null'} (com.github.dariobalinzo.task.ElasticSourceTask:203)
[2022-09-20 17:45:38,129] WARN [elastic-source|task-0] request [POST http://localhost:1750/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512] returned 1 warnings: [299 Elasticsearch-7.15.0-79d65f6e357953a5b3cbcc5e2c7c21073d89aa29 "Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.15/security-minimal-setup.html to enable security."] (org.elasticsearch.client.RestClient:72)
[2022-09-20 17:45:38,129] ERROR [elastic-source|task-0] error (com.github.dariobalinzo.task.ElasticSourceTask:217)
ElasticsearchStatusException[Elasticsearch exception [type=search_phase_execution_exception, reason=all shards failed]]
at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:178)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2484)
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2461)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2184)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.search(RestHighLevelClient.java:1367)
at com.github.dariobalinzo.elastic.ElasticRepository.executeSearch(ElasticRepository.java:176)
at com.github.dariobalinzo.elastic.ElasticRepository.searchAfter(ElasticRepository.java:90)
at com.github.dariobalinzo.task.ElasticSourceTask.poll(ElasticSourceTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:249)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://localhost:1750], URI [/products/_search?typed_keys=true&max_concurrent_shard_requests=5&search_type=query_then_fetch&batched_reduce_size=512], status line [HTTP/1.1 400 Bad Request]
and this:
{"error":{"root_cause":[{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"can_match","grouped":true,"failed_shards":[{"shard":0,"index":"products","node":"asmTRFlgThS7kBU6yyCJzA","reason":{"type":"query_shard_exception","reason":"No mapping found for [@timestamp] in order to sort on","index_uuid":"b575v16yTXmq5o2sk77zbA","index":"products"}}]},"status":400}
at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
at org.elasticsearch.client.RestHighLevelClient.performClientRequest(RestHighLevelClient.java:2699)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2171)
... 15 more
All list topics are:
__consumer_offsets
connect-configs
connect-offset
connect-offsets
connect-status
elastic-events
Normally in your tutorial all the indices products*
are sent to Kafka using the es_
string as a topic prefix.
how can i know if all the data i pass to elasticsearch is read by kafka?
CodePudding user response:
The issue you describe has been documented here and the solution is to define the incrementing.field.name
setting.
By default, the connector is using the @timestamp
field, but obviously your products
index doesn't have one. You should use another field that exists in your mapping.
CodePudding user response:
I was able to solve the problem, step by step
1- create index
PUT /test-index
{
"settings": {
"number_of_replicas": 2,
"number_of_shards": 2
}
}
2- Add field timestamp
PUT _ingest/pipeline/add-current-time
{
"description" : "automatically add the current time to the documents",
"processors" : [
{
"set" : {
PUT _ingest/pipeline/add-current-time
{
"description" : "automatically add the current time to the documents",
"processors" : [
{
"set" : {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
} "field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}
3- Create data
PUT test-index/_doc/11?pipeline=add-current-time
{
"my_field": "test numero 11",
"girls group": "spice girl"
}
4- Output in kibana:
"hits" : [
{
"_index" : "test-index",
"_type" : "_doc",
"_id" : "9",
"_score" : 1.0,
"_source" : {
"my_field" : "test numero 9",
"timestamp" : "2022-09-21T14:34:05.785318623Z"
}
}]