I am a beginner with Azure and Databricks and I'm having a difficulty saving a lot of data (ex. 5 GB) from Azure Databricks to Cosmos DB with Spark connector.
Preface: I am facing a problem because I want to populate Cosmos DB container with a lot of rows (ex. 500mil ) for some further testing. Currently I have one container (ex. Container #1) in which I am streaming telemetry data using Stream Analytics. What I want to achieve is taking data from container #1, multiply the data many times, and then save it to a new container #2. I wanted to solve this using Spark connector and Databricks, but the saving is very slow, ex. 1mil rows per 40min, which is not acceptable when I want to save gigabytes of rows to it. Reading from a container on the other hand is blazingly fast, ex. 10s for 1.3mil rows. I disabled indexing on container #2 as I read that this should provide some benefit but it doesn't really make a very noticable difference. The container #2 is set to 2000 RUs, container #1 to 1000 RUs, both throughputs set to autoscale.
Unfortunately I don't know what I am doing wrong, so any help would be appreciated, even recommendations for other possible ways of solving this (ex. Synapse Analytics, etc.).
P.S. I read somewhere that I should try first saving to ADLS 2 storage, then read dataframe from it and then to Cosmos DB. It did actually provide an improvement, but again still not near acceptable speeds.
I apologize in advance for my sloppy english, not my first language.
Code
Config
connectionConfigRead = {
"spark.cosmos.accountEndpoint" : Endpoint,
"spark.cosmos.accountKey" : Masterkey,
"spark.cosmos.database" : Database,
"spark.cosmos.container": Container,
"spark.cosmos.read.inferSchema.enabled" : "false",
"spark.cosmos.changeFeed.startFrom" : "Now"
}
connectionConfigWrite = {
"spark.cosmos.accountEndpoint" : Endpoint,
"spark.cosmos.accountKey" : Masterkey,
"spark.cosmos.database" : Database,
"spark.cosmos.container": Container2,
"spark.cosmos.changeFeed.startFrom" : "Now"
}
Read from a container
customSchema = StructType([
StructField("iothub-connection-module-id", StringType()),
StructField("value_type", StringType()),
StructField("timestamp", DoubleType()),
StructField("data", StringType()),
StructField("IoTHub", StringType()),
StructField("id", StringType()),
StructField("EventEnqueuedUtcTime", StringType()),
StructField("sensorId", StringType()),
StructField("EventProcessedUtcTime", StringType()),
StructField("PartitionId", IntegerType()),
StructField("value", DoubleType())
])
readDF = (spark.read.schema(customSchema).format("cosmos.oltp").options(**connectionConfigRead).load())
Writing to container
readDF.write.mode("append").format("cosmos.oltp").options(**connectionConfigWrite).save()
CodePudding user response:
Container #2 does not have enough throughput. Assuming 10 RU/s for each insert, 2000 RU/s can do at most 200 inserts/second. That's 12k/minute or 720k/hour. Since you're only dealing with 5 GB of data and millions of rows, I would scale up to 10000 RU/s autoscale max throughput, that will do 1000/second or 3.6M/hour. Get more information on the Spark connector, Spark OLTP connector resources. Also, make sure Databricks and Cosmos DB are running in the same region. This can be a cause for increased latency.
If you are going to query container #2 you need to index it. Find out which properties you use for your filter predicates, order by, etc. and create the necessary range and composite indexes where needed.