I have hundreds of columns a,b,c ... . I would like to modify dataframe schema, where each array will have the same shape date
, num
and val
field.
There are thousands of id
so I would like to modify ONLY schema not dataframe. Modified schema will be used in the next step to load data to dataframe efficiently . I would like to avoid using UDF to modify whole dataframe.
Input schema:
df.printSchema()
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true) !!! NOTE : `num` !!!
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- d: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- id: long (nullable = true)
Required Output schema:
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- d: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- id: long (nullable = true)
To reproduce input Schema:
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":2001,"num":1},{"date":2002,},{"date":2003,}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
"""{"id":2,"a":[{"date":2001,"num":2},{"date":2002},{"date":2003}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))
for field in df.schema:
print(field)
Print output:
StructField(a,ArrayType(StructType(List(StructField(date,LongType,true),StructField(num,LongType,true),StructField(val,LongType,true))),true),true)
StructField(b,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(c,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(d,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(id,LongType,true)
Solution (see OneCricketeer answer below for details) :
from pyspark.sql.types import StructField, StructType, LongType, ArrayType
jsonstr=[
"""{"id":1,"a":[{"date":2001,"val":1,"num":1},{"date":2002,"val":2},{"date":2003,"val":3}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
"""{"id":2,"a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]
array_schema = ArrayType(StructType([
StructField('date' ,LongType(),True),
StructField('num' ,LongType(),True),
StructField('val' ,LongType(),True)]),
True)
keys = ['a', 'b', 'c', 'd']
fields = [StructField(k, array_schema, True) for k in keys]
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields)
dff = spark.read.json(sc.parallelize(jsonstr),df_schema)
CodePudding user response:
I think the true solution is to have consistent names, or at least something more descriptive if the fields are truly different. "num" and "val" are basically synonymous
If I understand the question, you want to reuse the same array schema that has all fields defined
array_schema = ArrayType(StructType([StructField('date' ,LongType(),False),StructField('num' ,LongType(),True),StructField('val' ,LongType(),True))),True)
df_schema = StructType([
StructField('a',array_schema,True)
StructField('b',array_schema,True)
...
StructField('id',LongType(),True)
])
Or you can do this in a loop, which is safe because it's applied in the Spark driver
keys = ['a', 'b']
fields = [StructField(k, array_schema, True) for k in keys]
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields)
(change each boolean to a False if there will be no nulls)
Then you need to provide this schema to your read function
spark.read.schema(df_schema).json(...
If there will still be more fields that cannot be consistently applied to all "keys", then use ArrayType(MapType(StringType(), LongType()), False)