Home > Mobile >  How to eliminate certain rows and keep certain rows (with values changed) in a datafram using pyspar
How to eliminate certain rows and keep certain rows (with values changed) in a datafram using pyspar

Time:10-22

Basically, what I want to achieve is the following binary transformation with the following conditions:

  1. If all the values in C2 associated with a variable in C1 has never been greater than 0, keep only 1 record of it (C & G) with its associated value to be 0 even if it appears multiple times.

  2. If some of the values in C2 associated with a variable in C1 has a value greater than 0, keep all those records of it with its associated value to be 1 and eliminate the ones with zero (A, B, D, E & F).

 ----------------- 
| C1     | C2     |
 --------|-------- 
| A      |   6    |
| B      |   5    |
| C      |   0    |
| A      |   0    |
| D      |   1    |
| E      |   4    |
| F      |   9    |
| B      |   0    |
| C      |   0    |
| G      |   0    |
| D      |   0    |
| D      |   7    |
| G      |   0    |
| G      |   0    |
 ----------------- 

to

 ----------------- 
| C1     | C2     |
 --------|-------- 
| A      |   1    |
| B      |   1    |
| C      |   0    |
| D      |   1    |
| D      |   1    |
| E      |   1    |
| F      |   1    |
| G      |   0    |
 ----------------- 

How does one attain this in PySpark?

CodePudding user response:

heres the implementation

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [
        ("A", 6),
        ("B", 5),
        ("C", 0),
        ("A", 0),
        ("D", 1),
        ("E", 4),
        ("F", 9),
        ("B", 0),
        ("C", 0),        
        ("G", 0),
        ("D", 0),
        ("D", 7),
        ("G", 0),
        ("G", 0),
    ],
    ["C1", "C2"],
)

df1 = (df
    .groupBy("C1") 
    .agg(max("C2").alias("C2"))
    .filter(F.col("C2") == 0)
)

df2 = (df
    .filter(F.col("C2") > 0)
    .withColumn("C2", F.lit(1))
)

final_df = df1.unionAll(df2).orderBy("C1") 

df1.show()
df2.show()
final_df.show()

output:

 --- --- 
| C1| C2|
 --- --- 
|  C|  0|
|  G|  0|
 --- --- 

 --- --- 
| C1| C2|
 --- --- 
|  A|  1|
|  B|  1|
|  D|  1|
|  E|  1|
|  F|  1|
|  D|  1|
 --- --- 

 --- --- 
| C1| C2|
 --- --- 
|  A|  1|
|  B|  1|
|  C|  0|
|  D|  1|
|  D|  1|
|  E|  1|
|  F|  1|
|  G|  0|
 --- --- 

CodePudding user response:

Here are my 2 cents

  1. Create a spark data frame.

     df = spark.createDataFrame([('A', 6) ,('B', 5) ,('C', 0) ,('A', 0) ,('D', 1) ,('E', 4) ,('F', 9) ,('B', 0) ,('C', 0) ,('G', 0) ,('D', 0) ,('D', 7) ,('G', 0) ,('G', 0)  ], schema = ['c1','c2'])
    
  2. Here we are using collect_list() to actually to order the collected results depends on the order of the C1 and then we are keeping the distinct elements

     from pyspark.sql.window import Window
     from pyspark.sql.functions import *
     import pyspark.sql.functions as fx
    
     windowSpec = Window.partitionBy('c1').orderBy('c1')
    
     df1 = df.withColumn('collected_list',fx.collect_list('c2').over(windowSpec)).select('c1','c2','collected_list')
     df1 = df1.withColumn("collected_list_with_out_dups", fx.array_distinct("collected_list"))
    

3a. Create data frame with 0's first:

df_with_0 = df1.filter(fx.col('collected_list_with_out_dups')==fx.array(lit(0)))\
            .select('c1','c2').dropDuplicates()\
            .withColumn('c2',fx.lit(0))
df_with_0.show()

3b. create data frame with the other values other than 0 as follows:

   df_with_other = df1.filter(fx.col('collected_list_with_out_dups')!=fx.array(lit(0)))\
                .filter(fx.col('c2')!=0)\
                    .select('c1','c2').dropDuplicates()\
                    .withColumn('c2',fx.lit(1))
df_with_other.show()
  1. Print the df_final dataframe as follows:

    df_final = df_with_0.unionByName(df_with_other).orderBy('c1')
    df_final.show()
    

Please check the below screenshot for your reference:

enter image description here

  • Related