I have the following error when I tried to get data from elasticsearch by spark. The error don't specify where is the error.
The body2 works in dev tools in elasticsearch
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
body2={
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"lte": "2022-05-03T09:25:15.000-03:00",
"gte": "2022-05-04T09:25:15.000-03:00"
}
}
},
{
"match": {
"type.keyword": "TABLA"
}
}
]
}
},
"size":10
}
es_read_conf = {
"es.nodes": "10.45.15.93",
"es.port": "9200",
"es.query": body2,
"es.nodes.wan.only": "true",
"es.resource" : "indice1/TABLA",
"es.net.http.auth.user": "usuario1",
"es.net.http.auth.pass": "rsl242442j"
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
Here is the error and I don´t know where is the error in the code:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/python/pyspark/context.py", line 859, in newAPIHadoopRDD
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/opt/spark/python/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassCastException: class java.util.HashMap cannot be cast to class java.lang.String (java.util.HashMap and java.lang.String are in module java.base of loader 'bootstrap')
at org.apache.spark.api.python.PythonHadoopUtil$.$anonfun$mapToConf$1(PythonHadoopUtil.scala:160)
at org.apache.spark.api.python.PythonHadoopUtil$.$anonfun$mapToConf$1$adapted(PythonHadoopUtil.scala:160)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
...
I review the code and I'dont get the solution
Thanks for all
CodePudding user response:
As the error message describes, the problem is that query should be a string and not a dictionary:
body2="""{
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"lte": "2022-05-03T09:25:15.000-03:00",
"gte": "2022-05-04T09:25:15.000-03:00"
}
}
},
{
"match": {
"type.keyword": "TABLA"
}
}
]
}
},
"size":10
}"""
You can see a reference for that here