Home > database >  PySpark crashes when creating new column by applying function upon existing columns in large df
PySpark crashes when creating new column by applying function upon existing columns in large df

Time:10-05

I have the following dataframe created from a compressed 10gb .gz file in csv format:

 ------------------- ---------- -------- ---- 
|           tweet_id|      date|    time|lang|
 ------------------- ---------- -------- ---- 
|1212360731695427584|2020-01-01|13:11:37|  en|
|1212470713338286081|2020-01-01|20:28:39|  ru|
|1212537749485449216|2020-01-02|00:55:01|  ru|
 ------------------- ---------- -------- ---- 

I am attempting to make a new column by transforming the date and time string colums into a unix timestamp:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime, date
import time

spark = SparkSession.builder.appName("Tweets").getOrCreate()
df = spark.read.csv('tweets.gz', header=True, sep=r'\t')

def tounixtime(date_s, time_s):
    if None in (date_s, time_s):
        return -1
    
    ymd = tuple([int(x) for x in date_s.split("-")])
    t = [int(x) for x in time_s.split(":")]
    d = date(*ymd).timetuple()
    return int(time.mktime(d)   t[0] * 3600   t[1] * 60   t[2])
        
tounix = udf(tounixtime, IntegerType())

df.withColumn('timestamp', tounix(df.date, df.time)).show()

I get an exception that an error occurred in some stage of the process, and that python failed to reconnect. I'm not sure what's wrong here

CodePudding user response:

Without using any function, a simple cast can do the job as your data are pretty neat :

from pyspark.sql import functions as F

df_2 = df.withColumn(
    "tmst", F.concat_ws(" ", F.col("date"), F.col("time")).cast("timestamp")
)  # or F.concat(F.col("date"), F.lit(" "), F.col("time"))

df_2.show()
 ------------------- ---------- -------- ---- ------------------- 
|           tweet_id|      date|    time|lang|               tmst|
 ------------------- ---------- -------- ---- ------------------- 
|1212360731695427584|2020-01-01|13:11:37|  en|2020-01-01 13:11:37|
|1212470713338286081|2020-01-01|20:28:39|  ru|2020-01-01 20:28:39|
|1212537749485449216|2020-01-02|00:55:01|  ru|2020-01-02 00:55:01|
 ------------------- ---------- -------- ---- ------------------- 

df_2.printSchema()
root
 |-- tweet_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- tmst: timestamp (nullable = true)
  • Related