Home > Blockchain >  Adding multiple columns in temp table from dataframe using pyspark
Adding multiple columns in temp table from dataframe using pyspark

Time:10-13

Trying to create a table in Pyspark in Databricks which would capture database name, table name and DDL statement for each table. Hive metastore has been loaded in AzureMySQL on which Databricks cluster is pointing. While in separate dataframe objects I am able to capture needed details, however unable to combine them into one single table. Schema I am looking at : DatabaseName, TableName, CreateDDLStatement, Location. For database name and table name here is the dataframe df1

tables = spark.catalog.listTables(db)
for t in tables:
    Df1 = spark.sql("show tables in db1")
    Df1.createOrReplaceTempView("TempTable1")
    result1 = sqlContext.sql('select * from TempTable1')
    result1.show()
Output -
DatabaseName TableName IsTemporary
db1          tb1       False
db2          tb2       True db = "db1"
tables = spark.catalog.listTables(db)
for t in tables:
    Df1 = spark.sql("show tables in db1")
    Df1.createOrReplaceTempView("TempTable1")
    result1 = sqlContext.sql('select * from TempTable1')
    result1.show()
Output -
DatabaseName TableName IsTemporary
db1          tb1       False
db2          tb2       True

Second dataframe objects has DDL statement for each table

for t in tables:
    DF2 = spark.sql("SHOW CREATE TABLE {}.{}".format(db, t.name))
    DF2.createOrReplaceTempView("TempTable2")
    result2 = sqlContext.sql('select * from TempTable2')
    result2.show(10)
crtmnt_stmnt
create table tb1....
create table tb2

Expected Output
DatabaseName TableName IsTemporary   crtmnt_stmnt
db1          tb1       False         create table tb1...
db2          tb2       True          create table tb2...

Can I append a column to dataframe object or temporary table. how to dynamically take values using withColumn and spark.sql command

CodePudding user response:

Since you already have a Dataframe where you have database and table name (which are used while create crtmnt_stmnt column), you can achieve this with dataframes itself.

  • The following is the dataframe I have using code similar to yours:
df1= spark.sql("show tables in default")
df1.createOrReplaceTempView("default.temp_table")
result = spark.sql("select * from temp_table")
result.show()

enter image description here

  • Now, I have created a dataframe that has all the details required. Iterating through tables, I created a temporary dataframe which I am appending to my final dataframe. I have also added 2 columns while doing so i.e., tablename and database using withColumn.

  • This is because we will have key columns in our final dataframe(create statement dataframe) using which we can apply inner join on the above df1.

from pyspark.sql.functions import col,lit,concat
df2=None
db='default'
for t in tables:
    if(df2 is None):
        df2 = spark.sql("SHOW CREATE TABLE {}.{}".format(db, t.name))
        df2 = df2.withColumn("table_name",lit(t.name)).withColumn("db_name",lit(db))
    else:
        temp = spark.sql("SHOW CREATE TABLE {}.{}".format(db, t.name))
        temp = temp.withColumn("table_name",lit(t.name)).withColumn("db_name",lit(db))
        df2 = df2.union(temp)
df2.createOrReplaceTempView("create_table")

#display(df2)

enter image description here

  • I have used pyspark to apply inner join to get the desired result.
final = df1.join(df2,(df1.tableName == df2.table_name) & (df1.database == df2.db_name),"inner").select(df1['database'],df1['tableName'],df1['isTemporary'],df2['createtab_stmt'])
display(final)

enter image description here

  • Using spark.sql() the code would be as follows (using the temporary views created)
spark.sql("select d1.database,d1.tableName,d1.isTemporary,d2.createtab_stmt from temp_table d1 inner join create_table d2 on d1.database=d2.db_name and d1.tableName=d2.table_name").show()
  • Related