I got VCF data format in Databricks. I wish to rename the subjects based on dictionary.
I got dictionary where I got the key-new names. Then I got function to get the new values and return values work so far:
import pyspark.sql.functions as F
keys= {'old_name': 'new_name'}
mapping_func = lambda x: keys.get(x)
df.withColumn('foo', udf(mapping_func, F.StringType())('geno.sampleId'))
Producing new column foo
. What I need to assign the values in the nested structure: (Last row)
StructField(contigName,StringType,true)
StructField(start,LongType,true)
StructField(end,LongType,true)
StructField(names,ArrayType(StringType,true),true)
StructField(referenceAllele,StringType,true)
StructField(alternateAlleles,ArrayType(StringType,true),true)
StructField(qual,DoubleType,true)
StructField(filters,ArrayType(StringType,true),true)
StructField(splitFromMultiAllelic,BooleanType,true)
StructField(geno,StructType(List(StructField(sampleId,StringType,true),StructField(CN,IntegerType,true),StructField(phased,BooleanType,true),StructField(calls,ArrayType(IntegerType,true),true))),true)
Something like this:
df = df.withColumn(F.col('geno').sampleId, udf(mapping_func, F.StringType())('geno.sampleId'))
But this says
Column is not iterable
How would I go about assigning the values to proper place?
Scala 2.12 and spark 3.01
CodePudding user response:
From my understanding, you don't need to use UDF here. You can simply use a map column expression instead:
from itertools import chain
import pyspark.sql.functions as F
keys_map = F.create_map(*[F.lit(x)for x in chain(*keys.items())])
Now, to update a nested field in a struct you need to recreate the whole struct column (for Spark 3.1 , you'd use withField
method):
df = df.withColumn(
"geno",
F.struct(
keys_map[F.col("geno.sampleId")].alias("sampleId"), # replaces sampleId value according to your keys mapping
F.col("geno.CN").alias("CN"),
F.col("geno.phased").alias("phased"),
F.col("geno.calls").alias("calls")
)
)