I was trying to add a new column to my existing data frame in pyspark. My data frame looks like as follows. And I was trying with the help of this post Pyspark: Replacing value in a column by searching a dictionary by-searching- a-dictionary
Fruit
Orange
Orange
Apple
Banana
Apple
the code I was tring as like this
from pyspark.sql import functions as F
from itertools import chain
simple_dict = {'Orange': 'OR, 'Apple': 'AP', 'Banana': 'BN'}
mapping_expr = F.create_map([F.lit(x) for x in F.chain(*simple_dict.items())])
def addCols(data):
data = (data.withColumn('Fruit_code', mapping_expr[data['Fruit']]))
return data
Expected output:
Expected output:
Fruit Fruit_code
Orange OR
Orange OR
Apple AP
Banana BN
Apple AP
I'm getting below error: I know its because of function F. But I don't know how to fix. Can someone help me ?
FILE "/MYPROJECT/DATASETS/DERIVED/OPPORTUNITY_WON.PY", LINE 8, IN <MODULE>
MAPPING_EXPR = CREATE_MAP([LIT(X) FOR X IN CHAIN(*SIMPLE_DICT.ITEMS())])
FILE "/MYPROJECT/DATASETS/DERIVED/OPPORTUNITY_WON.PY", LINE 8, IN <LISTCOMP>
MAPPING_EXPR = CREATE_MAP([LIT(X) FOR X IN CHAIN(*SIMPLE_DICT.ITEMS())])
CodePudding user response:
I have modified your code snippet to get it working.
from pyspark.sql import functions as F
from itertools import chain
simple_dict = {'Orange': 'OR', 'Apple': 'AP', 'Banana': 'BN'}
mapping_expr = F.create_map([F.lit(x) for x in chain(*simple_dict.items())])
def addCols(data):
data = (data.withColumn('Fruit_code', mapping_expr[data['Fruit']]))
return data
data = spark.createDataFrame([("Orange", ), ("Apple", ), ("Banana", ), ], ("Fruit", ))
new_data = addCols(data)
new_data.show()
Output
------ ----------
| Fruit|Fruit_code|
------ ----------
|Orange| OR|
| Apple| AP|
|Banana| BN|
------ ----------
CodePudding user response:
I will delete it tomorrow,so disappointment……