Trying to convert pandas code to pandas api on pyspark. I was using .loc to add a new column finalValue
in pandas, but this is not working w/ pyspark. Instead of adding a new column and values w/ .loc, nothing happens.
A. Sample data:
d = {'posNeg': ['positive','positive','negative'], 'valuePositive': [2, 2, 3], 'valueNegative': [-2, -2, -3]}
df = ps.DataFrame(d)
B. Expected output:
---------- --------------- --------------- ------------
| posNeg | valuePositive | valueNegative | finalValue |
---------- --------------- --------------- ------------
| positive | 2 | -2 | 2 |
| positive | 2 | -2 | 2 |
| negative | 3 | -3 | -3 |
---------- --------------- --------------- ------------
C. Code I'm trying to adapt from pandas, but is doing nothing in pyspark:
df.loc[df['posNeg'] == 'positive', 'finalValue'] = df.loc[df['posNeg'] == 'positive', 'valuePositive']
df.loc[df['posNeg'] == 'negative', 'finalValue'] = df.loc[df['posNeg'] == 'negative', 'valueNegative']
D. Backup code I've started using, which works, but is very slow:
def addFinalValue(row):
if row['posNeg'] == 'positive':
return row['weightedPositive']
elif row['posNeg'] == 'negative':
return row['weightedInverse']
else:
return np.NaN
df['finalValue'] = df.apply(addFinalValue,axis=1)
Any ideas why pyspark is ignoring the .loc? Are there any suggestions for adapting C to make it work with pyspark? I'm unsure of other ways beyond what I came up with in D to pull values into a new column contingent on a certain parameter (in this case, whether the row is positive or negative), but would love anything that is more performant.
Thank you!
CodePudding user response:
Spark Implementation , using case-when
Data Preparation
d = {'posNeg': ['positive','positive','negative'], 'valuePositive': [2, 2, 3], 'valueNegative': [-2, -2, -3]}
df = pd.DataFrame(d)
sparkDF = sql.createDataFrame(df)
sparkDF.show()
-------- ------------- -------------
| posNeg|valuePositive|valueNegative|
-------- ------------- -------------
|positive| 2| -2|
|positive| 2| -2|
|negative| 3| -3|
-------- ------------- -------------
Case - When
sparkDF = sparkDF.withColumn('finalValue',F.when(F.col('posNeg') == 'positive'
,F.col('valuePositive')
).otherwise(F.col('valueNegative'))
)
sparkDF.show()
-------- ------------- ------------- ----------
| posNeg|valuePositive|valueNegative|finalValue|
-------- ------------- ------------- ----------
|positive| 2| -2| 2|
|positive| 2| -2| 2|
|negative| 3| -3| -3|
-------- ------------- ------------- ----------