Home > Blockchain >  How can we have same current_timestamp for all records processed in different compute nodes?
How can we have same current_timestamp for all records processed in different compute nodes?

Time:07-22

Let's suppose we have a dataframe with millions of records and we are processing this using pyspark. At some point of time we are adding a column => lit(datetime.datetime.now()). In general the completion time of previous action will be different in different compute nodes. So if the column is created in above mentioned way records in different nodes should have different value for the column atleast they should differ in some milliseconds. But when I did the same all the records were having same value for that column.

Could someone help me in understanding this behavior.

CodePudding user response:

As samkart said you should use lit function and add the current timestamp as a new column to the dataframe, that way the current timestamp will use the current time in the driver Example ():

from pyspark.sql.functions import lit, to_timestamp
import datetime

current_time = datetime.datetime.now()

print(current_time)
# 2022-07-19 13:40:06.425681

df = spark.sql("select 1")
df = df.withColumn('current_ts', lit(current_time))

df.printSchema()

# root
#  |-- 1: integer (nullable = false)
#  |-- current_ts: timestamp (nullable = false)

df.show(1, False)
#  --- --------------------------                                                 
# |1  |current_ts                |
#  --- -------------------------- 
# |1  |2022-07-19 13:40:06.425681|
#  --- -------------------------- 

# Query the data (if X is your filtering column)
df_results = df.where("X >= current_ts")

# remove the current timestamp column
df_results = df_results.drop('current_ts')

CodePudding user response:

Based on your interesting comment(s), here's a test that uses the same function but once as UDF (that executors will handle) and once as literal value (executed at runtime; not by executors).

def curr_ts():
    import datetime

    return datetime.datetime.now()

curr_ts_udf = func.udf(curr_ts, TimestampType())

data_sdf. \
    withColumn('ts_pydttm', func.lit(datetime.datetime.now())). \
    withColumn('ts_pyudf', curr_ts_udf()). \
    show(10, truncate=False)

#  ---- ---------- ---- -------------------------- -------------------------- 
# |col1|dt        |col3|ts_pydttm                 |ts_pyudf                  |
#  ---- ---------- ---- -------------------------- -------------------------- 
# |1   |2022-01-01|1   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462249|
# |1   |2022-01-02|-1  |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462347|
# |1   |2022-01-03|1   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462395|
# |1   |2022-01-04|0   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462406|
# |1   |2022-01-05|1   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462415|
# |1   |2022-01-06|1   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462425|
# |1   |2022-01-07|0   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462434|
# |1   |2022-01-08|1   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462443|
# |1   |2022-01-09|1   |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462453|
# |1   |2022-01-10|-1  |2022-07-21 05:59:56.281733|2022-07-21 05:59:56.462462|
#  ---- ---------- ---- -------------------------- -------------------------- 
# only showing top 10 rows

# root
#  |-- col1: long (nullable = true)
#  |-- dt: date (nullable = true)
#  |-- col3: long (nullable = true)
#  |-- ts_pydttm: timestamp (nullable = false)
#  |-- ts_pyudf: timestamp (nullable = true)

Look at the values of ts_pydttm, they're all same as the current timestamp was passed as a static (literal) value. Now, look at the values of ts_pyudf, they're different for all records as the executors run the UDF (calls the current timestamp) for each record processing. Both of them result in a timestamp field, but are handled differently.

  • Related