I try to write 100k rows to Bigquery using the Spark Bigquery connector.
My rows are composed of 2 big strings (approx. 200-250 words each), many single word strings, and some dict types data (max. depth 2 and very few data inside) for approx 35 fields in total.
I do some process on the strings, for 100k rows it's almost instant, but when it comes to writing the data to BQ, the problems appear.
I use a Spark cluster composed of 5 worker nodes, each of them has 32GB RAM, 8 vCPU and 500GB SSD each, totalling 160GB RAM for 40 vCPU. Even with those specs, it takes 50 minutes to write the 100k rows to BQ. I investigated a bit, and since I'm using indirect write, the data is first writen on GCS and then read by BQ. The read job takes approx. 20 seconds, meaning that the writing operation to GCS is taking 50 minutes just for the 100k rows.
It can't be normal behaviour since even if I run the same writing operation locally with my home computer and pandas for exemple, it will take way less time than that.
My Spark Session is initialed like that:
spark = SparkSession \
.builder \
.appName('extract-skills') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.26.0,com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0') \
.config('spark.executor.memory', '25g') \
.config('spark.executor.cores', '8') \
.config('spark.driver.memory', '12g') \
.config('spark.executor.instances', '5') \
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.getOrCreate()
and my write is as follow:
result. \
write.format('bigquery') \
.mode("overwrite") \
.option("writeMethod", "indirect") \
.option("temporaryGcsBucket","my_bucket") \
.option('table', 'my_project.my_dataset.my_table') \
.save()
Is there something I forgot here ? I can't find the bottleneck and setting writeMethod
to direct
is not possible since I need to write in a partitioned table.
CodePudding user response:
Using direct write should be faster, the change is quite minimal:
result. \
write.format('bigquery') \
.mode("overwrite") \
.option("writeMethod", "direct") \
.option('table', 'my_project.my_dataset.my_table') \
.save()
Also, please check that the DataFrame is properly partitioned. If a single partition is much larger than the others, you are inefficiently using your resources.
CodePudding user response:
After some test, I figured out that my problem was coming from Spark NLP that I use to process my strings (lemmatization in my case).
I ran the write
operation without running the Spark NLP process and for 24M rows, it took less than a minute, even in indirect
write-mode, which seems more right in term of performance.
Now the question is: why is Spark NLP so slow ?