I am trying to read data from a table in a postgresql database and proceed with an ETL project. I have an Docker enviroment using this docker-compose:
version: "3.3"
services:
spark-master:
image: docker.io/bitnami/spark:3.3
ports:
- "9090:8080"
- "7077:7077"
volumes:
- /opt/spark-apps
- /opt/spark-data
environment:
- SPARK_LOCAL_IP=spark-master
- SPARK_WORKLOAD=master
spark-worker-a:
image: docker.io/bitnami/spark:3.3
ports:
- "9091:8080"
- "7000:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-a
volumes:
- /opt/spark-apps
- /opt/spark-data
spark-worker-b:
image: docker.io/bitnami/spark:3.3
ports:
- "9092:8080"
- "7001:7000"
depends_on:
- spark-master
environment:
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=1G
- SPARK_DRIVER_MEMORY=1G
- SPARK_EXECUTOR_MEMORY=1G
- SPARK_WORKLOAD=worker
- SPARK_LOCAL_IP=spark-worker-b
volumes:
- /opt/spark-apps
- /opt/spark-data
postgres:
container_name: postgres_container
image: postgres:11.7-alpine
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
volumes:
- /data/postgres
ports:
- "4560:5432"
restart: unless-stopped
# jupyterlab with pyspark
jupyter-pyspark:
image: jupyter/pyspark-notebook:latest
environment:
JUPYTER_ENABLE_LAB: "yes"
ports:
- "9999:8888"
volumes:
- /app/data
I was succesful connecting to the DB, but I can't print any data. Here's my code:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName("salesETL")\
.config("spark.driver.extraClassPath", "./postgresql-42.5.1.jar")\
.getOrCreate()
df = spark.read.format("jdbc").option("url", "jdbc:postgresql://postgres_container:5432/postgres")\
.option("dbtable", "sales")\
.option("driver", "org.postgresql.Driver")\
.option("user", "admin")\
.option("password", "admin").load()
df.show(10).toPandas()
With .toPandas()
it gives me this error:
AttributeError Traceback (most recent call last)
Cell In[7], line 1
----> 1 df.show(10).toPandas()
AttributeError: 'NoneType' object has no attribute 'toPandas'
Without .toPandas()
it print the columns but no data
-------- ---------- ----------- ------------- ----------------- ------------- -------------- ---------- -------- -----------
|order_id|order_date|customer_id|customer_name|customer_lastname|customer_city|customer_state|product_id|quantity|order_value|
-------- ---------- ----------- ------------- ----------------- ------------- -------------- ---------- -------- -----------
-------- ---------- ----------- ------------- ----------------- ------------- -------------- ---------- -------- -----------
I am new to Pyspark/Spark so I can't figure out what I am missing. It's my very first project. What can it be?
ps: when I run type(df)
it returns pyspark.sql.dataframe.DataFrame
CodePudding user response:
show
returns nothing. You should call pandas on the dataframe directly. Moreover, I think it's to_pandas
not toPandas
(https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_pandas.html). So it seems the error will be vanished, with something like that:
df.to_pandas()
About the empty dataset, is there any error? If there is no error, are you sure that any records exist on the table?
CodePudding user response:
Well, I couldn't find a justification of why this has happened and fix it. Instead, I took a workaround: I loaded data to Python using Pandas and then changed the pandas DF to Pyspark DF.
Here's my code:
import psycopg2
import pandas as pd
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
appName = "salesETL"
master = "local"
spark = SparkSession.builder.master(master).appName(appName).getOrCreate()
engine = create_engine(
"postgresql psycopg2://admin:admin@postgres_container/postgres?client_encoding=utf8")
pdf = pd.read_sql('select * from sales.sales', engine)
# Convert Pandas dataframe to spark DataFrame
df = spark.createDataFrame(pdf)