I have data incoming in batches and several columns that come from pivoting the values of another column, so the number of columns varies, one of the columns rarely receives any data ('surprise'),
Because this column doesn't always get created and it is in the .select()
statement, sometimes it interrupts the process throwing and AnalysisException.
I got to solve it like this and it seems to work so far, but I'm looking for a better way to solve this because this doesn't look like good code:
try:
df = agg_sentiment \
.select('created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'surprise', 'anger')
except Exception:
df = agg_sentiment \
.select('created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'anger')
As you can see I removed 'surprise' from the select statement in the except part. Is there a way in PySpark to handle this type of situations?
CodePudding user response:
I think you could check if the desired columns exists in dataframe before selecting columns.
Here is an example
import pyspark.sql.functions as fx
# All wanted columns including possible missing ones
colWanted = ['created_at', 'topic', 'counts',
'fear', 'joy', 'sadness', 'surprise', 'anger']
colSelectPossible = []
for col in colWanted:
if col in aggSentiment.columns:
# Column exists, so save it to select later on
colSelectPossible.append(col)
df = aggSentiment.select(colSelectPossible)