I am trying to save (and index) 170GB file (~915 millions rows and 25 columns) in Elasticsearch cluster. I got a horrible performance on a 5 nodes elasticsearch cluster. The task takes ~5hours. Spark cluster has 150 Cores 10x(15 CPU, 64 RAM).
This is my current worflow:
- Build a Spark Dataframe from multiple parquet files from S3.
- Then save this dataframe to ElasticSearch index using "org.elasticsearch.spark.sql" source from Spark. (I tried many sharding and replication configuration mix without gaining in performance)
This is the Cluster nodes caracteristics
- 5 nodes (16 CPU, 64 RAM, 700GB DISK) each.
- HEAP_SIZE is about 50% of availabe RAM, means 32GB on each node. Configured in /etc/elasticsearch/jvm.options
This is the code which writes the dataframe to ElasticSearch(wrote in scala)
writeDFToEs(whole_df, "main-index")
The writeDFToEs function:
def writeDFToEs(df: DataFrame, index: String) = {
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", "192.168.1.xxx")
.option("es.http.timeout", 600000)
.option("es.http.max_content_length", "2000mb")
.option("es.port", 9200)
.mode("overwrite")
.save(s"$index")
}
Can you help me finf out what I am not doing well and how to fi it?
Thanks in advance.
CodePudding user response:
Answering my own question.
As suggested by @warkolm, I focused on _bulk
.
I am using es-hadoop connector, so I had to tweak es.batch.size.entries
parameter.
After running a bunch of tests, (testing various values), I finally got better results (still not optimal though) with es.batch.size.entries
set to 10000 and following values in ES index template.
{
"index": {
"number_of_shards": "10",
"number_of_replicas": "0",
"refresh_interval": "60s"
}
}
Finally, my df.write
looks like this:
df.write
.format("org.elasticsearch.spark.sql")
.option("es.nodes", es_nodes)
.option("es.port", es_port)
.option("es.http.timeout", 600000)
.option("es.batch.size.entries", 10000)
.option("es.http.max_content_length", "2000mb")
.mode("overwrite")
.save(s"$writeTo")
Now the process takes ~3h (2h 55 min) instead of 5 hours.
I am still improving the configs and code. I will update if I got better performance.