I have a column RESULT
which has digits of length 11 in each and the schema of the same is:
RESULT: string (nullable = true)
Now, I want to perform the below operation and update a new column which will add an extra digit at last. The example shown below is for first number 03600024145
Note: I don't want to change the format of table to pandas but do everything in Pyspark dataframe.
- Add the odd number digits: 0 6 0 2 1 5 = 14.
- Multiply the result by 3: 14 × 3 = 42.
- Add the even number digits: 3 0 0 4 4 = 11.
- Add the two results together: 42 11 = 53.
- To calculate the check digit, take the remainder of (53 / 10), which is also known as (53 modulo 10), and if not 0, subtract from 10. Therefore, the check digit value is 7. i.e. (53 / 10) = 5 remainder 3; 10 - 3 = 7.
- Add this check digit at last. So the number becomes
036000241457
So, if this logic is applied to the whole column the result will become as UPDATED RESULT
For further clarification of logic :https://en.wikipedia.org/wiki/Check_digit#UPC
There is a similar python code but a bit different at step-5: python: create check digit function
CodePudding user response:
Solution using user defined function (udf).
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
df = spark.createDataFrame([('03600024145',), ('01010101010',)], ['RESULT'])
@udf(StringType())
def add_check_digit(val):
odd = sum(int(i) for i in val[::2])
even = sum(int(i) for i in val[1::2])
check_val = (odd * 3 even) % 10
return val str((10 - check_val) % 10)
df = df.withColumn('UPDATED_RESULT', add_check_digit(col('RESULT')))
df.show()
----------- --------------
| RESULT|UPDATED_RESULT|
----------- --------------
|03600024145| 036000241457|
|01010101010| 010101010105|
----------- --------------
CodePudding user response:
You can split the column RESULT
into an array of digits than using some higher order functions transform
and aggregate
, you can calculate the checkdigit
that you concatenate to the original string:
import pyspark.sql.functions as F
df1 = df.withColumn(
"digits",
F.expr("slice(split(RESULT, ''), 1, size(split(RESULT, '')) - 1)")
).withColumn(
"digits",
F.expr("transform(digits, (x, i) -> struct(int(x) as d, i 1 as i))")
).withColumn(
"odd_even",
F.expr(
"""aggregate(digits,
array(0, 0),
(acc, x) ->
IF (x.i%2 = 1,
array(acc[0] x.d, acc[1]),
array(acc[0], acc[1] x.d)
)
)""")
).withColumn(
"UPDATED RESULT",
F.concat(F.col("RESULT"), 10 - ((F.col("odd_even")[0] * 3 F.col("odd_even")[1]) % 10))
).select(
"RESULT", "UPDATED RESULT"
)
df1.show(truncate=False)
# ----------- --------------
#|RESULT |UPDATED RESULT|
# ----------- --------------
#|03600024145|036000241457 |
#|01010101010|010101010105 |
# ----------- --------------
Explanations:
- Step 1: split the column and slice the resulting array to remove the last empty value. Then transform each element of the array by adding its index. (Example
0 -> struct(0, 1)
) - Step 2: using aggregate, sum even and odd position digits by using the index we added in the first step
- Step 3: calculate the check digit and concatenate it with result column
You can show all the intermediate columns to understand the logic.
CodePudding user response:
We can translate the logic as Spark functions.
- First extract numbers at various positions and cast them to integers.
- Then sum odd and even positions separately.
- Multiplr odd sum by 3 and add the even sum.
- Apply modulo operation.
- Subtract 10 by result from step 4 and then apply modulo 10, to imitate behavior of check digit being 0 when result from step 4 is 0.
- Finally, concat
RESULT
column with thecheck digit
.
Working Example
import pyspark.sql.functions as F
from pyspark.sql import Column
from typing import List
df = spark.createDataFrame([("03600024145",), ("01010101010",)], ("RESULT",))
def sum_digits(c: Column, pos: List[int]):
sum_col = F.lit(0)
for p in pos:
sum_col = sum_col F.substring(c, p, 1).cast("int")
return sum_col
def check_digit(c: Column) -> Column:
odd_sum = sum_digits(c, range(1, 12, 2))
even_sum = sum_digits(c, [2, 4, 6, 8, 10])
sum_result = (3 * odd_sum) even_sum
modulo = sum_result % 10
return (10 - modulo) % 10
df.withColumn("UPDATED_RESULT", F.concat(F.col("RESULT"), check_digit(F.col("RESULT")))).show()
Output
----------- --------------
| RESULT|UPDATED_RESULT|
----------- --------------
|03600024145| 036000241457|
|01010101010| 010101010105|
----------- --------------