Given superset
and superset2
input text files. Superset
is having all the required headers (keys
).
Rows from superset2
file may be missing value for some column in header, need to fill missing value with $
character.
A. superset:
a|b|c|d|e|f|g|h|i|j
B. superset2:
a:1,b:1,d:1,e:1,h:1
a:2,e:2,d:2,h:2,f:2
c:3,e:3,d:3,h:3,f:3
a:4,b:4,c:4,e:4,h:4,f:4,i:4,j:4
Expected output:
a|b|c|d|e|f|g|h|i|j
1|1|$|1|1|$|$|1|$|$
2|$|$|2|2|2|$|2|$|$
$|$ |3|3|3|3|$|3|$|$
4|4|4|$|4|4|$|4|4|4
CodePudding user response:
Read the 2 files into Dataframes and:
- get the list of keys (columns) of the the first dataframe
- do some transformations on the second dataframe which contains the data, by splitting the values first by
,
then second by:
using combination oftransform
andmap_from_entries
functions to convert each row into a map column - finally using list comprehension on the list of keys select the columns and
fillna
to replace nulls by$
:
from pyspark.sql import functions as F
keys = spark.read.csv(keys_file_path, sep="|", header=True).columns
data = spark.read.text(data_file_path)
df = data.withColumn(
"value",
F.map_from_entries(
F.expr("""transform(
split(value , ','),
x -> struct(split(x, ':')[0] as col, split(x, ':')[1] as val)
)""")
)
).select(*[
F.col("value").getItem(k).alias(k) for k in keys
]).fillna("$")
df.show(truncate=False)
# --- --- --- --- --- --- --- --- --- ---
#|a |b |c |d |e |f |g |h |i |j |
# --- --- --- --- --- --- --- --- --- ---
#|1 |1 |$ |1 |1 |$ |$ |1 |$ |$ |
#|2 |$ |$ |2 |2 |2 |$ |2 |$ |$ |
#|$ |$ |3 |3 |3 |3 |$ |3 |$ |$ |
#|4 |4 |4 |$ |4 |4 |$ |4 |4 |4 |
# --- --- --- --- --- --- --- --- --- ---