Home > database >  RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indi
RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indi

Time:09-17

I am working with pyspark and elasticsearch (py library), and while updating one of the documents in ES I am getting the following error.

2021-09-08 06:31:49 ERROR JobScheduler:91 - Error running job streaming job 1631082700000 ms.1
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
    posttoES(row)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
    es.update(index="anonprofile", id = jsonid, body=query)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
    "POST", path, params=params, headers=headers, body=body
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
    raise e
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
    status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 172, in <module>
    ssc.awaitTermination() 
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 161, in <lambda>
    func = lambda t, rdd: old_func(rdd)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 124, in RDDfromKafkaStream
    posttoES(row)
  File "/usr/bin/spark-2.4.0-bin-hadoop2.7/spark_scripts/main.py", line 100, in posttoES
    es.update(index="anonprofile", id = jsonid, body=query)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/client/__init__.py", line 1903, in update
    "POST", path, params=params, headers=headers, body=body
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 458, in perform_request
    raise e
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/transport.py", line 426, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python3.7/dist-packages/elasticsearch/connection/base.py", line 331, in _raise_error
    status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'illegal_argument_exception', '[f756ea2593ee][172.18.0.4:9300][indices:data/write/update[s]]')

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

The code snippet is-

row = json.loads(row)
count = row["Count"]
row.pop("Count")
jsonid = hashlib.sha224(json.dumps(row).encode('ascii', 'ignore')).hexdigest()

if es.exists(index="anonprofile", id=jsonid):
        q = {
        "script": {
            "source": "ctx._source.Count ={}".format(count),
            "lang": "painless"
            }
        }
        es.update(index="anonprofile", id = jsonid, body=q)
        
else:
        row["Count"] = count
        es.index(index="anonprofile", id=jsonid, body={'doc' : row})

The first document goes to the else block and works as expected, but entering the if block it returns an error on the update call.

Looking up on the internet, I tried changing the query, but nothing seems to be working.

P.S. For learning purposes I was trying to execute the same task without the pyspark integrating, and that seems to be working file. The code for that is here

CodePudding user response:

The problem is that you insert information in doc field which is convert into a properties since row variable is a dict of values and you try to update _source.Count instead of _source.doc.Count

body arg with doc field is only usefull for update with for example an upsert or a script when the document not exist.

So for example :

row["Count"] = count
body = {
    "script": {
        "source": "ctx._source.Count ={}".format(count),
        "lang": "painless"
    }
    "upsert": row
}
es.update(index="anonprofile", id=jsonid, body=body)

instead of your if exists...

  • Related