Problem Statement: On upgrading Databricks runtime version, duplicate column(s) throw error while creating dataframe. In lower runtime, the dataframe was created and since the duplicate column was not required downstream, it was simply excluded in select.
File Location: Json files stored on ADLS Gen2 (Azure). Cluster Mode: Standard
Code: We read it in Azure Databricks as below.
intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")
The json file is nested and under one of those occurs tags
which is duplicate column(pic below).
After reading into dataframe, we pick required columns. We do not require this duplicate tags
anyway.
Previously we were running on Databricks runtime 7.3LTS(Spark3.0.1) where it created the dataframe including duplicate columns but since we did not use it further, it didn't hurt.
However we are now upgrading to runtime 9.1LTS(Spark3.1.2) and it throws an error about columns being duplicate while creating dataframe itself.
Error message: Found duplicate column(s) in the data schema: `tags`
Pic duplication column:- Duplicate column in json file: tags. Dataframe was created successfully in runtime 7.3LTS(Spark3.0.1)
Conclusion: I've tried selecting columns as soon as I read dataframe but to no success. I have a hunch that since now the upgraded runtime versions of Databricks are more by default Delta table inclined (delta tables do not support duplicate columns within them), there might be a property which we would have to turn off in order to ignore this check throughout notebook or just while reading into dataframe.
Although this exact error occured on json, I believe that it might occur for other file formats like csv if they have duplicate columns.
The file is quite nested and defining schema for all required columns is not very practical as it is tedious and prone to error in case more columns are required in future (this would be secondary solution). File is generated by a vendor using automated process and it is expected that all files will remain in same format as historical files already delivered.
Complete error on runtime 9.1LTS(Spark3.1.2):
AnalysisException Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")
/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
370 path = [path]
371 if type(path) == list:
--> 372 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
373 elif isinstance(path, RDD):
374 def func(iterator):
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121 # Hide where the exception came from that shows a non-Pythonic
122 # JVM exception message.
--> 123 raise converted from None
124 else:
125 raise
AnalysisException: Found duplicate column(s) in the data schema: `tags`
Edit: Comment on defining schema beforehand.
CodePudding user response:
Haven't found an option for this in the spark documentation. There also seem to be differing opinions/standards on the validity of jsons with duplicate key values and how to treat them (SO discussion).
Regardless I've found that supplying the schema without the duplicate key field results in a successful load. It seems to take the value of the last key in the json.
The schema depends on your source file.
test.json
{
"id": 1,
"tags": "test1",
"tags": "test2"
}
python
from pyspark.sql.types import *
schema = StructType([
StructField('id', LongType(), True),
StructField('tags', StringType(), True)
])
df = spark.read.schema(schema).json("test.json", multiLine=True)
df.show()
output
--- -----
| id| tags|
--- -----
| 1|test2|
--- -----
Ran locally on pyspark 3.1.1
CodePudding user response:
Please use json.load to convert to json to dictonary with handling duplicated keys
import json
#test json
test_json = """[
{"id": 1,
"tags": "test1",
"tags": "test1"},
{"id": 2,
"tags": "test2",
"tags": "test2",
"tags": "test3"}]
"""
#function to handle duplicate keys:
def value_resolver(pairs):
d = {}
i=1
for k, v in pairs:
if k in d:
d[k str(i)] = v
i =1
else:
d[k] = v
return d
#load
our_dict = json.loads(test_json, object_pairs_hook=value_resolver)
print(our_dict)
>> [{'id': 1, 'tags': 'test1', 'tags1': 'test1'}, {'id': 2, 'tags': 'test2', 'tags1': 'test2', 'tags2': 'test3'}]
#dict to dataframe
df = spark.createDataFrame(our_dict)
df.show()
--- ----- ----- -----
| id| tags|tags1|tags2|
--- ----- ----- -----
| 1|test1|test1| null|
| 2|test2|test2|test3|
--- ----- ----- -----