Home > OS >  Pyspark: Read data from AWS:S3 bucket and write to postgres table
Pyspark: Read data from AWS:S3 bucket and write to postgres table

Time:03-22

I am trying to read data from S3 bucket and want to write/ load it to postgres table. My code is as-

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Read Multiple CSV Files').getOrCreate()
path = ['C://Projects/Sandbox/file2.csv']
files = spark.read.csv(path, sep=',',inferSchema=True, header=True)
df1 = files.toPandas()

from pyspark.sql import DataFrameWriter
my_writer = DataFrameWriter(df1)

mode = "overwrite"
url = ""
properties = {"user": "","password": "","driver": "org.postgresql.Driver"}
my_writer.write.jdbc(url=url, table="test_result", mode=mode, properties=properties)

on line

my_writer = DataFrameWriter(files)

It gives error as-

AttributeError: 'DataFrameWriter' object has no attribute 'write'

On line, when parameter to DataFrameWriter() is passed as -

my_writer = DataFrameWriter(df1)

AttributeError: 'DataFrame' object has no attribute 'sql_ctx'

Is there anything/anywhere which I have done wrong?

CodePudding user response:

There is no need to create a new instance of DataFrameWriter, spark dataframe already expose this interface using the write attribute. You can use this attribute to export the csv data using jdbc connection

# Read the data form source
files = spark.read.csv(path, sep=',', inferSchema=True, header=True)

# Write the data to destination using jdbc connection
files.write.jdbc(url=url, table="test_result", mode=mode, properties=properties)

How to fix your existing code?

Create a new instance of DataFrameWriter using files then use my_writer.jdbc to export the data using jdbc connection

my_writer = DataFrameWriter(files)
my_writer.jdbc(url=url, table="test_result", mode=mode, properties=properties)
#      ^^^^^^ No need to use .write attribute

CodePudding user response:

Below solution is correct

spark = SparkSession.builder.appName('Read Multiple CSV Files').getOrCreate()
path = ['C://Projects/Sandbox/file2.csv']
files = spark.read.csv(path, sep=',',inferSchema=True, header=True)
df1 = files.toPandas()

from pyspark.sql import DataFrameWriter
my_writer = DataFrameWriter(df1)

mode = "overwrite"
url = ""
properties = {"user": "","password": "","driver": "org.postgresql.Driver"}
my_writer.jdbc(url=url, table="test_result", mode=mode, properties=properties)
  • Related