Home > Net >  2 dataframe column values not working in where clause
2 dataframe column values not working in where clause

Time:04-22

val creation_timestamp = df.groupBy().agg(min($"userCreation_timestamp").alias("ts")).col("ts")

df.filter(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()
or
df.where(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()

When running above code to get show data then code is getting failed with org.apache.spark.sql.AnalysisException: Resolved attribute(s).

org.apache.spark.sql.AnalysisException: Resolved attribute(s) ts#1658 missing from id#2,userCreation_timestamp#8,firstname#31 in operator !Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658).;;
!Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658)
 - Relation[id#02,userCreation_timestamp#8, 26 more fields] parquet

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:293)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:172)
  at org.apache.spark.sql.Dataset.<init>(Dataset.scala:178)
  at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
  at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3306)
  at org.apache.spark.sql.Dataset.filter(Dataset.scala:1463)
  ... 49 elided

df.where(col("userCreation_timestamp").cast("timestamp") >= "2022-03-11 18:36:48").show()

with literal value in where clause, code is working fine but when using dataframe then it is getting failed

CodePudding user response:

You can firstly select the min timestamp as value and then use this value in where/filter function. Please find the below working sample:

import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{SparkSession, functions}

object QuestionStackOverflow extends App {

  val spark = SparkSession.builder
    .master("local[*]")
    .appName("Sample App")
    .config("spark.sql.shuffle.partitions", "1")
    .getOrCreate()

  import spark.sqlContext.implicits._

  val df = Seq(
    (1, "2022-03-11 18:36:48"),
    (2, "2022-03-11 19:00:00"),
    (3, "2022-03-11 20:00:00")
  ).toDF("id", "userCreation_timestamp")
    .withColumn("ts", col("userCreation_timestamp").cast(TimestampType))

  df.printSchema()

  val creation_timestamp = df
    .select(functions.min("ts"))
    .head().get(0)

  df.where(col("ts") > lit(creation_timestamp).cast(TimestampType))
    .show()
  
}

Schema is:

root
 |-- id: integer (nullable = false)
 |-- userCreation_timestamp: string (nullable = true)
 |-- ts: timestamp (nullable = true)

Output:

 --- ---------------------- ------------------- 
| id|userCreation_timestamp|                 ts|
 --- ---------------------- ------------------- 
|  2|   2022-03-11 19:00:00|2022-03-11 19:00:00|
|  3|   2022-03-11 20:00:00|2022-03-11 20:00:00|
 --- ---------------------- ------------------- 

If you are interested in similar issues regarding Spark, please visit my blog: https://bigdata-etl.com/tag/apache-spark/

CodePudding user response:

You're getting this error because you're not passing the value of the ts column in your filter condition, but the column itself. As the ts column does not exist in df dataframe, you get a AnalysisException: Resolved attribute(s) ts#1658 missing exception.

If you want to pass the value of the column, you need to retrieve the first row of your aggregated dataframe, then retrieve the timestamp value in this row, and finally use lit to pass it to your condition:

import org.apache.spark.sql.functions.{min, lit, col}

val creation_timestamp = df.agg(min($"userCreation_timestamp")).head().getTimestamp(0)

df.filter(col("userCreation_timestamp").cast("timestamp") >= lit(creation_timestamp)).show()

  • Related