I am trying to lift some files with pyspark from a databricks datalake. To do this, I use the "sqlContext" statement to create the data frame, I do this without problems. Each file is named by the creation date, for example "20211001.cv". These arrive on a daily basis and I was using "* .csv" to get them all up. But now I need to lift the files from a certain date forward and I can't find a way to do it, that's why I turn to you please.
The statement style I am using is the following:
df_example= (sqlContext
.read
.format("com.databricks.spark.csv")
.option("delimiter", ";")
.option("header","true")
.option("inferSchema","true")
.option("encoding","windows-1252")
.load("/mnt/path/202110*.csv"))
I need to be able to detect files from a certain date forward in ".load" sentence, is it possible to do it with pyspark? Example "NameFile.csv >= 202110" Do you have an example please?
From already thank you very much!
CodePudding user response:
I believe you can't do this. At least how you intend it.
If the data would've been written partitioned by date, said date would be part of the path and then Spark would add it as another column which you could then use to filter using the DataFrame API as you do with any other column.
So if the files were, let's say:
your_main_df_path
├── date_at=20211001
│ └── file.csv
├── date_at=20211002
│ └── file.csv
├── date_at=20211003
│ └── file.csv
└── ...
You could then do:
df = spark_session.read.format("csv").load("your_main_df_path") # with all your options
df.filter("date_at>=20211002") # or any other date you need
Spark would use the date in the path to do the partition pruning and only read the dates you need. If you can modify how the data is written, this is probably the best option.
If you can't control that or is hard to change for all the data you already have there. Maybe you can try to write a little python function takes a start date (and maybe an optional end_date) and returns a list of files that fall in that range. That list of files could then be passed to the DataFrameWriter.
CodePudding user response:
get the all dates >[particular-date] and make that as list and pass those values as parameterized value in iteration mode as like this ,
%python
table_name ='my_table_name'
survey_curated_delta_path = f"abfss://[email protected]/path1/path2/stage/validation/results/{table_name}.csv"
survey_sdf = spark.read.format("csv").load(survey_curated_delta_path)
display(survey_sdf)