Home > database >  Form value sequence around given values PySpark
Form value sequence around given values PySpark

Time:04-29

I have a couple of PySpark dataframes, the second of which is derived from the first, checking for anomalies.

import pandas as pd
from datetime import date
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df1 = pd.DataFrame({
    "pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3],
    "date": [
        date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),  
        date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
        date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
    ],
    "varA": [4, 5, 1, 6, 7, 4, 8, 11, 12, 10, 11, 13],
    "varB": [5, 6, 4, 3, 12, 13, 1, 14, 9, 10, 11, 15],
    "varC": [8, 32, 1, 11, 4, 3, 5, 6, 10, 14, 9, 11]
})

df1 = spark.createDataFrame(df1)

So the first data frame is a "wide" data frame. The second is a "long" dataframe, derived from the first as a result of an anomaly processing algorithm. Here, each row is an anomaly.

df2 = do_anomaly_processing(df1)

df2.show()

# ----- ----------- -------- ----- 
#|pk   |       date|variable|value|
# ----- ----------- -------- ----- 
#|    1| 2022-04-14|    varA|    5|
#|    2| 2022-04-14|    varA|    4|
#|    3| 2022-04-14|    varA|   10|
#|    3| 2022-04-15|    varC|   14|
# ----- ----------- -------- ----- 

The code to produce this dataframe is:

df2 = pd.DataFrame({
    "pk": [1,2,3,3],
    "date": [date(2022,4,14), date(2022,4,14), date(2022,4,14), date(2022,4,15)],
    "variable": ["varA", "varA", "varA", "varC"],
    "value": [5,4,10,14]
})

df2 = spark.createDataFrame(df2)

I'd like to create a new dataframe that has the surrounding values for each anomaly listed, which looks like:

# ----- ----------- -------- ----- ------ ------ 
#|pk   |       date|variable|value|v[n-1]|v[n 1]|
# ----- ----------- -------- ----- ------ ------ 
#|    1| 2022-04-14|    varA|    5|     4|     1|
#|    2| 2022-04-14|    varA|    4|     7|     8|
#|    3| 2022-04-14|    varA|   10|    12|    11|
#|    3| 2022-04-15|    varC|    9|    14|    11|
# ----- ----------- -------- ----- ------ ------ 

In reality, I'd like to be able to put as many past and/or future values in the new dataframe as the data will allow (e.g., (v[n-5], v[n-4], v[n-3], v[n-2], v[n-1]), etc -- but always a sequence). I was initially doing this with a for loop:

for step in dt:
    if step == 0:
        varName = "v[n]"

        shiftedDataframe = melt(
            df1,
            id_vars=["pk", "date"],
            value_vars=["varA", "varB", "varC"],
            var_name=varName
        )
    else:
        if step < 0:
            varName = f"v[n{step}]"
            step = abs(step)
        elif step > 0:
            varName = f"v[n {step}]"
            step = -step

            shiftedDataframe = melt(
                create_shifted_dataframe(
                    df1,
                    "pk",
                    "date",
                    ["varA", "varB", "varC"],
                    shiftAmount=step
                ),
                id_vars=["pk", "date"],
                value_vars=["varA", "varB", "varC"],
                var_name=varName
            )
    
    df2 = df2.join(df1, on=["pk", "date", "variable"], how="left")

NOTE: the functions melt and create_shifted_dataframe do exactly what you think they do. Effectively, I am creating a shifted version of the original dataframe (df1), then forcing it to be a "long" dataframe, and finally, I merge the new shifted-melted version of df1 into df2. This is the best way I could find to do this, but there has to be a more efficient way. I was thinking about doing a pivot, and then a merge but I couldn't figure out how to do this with the different dates in df2. Anyhow, I hope this makes sense, and is an interesting problem for someone.

CodePudding user response:

If I understand the logic, you can leverage lead and lag.

But first we need to melt the df1 to look like this,

 --- ---------- -------- -------------- 
| pk|      date|variable|original_value|
 --- ---------- -------- -------------- 
|  1|2022-04-13|    varA|             4|
|  1|2022-04-13|    varB|             5|
...

If you have this dataframe, you can join it with df2.

df = df1.join(df2, on=['pk', 'date', 'variable'], how='left')

Then, you can use lead (get value at offset after the current row) and lag (get value at offset before the current row) to obtain v[n-1], v[n 1], v[n-2], v[n 2]...

# I will get until n -2 in this example.
w = Window.partitionBy(['pk', 'variable']).orderBy('date')
df = (df.select('pk', 'date', 'variable', 'value',
        *[F.when(F.col('value').isNotNull(), F.lag('original_value', x).over(w)).alias(f'v[n-{x}]') for x in range(1, 3)],
        *[F.when(F.col('value').isNotNull(), F.lead('original_value', x).over(w)).alias(f'v[n {x}]') for x in range(1, 3)])
    .filter(F.col('value').isNotNull()))
  • Related