I have a table like this:
EventID | EventTime | AttrA | AttrB |
---|---|---|---|
1 | 2022-10-01 00:00:01.000000 | null | null |
1 | 2022-10-01 00:00:02.000000 | a | null |
1 | 2022-10-01 00:00:03.000000 | b | 1 |
1 | 2022-10-01 00:00:04.000000 | null | null |
2 | 2022-10-01 00:01:01.000000 | aa | 11 |
2 | 2022-10-01 00:01:02.000000 | bb | null |
2 | 2022-10-01 00:01:03.000000 | null | null |
2 | 2022-10-01 00:01:04.000000 | aa | 22 |
and I want to jump across the records to return the first and last not null AttrA and AttrB values for each eventID based on the eventTime. Each eventID can have multiple records so we can't know where the not nulls may be. So the wished results would be:
EventID | FirstAttrA | LastAttrA | FirstAttrB | LastAttrB |
---|---|---|---|---|
1 | a | b | 1 | 1 |
2 | aa | aa | 11 | 22 |
What I did is to add row_number() OVER (PARTITION BY event_id) ORDER BY event_time ASC)
and then again DESC
and then have multiple CTEs
like this:
WITH enhanced_table AS
(
SELECT
eventID,
attrA,
attrB,
row_number() OVER (PARTITION BY event_id) ORDER BY event_time ASC) as rn,
row_number() OVER (PARTITION BY event_id) ORDER BY event_time DESC) as reversed_rn
),
first_events_with_attrA AS
(
SELECT
eventID,
FIRST(attrA) OVER (PARTITION BY eventID ORDER BY rn ASC) AS url
FROM enhanced_table
WHERE attrA IS NOT NULL
)...
But I need one CTE which scans again the table for each case I want (for this example 4 CTEs in total). It works, but it is slow.
Is there a way to grab the values I am interested in in a more efficient way? Thanks a lot in advance
CodePudding user response:
No Need to build Row Numbers , you can directly use native SparkSQL
Functions FIRST & LAST with isIgnoreNull
as True to achieve the intended results -
Data Preparation
s = StringIO("""
EventID,EventTime,AttrA,AttrB
1,2022-10-01 00:00:01.000000,,
1,2022-10-01 00:00:02.000000,a,
1,2022-10-01 00:00:03.000000,b,1
1,2022-10-01 00:00:04.000000,,
2,2022-10-01 00:01:01.000000,aa,11
2,2022-10-01 00:01:02.000000,bb,
2,2022-10-01 00:01:03.000000,,
2,2022-10-01 00:01:04.000000,aa,22
"""
)
inp_schema = StructType([
StructField('EventID',IntegerType(),True)
,StructField('EventTime',StringType(),True)
,StructField('AttrA',StringType(),True)
,StructField('AttrB',DoubleType(),True)
]
)
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df,schema=inp_schema)\
.withColumn('AttrA',F.when(F.isnan(F.col('AttrA')),None).otherwise(F.col('AttrA')))\
.withColumn('AttrB',F.when(F.isnan(F.col('AttrB')),None).otherwise(F.col('AttrB')))
sparkDF.show(truncate=False)
------- -------------------------- ----- -----
|EventID|EventTime |AttrA|AttrB|
------- -------------------------- ----- -----
|1 |2022-10-01 00:00:01.000000|null |null |
|1 |2022-10-01 00:00:02.000000|a |null |
|1 |2022-10-01 00:00:03.000000|b |1.0 |
|1 |2022-10-01 00:00:04.000000|null |null |
|2 |2022-10-01 00:01:01.000000|aa |11.0 |
|2 |2022-10-01 00:01:02.000000|bb |null |
|2 |2022-10-01 00:01:03.000000|null |null |
|2 |2022-10-01 00:01:04.000000|aa |22.0 |
------- -------------------------- ----- -----
First & Last
sparkDF.registerTempTable("INPUT")
sql.sql("""
SELECT
EventID,
FIRST(AttrA,True) as First_AttrA,
LAST(AttrA,True) as Last_AttrA,
FIRST(AttrB,True) as First_AttrB,
LAST(AttrB,True) as Last_AttrB
FROM INPUT
GROUP BY 1
""").show()
------- ----------- ---------- ----------- ----------
|EventID|First_AttrA|Last_AttrA|First_AttrB|Last_AttrB|
------- ----------- ---------- ----------- ----------
| 1| a| b| 1.0| 1.0|
| 2| aa| aa| 11.0| 22.0|
------- ----------- ---------- ----------- ----------
CodePudding user response:
sdfekj sacjkwdk asckjj
gvjhv ggjvkj hbkk