I'll try to be as thorough and concise with my explanation and design parameters. So I've given this a few initial iterations and I'm just not adept at Java at all (My comfortable coding language is Python) or understanding how to set up the architecture of the cluster so that it doesn't just hang or fail. At a high level I have a huge dataset (roughly 1.8 trillion data points, 120 TB of data), that have location data in the form of Lat, Lon. I'm using Apache Sedona or GeoSpark (struggling to understand how to configure and use them in my python Pyspark code)
The workflow for my jobs:
- Create DataProc cluster
- Load data (Raw data and some reference datasets) from GCS Bucket and BigQuery Table
- Do some geospatial processing to extract points (For example, use Lat, Lon to assign US State and US County)
- Save new data to a GCS bucket
So I have done this using a small data sample around several hundred points. And was able to do this just fine. When I try to run the entire data set it seems to run into a lot of issues.
The following are only some of the things I've seen in the DataProc Job Log:
WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 625 for reason Container marked as failed: container_1633477513137_0001_01_000626 on host:
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 625
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 85581) (patino-pyspark-testing-sw-r96f.[<b>removed google info here</b>].internal executor 443): FetchFailed(BlockManagerId(598...
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ....
Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId[streamId=493540200000,chunkIndex=0]:
I can go on for days with these errors, I feel like it all boils down to I just don't fully understand Hadoop and Spark configurations that I don't know what I'm doing.
I'm taking these approach as my BQ operations have timed out when trying to process there.
I would really a good explanation on the following:
The correct way to pass JAR files (do you pass them at the job level or when you create the cluster?)
How to properly install/setup Sedona or GeoSpark, don't really care which one to use just want to get it working
Any and all setting/configurations (again still novice so my apologies for follow up questions) and again do I pass these on the creation of the cluster or when I submit the job
Troubleshooting any of the logs.
I get that this was a wall of text and truly appreciate any and all efforts and comments to help me out. Thank you again!
CodePudding user response:
Though your more general questions about cluster architecture are unfortunately probably outside the scope of what a StackOverflow Q&A can cover, there are a few considerations that come to mind:
- For debugging your Spark job, another way to access the runtime info is through Spark's UI and HistoryServer, which will give some worker-level logs, time spent, amounts of data sent to different workers, and even stack traces. The best way to access Spark UI is if you enable Component Gateway at cluster creation time
- Your log message mentions hostname "patino-pyspark-testing-sw-r96f" - this appears to be a "secondary worker" which uses Preemptible VMs by default. While Dataproc does its best to make typical workloads run as smoothly as possible with these VM types, in the end Preemptible VMs are by design a bit unpredictable. Your VM may have very well simply been preempted by another on-demand workload, causing temporary failures. Some workloads can handle automatic retries of worker failures well (especially map only jobs and jobs that have minimal external dependencies), while others are more sensitive (if BigQuery dependencies make it harder to have task-level retries, if you have lots of joins/shuffle data, etc). If trying to get the workload off the ground for the first time, you may want to stick to on-demand VM types and only introduce PVMs once you know your job can tolerate task failures well.
- If spark does have to shuffle/group/aggregate or cache/checkpoint intermediate data, it'll need disk space. Additionally, the IO performance of GCE VMs scales according to disk size, so even if you don't use a lot of disk, tiny disks will be very slow. If Spark is going to use disk, you probably want to make sure the total amount of disk in your cluster is at least double the size of your input dataset (120TB). The worker logs would give some indication as to whether it's possible that you're losing workers due to "out of disk" errors.
CodePudding user response:
FetchFailedException
is a very common error which usually happens in clusters with preemptible VMs or autoscaling but without EFM enabled, see this doc for more details. To avoid the problem, either avoid PVMs or autoscaling; or enable EFM when PVMs or autoscaling is enabled. Note that EFM is available on 1.4 and 1.5, not for 2.0 yet as of October 2021. So if you have to use Spark 3 on 2.0, you have to go with the first option.
In addition, as Dennis mentioned, you also need to make sure the cluster has enough disks. For 120TB input data, I would consider a cluster shape of 50 primary worker nodes, each with 4TB disk.