I'm trying to add the kafka & mongoDB packages while submitting dataproc pyspark jobs, however that is failing. So far, i've been using only kafka package and that is working fine, however when i try to add mongoDB package in the command below it gives error
Command working fine, with only Kafka package
gcloud dataproc jobs submit pyspark main.py \
--cluster versa-structured-stream \
--properties spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2, spark.dynamicAllocation.enabled=true,spark.shuffle.service.enabled=true
I tried e few options to add both the packages, however that is not working : eg.
--properties ^#^spark:spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2,spark:spark.dynamicAllocation.enabled=true,spark:spark.shuffle.service.enabled=true,spark:spark.executor.memory=20g,spark:spark.driver.memory=5g,spark:spark.executor.cores=2 \
--jars=gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar,gs://dataproc-spark-jars/isolation-forest_2.4.3_2.12-2.0.8.jar,gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
--region us-east1 \
--py-files streams.zip,utils.zip
Error :
Traceback (most recent call last):
File "/tmp/1abcccefa3144660967606f3f7f9491d/main.py", line 303, in <module>
sys.exit(main())
File "/tmp/1abcccefa3144660967606f3f7f9491d/main.py", line 260, in main
df_stream = spark.readStream.format('kafka') \
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 482, in load
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming Kafka Integration Guide".
How do i make this work ?
tia!
CodePudding user response:
In your --properties
you have defined ^#^
as delimiter. To properly use the delimiter you need to change the ,
to #
as separator of your properties. You will only use ,
when defining multiple values in a single key. Also you need to remove spark:
prefix on your properties. See sample command below:
gcloud dataproc jobs submit pyspark main.py \
--cluster=cluster-3069 \
--region=us-central1 \
--properties ^#^spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2#spark.dynamicAllocation.enabled=true#spark.shuffle.service.enabled=true#spark.executor.memory=20g#spark.driver.memory=5g#spark.executor.cores=2
When job config is inspected this is the result: