I have a couple of PySpark dataframes, the second of which is derived from the first, checking for anomalies.
import pandas as pd
from datetime import date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = pd.DataFrame({
"pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3],
"date": [
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
date(2022,4,13), date(2022,4,14), date(2022,4,15), date(2022,4,16),
],
"varA": [4, 5, 1, 6, 7, 4, 8, 11, 12, 10, 11, 13],
"varB": [5, 6, 4, 3, 12, 13, 1, 14, 9, 10, 11, 15],
"varC": [8, 32, 1, 11, 4, 3, 5, 6, 10, 14, 9, 11]
})
df1 = spark.createDataFrame(df1)
So the first data frame is a "wide" data frame. The second is a "long" dataframe, derived from the first as a result of an anomaly processing algorithm. Here, each row is an anomaly.
df2 = do_anomaly_processing(df1)
df2.show()
# ----- ----------- -------- -----
#|pk | date|variable|value|
# ----- ----------- -------- -----
#| 1| 2022-04-14| varA| 5|
#| 2| 2022-04-14| varA| 4|
#| 3| 2022-04-14| varA| 10|
#| 3| 2022-04-15| varC| 14|
# ----- ----------- -------- -----
The code to produce this dataframe is:
df2 = pd.DataFrame({
"pk": [1,2,3,3],
"date": [date(2022,4,14), date(2022,4,14), date(2022,4,14), date(2022,4,15)],
"variable": ["varA", "varA", "varA", "varC"],
"value": [5,4,10,14]
})
df2 = spark.createDataFrame(df2)
I'd like to create a new dataframe that has the surrounding values for each anomaly listed, which looks like:
# ----- ----------- -------- ----- ------ ------
#|pk | date|variable|value|v[n-1]|v[n 1]|
# ----- ----------- -------- ----- ------ ------
#| 1| 2022-04-14| varA| 5| 4| 1|
#| 2| 2022-04-14| varA| 4| 7| 8|
#| 3| 2022-04-14| varA| 10| 12| 11|
#| 3| 2022-04-15| varC| 9| 14| 11|
# ----- ----------- -------- ----- ------ ------
In reality, I'd like to be able to put as many past and/or future values in the new dataframe as the data will allow (e.g., (v[n-5], v[n-4], v[n-3], v[n-2], v[n-1])
, etc -- but always a sequence). I was initially doing this with a for
loop:
for step in dt:
if step == 0:
varName = "v[n]"
shiftedDataframe = melt(
df1,
id_vars=["pk", "date"],
value_vars=["varA", "varB", "varC"],
var_name=varName
)
else:
if step < 0:
varName = f"v[n{step}]"
step = abs(step)
elif step > 0:
varName = f"v[n {step}]"
step = -step
shiftedDataframe = melt(
create_shifted_dataframe(
df1,
"pk",
"date",
["varA", "varB", "varC"],
shiftAmount=step
),
id_vars=["pk", "date"],
value_vars=["varA", "varB", "varC"],
var_name=varName
)
df2 = df2.join(df1, on=["pk", "date", "variable"], how="left")
NOTE: the functions melt
and create_shifted_dataframe
do exactly what you think they do. Effectively, I am creating a shifted version of the original dataframe (df1
), then forcing it to be a "long" dataframe, and finally, I merge the new shifted-melted version of df1
into df2
. This is the best way I could find to do this, but there has to be a more efficient way. I was thinking about doing a pivot, and then a merge but I couldn't figure out how to do this with the different dates in df2
. Anyhow, I hope this makes sense, and is an interesting problem for someone.
CodePudding user response:
If I understand the logic, you can leverage lead
and lag
.
But first we need to melt
the df1
to look like this,
--- ---------- -------- --------------
| pk| date|variable|original_value|
--- ---------- -------- --------------
| 1|2022-04-13| varA| 4|
| 1|2022-04-13| varB| 5|
...
If you have this dataframe, you can join it with df2
.
df = df1.join(df2, on=['pk', 'date', 'variable'], how='left')
Then, you can use lead
(get value at offset after the current row) and lag
(get value at offset before the current row) to obtain v[n-1], v[n 1], v[n-2], v[n 2]...
# I will get until n -2 in this example.
w = Window.partitionBy(['pk', 'variable']).orderBy('date')
df = (df.select('pk', 'date', 'variable', 'value',
*[F.when(F.col('value').isNotNull(), F.lag('original_value', x).over(w)).alias(f'v[n-{x}]') for x in range(1, 3)],
*[F.when(F.col('value').isNotNull(), F.lead('original_value', x).over(w)).alias(f'v[n {x}]') for x in range(1, 3)])
.filter(F.col('value').isNotNull()))