I'm using PySpark in DAE Databricks to get HES data.
At the moment I do this:
df_test = sqlContext.sql("select * from db_name_2122")
ICD10_codes = ['X85','X87']
df_test = df_test.filter( (df_test.field1 == "something") &
(df_test.field.rlike('|'.join(ICD10_codes) )
df_test_2 = sqlContext.sql("select * from db_name_2021")
ICD10_codes = ['X85','X87']
df_test2 = df_test2.filter( (df_test2.field1 == "something") &
(df_test2.field.rlike('|'.join(ICD10_codes) )
I have to do this for financial years 1112, 1213, 1314, ..., 2122. This is a lot of copy-pasting of similar code and I know this is bad - both from experience of finding c p errors and also reading stuff.
What I want to do:
Be able to select data where the same conditions are met in the same fields in 11 different financial year tables within a DB and pull it all into one table at the end.
Rather than what I'm doing now which is 11 different but similar copy and paste chunks of code, which are then appended together.
CodePudding user response:
First, before the "appending" you can put all the dfs into the same list:
dfs = []
for i in range(11, 22):
df = sqlContext.sql(f"select * from db_name_{i}{i 1}")
ICD10_codes = ['X85','X87']
df = df.filter((df.field1 == "something") &
(df.field.rlike('|'.join(ICD10_codes))))
dfs.append(df)
And then do the "append". I don't know what do you mean by "append". This is how you could do a unionByName
:
df_final = dfs[0]
for df in dfs[1:]:
df_final = df_final.unionByName(df)
Everything can be added into one loop:
ICD10_codes = ['X85','X87']
rng = range(11, 22)
for i in rng:
df = sqlContext.sql(f"select * from db_name_{i}{i 1}")
df = df.filter((df.field1 == "something") &
(df.field.rlike('|'.join(ICD10_codes))))
df_final = df if i == rng[0] else df_final.unionByName(df)
If your table names are more complex, you can put them directly into the list:
tables = ['db_name_1112_text1', 'db_name_1213_text56']
ICD10_codes = ['X85','X87']
for x in tables:
df = sqlContext.sql(f"select * from {x}")
df = df.filter((df.field1 == "something") &
(df.field.rlike('|'.join(ICD10_codes))))
df_final = df if x == tables[0] else df_final.unionByName(df)