Home > Back-end >  read key value pairs from a text file in pyspark
read key value pairs from a text file in pyspark

Time:11-30

Given superset and superset2 input text files. Superset is having all the required headers (keys).

Rows from superset2 file may be missing value for some column in header, need to fill missing value with $ character.

A. superset:

a|b|c|d|e|f|g|h|i|j

B. superset2:

a:1,b:1,d:1,e:1,h:1
a:2,e:2,d:2,h:2,f:2
c:3,e:3,d:3,h:3,f:3
a:4,b:4,c:4,e:4,h:4,f:4,i:4,j:4

Expected output:

a|b|c|d|e|f|g|h|i|j
1|1|$|1|1|$|$|1|$|$
2|$|$|2|2|2|$|2|$|$
$|$ |3|3|3|3|$|3|$|$
4|4|4|$|4|4|$|4|4|4

CodePudding user response:

Read the 2 files into Dataframes and:

  1. get the list of keys (columns) of the the first dataframe
  2. do some transformations on the second dataframe which contains the data, by splitting the values first by , then second by : using combination of transform and map_from_entries functions to convert each row into a map column
  3. finally using list comprehension on the list of keys select the columns and fillna to replace nulls by $:
from pyspark.sql import functions as F


keys = spark.read.csv(keys_file_path, sep="|", header=True).columns
data = spark.read.text(data_file_path)

df = data.withColumn(
    "value",
    F.map_from_entries(
        F.expr("""transform(
                        split(value , ','), 
                        x -> struct(split(x, ':')[0] as col, split(x, ':')[1] as val)
        )""")
    )
).select(*[
    F.col("value").getItem(k).alias(k) for k in keys
]).fillna("$")

df.show(truncate=False)
# --- --- --- --- --- --- --- --- --- --- 
#|a  |b  |c  |d  |e  |f  |g  |h  |i  |j  |
# --- --- --- --- --- --- --- --- --- --- 
#|1  |1  |$  |1  |1  |$  |$  |1  |$  |$  |
#|2  |$  |$  |2  |2  |2  |$  |2  |$  |$  |
#|$  |$  |3  |3  |3  |3  |$  |3  |$  |$  |
#|4  |4  |4  |$  |4  |4  |$  |4  |4  |4  |
# --- --- --- --- --- --- --- --- --- --- 

  • Related