Considering a spark dataframe named employees
such as this one :
---------- -----
| name | age |
---------- -----
| John | 32 |
| Elizabeth| 28 |
| Eric | 41 |
---------- -----
and an array of string state = ["LA", "AZ", "OH"]
, I want to append this array to df
as a new column so the dataframe will then look like :
---------- ----- -------
| name | age | state |
---------- ----- -------
| John | 32 | LA |
| Elizabeth| 28 | AZ |
| Eric | 41 | OH |
---------- ----- -------
How can I achieve this in Scala (or Java, it's almost the same) ? I have only seen how to add same value to all rows on the net, and here I want different values for each one.
Thank you ! :)
CodePudding user response:
Since spark runs in distributed mode, you cannot add column based values on array with index. Suppose spark runs with two workers and John and Elizabeth deliver to worker A and Eric deliver to worker B. Indeed, they will split when save in dataframe. The workers don't know what is the index of John,Elizabeth or Eric. You can do what you want simply in a normal java single program.
In your example you need to convert your array to dataframe and use join
to merge two dataframes based a column with the same value. However, you can use crossJoin
to do a cartesian product on your tables.
Dataset<Row> ndf = df.crossJoin(df2);
If you need just add a column with a constant value or a value based another column on the same dataframe, use withColumn
as below:
Dataset<Row> ndf = df.withColumn("city",functions.lit(1));
Dataset<Row> ndf = df.withColumn("city",functions.rand());
Dataset<Row> ndf = df.withColumn("city",functions.col("name"));
At last, you can use Atomic like this to get what you want. I test it in spark single mode.
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "H:\\work\\HadoopWinUtils\\");
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");
List<String> city_array = Arrays.asList("LA", "AZ", "OH");
// Displays the content of the DataFrame to stdout
df.show();
df = df.withColumn("city",functions.col("name"));
AtomicInteger i= new AtomicInteger();
Dataset<Row> df3 = df.map((MapFunction<Row, Row>) value -> {
return RowFactory.create(value.get(0),value.get(1),city_array.get(i.getAndIncrement()));
//return city_array.get(i.getAndIncrement());
}, RowEncoder.apply(df.schema()));
df3.show();
}
People is
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
and the result is
---- ------- ----
| age| name|city|
---- ------- ----
|null|Michael| LA|
| 30| Andy| AZ|
| 19| Justin| OH|
---- ------- ----
CodePudding user response:
You can try something like this in pyspark.
>>> _TRANSFORMED_DF_SCHEMA = StructType([
... StructField('name', StringType(), False),
... StructField('age', IntegerType(), False),
... StructField('id', IntegerType(), False),
... StructField('state', StringType(), False),
... ])
>>>
>>> state = ['LA', 'AZ', 'OH']
>>> data = (['John', 32], ['Eli', 28], ['Eric', 41])
>>> df = spark.createDataFrame(data, schema=['name', 'age'])
>>> rdd1 = df.rdd.zipWithIndex()
>>> df1 = rdd1.toDF()
>>> df1.show()
---------- ---
| _1| _2|
---------- ---
|[John, 32]| 0|
| [Eli, 28]| 1|
|[Eric, 41]| 2|
---------- ---
>>> df_final = df1.select(df1['_1']['name'].alias('name'), df1['_1']['age'].alias('age'), df1['_2'].alias('id'))
>>> df_final.show()
---- --- ---
|name|age| id|
---- --- ---
|John| 32| 0|
| Eli| 28| 1|
|Eric| 41| 2|
---- --- ---
>>> def add_state(row_dict):
... new_dict = dict()
... new_dict['name'] = row_dict['name']
... new_dict['age'] = row_dict['age']
... new_dict['id'] = row_dict['id']
... new_dict['state'] = state[row_dict['id']]
... return new_dict
...
>>> df_rdd = df_final.rdd.map(add_state)
>>> df_final = spark.createDataFrame(df_rdd, schema=_TRANSFORMED_DF_SCHEMA)
>>> df_final.show()
---- --- --- -----
|name|age| id|state|
---- --- --- -----
|John| 32| 0| LA|
| Eli| 28| 1| AZ|
|Eric| 41| 2| OH|
---- --- --- -----