Home > Software engineering >  Open, High, Low, Close, Volume in PySpark using tick data
Open, High, Low, Close, Volume in PySpark using tick data

Time:11-26

how can i transform tick data to OHLCV(Open , High , Low , Close , Volume):

Current Sample (ticks format)

 --------- ------- -------- ---------- ------------------- 
|       id|  price|  volume|   capital|           datetime|
 --------- ------- -------- ---------- ------------------- 
|237367441|9351.71|0.043982| 411.30692|2020-02-01 00:00:00|
|237367442|9351.71|0.001413| 13.213966|2020-02-01 00:00:00|
|237367443|9352.86|0.001236| 11.560135|2020-02-01 00:00:00|
|237367444|9352.85|0.001976| 18.481232|2020-02-01 00:00:01|
|237367445| 9352.0|1.214703| 11359.902|2020-02-01 00:00:01|
|237367446|9352.86|0.059586|  557.2995|2020-02-01 00:00:01|
|237367447|9352.86|0.021383|  199.9922|2020-02-01 00:00:01|
|237367448|9352.85|0.011226|104.995094|2020-02-01 00:00:02|
|237367449|9352.17|0.278627|  2605.767|2020-02-01 00:00:02|
|237367450|9351.75| 0.07455|   697.173|2020-02-01 00:00:02|
|237367451|9352.85|0.188774|  1765.575|2020-02-01 00:00:02|
|237367452|9352.86| 0.18511| 1731.3079|2020-02-01 00:00:02|
|237367453|9352.87|0.930838|  8706.007|2020-02-01 00:00:02|
|237367454|9352.87|     0.5|  4676.435|2020-02-01 00:00:03|
|237367455|9352.87|0.032738| 306.19424|2020-02-01 00:00:03|
|237367456|9352.89|     0.2|  1870.578|2020-02-01 00:00:03|
|237367457|9352.89|0.003279| 30.668127|2020-02-01 00:00:03|
|237367458|9352.86|0.005748|  53.76024|2020-02-01 00:00:03|
|237367459|9352.89|0.052585| 491.82172|2020-02-01 00:00:03|
|237367460|9353.25| 0.08838| 826.64026|2020-02-01 00:00:03|
 --------- ------- -------- ---------- ------------------- 
only showing top 20 rows

###################################################################################

To this (1 minute OHLCV , this is pandas format , but i need it in pyspark dataframe format):

                            open     high        low         close      volume
datetime                    
2020-02-01 00:00:00     9351.710    9375.000    9351.710    9358.590    532596.780
2020-02-01 00:01:00     9359.880    9366.730    9356.750    9359.960    127918.275
2020-02-01 00:02:00     9359.400    9361.170    9356.750    9356.800    93449.282
2020-02-01 00:03:00     9356.840    9356.840    9347.640    9353.610    211704.381
2020-02-01 00:04:00     9353.730    9353.730    9346.140    9347.540    87436.789

###############################################################################

Initially the data is like this:

 --------- ------- -------- ---------- ------------- 
|       id|  price|  volume|   capital|    timestamp|
 --------- ------- -------- ---------- ------------- 
|237367441|9351.71|0.043982| 411.30692|1580515200518|
|237367442|9351.71|0.001413| 13.213966|1580515200580|
|237367443|9352.86|0.001236| 11.560135|1580515200690|
|237367444|9352.85|0.001976| 18.481232|1580515201148|
|237367445| 9352.0|1.214703| 11359.902|1580515201176|
|237367446|9352.86|0.059586|  557.2995|1580515201278|
|237367447|9352.86|0.021383|  199.9922|1580515201629|
|237367448|9352.85|0.011226|104.995094|1580515202705|
|237367449|9352.17|0.278627|  2605.767|1580515202963|
|237367450|9351.75| 0.07455|   697.173|1580515202966|
|237367451|9352.85|0.188774|  1765.575|1580515202995|
|237367452|9352.86| 0.18511| 1731.3079|1580515202995|
|237367453|9352.87|0.930838|  8706.007|1580515202995|
|237367454|9352.87|     0.5|  4676.435|1580515203015|
|237367455|9352.87|0.032738| 306.19424|1580515203090|
|237367456|9352.89|     0.2|  1870.578|1580515203104|
|237367457|9352.89|0.003279| 30.668127|1580515203119|
|237367458|9352.86|0.005748|  53.76024|1580515203527|
|237367459|9352.89|0.052585| 491.82172|1580515203535|
|237367460|9353.25| 0.08838| 826.64026|1580515203596|
 --------- ------- -------- ---------- ------------- 
only showing top 20 rows

You can download the data here:

https://data.binance.vision/data/spot/monthly/trades/BTCUSDT/BTCUSDT-trades-2020-02.zip

Or here in the "Trades" section:

https://github.com/binance/binance-public-data/

My code until that point:

import datetime

class Datetime_manager():

    def timestamp_to_datetime_utc(x):

        return datetime.datetime.utcfromtimestamp(x/ 1000.0).strftime("%Y-%m-%d %H:%M:%S")

##############################################################################

from Helpers.Datetime_manager import Datetime_manager

import pyspark.sql.functions as func

# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()


schema = StructType([\
        StructField("id", IntegerType()), \
        StructField("price", FloatType()),\
        StructField("volume", FloatType()),\
        StructField("capital", FloatType()),\
        StructField("timestamp", LongType()),\
    ])

df = spark.read.csv('file.csv',sep=",", schema=schema)


get_datetime = func.udf(lambda x : Datetime_manager.timestamp_to_datetime_utc(x))
    
df_ = df.withColumn("datetime", get_datetime(df.timestamp))

deleted_column=df_.drop('timestamp')

deleted_column.show()

CodePudding user response:

You can associate each row in your dataset to 1 minute windows using window. After, the rows are partitioned using their window and window analytical functions can be applied on them. Finally select the first row from each window.

A groupBy would not work as first and last are non-deterministic when order is not provided, leading to wrong values for open and close columns.

I have also included logic to translate epoch to datetime without needing to use UDF.

I am preserving the timestamp column and using it order within window to have the high precision for finding open and close columns; as the derived datetime column does not include microseconds and the dataset has multiple entries within the same second.

Working Example

from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
schema = StructType([\
        StructField("id", IntegerType()), \
        StructField("price", FloatType()),\
        StructField("volume", FloatType()),\
        StructField("capital", FloatType()),\
        StructField("timestamp", LongType()),\
    ])

df = spark.read.csv('data/BTCUSDT-trades-2020-02.csv',sep=",", schema=schema)


df_windowed = df.withColumn("datetime", F.from_unixtime(F.col("timestamp") / 1000))\
                .withColumn("window", F.window(F.col("datetime"), "1 minute")["start"])

window_spec = Window.partitionBy("window")\
                    .orderBy("datetime")\
                    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

window_spec = Window.partitionBy("window")\
                    .orderBy("timestamp")\
                    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df_windowed.withColumn("open", F.first(F.col("price")).over(window_spec))\
           .withColumn("close", F.last(F.col("price")).over(window_spec))\
           .withColumn("high", F.max(F.col("price")).over(window_spec))\
           .withColumn("low", F.min(F.col("price")).over(window_spec))\
           .withColumn("low", F.min(F.col("price")).over(window_spec))\
           .withColumn("volume", F.sum(F.col("volume")).over(window_spec))\
           .withColumn("rn", F.row_number().over(Window.partitionBy("window").orderBy("timestamp")))\
           .filter(F.col("rn") == 1)\
           .selectExpr("window as datetime", "open", "high", "low", "close", "volume")\
           .orderBy("datetime")\
           .show(200, False)

Output

 ------------------- ------- ------- ------- ------- ------------------ 
|datetime           |open   |high   |low    |close  |volume            |
 ------------------- ------- ------- ------- ------- ------------------ 
|2020-02-01 00:00:00|9351.71|9375.0 |9351.71|9358.59|56.886639995952464|
|2020-02-01 00:01:00|9359.88|9366.73|9356.75|9359.96|13.66472805586227 |
|2020-02-01 00:02:00|9359.4 |9361.17|9356.75|9356.8 |9.985669983227126 |
|2020-02-01 00:03:00|9356.84|9356.84|9347.64|9353.61|22.637158939754613|
|2020-02-01 00:04:00|9353.73|9353.73|9346.14|9347.54|9.35306903786136  |
|2020-02-01 00:05:00|9347.5 |9350.0 |9341.17|9349.67|14.01134790964511 |
|2020-02-01 00:06:00|9349.98|9352.79|9345.26|9349.99|14.385089107259432|
|2020-02-01 00:07:00|9350.0 |9354.81|9348.17|9349.17|9.946719115832138 |
|2020-02-01 00:08:00|9348.62|9355.87|9347.17|9349.9 |16.211291008886747|
|2020-02-01 00:09:00|9349.87|9353.71|9348.46|9353.4 |15.208806983068484|
|2020-02-01 00:10:00|9353.31|9368.93|9353.31|9367.75|36.819248940208126|
|2020-02-01 00:11:00|9367.83|9371.73|9363.17|9366.96|9.44959905881774  |
|2020-02-01 00:12:00|9366.17|9371.47|9364.38|9370.61|21.24247089397852 |
|2020-02-01 00:13:00|9370.61|9376.99|9368.99|9371.2 |17.0184838917437  |
|2020-02-01 00:14:00|9371.2 |9375.96|9370.17|9374.55|8.756931013712574 |
|2020-02-01 00:15:00|9374.56|9374.56|9365.0 |9366.92|12.735712010200587|
|2020-02-01 00:16:00|9366.79|9367.02|9363.83|9365.08|13.130701040360236|
|2020-02-01 00:17:00|9365.83|9369.41|9363.95|9369.3 |16.921483727895975|
|2020-02-01 00:18:00|9369.3 |9369.3 |9363.21|9366.89|12.236453087406176|
|2020-02-01 00:19:00|9366.89|9375.47|9366.88|9368.47|23.542967173349552|
 ------------------- ------- ------- ------- ------- ------------------ 
only showing top 20 rows

PS. The values in the volume does not match your expected values. I applied a sum over the window, however I assume you apply other transformations on this value to get the desired results.

  • Related