Home > Back-end >  How do I create a new DataFame based on an old DataFrame?
How do I create a new DataFame based on an old DataFrame?


I have csv file: dbname1.table1.csv:

|target            | source        |source_table                       |relation_type|
avg_ensure_sum_12m | inn_num       | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | protocol_dttm | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | inn_num       | custom_cib_ml_stg.p_overall_part_tend_cust | indirect

csv format for this table:


Then I create a dataframe by reading it:

 val dfDL = spark.read.option("delimiter", ",")
                     .option("header", true)

Now I need to create a new dataframe based on dfDL.

The structure of the new dataframe looks like this:

case class DataLink(schema_from: String,
                    table_from: String,
                    column_from: String,
                    link_type: String,
                    schema_to: String,
                    table_to: String,
                    column_to: String)

The information for the fields of the new DataFrame is obtained from a csv file:

schema_from = source_table.split(".")(0) // Example: custom_cib_ml_stg
table_from  = source_table.split(".")(1) // Example: p_overall_part_tend_cust
column_from = source                     // Example: inn_num
link_type   = relation_type              // Example: direct
schema_to   = "dbname1.table1.csv".split(".")(0) // Example: dbname1
table_to    = "dbname1.table1.csv".split(".")(1) // Example: table1
column_to   = target                             // Example: avg_ensure_sum_12m

I need to create a new dataframe. I can't cope on my own.

P.S. I need this dataframe to create a json file from it later. Example JSON:


I don't like my current implementation:

def readDLFromHDFS(file: LocatedFileStatus): Array[DataLink] = {

    val arrTableName        = file.getPath.getName.split("\\.")
    val (schemaTo, tableTo) = (arrTableName(0), arrTableName(1))

    val dfDL = spark.read.option("delimiter", ",")
                         .option("header", true)

    //val sourceTable = dfDL.select("source_table").collect().map(value => value.toString().split("."))

    dfDL.collect.map(row => DataLink(row.getString(2).split("\\.")(0),

  def toJSON(dataLinks: Array[DataLink]): Option[JValue] =
    dataLinks.map(Extraction.decompose).reduceOption(_    _)


CodePudding user response:

You can use dataset directly.

import spark.implicits._

case class DataLink(schema_from: String,
                    table_from: String,
                    column_from: String,
                    link_type: String,
                    schema_to: String,
                    table_to: String,
                    column_to: String)

val filename = "dbname1.table1.csv"
val df = spark.read.option("header","true").csv("test.csv")
 ------------------ ------------- ------------------------------------------ ------------- 
|target            |source       |source_table                              |relation_type|
 ------------------ ------------- ------------------------------------------ ------------- 
|avg_ensure_sum_12m|inn_num      |custom_cib_ml_stg.p_overall_part_tend_cust|direct       |
|avg_ensure_sum_12m|protocol_dttm|custom_cib_ml_stg.p_overall_part_tend_cust|direct       |
|avg_ensure_sum_12m|inn_num      |custom_cib_ml_stg.p_overall_part_tend_cust|indirect     |
 ------------------ ------------- ------------------------------------------ ------------- 


val df2 = spark.sql(s"""
select split(source_table, '[.]')[0] as schema_from
     , split(source_table, '[.]')[1] as table_from
     , source                        as column_from
     , relation_type                 as link_type
     , split('${filename}', '[.]')[0] as schema_to
     , split('${filename}', '[.]')[1] as table_to
     , target                        as column_to
  from table


 ----------------- -------------------- ------------- --------- --------- -------- ------------------ 
|      schema_from|          table_from|  column_from|link_type|schema_to|table_to|         column_to|
 ----------------- -------------------- ------------- --------- --------- -------- ------------------ 
|custom_cib_ml_stg|p_overall_part_te...|      inn_num|   direct|  dbname1|  table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...|protocol_dttm|   direct|  dbname1|  table1|avg_ensure_sum_12m|
|custom_cib_ml_stg|p_overall_part_te...|      inn_num| indirect|  dbname1|  table1|avg_ensure_sum_12m|
 ----------------- -------------------- ------------- --------- --------- -------- ------------------ 

CodePudding user response:

You definitely don't want to collect, that defeats the point of using spark here. As always with Spark you have a lot of options. You can use RDDs but I don't see a need to switch between modes here. You just want to apply custom logic to some columns and end up with a dataframe with the resulting column alone.

First, define a UDF that you want to apply:

def convert(target, source, source_table, relation_type) =

Then apply this function to all the relevant columns (making sure you wrap it in udf to make it a spark function rather than a plain Scala function) and select the result:

df.select(udf(convert)($"target", $"source", $"source_table", $"relation_type"))
  • Related