Home > database >  Create a dataframe by iterating over column of list in another dataframe
Create a dataframe by iterating over column of list in another dataframe

Time:10-08

In pyspark, I have a DataFrame with a column that contains a list of ordered nodes to go through:

osmDF.schema
Out[1]:
 StructType(List(StructField(id,LongType,true),
                         StructField(nodes,ArrayType(LongType,true),true),
                         StructField(tags,MapType(StringType,StringType,true),true)))
osmDF.head(3)
Out[2]:
|     id    |                         nodes                       |         tags        |
|-----------|-----------------------------------------------------|---------------------|
| 62960871  | [783186590,783198852]                               | "{""foo"":""bar""}" |
| 211528816 | [2215187080,2215187140,2215187205,2215187256]       | "{""foo"":""boo""}" |
| 62960872  | [783198772,783183397,783167527,783169067,783198772] | "{""foo"":""buh""}" |

I need to create a dataframe with a row for each consecutive combination of 2 nodes the list of nodes, then save it as parquet.

The expected result will have a length of n-1, with n len(nodes) for each rows. It would look like this (with other columns that I'll add):

|           id          |    from    |      to    |         tags        |
|-----------------------|------------|------------|---------------------|
| 783186590_783198852   | 783186590  | 783198852  | "{""foo"":""bar""}" |
| 2215187080_2215187140 | 2215187080 | 2215187140 | "{""foo"":""boo""}" |
| 2215187140_2215187205 | 2215187140 | 2215187205 | "{""foo"":""boo""}" |
| 2215187205_2215187256 | 2215187205 | 2215187256 | "{""foo"":""boo""}" |
| 783198772_783183397   | 783198772  | 783183397  | "{""foo"":""buh""}" |
| 783183397_783167527   | 783183397  | 783167527  | "{""foo"":""buh""}" |
| 783167527_783169067   | 783167527  | 783169067  | "{""foo"":""buh""}" |
| 783169067_783198772   | 783169067  | 783198772  | "{""foo"":""buh""}" |

I tried to initiate with the following

from pyspark.sql.functions import udf

def split_ways_into_arcs(row):
    arcs = []
    for node in range(len(row['nodes']) - 1):
      arc = dict()
      
      arc['id'] = str(row['nodes'][node])   "_"   str(row['nodes'][node   1])
      
      arc['from'] = row['nodes'][node]
      arc['to'] = row['nodes'][node   1]
      arc['tags'] = row['tags']
      
      arcs.append(arc)

    return arcs

# Declare function as udf
split = udf(lambda row: split_ways_into_arcs(row.asDict()))

The issue I'm having is I don't know how many nodes there are in each row of the original DataFrame. I know how to apply a udf to add a column to an existing DataFrame, but not to create a new one from lists of dicts.

CodePudding user response:

Iterate over the nodes array using transform and explode the array afterwards:

from pyspark.sql import functions as F

df = ...

df.withColumn("nodes", F.expr("transform(nodes, (n,i) -> named_struct('from', nodes[i], 'to', nodes[i 1]))")) \
  .withColumn("nodes", F.explode("nodes")) \
  .filter("not nodes.to is null") \
  .selectExpr("concat_ws('_', nodes.to, nodes.from) as id", "nodes.*", "tags") \
  .show(truncate=False)

Output:

 --------------------- ---------- ---------- ----------------- 
|id                   |from      |to        |tags             |
 --------------------- ---------- ---------- ----------------- 
|783198852_783186590  |783186590 |783198852 |{""foo"":""bar""}|
|2215187140_2215187080|2215187080|2215187140|{""foo"":""boo""}|
|2215187205_2215187140|2215187140|2215187205|{""foo"":""boo""}|
|2215187256_2215187205|2215187205|2215187256|{""foo"":""boo""}|
|783183397_783198772  |783198772 |783183397 |{""foo"":""buh""}|
|783167527_783183397  |783183397 |783167527 |{""foo"":""buh""}|
|783169067_783167527  |783167527 |783169067 |{""foo"":""buh""}|
|783198772_783169067  |783169067 |783198772 |{""foo"":""buh""}|
 --------------------- ---------- ---------- ----------------- 
  • Related