Home > OS >  PySpark : How do you use the values in multiple columns to perform some sort of aggregation?
PySpark : How do you use the values in multiple columns to perform some sort of aggregation?

Time:10-19

What i have :

# ------- ---------- ---------- 
#|dotId  |codePp    |status    |
# ------- ---------- ---------- 
#|dot0001 |Pp3523   |start     |
#|dot0001 |Pp3524   |stop      |
#|dot0020 |Pp3522   |start     |
#|dot0020 |Pp3556   |stop      |
#|dot9999 |Pp3545   |stop      |
#|dot9999 |Pp3523   |start     |
#|dot9999 |Pp3587   |stop      |
#|dot9999 |Pp3567   |start     |
#------------------------------|

What i want :

Instruction: if status is 'stop' put codePp with '(stop)' else put 'codePp'

# ------- ---------------------------------------------- 
#|dotId  |codePp                                        |
# ------- ---------------------------------------------- 
#|dot0001 |Pp3523, Pp3524(stop)                         |
#|dot0020 |Pp3522, Pp3556(stop)                         |
#|dot9999 |Pp3545(stop), Pp3523, Pp3587(stop), Pp3567   |
#-------------------------------------------------------|

But how to wrote it at pyspark ?

CodePudding user response:

You may try the following using a case expression (using when) to determine whether to append the status. This was done in a group by/aggregation that used collect_list to gather all codePp values and concat_ws to convert it into a comma separated string.

from pyspark.sql import functions as F

output_df =(
    df.groupBy("dotId")
      .agg(
          F.concat_ws(
              ', ',
              F.collect_list(
                  F.concat(
                      F.col("codePp"),
                      F.when(F.col("status")=="stop"),"(stop)")
                  )
              )
          ).alias("codePp")
      )
)

Let me know if this works for you.

CodePudding user response:

i am Using PrettyTable and this is how i make it :

from prettytable import PrettyTable

import time

myTable = PrettyTable(["CodePb", "Dot", "Status"])

while True:
    if (True):
        try:
            CodePb0 = 'CodePb0001'
            CodePb1 = 'CodePb0001'
            CodePb2 = 'CodePb0001'
            CodePb3 = 'CodePb0001'
            CodePb4 = 'CodePb0001'
            dot0 = 'dot0001'
            dot1 = 'dot0002'
            dot2 = 'dot0003'
            dot3 = 'dot0004'
            dot4 = 'dot0005'
            Status0 = 'stop'
            Status1 = 'stop'
            Status2 = 'start'
            Status3 = 'stop'
            Status4 = 'start'
            s0 = Status0
            s1 = Status1
            s2 = Status2
            s3 = Status3
            s4 = Status4
            a = [CodePb0   s0, dot0, Status0]
            b = [CodePb1   s1, dot1, Status1]
            c = [CodePb2   s2, dot2, Status2]
            d = [CodePb3   s3, dot3, Status3]
            e = [CodePb4   s4, dot4, Status4]

            if a[2] == 'stop':
                a = [CodePb0 s0, dot0, Status0]
            else:
                a = [CodePb0, dot0, Status0]
            if b[2]== 'stop':
                b = [CodePb1 s1, dot1, Status1]
            else:
                b = [CodePb1, dot1, Status1]

            if c[2] == 'stop':
                c = [CodePb2 s2, dot2, Status2]
            else:
                c = [CodePb2, dot2, Status2]

            if d[2] == 'stop':
                d = [CodePb3   s3, dot3, Status3]
            else:
                d = [CodePb3, dot3, Status3]

            if e[2] == 'stop':
                e = [CodePb4   s4, dot4, Status4]
            else:
                e = [CodePb4, dot4, Status4]

            myTable.add_row(a)
            myTable.add_row(b)
            myTable.add_row(c)
            myTable.add_row(d)
            myTable.add_row(e)

        except:
            print('Error')

    print(myTable)
    print('\n')
    myTable.clear_rows()
    time.sleep(3)
  • Related