Home > Mobile >  Pulling values into new column in pyspark when .loc does not work?
Pulling values into new column in pyspark when .loc does not work?

Time:07-16

Trying to convert pandas code to pandas api on pyspark. I was using .loc to add a new column finalValue in pandas, but this is not working w/ pyspark. Instead of adding a new column and values w/ .loc, nothing happens.

A. Sample data:

d = {'posNeg': ['positive','positive','negative'], 'valuePositive': [2, 2, 3], 'valueNegative': [-2, -2, -3]}
df = ps.DataFrame(d)

B. Expected output:

 ---------- --------------- --------------- ------------ 
| posNeg   | valuePositive | valueNegative | finalValue | 
 ---------- --------------- --------------- ------------ 
| positive | 2             | -2            | 2          |
| positive | 2             | -2            | 2          |
| negative | 3             | -3            | -3         | 
 ---------- --------------- --------------- ------------ 

C. Code I'm trying to adapt from pandas, but is doing nothing in pyspark:  

df.loc[df['posNeg'] == 'positive', 'finalValue'] = df.loc[df['posNeg'] == 'positive', 'valuePositive']
 
df.loc[df['posNeg'] == 'negative', 'finalValue'] = df.loc[df['posNeg'] == 'negative', 'valueNegative']

  D. Backup code I've started using, which works, but is very slow:

def addFinalValue(row):
    if row['posNeg'] == 'positive':
        return row['weightedPositive']
    elif row['posNeg'] == 'negative':
        return row['weightedInverse']
    else:
        return np.NaN
    
df['finalValue'] = df.apply(addFinalValue,axis=1)

Any ideas why pyspark is ignoring the .loc? Are there any suggestions for adapting C to make it work with pyspark? I'm unsure of other ways beyond what I came up with in D to pull values into a new column contingent on a certain parameter (in this case, whether the row is positive or negative), but would love anything that is more performant.

Thank you!

CodePudding user response:

Spark Implementation , using case-when

Data Preparation

d = {'posNeg': ['positive','positive','negative'], 'valuePositive': [2, 2, 3], 'valueNegative': [-2, -2, -3]}

df = pd.DataFrame(d)

sparkDF = sql.createDataFrame(df)

sparkDF.show()

 -------- ------------- ------------- 
|  posNeg|valuePositive|valueNegative|
 -------- ------------- ------------- 
|positive|            2|           -2|
|positive|            2|           -2|
|negative|            3|           -3|
 -------- ------------- ------------- 

Case - When

sparkDF = sparkDF.withColumn('finalValue',F.when(F.col('posNeg') == 'positive'
                                 ,F.col('valuePositive')
                                ).otherwise(F.col('valueNegative'))
                            )
                            
sparkDF.show()

 -------- ------------- ------------- ---------- 
|  posNeg|valuePositive|valueNegative|finalValue|
 -------- ------------- ------------- ---------- 
|positive|            2|           -2|         2|
|positive|            2|           -2|         2|
|negative|            3|           -3|        -3|
 -------- ------------- ------------- ---------- 
  • Related