I'm new to pyspark. I'd like to be able to read values from my kafka topic. To do so, I've created a schema for messages in my topic.
Here is example message in my kafka topic:
{
"action": "string",
"id": "string",
"epoch": "long",
"entity": {
"type": "string",
"sources": [{
"items": [
{"identifier": "string"},
{"identifier": "string"},
]
}]
}
}
Here is my schema:
root
|-- value: struct (nullable = true)
| |-- action: string (nullable = true)
| |-- id: string (nullable = true)
| |-- epoch: long (nullable = true)
| |-- entity: struct (nullable = true)
| | |-- type: string (nullable = true)
| |-- entity: struct (nullable = true)
| | |-- sources: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- items: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- identifier: string (nullable = true)
However, I get an error when trying to select the fields I'm interested in.
value_df = df.select(
from_json(col("value").cast("string"), Schema).alias("value"),
)
explode_df = value_df.select(
"value.id",
"value.action",
"value.epoch",
"value.entity.type",
"value.entity.sources.items.identifier",
)
explode_df.printSchema()
This is the error I get:
An error was encountered:
Ambiguous reference to fields StructField(entity,StructType(StructField(type,StringType,true)),true), StructField(entity,StructType(StructField(sources,ArrayType(StructType(StructField(items,ArrayType(StructType(StructField(identifier,StringType,true)),true),true)),true),true)),true)
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1671, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Ambiguous reference to fields StructField(entity,StructType(StructField(type,StringType,true)),true), StructField(entity,StructType(StructField(sources,ArrayType(StructType(StructField(items,ArrayType(StructType(StructField(identifier,StringType,true)),true),true)),true),true)),true)
I believe this is because sources
and items
are both arrays. Is there a way to read arrays into a df using pyspark?
CodePudding user response:
Looks like you have entity
twice in your schema, that is the ambiguous reference.