I have a spark dataframe with the following schema:
root
|-- CONTRATO: long (nullable = true)
|-- FECHA_FIN: date (nullable = true)
|-- IMPORTE_FIN: double (nullable = true)
|-- MOVIMIENTOS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- FECHA: date (nullable = true)
| | |-- IMPORTE: double (nullable = true)
Example of data is below:
[Row(CONTRATO=1, FECHA_FIN=datetime.date(2022, 10, 31), IMPORTE_FIN=895.83, MOVIMIENTOS=[Row(FECHA=datetime.date(2020, 9, 14), IMPORTE=10), Row(FECHA=datetime.date(2020, 9, 15), IMPORTE=20)]]
[Row(CONTRATO=2, FECHA_FIN=datetime.date(2022, 09, 31), IMPORTE_FIN=5.83, MOVIMIENTOS=[Row(FECHA=datetime.date(2021, 9, 14), IMPORTE=30), Row(FECHA=datetime.date(2020, 7, 15), IMPORTE=40)]]
I would like to access the items in 'FECHA' and 'IMPORTE' but I do not know how to do it. I am familiar with pandas dataframe but new to spark dataframes... it would be something like:
df['MOVIMIENTOS'][df['CONTRATO'] == 1][0][0] --> 14/09/2020
df['MOVIMIENTOS'][df['CONTRATO'] == 1][0][1] --> 10
df['MOVIMIENTOS'][df['CONTRATO'] == 1][1][0] --> 15/09/2020
df['MOVIMIENTOS'][df['CONTRATO'] == 1][1][1] --> 20
df['MOVIMIENTOS'][df['CONTRATO'] == 2][1][0] --> 14/09/2021
df['MOVIMIENTOS'][df['CONTRATO'] == 2][1][1] --> 30
Thanks a lot in advance
I tried different combinations but no luck
CodePudding user response:
You can use the explode
function to get a new row for each element in MOVIMIENTOS
array, and then select the values that you like, like so:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.sql.functions import explode, col
schema = StructType([
StructField("CONTRATO", IntegerType(), True),
StructField("FECHA_FIN", StringType(), True),
StructField("IMPORTE_FIN", DoubleType(), True),
StructField("MOVIMIENTOS", ArrayType(
StructType([
StructField("FECHA", StringType(), True),
StructField("IMPORTE", DoubleType(), True)
])
), True),
])
df = spark.createDataFrame([(1, "2022-10-31", 895.83, [("2022-9-14", 10.0), ("2020-0-15", 20.0)])], schema)
df.select(
"CONTRATO",
"FECHA_FIN",
"IMPORTE_FIN",
explode("MOVIMIENTOS").alias("MOVIMIENTO_exploded"),
col("MOVIMIENTO_exploded.FECHA").alias("FECHA"),
col("MOVIMIENTO_exploded.IMPORTE").alias("IMPORTE")) \
.show(truncate=False)
-------- ---------- ----------- ------------------- --------- -------
|CONTRATO|FECHA_FIN |IMPORTE_FIN|MOVIMIENTO_exploded|FECHA |IMPORTE|
-------- ---------- ----------- ------------------- --------- -------
|1 |2022-10-31|895.83 |{2022-9-14, 10.0} |2022-9-14|10.0 |
|1 |2022-10-31|895.83 |{2020-0-15, 20.0} |2020-0-15|20.0 |
-------- ---------- ----------- ------------------- --------- -------