I have the following dataset with the name and date of birth I would like to create a new column with the zodiac sign I managed to do it in pandas and in python, but I don't know how to proceed with pyspark since I have a giant dataset
EXAMPLE:
name | dob |
---|---|
John | 1932-11-14 |
Maike | 1932-10-14 |
base on the dict
zodiacs = [(120, 'Cap'), (218, 'Aqu'), (320, 'Pis'), (420, 'Ari'), (521, 'Tau'),
(621, 'Gem'), (722, 'Can'), (823, 'Leo'), (923, 'Vir'), (1023, 'Lib'),
(1122, 'Sco'), (1222, 'Sag'), (1231, 'Cap')]
base on dict become:
name | dob | sign |
---|---|---|
John | 1932-11-14 | Sco |
Maike | 1932-10-14 | Lib |
CodePudding user response:
You can create a dataframe that contains all zodiac signs with their start and end dates, then join between the 2 dates, here's the code:
spark = SparkSession.builder.master("local[*]").getOrCreate()
data1 = [
["John", "1932-11-14"],
["Maike", "1932-10-14"]
]
data2 = [
["Aries", "03-21", "04-19"],
["Taurus", "04-20", "05-20"],
["Gemini", "05-21", "06-20"],
["Cancer", "06-21", "07-22"],
["Leo star", "07-23", "08-22"],
["Virgo", "08-23", "09-22"],
["Libra", "09-23", "10-22"],
["Scorpio", "10-23", "11-21"],
["Sagittarius", "11-22", "12-21"],
["Capricorn", "12-22", "01-19"],
["Aquarius", "01-20", "02-18"],
["Pisces", "02-19", "03-20"],
]
df = spark.createDataFrame(data1).toDF("name", "dob")
zodiacSignDf = spark.createDataFrame(data2).toDF("sign", "start", "end")
df.alias("df").join(zodiacSignDf.alias("zodiacSignDf"), to_date(col("df.dob").substr(6, 5), 'MM-dd').between(
to_date(col("zodiacSignDf.start"), 'MM-dd'), to_date(col("zodiacSignDf.end"), 'MM-dd')
), "left").drop("start", "end").show()
----- ---------- -------
| name| dob| sign|
----- ---------- -------
| John|1932-11-14|Scorpio|
|Maike|1932-10-14| Libra|
----- ---------- -------
CodePudding user response:
If you want to use int comparison during joins instead of dates (which internaly depends on version of your cluster, in Spark 2.X dates are compared as strings, in Spark 3.X they may be casted to datetime if you dont set legacy parameter) and avoid non-equi join which will resolve to broadcastNestedLoopsJoin you can think about something like this:
import datetime
import pyspark.sql.functions as F
x = [
("Month with one digit", datetime.date(2000, 1, 14)),
("Last day of month", datetime.date(1800, 5, 31)),
("First day of month", datetime.date(1700, 8, 1)),
("John", datetime.date(1932, 11, 14)),
("Maike", datetime.date(1932, 10, 14)),
]
df = spark.createDataFrame(x, schema=["name", "dob"])
array = []
for x in range(1231):
if x <= 100:
continue
elif x <= 120:
array.append((x, 'Cap'))
elif x > 120 and x <= 131 or x > 200 and x <= 218:
array.append((x, 'Aqu'))
elif x > 218 and x <= 229 or x > 300 and x <= 320:
array.append((x, 'Pis'))
elif x > 320 and x <= 331 or x > 400 and x <= 420:
array.append((x, 'Ari'))
elif x > 420 and x <= 430 or x > 500 and x <= 521:
array.append((x, 'Tau'))
elif x > 521 and x <= 531 or x > 600 and x <= 621:
array.append((x, 'Gem'))
elif x > 621 and x <= 630 or x > 700 and x <= 722:
array.append((x, 'Can'))
elif x > 722 and x <= 731 or x > 800 and x <= 823:
array.append((x, 'Leo'))
elif x > 823 and x <= 831 or x > 900 and x <= 923:
array.append((x, 'Vir'))
elif x > 923 and x <= 930 or x > 1000 and x <= 1023:
array.append((x, 'Lib'))
elif x > 1023 and x <= 1031 or x > 1100 and x <= 1122:
array.append((x, 'Sco'))
elif x > 1122 and x <= 1130 or x > 1200 and x <= 1222:
array.append((x, 'Sag'))
elif x > 1222 and x <= 1231:
array.append((x, 'Cap'))
zodiacsDf = spark.createDataFrame(array, schema=["date", "name"])
df = df.withColumn("dobWithoutYearAsInt", F.regexp_replace(F.substring(F.col("dob"), 6,5), "-", ""))
joinedDf = df.join(F.broadcast(zodiacsDf), df["dobWithoutYearAsInt"] == zodiacsDf["date"], "left")
joinedDf.select(df.name, df.dob, zodiacsDf.name).show()
First i am preparing my right side of join, its array with zodiac sign for each day of the year in int format similar to one which you posted in your question (it covers leap year)
Then i am adding temp column dobWithoutYearAsInt which is representation of date from left side table as int
Then i am performing broadcast join which resolves to BroadcastHastJoin
Output looks ok
-------------------- ---------- ----
| name| dob|name|
-------------------- ---------- ----
|Month with one digit|2000-01-14| Cap|
| Last day of month|1800-05-31| Gem|
| First day of month|1700-08-01| Leo|
| John|1932-11-14| Sco|
| Maike|1932-10-14| Lib|
-------------------- ---------- ----
And the plan looks like this:
== Physical Plan ==
AdaptiveSparkPlan (15)
- == Final Plan ==
CollectLimit (10)
- * Project (9)
- * BroadcastHashJoin LeftOuter BuildRight (8)
:- * LocalLimit (3)
: - * Project (2)
: - * Scan ExistingRDD (1)
- BroadcastQueryStage (7), Statistics(sizeInBytes=1032.8 KiB, rowCount=365, isRuntime=true)
- BroadcastExchange (6)
- * Filter (5)
- * Scan ExistingRDD (4)