Home > database >  read csv file which has columns shuffled
read csv file which has columns shuffled

Time:06-28

I m trying to read csv file in databricks using pyspark where it has columns shuffled instead of A ,B, C it will randomly arranged like C,A,B i tried using map() , it throws error 'cannot pickle '_thread.RLock' object'

i need to reshuffle the column correctly using pyspark in databricks. I refered to the example in https://sparkbyexamples.com/pyspark/pyspark-map-transformation/ it is not helping to me because i have dataframe created by reading csv file

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

appName = "AddColumnUsingMap"
spark = SparkSession.Builder().appName(appName).getOrCreate()
parentname='xx'
filename='Test (2)'
todayDate='2022-05-24 1:48:42'
extension='.csv'
filePath="dbfs:/mnt/bronze/landing/x/" parentname "/" "current/" filename extension

# Formats
read_format = "csv"
write_format = "csv"
claimdenials_df_raw = (spark
                  .read
                  .format(read_format)
                  .option("multiLine", "true")
                  .option("header", "true")
                  .option("escape", '"')
                  .load(filePath))
display(claimdenials_df_raw)
rdd=spark.sparkContext.parallelize(claimdenials_df_raw)
def func1(x):
    DenialId=x["Id"]
    PatientFirstName=x["First Name"]
    PatientLastName=x[" Last Name"]
    PatientDateOfBirth=x["Date of Birth"]
    PatientId=x["PatientId"]
    
    return (DenialId,PatientFirstName,PatientLastName,PatientDateOfBirth,PatientId)

rdd2=df.rdd.map(lambda x: func1(x))
print(rdd2.collect())

CodePudding user response:

You can transfer rdd to dataframe with correct order. I am not sure but you can try with rdd2

df = rdd2.toDF(["Id", "First Name", "Last Name", "Date of Birth", "PatientId"])

CodePudding user response:

The sparkContext.parallelize() helps us to create an RDD from a list of elements or a collection. It is throwing the error TypeError: cannot pickle '_thread.RLock' object because you are trying to create an RDD by directly passing the dataframe claimdenials_df_raw.

So, you need to modify your code as shown below:

rdd=spark.sparkContext.parallelize(claimdenials_df_raw.collect()) 
  • Here claimdenials_df_raw.collect() returns a list of all the data of your dataframe. You can pass this list to successfully create an RDD.
rdd = claimdenials_df_raw.rdd 
  • Using the above code also gives you an RDD to work with.

The above code helps to overcome the error. You can use the following modified code directly to get rdd2:

claimdenials_df_raw = (spark 
                  .read 
                  .format(read_format) 
                  .option("multiLine", "true") 
                  .option("header", "true") 
                  .option("escape", '"') 
                  .load(filePath)) 
display(claimdenials_df_raw) 
rdd=spark.sparkContext.parallelize(claimdenials_df_raw.collect()) 
def func1(x): 
    DenialId=x["Id"] 
    PatientFirstName=x["First Name"] 
    PatientLastName=x[" Last Name"] 
    PatientDateOfBirth=x["Date of Birth"] 
    PatientId=x["PatientId"] 
     
    return (DenialId,PatientFirstName,PatientLastName,PatientDateOfBirth,PatientId) 
 
rdd2=rdd.map(lambda x: func1(x)) 
print(rdd2.collect()) 
  • Related