Home > Mobile >  Pulling values into new column in pyspark when .loc does not work?
Pulling values into new column in pyspark when .loc does not work?


Trying to convert pandas code to pandas api on pyspark. I was using .loc to add a new column finalValue in pandas, but this is not working w/ pyspark. Instead of adding a new column and values w/ .loc, nothing happens.

A. Sample data:

d = {'posNeg': ['positive','positive','negative'], 'valuePositive': [2, 2, 3], 'valueNegative': [-2, -2, -3]}
df = ps.DataFrame(d)

B. Expected output:

 ---------- --------------- --------------- ------------ 
| posNeg   | valuePositive | valueNegative | finalValue | 
 ---------- --------------- --------------- ------------ 
| positive | 2             | -2            | 2          |
| positive | 2             | -2            | 2          |
| negative | 3             | -3            | -3         | 
 ---------- --------------- --------------- ------------ 

C. Code I'm trying to adapt from pandas, but is doing nothing in pyspark:  

df.loc[df['posNeg'] == 'positive', 'finalValue'] = df.loc[df['posNeg'] == 'positive', 'valuePositive']
df.loc[df['posNeg'] == 'negative', 'finalValue'] = df.loc[df['posNeg'] == 'negative', 'valueNegative']

  D. Backup code I've started using, which works, but is very slow:

def addFinalValue(row):
    if row['posNeg'] == 'positive':
        return row['weightedPositive']
    elif row['posNeg'] == 'negative':
        return row['weightedInverse']
        return np.NaN
df['finalValue'] = df.apply(addFinalValue,axis=1)

Any ideas why pyspark is ignoring the .loc? Are there any suggestions for adapting C to make it work with pyspark? I'm unsure of other ways beyond what I came up with in D to pull values into a new column contingent on a certain parameter (in this case, whether the row is positive or negative), but would love anything that is more performant.

Thank you!

CodePudding user response:

Spark Implementation , using case-when

Data Preparation

d = {'posNeg': ['positive','positive','negative'], 'valuePositive': [2, 2, 3], 'valueNegative': [-2, -2, -3]}

df = pd.DataFrame(d)

sparkDF = sql.createDataFrame(df)


 -------- ------------- ------------- 
|  posNeg|valuePositive|valueNegative|
 -------- ------------- ------------- 
|positive|            2|           -2|
|positive|            2|           -2|
|negative|            3|           -3|
 -------- ------------- ------------- 

Case - When

sparkDF = sparkDF.withColumn('finalValue',F.when(F.col('posNeg') == 'positive'

 -------- ------------- ------------- ---------- 
|  posNeg|valuePositive|valueNegative|finalValue|
 -------- ------------- ------------- ---------- 
|positive|            2|           -2|         2|
|positive|            2|           -2|         2|
|negative|            3|           -3|        -3|
 -------- ------------- ------------- ---------- 
  • Related