Home > Enterprise >  How create parquet table in scala?
How create parquet table in scala?

Time:12-21

I want to create a parquet table with certain types of fields:

name_process: String id_session: Int time_write: LocalDate or Timestamp key: String value: String

name_process id_session time_write key value
OtherClass jsdfsadfsf 43434883477 schema0.table0.csv Success
OtherClass jksdfkjhka 23212123323 schema1.table1.csv Success
OtherClass alskdfksjd 23343212234 schema2.table2.csv Failure
ExternalClass sdfjkhsdfd 34455453434 schema3.table3.csv Success

I want to write such a table correctly. With the correct data types. Then I'm going to read the partitions from it. I'm trying to implement read and write. But it turns out badly so far.

def createHiveTable(implicit spark: SparkSession) {

  val schema = "test_schema"
  val table = "test_table"
  val partitionName = "name_process"
  val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil

  spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
  //val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"

  val path = new File(".").getAbsolutePath    "/src/test/data-lineage/test_data_journal.csv"

  val df = spark.read.option("delimiter", ",")
    .option("header", true)
    .csv(path)

  df.show()

  df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")

}

def getLastSession(processName: String)(implicit spark: SparkSession): Unit = {

  val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
                     .select(
                              col("name_process").cast(StringType),
                              col("id_session").cast(StringType),
                              col("time_write").cast(LongType),
                              col("key").cast(StringType),
                              col("value").cast(StringType)
                     )

  val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
  val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)

  println(lastSession)
  println(TimeStamp.getCurrentTime)
}

logs from spark:

21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
 
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
 - Relation[id_session#78,time_write#79,key#80,value#81] parquet
 
 
                at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
                at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:342)
                at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
                at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
                at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
                at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
                at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)

CodePudding user response:

Problem

When you do this

spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")

You are reading from the specific directory that's why the name_process column is missing.

Solution:

You can do the following

spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)
  • Related