I am trying to create delta lake for all files located in '/dbfs/FileStore/tables/FirstRate30mins'
, but the data map wrongly.
The earliest correct data of AAL should be:
AAL_date/time AAL_adjOpen AAL_adjHigh AAL_adjLow AAL_adjClose AAL_adjVolume
2013-12-09 08:30:00 22.8049 22.8049 21.7868 22.2016 141800
but when I display(df_30mins_['AAL'])
, there is a wrong price output. There are 2005 data.
AAL_date/time AAL_adjOpen AAL_adjHigh AAL_adjLow AAL_adjClose AAL_adjVolume
2005-01-03 08:00:00 0.9939 0.9985 0.9863 0.9955 1711416
when I try to display other data, such as display(df_30mins_['A'])
, the data are also the same and map wrongly.
A_date/time A_adjOpen A_adjHigh A_adjLow A_adjClose A_adjVolume
2005-01-03 08:00:00 0.9939 0.9985 0.9863 0.9955 1711416
Here is the source code:
import os
import numpy as np
import pandas as pd
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)
df_30mins_ = {}
_delta ={}
for filename in os.listdir(path):
#split file name
name = filename.split('_')[0]
#create clolumn header names
temp = StructType([StructField(name "_date/time", StringType(), True),StructField(name "_adjOpen", FloatType(), True),StructField(name "_adjHigh", FloatType(), True),StructField(name "_adjLow", FloatType(), True),StructField(name "_adjClose", FloatType(), True),StructField(name "_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/")
#name each dataframes
df_30mins_[name] = temp_df
#name each table
table_name = name '_30mins_delta'
#create delta lake for each dataframes
df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)
display(df_30mins_['AAL'])
display(df_30mins_['A'])
display(df_30mins_['AAPL'])
#display(spark.sql('SELECT * FROM aal_30mins_delta'))
#display(spark.sql('SELECT * FROM a_30mins_delta'))
#display(spark.sql('SELECT * FROM aapl_30mins_delta'))
CodePudding user response:
sorry. Just miss filename
Here is the correct code.
import os
import numpy as np
import pandas as pd
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)
df_30mins_ = {}
_delta ={}
for filename in os.listdir(path):
#split file name
name = filename.split('_')[0]
#create clolumn header names
temp = StructType([StructField(name "_date/time", StringType(), True),StructField(name "_adjOpen", FloatType(), True),StructField(name "_adjHigh", FloatType(), True),StructField(name "_adjLow", FloatType(), True),StructField(name "_adjClose", FloatType(), True),StructField(name "_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/" filename)
#name each dataframes
df_30mins_[name] = temp_df
#name each table
table_name = name '_30mins_delta'
#create delta lake for each dataframes
df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)
display(df_30mins_['AAL'])
display(df_30mins_['AAPL'])
display(df_30mins_['A'])
display(spark.sql('SELECT * FROM aal_30mins_delta'))
display(spark.sql('SELECT * FROM aapl_30mins_delta'))
display(spark.sql('SELECT * FROM a_30mins_delta'))