Home > Mobile >  Adding array values to a spark dataframe as new column
Adding array values to a spark dataframe as new column

Time:10-21

Considering a spark dataframe named employees such as this one :

 ---------- ----- 
|   name   | age |
 ---------- ----- 
|   John   | 32  |
| Elizabeth| 28  |
|   Eric   | 41  |
 ---------- ----- 

and an array of string state = ["LA", "AZ", "OH"], I want to append this array to df as a new column so the dataframe will then look like :

 ---------- ----- ------- 
|   name   | age | state |
 ---------- ----- ------- 
|   John   | 32  |   LA  |
| Elizabeth| 28  |   AZ  |
|   Eric   | 41  |   OH  |
 ---------- ----- ------- 

How can I achieve this in Scala (or Java, it's almost the same) ? I have only seen how to add same value to all rows on the net, and here I want different values for each one.

Thank you ! :)

CodePudding user response:

Since spark runs in distributed mode, you cannot add column based values on array with index. Suppose spark runs with two workers and John and Elizabeth deliver to worker A and Eric deliver to worker B. Indeed, they will split when save in dataframe. The workers don't know what is the index of John,Elizabeth or Eric. You can do what you want simply in a normal java single program.

In your example you need to convert your array to dataframe and use join to merge two dataframes based a column with the same value. However, you can use crossJoin to do a cartesian product on your tables.

Dataset<Row> ndf = df.crossJoin(df2);

If you need just add a column with a constant value or a value based another column on the same dataframe, use withColumn as below:

Dataset<Row> ndf = df.withColumn("city",functions.lit(1));
Dataset<Row> ndf = df.withColumn("city",functions.rand());
Dataset<Row> ndf = df.withColumn("city",functions.col("name"));

At last, you can use Atomic like this to get what you want. I test it in spark single mode.

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "H:\\work\\HadoopWinUtils\\");
        SparkSession spark = SparkSession
                .builder()
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");

        List<String> city_array = Arrays.asList("LA", "AZ", "OH");
        // Displays the content of the DataFrame to stdout
        df.show();
   
        df = df.withColumn("city",functions.col("name"));

        AtomicInteger i= new AtomicInteger();

        Dataset<Row> df3 = df.map((MapFunction<Row, Row>) value -> {
            return RowFactory.create(value.get(0),value.get(1),city_array.get(i.getAndIncrement()));
            //return city_array.get(i.getAndIncrement());
        }, RowEncoder.apply(df.schema()));

        df3.show();
    }

People is

 ---- ------- 
| age|   name|
 ---- ------- 
|null|Michael|
|  30|   Andy|
|  19| Justin|
 ---- ------- 

and the result is

 ---- ------- ---- 
| age|   name|city|
 ---- ------- ---- 
|null|Michael|  LA|
|  30|   Andy|  AZ|
|  19| Justin|  OH|
 ---- ------- ---- 

CodePudding user response:

You can try something like this in pyspark.

>>> _TRANSFORMED_DF_SCHEMA = StructType([
...     StructField('name', StringType(), False),
...     StructField('age', IntegerType(), False),
...     StructField('id', IntegerType(), False),
...     StructField('state', StringType(), False),
... ])
>>> 
>>> state = ['LA', 'AZ', 'OH']
>>> data = (['John', 32], ['Eli', 28], ['Eric', 41])
>>> df = spark.createDataFrame(data, schema=['name', 'age'])
>>> rdd1 = df.rdd.zipWithIndex()
>>> df1 = rdd1.toDF()
>>> df1.show()
 ---------- --- 
|        _1| _2|
 ---------- --- 
|[John, 32]|  0|
| [Eli, 28]|  1|
|[Eric, 41]|  2|
 ---------- --- 

>>> df_final = df1.select(df1['_1']['name'].alias('name'), df1['_1']['age'].alias('age'), df1['_2'].alias('id'))
>>> df_final.show()
 ---- --- --- 
|name|age| id|
 ---- --- --- 
|John| 32|  0|
| Eli| 28|  1|
|Eric| 41|  2|
 ---- --- --- 

>>> def add_state(row_dict):
...     new_dict = dict()
...     new_dict['name'] = row_dict['name']
...     new_dict['age'] = row_dict['age']
...     new_dict['id'] = row_dict['id']
...     new_dict['state'] = state[row_dict['id']]
...     return new_dict
... 
>>> df_rdd = df_final.rdd.map(add_state)
>>> df_final = spark.createDataFrame(df_rdd, schema=_TRANSFORMED_DF_SCHEMA)
>>> df_final.show()
 ---- --- --- ----- 
|name|age| id|state|
 ---- --- --- ----- 
|John| 32|  0|   LA|
| Eli| 28|  1|   AZ|
|Eric| 41|  2|   OH|
 ---- --- --- ----- 
  • Related