Home > Back-end >  First and last not null fields over partitions
First and last not null fields over partitions

Time:10-28

I have a table like this:

EventID EventTime AttrA AttrB
1 2022-10-01 00:00:01.000000 null null
1 2022-10-01 00:00:02.000000 a null
1 2022-10-01 00:00:03.000000 b 1
1 2022-10-01 00:00:04.000000 null null
2 2022-10-01 00:01:01.000000 aa 11
2 2022-10-01 00:01:02.000000 bb null
2 2022-10-01 00:01:03.000000 null null
2 2022-10-01 00:01:04.000000 aa 22

and I want to jump across the records to return the first and last not null AttrA and AttrB values for each eventID based on the eventTime. Each eventID can have multiple records so we can't know where the not nulls may be. So the wished results would be:

EventID FirstAttrA LastAttrA FirstAttrB LastAttrB
1 a b 1 1
2 aa aa 11 22

What I did is to add row_number() OVER (PARTITION BY event_id) ORDER BY event_time ASC) and then again DESC and then have multiple CTEs like this:

WITH enhanced_table AS
(
   SELECT
      eventID,
      attrA,
      attrB,
      row_number() OVER (PARTITION BY event_id) ORDER BY event_time ASC) as rn,
      row_number() OVER (PARTITION BY event_id) ORDER BY event_time DESC) as reversed_rn
),
first_events_with_attrA AS 
(
   SELECT 
      eventID,
      FIRST(attrA) OVER (PARTITION BY eventID ORDER BY rn ASC) AS url
   FROM enhanced_table
   WHERE attrA IS NOT NULL
)...

But I need one CTE which scans again the table for each case I want (for this example 4 CTEs in total). It works, but it is slow.

Is there a way to grab the values I am interested in in a more efficient way? Thanks a lot in advance

CodePudding user response:

No Need to build Row Numbers , you can directly use native SparkSQL Functions FIRST & LAST with isIgnoreNull as True to achieve the intended results -

Data Preparation

s = StringIO("""
EventID,EventTime,AttrA,AttrB
1,2022-10-01 00:00:01.000000,,
1,2022-10-01 00:00:02.000000,a,
1,2022-10-01 00:00:03.000000,b,1
1,2022-10-01 00:00:04.000000,,
2,2022-10-01 00:01:01.000000,aa,11
2,2022-10-01 00:01:02.000000,bb,
2,2022-10-01 00:01:03.000000,,
2,2022-10-01 00:01:04.000000,aa,22
"""
)

inp_schema = StructType([
                      StructField('EventID',IntegerType(),True)
                     ,StructField('EventTime',StringType(),True)
                     ,StructField('AttrA',StringType(),True)
                     ,StructField('AttrB',DoubleType(),True)
                    ]
                   )


df = pd.read_csv(s,delimiter=',')


sparkDF = sql.createDataFrame(df,schema=inp_schema)\
             .withColumn('AttrA',F.when(F.isnan(F.col('AttrA')),None).otherwise(F.col('AttrA')))\
             .withColumn('AttrB',F.when(F.isnan(F.col('AttrB')),None).otherwise(F.col('AttrB')))


sparkDF.show(truncate=False)

 ------- -------------------------- ----- ----- 
|EventID|EventTime                 |AttrA|AttrB|
 ------- -------------------------- ----- ----- 
|1      |2022-10-01 00:00:01.000000|null |null |
|1      |2022-10-01 00:00:02.000000|a    |null |
|1      |2022-10-01 00:00:03.000000|b    |1.0  |
|1      |2022-10-01 00:00:04.000000|null |null |
|2      |2022-10-01 00:01:01.000000|aa   |11.0 |
|2      |2022-10-01 00:01:02.000000|bb   |null |
|2      |2022-10-01 00:01:03.000000|null |null |
|2      |2022-10-01 00:01:04.000000|aa   |22.0 |
 ------- -------------------------- ----- ----- 

First & Last

sparkDF.registerTempTable("INPUT")

sql.sql("""
SELECT
    EventID,
    FIRST(AttrA,True) as First_AttrA,
    LAST(AttrA,True) as Last_AttrA,
    FIRST(AttrB,True) as First_AttrB,
    LAST(AttrB,True) as Last_AttrB
FROM INPUT
GROUP BY 1
""").show()

 ------- ----------- ---------- ----------- ---------- 
|EventID|First_AttrA|Last_AttrA|First_AttrB|Last_AttrB|
 ------- ----------- ---------- ----------- ---------- 
|      1|          a|         b|        1.0|       1.0|
|      2|         aa|        aa|       11.0|      22.0|
 ------- ----------- ---------- ----------- ---------- 

CodePudding user response:

sdfekj sacjkwdk asckjj

gvjhv ggjvkj hbkk

  • Related