I want to create a parquet table with certain types of fields:
name_process: String id_session: Int time_write: LocalDate or Timestamp key: String value: String
name_process | id_session | time_write | key | value |
---|---|---|---|---|
OtherClass | jsdfsadfsf | 43434883477 | schema0.table0.csv | Success |
OtherClass | jksdfkjhka | 23212123323 | schema1.table1.csv | Success |
OtherClass | alskdfksjd | 23343212234 | schema2.table2.csv | Failure |
ExternalClass | sdfjkhsdfd | 34455453434 | schema3.table3.csv | Success |
I want to write such a table correctly. With the correct data types. Then I'm going to read the partitions from it. I'm trying to implement read and write. But it turns out badly so far.
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
def getLastSession(processName: String)(implicit spark: SparkSession): Unit = {
val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
.select(
col("name_process").cast(StringType),
col("id_session").cast(StringType),
col("time_write").cast(LongType),
col("key").cast(StringType),
col("value").cast(StringType)
)
val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)
println(lastSession)
println(TimeStamp.getCurrentTime)
}
logs from spark:
21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
- Relation[id_session#78,time_write#79,key#80,value#81] parquet
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
CodePudding user response:
Problem
When you do this
spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
You are reading from the specific directory that's why the name_process
column is missing.
Solution:
You can do the following
spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)