Home > OS >  Spark : How to reuse the same array schema that has all fields defined across the data-frame
Spark : How to reuse the same array schema that has all fields defined across the data-frame

Time:10-02

I have hundreds of columns a,b,c ... . I would like to modify dataframe schema, where each array will have the same shape date, num and val field.

There are thousands of id so I would like to modify ONLY schema not dataframe. Modified schema will be used in the next step to load data to dataframe efficiently . I would like to avoid using UDF to modify whole dataframe.

Input schema:

df.printSchema()

root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true) !!! NOTE : `num` !!!
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- id: long (nullable = true)

Required Output schema:

root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- id: long (nullable = true)

To reproduce input Schema:

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":2001,"num":1},{"date":2002,},{"date":2003,}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
  """{"id":2,"a":[{"date":2001,"num":2},{"date":2002},{"date":2003}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))


 for field in df.schema:
    print(field)

Print output:

StructField(a,ArrayType(StructType(List(StructField(date,LongType,true),StructField(num,LongType,true),StructField(val,LongType,true))),true),true)
StructField(b,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(c,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(d,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(id,LongType,true)

Solution (see OneCricketeer answer below for details) :

from pyspark.sql.types import StructField, StructType, LongType, ArrayType

jsonstr=[
  """{"id":1,"a":[{"date":2001,"val":1,"num":1},{"date":2002,"val":2},{"date":2003,"val":3}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
  """{"id":2,"a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"d":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"c":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]

array_schema = ArrayType(StructType([
    
    StructField('date' ,LongType(),True),
    StructField('num' ,LongType(),True),
    StructField('val' ,LongType(),True)]),
    True)


keys = ['a', 'b', 'c', 'd'] 
fields = [StructField(k, array_schema, True) for k in keys] 
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields)

dff = spark.read.json(sc.parallelize(jsonstr),df_schema)

CodePudding user response:

I think the true solution is to have consistent names, or at least something more descriptive if the fields are truly different. "num" and "val" are basically synonymous

If I understand the question, you want to reuse the same array schema that has all fields defined

array_schema = ArrayType(StructType([StructField('date' ,LongType(),False),StructField('num' ,LongType(),True),StructField('val' ,LongType(),True))),True) 

df_schema = StructType([
StructField('a',array_schema,True)
StructField('b',array_schema,True)
...
StructField('id',LongType(),True)
])

Or you can do this in a loop, which is safe because it's applied in the Spark driver

keys = ['a', 'b'] 
fields = [StructField(k, array_schema, True) for k in keys] 
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields) 

(change each boolean to a False if there will be no nulls)

Then you need to provide this schema to your read function

spark.read.schema(df_schema).json(...

If there will still be more fields that cannot be consistently applied to all "keys", then use ArrayType(MapType(StringType(), LongType()), False)

  • Related