I am working with pyspark and elasticsearch (py library), and while updating one of the documents in ES I am getting the following error.
2021-09-08 06:31:49 ERROR JobScheduler:91 - Error running job streaming job 1631082700000 ms.1
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
posttoES(row)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
es.update(index="anonprofile", id = jsonid, body=query)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
"POST", path, params=params, headers=headers, body=body
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
raise e
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
timeout=timeout,
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
self._raise_error(response.status, raw_data)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 172, in <module>
ssc.awaitTermination()
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
func = lambda t, rdd: old_func(rdd)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
posttoES(row)
File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
es.update(index="anonprofile", id = jsonid, body=query)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
"POST", path, params=params, headers=headers, body=body
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
raise e
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
timeout=timeout,
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
self._raise_error(response.status, raw_data)
File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The code snippet is-
row = json.loads(row)
count = row["Count"]
row.pop("Count")
jsonid = hashlib.sha224(json.dumps(row).encode('ascii', 'ignore')).hexdigest()
if es.exists(index="anonprofile", id=jsonid):
q = {
"script": {
"source": "ctx._source.Count ={}".format(count),
"lang": "painless"
}
}
es.update(index="anonprofile", id = jsonid, body=q)
else:
row["Count"] = count
es.index(index="anonprofile", id=jsonid, body={'doc' : row})
The first document goes to the else
block and works as expected, but entering the if block it returns an error on the update call.
Looking up on the internet, I tried changing the query, but nothing seems to be working.
P.S. For learning purposes I was trying to execute the same task without the pyspark integrating, and that seems to be working file. The code for that is here
CodePudding user response:
The problem is that you insert information in doc
field which is convert into a properties since row
variable is a dict of values and you try to update _source.Count
instead of _source.doc.Count
body
arg with doc
field is only usefull for update
with for example an upsert
or a script
when the document not exist.
So for example :
row["Count"] = count
body = {
"script": {
"source": "ctx._source.Count ={}".format(count),
"lang": "painless"
}
"upsert": row
}
es.update(index="anonprofile", id=jsonid, body=body)
instead of your if exists...