Home > OS >  Pyspark: Check for conditions on each cell and calculate the checkdigit
Pyspark: Check for conditions on each cell and calculate the checkdigit

Time:11-15

I have a column RESULT which has digits of length 11 in each and the schema of the same is:

RESULT: string (nullable = true)

enter image description here

Now, I want to perform the below operation and update a new column which will add an extra digit at last. The example shown below is for first number 03600024145

Note: I don't want to change the format of table to pandas but do everything in Pyspark dataframe.

  1. Add the odd number digits: 0 6 0 2 1 5 = 14.
  2. Multiply the result by 3: 14 × 3 = 42.
  3. Add the even number digits: 3 0 0 4 4 = 11.
  4. Add the two results together: 42 11 = 53.
  5. To calculate the check digit, take the remainder of (53 / 10), which is also known as (53 modulo 10), and if not 0, subtract from 10. Therefore, the check digit value is 7. i.e. (53 / 10) = 5 remainder 3; 10 - 3 = 7.
  6. Add this check digit at last. So the number becomes 036000241457

So, if this logic is applied to the whole column the result will become as UPDATED RESULT

enter image description here

For further clarification of logic :https://en.wikipedia.org/wiki/Check_digit#UPC

There is a similar python code but a bit different at step-5: python: create check digit function

CodePudding user response:

Solution using user defined function (udf).

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col

df = spark.createDataFrame([('03600024145',), ('01010101010',)], ['RESULT'])

@udf(StringType())
def add_check_digit(val):
    odd = sum(int(i) for i in val[::2])
    even = sum(int(i) for i in val[1::2])
    check_val = (odd * 3   even) % 10
    
    return val   str((10 - check_val) % 10)

df = df.withColumn('UPDATED_RESULT', add_check_digit(col('RESULT')))

df.show()

 ----------- --------------                                                     
|     RESULT|UPDATED_RESULT|
 ----------- -------------- 
|03600024145|  036000241457|
|01010101010|  010101010105|
 ----------- -------------- 

CodePudding user response:

You can split the column RESULT into an array of digits than using some higher order functions transform and aggregate, you can calculate the checkdigit that you concatenate to the original string:

import pyspark.sql.functions as F

df1 = df.withColumn(
    "digits",
    F.expr("slice(split(RESULT, ''), 1, size(split(RESULT, '')) - 1)")
).withColumn(
    "digits",
    F.expr("transform(digits, (x, i) -> struct(int(x) as d, i 1 as i))")
).withColumn(
    "odd_even",
    F.expr(
        """aggregate(digits, 
                     array(0, 0), 
                     (acc, x) -> 
                         IF (x.i%2 = 1,
                             array(acc[0]   x.d, acc[1]),
                             array(acc[0], acc[1]   x.d)
                         )
        )""")
).withColumn(
    "UPDATED RESULT",
    F.concat(F.col("RESULT"), 10 - ((F.col("odd_even")[0] * 3   F.col("odd_even")[1]) % 10))
).select(
    "RESULT", "UPDATED RESULT"
)

df1.show(truncate=False)

# ----------- -------------- 
#|RESULT     |UPDATED RESULT|
# ----------- -------------- 
#|03600024145|036000241457  |
#|01010101010|010101010105  |
# ----------- -------------- 

Explanations:

  • Step 1: split the column and slice the resulting array to remove the last empty value. Then transform each element of the array by adding its index. (Example 0 -> struct(0, 1))
  • Step 2: using aggregate, sum even and odd position digits by using the index we added in the first step
  • Step 3: calculate the check digit and concatenate it with result column

You can show all the intermediate columns to understand the logic.

CodePudding user response:

We can translate the logic as Spark functions.

  1. First extract numbers at various positions and cast them to integers.
  2. Then sum odd and even positions separately.
  3. Multiplr odd sum by 3 and add the even sum.
  4. Apply modulo operation.
  5. Subtract 10 by result from step 4 and then apply modulo 10, to imitate behavior of check digit being 0 when result from step 4 is 0.
  6. Finally, concat RESULT column with the check digit.

Working Example

import pyspark.sql.functions as F
from pyspark.sql import Column
from typing import List

df = spark.createDataFrame([("03600024145",), ("01010101010",)], ("RESULT",))

def sum_digits(c: Column, pos: List[int]):
    sum_col = F.lit(0) 
    for p in pos:
        sum_col = sum_col   F.substring(c, p, 1).cast("int")
    return sum_col

def check_digit(c: Column) -> Column:
    odd_sum = sum_digits(c, range(1, 12, 2))
    even_sum = sum_digits(c, [2, 4, 6, 8, 10])
    sum_result = (3 * odd_sum)   even_sum
    modulo  = sum_result % 10
    return (10 - modulo) % 10    

df.withColumn("UPDATED_RESULT", F.concat(F.col("RESULT"), check_digit(F.col("RESULT")))).show()

Output

 ----------- -------------- 
|     RESULT|UPDATED_RESULT|
 ----------- -------------- 
|03600024145|  036000241457|
|01010101010|  010101010105|
 ----------- -------------- 

  • Related