I am using scala.
I want to filter latest folder and read only latest and also all files in it from hdfs dir.
Now it looks like
val read_csv =
spark
.read
.format("csv")
.load( "hdfs://device/signs/load=16»)
in the folder signs there are few folders with load (load=10, load=13, load=14, load=16) and I want to get only max value.
CodePudding user response:
try this methos:
private def getMaxPartitionValue(path: String, partitionName: String, sparkSession: SparkSession): String = {
// This will create a Path object using the hadoop configuration used in the spark session val hdfs = new Path(path).getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
// This will filter all folders that starts with the partition name, in your case load=
val partitionNames = hdfs.listStatus(new Path(path)).toList.filter(_.getPath.getName.startsWith(partitionName "="))
// if no partition starts with load= the we return an empty string
if (partitionNames.isEmpty) return ""
// this is to only get the partition value ( everything after load=) and then get the max value of all these partitions
partitionNames.map(_.getPath.getName).sorted.reverse.head
}
Call it like this:
val maxPartitionPath = path File.separator getMaxPartitionValue(path, "load", sparkSession)
CodePudding user response:
If you're not using hive metastore in your stack, you can find out latest partition with Hadoop FS API. You can get access to files metadata and handle it in the way you need:
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
val ls = fs.listStatus(new Path("/"))
If you have metastore on your cluster, you can get access to partitions with sql
sparkSession.sql("SHOW PARTITIONS tablename")