Home > Software engineering >  How to Skip/Ignore duplicate columns while reading json files in PySpark on Databricks.Upgraded runt
How to Skip/Ignore duplicate columns while reading json files in PySpark on Databricks.Upgraded runt

Time:11-16

Problem Statement: On upgrading Databricks runtime version, duplicate column(s) throw error while creating dataframe. In lower runtime, the dataframe was created and since the duplicate column was not required downstream, it was simply excluded in select.

File Location: Json files stored on ADLS Gen2 (Azure). Cluster Mode: Standard

Code: We read it in Azure Databricks as below.

intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

The json file is nested and under one of those occurs tags which is duplicate column(pic below). After reading into dataframe, we pick required columns. We do not require this duplicate tags anyway.

Previously we were running on Databricks runtime 7.3LTS(Spark3.0.1) where it created the dataframe including duplicate columns but since we did not use it further, it didn't hurt.

However we are now upgrading to runtime 9.1LTS(Spark3.1.2) and it throws an error about columns being duplicate while creating dataframe itself. Error message: Found duplicate column(s) in the data schema: `tags`

Pic duplication column:- Duplicate column in json file: tags. Dataframe was created successfully in runtime 7.3LTS(Spark3.0.1)

Conclusion: I've tried selecting columns as soon as I read dataframe but to no success. I have a hunch that since now the upgraded runtime versions of Databricks are more by default Delta table inclined (delta tables do not support duplicate columns within them), there might be a property which we would have to turn off in order to ignore this check throughout notebook or just while reading into dataframe.

Although this exact error occured on json, I believe that it might occur for other file formats like csv if they have duplicate columns.

The file is quite nested and defining schema for all required columns is not very practical as it is tedious and prone to error in case more columns are required in future (this would be secondary solution). File is generated by a vendor using automated process and it is expected that all files will remain in same format as historical files already delivered.

Complete error on runtime 9.1LTS(Spark3.1.2):

AnalysisException                         Traceback (most recent call last)
<command-4270018894919110> in <module>
----> 1 intermediate_df = spark.read.option("multiline","true").json(f"{path}/IN-109418_Part_1.json")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup, allowNonNumericNumbers, modifiedBefore, modifiedAfter)
    370             path = [path]
    371         if type(path) == list:
--> 372             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    373         elif isinstance(path, RDD):
    374             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise

AnalysisException: Found duplicate column(s) in the data schema: `tags`

Edit: Comment on defining schema beforehand.

CodePudding user response:

Haven't found an option for this in the spark documentation. There also seem to be differing opinions/standards on the validity of jsons with duplicate key values and how to treat them (SO discussion).

Regardless I've found that supplying the schema without the duplicate key field results in a successful load. It seems to take the value of the last key in the json.

The schema depends on your source file.

test.json

{
    "id": 1,
    "tags": "test1",
    "tags": "test2"
}

python

from pyspark.sql.types import *

schema = StructType([
    StructField('id', LongType(), True),
    StructField('tags', StringType(), True)
])

df = spark.read.schema(schema).json("test.json", multiLine=True)

df.show()

output

 --- ----- 
| id| tags|
 --- ----- 
|  1|test2|
 --- ----- 

Ran locally on pyspark 3.1.1

CodePudding user response:

Please use json.load to convert to json to dictonary with handling duplicated keys

import json

#test json
test_json = """[
   {"id": 1,
   "tags": "test1",
   "tags": "test1"},
  {"id": 2,
   "tags": "test2",
   "tags": "test2",
   "tags": "test3"}]
"""

#function to handle duplicate keys:
def value_resolver(pairs):
    d = {}
    i=1
    for k, v in pairs:
        if k in d:
           d[k   str(i)] = v
           i =1
        else:
           d[k] = v
    return d

#load
our_dict = json.loads(test_json, object_pairs_hook=value_resolver)
print(our_dict)
>> [{'id': 1, 'tags': 'test1', 'tags1': 'test1'}, {'id': 2, 'tags': 'test2', 'tags1': 'test2', 'tags2': 'test3'}]

#dict to dataframe
df = spark.createDataFrame(our_dict)
df.show()


 --- ----- ----- ----- 
| id| tags|tags1|tags2|
 --- ----- ----- ----- 
|  1|test1|test1| null|
|  2|test2|test2|test3|
 --- ----- ----- ----- 
  • Related