val creation_timestamp = df.groupBy().agg(min($"userCreation_timestamp").alias("ts")).col("ts")
df.filter(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()
or
df.where(col("userCreation_timestamp").cast("timestamp") >= creation_timestamp).show()
When running above code to get show data then code is getting failed with org.apache.spark.sql.AnalysisException: Resolved attribute(s).
org.apache.spark.sql.AnalysisException: Resolved attribute(s) ts#1658 missing from id#2,userCreation_timestamp#8,firstname#31 in operator !Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658).;;
!Filter (cast(userCreation_timestamp#8 as timestamp) >= ts#1658)
- Relation[id#02,userCreation_timestamp#8, 26 more fields] parquet
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:293)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:172)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:178)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3306)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1463)
... 49 elided
df.where(col("userCreation_timestamp").cast("timestamp") >= "2022-03-11 18:36:48").show()
with literal value in where clause, code is working fine but when using dataframe then it is getting failed
CodePudding user response:
You can firstly select the min timestamp
as value and then use this value in where/filter
function. Please find the below working sample:
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{SparkSession, functions}
object QuestionStackOverflow extends App {
val spark = SparkSession.builder
.master("local[*]")
.appName("Sample App")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
import spark.sqlContext.implicits._
val df = Seq(
(1, "2022-03-11 18:36:48"),
(2, "2022-03-11 19:00:00"),
(3, "2022-03-11 20:00:00")
).toDF("id", "userCreation_timestamp")
.withColumn("ts", col("userCreation_timestamp").cast(TimestampType))
df.printSchema()
val creation_timestamp = df
.select(functions.min("ts"))
.head().get(0)
df.where(col("ts") > lit(creation_timestamp).cast(TimestampType))
.show()
}
Schema is:
root
|-- id: integer (nullable = false)
|-- userCreation_timestamp: string (nullable = true)
|-- ts: timestamp (nullable = true)
Output:
--- ---------------------- -------------------
| id|userCreation_timestamp| ts|
--- ---------------------- -------------------
| 2| 2022-03-11 19:00:00|2022-03-11 19:00:00|
| 3| 2022-03-11 20:00:00|2022-03-11 20:00:00|
--- ---------------------- -------------------
If you are interested in similar issues regarding Spark
, please visit my blog: https://bigdata-etl.com/tag/apache-spark/
CodePudding user response:
You're getting this error because you're not passing the value of the ts
column in your filter condition, but the column itself. As the ts
column does not exist in df
dataframe, you get a AnalysisException: Resolved attribute(s) ts#1658 missing
exception.
If you want to pass the value of the column, you need to retrieve the first row of your aggregated dataframe, then retrieve the timestamp value in this row, and finally use lit
to pass it to your condition:
import org.apache.spark.sql.functions.{min, lit, col}
val creation_timestamp = df.agg(min($"userCreation_timestamp")).head().getTimestamp(0)
df.filter(col("userCreation_timestamp").cast("timestamp") >= lit(creation_timestamp)).show()