I've read a file in pyspark, downloaded from s3 bucket which is batched by the kinesis firehose delivery stream, to analyse the data in pyspark application and again upload the processed the file in new aws s3 bucket.
I've written code to read the file from local machine which I've downloaded from s3 bucket. I want to convert the timestamp in the json payload to this format :-
dd-MM-yyyy HH:mm:ss
Here is my code:-
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
spark = SparkSession.builder.appName('demo').getOrCreate()
d = spark.read.json('firehose-ds-iot-data-to-s3-1-2023-01-10-14-31-00-f23001c7-7759-38ca-9f14-4682cc39ae89')
d.show()
d.withColumn('ts',f.date_format(d.ts.cast(dataType=t.TimestampType()),"yyyy-MM-dd HH:mm:ss"))
d.select('ts').show(truncate=False)
Here is my dataset:-
------------- ------------ ----------- ----------- --------- ----------- -------------
|Active-Import|Active-Power|Pyranometer|Temperature|Voltage-1|device_name| ts|
------------- ------------ ----------- ----------- --------- ----------- -------------
| 2.57| 0| 0| 25.3| 239.65| inHand-RTU|1673361060486|
| 2.57| 0| 0| 25.3| 239.44| inHand-RTU|1673361075375|
| 2.57| 0| 0| 25.3| 239.44| inHand-RTU|1673361090384|
| 2.57| 0| 0| 25.3| 239.44| inHand-RTU|1673361105397|
| 2.57| 0| 0| 25.3| 239.44| inHand-RTU|1673361120532|
| 2.57| 0| 0| 25.3| 239.43| inHand-RTU|1673361135503|
| 2.57| 0| 0| 25.3| 239.43| inHand-RTU|1673361150520|
| 2.57| 0| 0| 25.3| 239.27| inHand-RTU|1673361165428|
| 2.57| 0| 0| 25.3| 236.14| inHand-RTU|1673361180435|
| 2.57| 0| 0| 25.3| 236.14| inHand-RTU|1673361195440|
| 2.57| 0| 0| 25.3| 236.03| inHand-RTU|1673361210450|
| 2.57| 0| 0| 25.3| 236.03| inHand-RTU|1673361225498|
| 2.57| 0| 0| 25.3| 236.08| inHand-RTU|1673361240595|
| 2.57| 0| 0| 25.3| 236.08| inHand-RTU|1673361255512|
| 2.57| 0| 0| 25.3| 236.09| inHand-RTU|1673361270490|
| 2.57| 0| 0| 25.3| 235.96| inHand-RTU|1673361285544|
| 2.57| 0| 0| 25.3| 235.96| inHand-RTU|1673361300800|
| 2.57| 0| 0| 25.3| 235.94| inHand-RTU|1673361315630|
| 2.57| 0| 0| 25.3| 235.94| inHand-RTU|1673361330528|
| 2.57| 0| 0| 25.3| 235.75| inHand-RTU|1673361345566|
------------- ------------ ----------- ----------- --------- ----------- -------------
only showing top 20 rows
After writing trying the code to convert the epoch timestamp into the "dd-mm-yyyy hh:mm:ss" format here is my output:-
---------------------
|ts |
---------------------
| 54996-09-13 00:00:00|
| 54996-09-13 00:00:00|
| 54996-09-13 00:00:00|
| 54996-09-13 00:00:00|
| 54996-09-13 00:00:00|
| 54996-09-13 00:00:00|
| 54996-09-14 00:00:00|
| 54996-09-14 00:00:00|
| 54996-09-14 00:00:00|
| 54996-09-14 00:00:00|
| 54996-09-14 00:00:00|
| 54996-09-15 00:00:00|
| 54996-09-15 00:00:00|
| 54996-09-15 00:00:00|
| 54996-09-15 00:00:00|
| 54996-09-15 00:00:00|
| 54996-09-15 00:00:00|
| 54996-09-16 00:00:00|
| 54996-09-16 00:00:00|
| 54996-09-16 00:00:00|
---------------------
only showing top 20 rows
Any help for the issue will be highly appreciated.
CodePudding user response:
Here is a sample code to convert epoch to date. Be sure that epoch column is StringType, and use your timezone in from_unixtime method.
from pyspark.sql.functions import from_utc_timestamp, from_unixtime
from pyspark.sql.types import StringType
d = ['1673361060486']
df = spark.createDataFrame(d, StringType())
df = df.withColumn('ts',from_utc_timestamp(from_unixtime(df.value/1000,"yyyy-MM-dd hh:mm:ss"),'GMT 1')).show()
CodePudding user response:
Can you try doing the casting with epoch
instead of ts
:
from pyspark.sql import functions as f
from pyspark.sql import types as t
d.withColumn('ts', f.date_format(d.epoch.cast(dataType=t.TimestampType()), "yyyy-MM-dd HH:mm:ss"))
UPDATE:
d.withColumn("ts",F.to_timestamp(F.col("ts") / 1000, "yyyy-MM-dd hh:mm:ss"))