Home > Software design >  How access first item of array type nested column of a spark dataframe with pyspark
How access first item of array type nested column of a spark dataframe with pyspark

Time:12-01

I have a spark dataframe with the following schema:

root
 |-- CONTRATO: long (nullable = true)
 |-- FECHA_FIN: date (nullable = true)
 |-- IMPORTE_FIN: double (nullable = true)
 |-- MOVIMIENTOS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- FECHA: date (nullable = true)
 |    |    |-- IMPORTE: double (nullable = true)

Example of data is below:

[Row(CONTRATO=1, FECHA_FIN=datetime.date(2022, 10, 31), IMPORTE_FIN=895.83, MOVIMIENTOS=[Row(FECHA=datetime.date(2020, 9, 14), IMPORTE=10), Row(FECHA=datetime.date(2020, 9, 15), IMPORTE=20)]]

[Row(CONTRATO=2, FECHA_FIN=datetime.date(2022, 09, 31), IMPORTE_FIN=5.83, MOVIMIENTOS=[Row(FECHA=datetime.date(2021, 9, 14), IMPORTE=30), Row(FECHA=datetime.date(2020, 7, 15), IMPORTE=40)]]

I would like to access the items in 'FECHA' and 'IMPORTE' but I do not know how to do it. I am familiar with pandas dataframe but new to spark dataframes... it would be something like:

df['MOVIMIENTOS'][df['CONTRATO'] == 1][0][0] --> 14/09/2020
df['MOVIMIENTOS'][df['CONTRATO'] == 1][0][1] --> 10
df['MOVIMIENTOS'][df['CONTRATO'] == 1][1][0] --> 15/09/2020
df['MOVIMIENTOS'][df['CONTRATO'] == 1][1][1] --> 20
df['MOVIMIENTOS'][df['CONTRATO'] == 2][1][0] --> 14/09/2021
df['MOVIMIENTOS'][df['CONTRATO'] == 2][1][1] --> 30

Thanks a lot in advance

I tried different combinations but no luck

CodePudding user response:

You can use the explode function to get a new row for each element in MOVIMIENTOS array, and then select the values that you like, like so:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.sql.functions import explode, col

schema = StructType([
    StructField("CONTRATO", IntegerType(), True),
    StructField("FECHA_FIN", StringType(), True),
    StructField("IMPORTE_FIN", DoubleType(), True),
    StructField("MOVIMIENTOS", ArrayType(
        StructType([
            StructField("FECHA", StringType(), True),
            StructField("IMPORTE", DoubleType(), True)
        ])
    ), True),
])

df = spark.createDataFrame([(1, "2022-10-31", 895.83, [("2022-9-14", 10.0), ("2020-0-15", 20.0)])], schema)

df.select(
    "CONTRATO", 
    "FECHA_FIN", 
    "IMPORTE_FIN", 
    explode("MOVIMIENTOS").alias("MOVIMIENTO_exploded"), 
    col("MOVIMIENTO_exploded.FECHA").alias("FECHA"), 
    col("MOVIMIENTO_exploded.IMPORTE").alias("IMPORTE")) \
    .show(truncate=False)

 -------- ---------- ----------- ------------------- --------- ------- 
|CONTRATO|FECHA_FIN |IMPORTE_FIN|MOVIMIENTO_exploded|FECHA    |IMPORTE|
 -------- ---------- ----------- ------------------- --------- ------- 
|1       |2022-10-31|895.83     |{2022-9-14, 10.0}  |2022-9-14|10.0   |
|1       |2022-10-31|895.83     |{2020-0-15, 20.0}  |2020-0-15|20.0   |
 -------- ---------- ----------- ------------------- --------- ------- 

  • Related