Home > Software engineering >  Python: Data match wrongly in Azure Databricks
Python: Data match wrongly in Azure Databricks

Time:02-20

I am trying to create delta lake for all files located in '/dbfs/FileStore/tables/FirstRate30mins', but the data map wrongly.

The earliest correct data of AAL should be:

AAL_date/time        AAL_adjOpen   AAL_adjHigh   AAL_adjLow  AAL_adjClose  AAL_adjVolume

2013-12-09 08:30:00   22.8049       22.8049       21.7868       22.2016       141800

but when I display(df_30mins_['AAL']), there is a wrong price output. There are 2005 data.

AAL_date/time        AAL_adjOpen   AAL_adjHigh   AAL_adjLow  AAL_adjClose  AAL_adjVolume
2005-01-03 08:00:00    0.9939       0.9985         0.9863     0.9955         1711416

when I try to display other data, such as display(df_30mins_['A']), the data are also the same and map wrongly.

A_date/time           A_adjOpen    A_adjHigh   A_adjLow    A_adjClose      A_adjVolume
2005-01-03 08:00:00    0.9939       0.9985    0.9863          0.9955        1711416

Here is the source code:

import os
import numpy as np
import pandas as pd

from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json

#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)

df_30mins_ = {}
_delta ={}

for filename in os.listdir(path):
    #split file name
    name = filename.split('_')[0]
    #create clolumn header names
    temp = StructType([StructField(name "_date/time", StringType(), True),StructField(name "_adjOpen", FloatType(), True),StructField(name "_adjHigh", FloatType(), True),StructField(name "_adjLow", FloatType(), True),StructField(name "_adjClose", FloatType(), True),StructField(name "_adjVolume", IntegerType(), True)])
    
    #list and create csv dataframes
    temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/")
    
    #name each dataframes
    df_30mins_[name] = temp_df
    
    #name each table
    table_name = name '_30mins_delta'
    
    #create delta lake for each dataframes
    df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)

    display(df_30mins_['AAL'])
    display(df_30mins_['A'])
    display(df_30mins_['AAPL'])

    #display(spark.sql('SELECT * FROM aal_30mins_delta'))
    #display(spark.sql('SELECT * FROM a_30mins_delta'))
    #display(spark.sql('SELECT * FROM aapl_30mins_delta'))

CodePudding user response:

sorry. Just miss filename

Here is the correct code.

import os
import numpy as np
import pandas as pd

from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import functions
import pyspark.sql.functions #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
import json

#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
path = '/dbfs/FileStore/tables/FirstRate30mins'
filename_lists = os.listdir(path)

df_30mins_ = {}
_delta ={}

for filename in os.listdir(path):
    #split file name
    name = filename.split('_')[0]
    #create clolumn header names
    temp = StructType([StructField(name "_date/time", StringType(), True),StructField(name "_adjOpen", FloatType(), True),StructField(name "_adjHigh", FloatType(), True),StructField(name "_adjLow", FloatType(), True),StructField(name "_adjClose", FloatType(), True),StructField(name "_adjVolume", IntegerType(), True)])
    
    #list and create csv dataframes
    temp_df = spark.read.format("csv").option("header", "false").schema(temp).load("/FileStore/tables/FirstRate30mins/" filename)
    
    #name each dataframes
    df_30mins_[name] = temp_df
    
    #name each table
    table_name = name '_30mins_delta'
    
    #create delta lake for each dataframes
    df_30mins_[name].write.format("delta").mode("overwrite").saveAsTable(table_name)
    
display(df_30mins_['AAL'])
display(df_30mins_['AAPL'])
display(df_30mins_['A'])

display(spark.sql('SELECT * FROM aal_30mins_delta'))
display(spark.sql('SELECT * FROM aapl_30mins_delta'))
display(spark.sql('SELECT * FROM a_30mins_delta'))
  • Related