For the dataframe below, which was generated from an avro file, I'm trying to get the column names as a list or other format so that I can use it in a select statement. node1
and node2
have the same elements. For example I understand that we could do df.select(col('data.node1.name'))
, but I'm not sure
- how to select all columns at once without hardcode all the column names, and
- how to handle the nested part. I think to make it readable, the
productvalues
andporders
should be selected into separate individual dataframes/tables?
Input schema:
root
|-- metadata: struct
|...
|-- data :struct
| |--node1 : struct
| | |--name : string
| | |--productlist: array
| | |--element : struct
| |--productvalues: array
| |--element : struct
| |-- pname:string
| |-- porders:array
| |--element : struct
| |-- ordernum: int
| |-- field: string
|--node2 : struct
| |--name : string
| |--productlist: array
| |--element : struct
|--productvalues: array
|--element : struct
|-- pname:string
|-- porders:array
|--element : struct
|-- ordernum: int
|-- field: string
CodePudding user response:
Instead of gathering all data in one table I would recommend you making more tables for each list . To get values from list you can use the "explode" function.
for example for making productlist table
productlist = df.select(col('data.node1.name').alias("name") , explode(col('data.node1.productlist'))).alias("first_explode"))
in the next step you can use the productlist df and do this
productValue = df.select(col('productlist.name'),col('productlist.node1.first_explode.element'),explode(col('productlist.node1.first_explode.productvalues')).alias("second_explode"))
and so on ..
you can also get some help from this link as well https://sparkbyexamples.com/pyspark/pyspark-explode-array-and-map-columns-to-rows/
CodePudding user response:
So, this is not a perfect answer for you, but I hope it might give you some ideas to solve your problem. I know you said you don't want to hardcode your column names but I'm unable to handle that part at this moment.
First thing first, I created this sample JSON for testing
{
"metadata": {},
"data": {
"node1": {
"name": "Node001",
"productlist": [
{
"productvalues": [
{
"pname": "Node001-P001",
"porders": [
{"ordernum": 1, "field": "Node001-P001-001"},
{"ordernum": 2, "field": "Node001-P001-002"}
]
},
{
"pname": "Node001-P002",
"porders": [
{"ordernum": 3, "field": "Node001-P002-003"},
{"ordernum": 4, "field": "Node001-P002-004"},
{"ordernum": 5, "field": "Node001-P002-005"},
{"ordernum": 6, "field": "Node001-P002-006"}
]
},
{
"pname": "Node001-P003",
"porders": [
{"ordernum": 7, "field": "Node001-P003-007"}
]
}
]
},
{
"productvalues": [
{
"pname": "Node001-P004",
"porders": [
{"ordernum": 8, "field": "Node001-P004-008"},
{"ordernum": 9, "field": "Node001-P004-009"},
{"ordernum": 10, "field": "Node001-P004-010"}
]
},
{
"pname": "Node001-P005",
"porders": [
{"ordernum": 11, "field": "Node001-P005-011"},
{"ordernum": 12, "field": "Node001-P005-012"},
{"ordernum": 13, "field": "Node001-P005-013"}
]
}
]
}
]
},
"node2": {
"name": "Node002",
"productlist": [
{
"productvalues": [
{
"pname": "Node002-P001",
"porders": [
{"ordernum": 14, "field": "Node002-P001-014"}
]
},
{
"pname": "Node002-P002",
"porders": [
{"ordernum": 15, "field": "Node002-P002-015"}
]
},
{
"pname": "Node002-P003",
"porders": [
{"ordernum": 16, "field": "Node002-P003-016"}
]
}
]
},
{
"productvalues": [
{
"pname": "Node002-P004",
"porders": [
{"ordernum": 17, "field": "Node002-P004-017"}
]
},
{
"pname": "Node002-P005",
"porders": [
{"ordernum": 18, "field": "Node002-P005-018"}
]
}
]
}
]
}
}
}
Now, this is a "dict-like" column that you need to use for later
cols_dict = [
{
'col': ['data.node1.name'],
'exp': ['data.node1.productlist'],
},
{
'exp': ['productlist.productvalues'],
},
{
'col': ['productvalues.pname'],
'exp': ['productvalues.porders'],
},
{
'col': ['porders.ordernum', 'porders.field']
}
]
And finally, loop through this dict and add some transformation to get your final result
dfx = df
select_col = []
for i in cols_dict:
select_col = [c.split('.')[-1] for c in select_col]
if i.get('col'):
select_col = i['col']
select_exp = []
if i.get('exp'):
select_exp = i['exp']
dfx = dfx.select([F.col(c) for c in select_col] [F.explode(c).alias(c.split('.')[-1]) for c in select_exp])
------- ------------ -------- ----------------
| name| pname|ordernum| field|
------- ------------ -------- ----------------
|Node001|Node001-P001| 1|Node001-P001-001|
|Node001|Node001-P001| 2|Node001-P001-002|
|Node001|Node001-P002| 3|Node001-P002-003|
|Node001|Node001-P002| 4|Node001-P002-004|
|Node001|Node001-P002| 5|Node001-P002-005|
|Node001|Node001-P002| 6|Node001-P002-006|
|Node001|Node001-P003| 7|Node001-P003-007|
|Node001|Node001-P004| 8|Node001-P004-008|
|Node001|Node001-P004| 9|Node001-P004-009|
|Node001|Node001-P004| 10|Node001-P004-010|
|Node001|Node001-P005| 11|Node001-P005-011|
|Node001|Node001-P005| 12|Node001-P005-012|
|Node001|Node001-P005| 13|Node001-P005-013|
------- ------------ -------- ----------------
CodePudding user response:
The following way, you will not need to hardcode all the struct fields. But you will need to provide a list of those columns/fields which have the type of array of struct. You have 3 of such fields, we will add one more column, so in total it will be 4.
First of all, the dataframe, similar to yours:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(
('a', 'b'),
(
(
'name_1',
[
([
(
'pname_111',
[
(1111, 'field_1111'),
(1112, 'field_1112')
]
),
(
'pname_112',
[
(1121, 'field_1121'),
(1122, 'field_1122')
]
)
],),
([
(
'pname_121',
[
(1211, 'field_1211'),
(1212, 'field_1212')
]
),
(
'pname_122',
[
(1221, 'field_1221'),
(1222, 'field_1222')
]
)
],)
]
),
(
'name_2',
[
([
(
'pname_211',
[
(2111, 'field_2111'),
(2112, 'field_2112')
]
),
(
'pname_212',
[
(2121, 'field_2121'),
(2122, 'field_2122')
]
)
],),
([
(
'pname_221',
[
(2211, 'field_2211'),
(2212, 'field_2212')
]
),
(
'pname_222',
[
(2221, 'field_2221'),
(2222, 'field_2222')
]
)
],)
]
)
),
)],
'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
)
# df.printSchema()
# root
# |-- metadata: struct (nullable = true)
# | |-- fld1: string (nullable = true)
# | |-- fld2: string (nullable = true)
# |-- data: struct (nullable = true)
# | |-- node1: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- productlist: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- productvalues: array (nullable = true)
# | | | | | |-- element: struct (containsNull = true)
# | | | | | | |-- pname: string (nullable = true)
# | | | | | | |-- porders: array (nullable = true)
# | | | | | | | |-- element: struct (containsNull = true)
# | | | | | | | | |-- ordernum: integer (nullable = true)
# | | | | | | | | |-- field: string (nullable = true)
# | |-- node2: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- productlist: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- productvalues: array (nullable = true)
# | | | | | |-- element: struct (containsNull = true)
# | | | | | | |-- pname: string (nullable = true)
# | | | | | | |-- porders: array (nullable = true)
# | | | | | | | |-- element: struct (containsNull = true)
# | | | | | | | | |-- ordernum: integer (nullable = true)
# | | | | | | | | |-- field: string (nullable = true)
The answer
Spark 3.1
nodes = df.select("data.*").columns for n in nodes: df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*"))) df = df.withColumn("data", F.array("data.*")) for arr_of_struct in ["data", "productlist", "productvalues", "porders"]: df = df.select( *[c for c in df.columns if c != arr_of_struct], F.expr(f"inline({arr_of_struct})") )
Lower Spark versions:
nodes = df.select("data.*").columns for n in nodes: df = df.withColumn( "data", F.struct( F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n), *[f"data.{c}" for c in df.select("data.*").columns if c != n] ) ) df = df.withColumn("data", F.array("data.*")) for arr_of_struct in ["data", "productlist", "productvalues", "porders"]: df = df.select( *[c for c in df.columns if c != arr_of_struct], F.expr(f"inline({arr_of_struct})") )
Results:
df.printSchema()
# root
# |-- metadata: struct (nullable = true)
# | |-- fld1: string (nullable = true)
# | |-- fld2: string (nullable = true)
# |-- node: string (nullable = false)
# |-- name: string (nullable = true)
# |-- pname: string (nullable = true)
# |-- ordernum: integer (nullable = true)
# |-- field: string (nullable = true)
df.show()
# -------- ----- ------ --------- -------- ----------
# |metadata| node| name| pname|ordernum| field|
# -------- ----- ------ --------- -------- ----------
# | {a, b}|node1|name_1|pname_111| 1111|field_1111|
# | {a, b}|node1|name_1|pname_111| 1112|field_1112|
# | {a, b}|node1|name_1|pname_112| 1121|field_1121|
# | {a, b}|node1|name_1|pname_112| 1122|field_1122|
# | {a, b}|node1|name_1|pname_121| 1211|field_1211|
# | {a, b}|node1|name_1|pname_121| 1212|field_1212|
# | {a, b}|node1|name_1|pname_122| 1221|field_1221|
# | {a, b}|node1|name_1|pname_122| 1222|field_1222|
# | {a, b}|node2|name_2|pname_211| 2111|field_2111|
# | {a, b}|node2|name_2|pname_211| 2112|field_2112|
# | {a, b}|node2|name_2|pname_212| 2121|field_2121|
# | {a, b}|node2|name_2|pname_212| 2122|field_2122|
# | {a, b}|node2|name_2|pname_221| 2211|field_2211|
# | {a, b}|node2|name_2|pname_221| 2212|field_2212|
# | {a, b}|node2|name_2|pname_222| 2221|field_2221|
# | {a, b}|node2|name_2|pname_222| 2222|field_2222|
# -------- ----- ------ --------- -------- ----------
Explanation
nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
Using the above, I decided to save the node title in case you need it.
It first gets a list of nodes from "data" column fields. Using the list, the for
loop creates one more field inside every node struct for the title of the node.
df = df.withColumn("data", F.array("data.*"))
The above converts the "data" column type from struct to array so that in the next step we could easily explode it into columns.
for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)
In the above, the main line is F.expr(f"inline({arr_of_struct})")
. It must be used inside a loop, because it's a generator and you cannot nest them together in Spark. inline
explodes arrays of structs into columns. At this step you have 4 of [array of struct], so 4 inline
expressions will be created.