Home > Software engineering >  DOCKER: Pyspark reading from Postgresql doesn't show data
DOCKER: Pyspark reading from Postgresql doesn't show data

Time:01-30

I am trying to read data from a table in a postgresql database and proceed with an ETL project. I have an Docker enviroment using this docker-compose:

version: "3.3"
services:
  spark-master:
    image: docker.io/bitnami/spark:3.3
    ports:
      - "9090:8080"
      - "7077:7077"
    volumes:
       - /opt/spark-apps
       - /opt/spark-data
    environment:
      - SPARK_LOCAL_IP=spark-master
      - SPARK_WORKLOAD=master
  spark-worker-a:
    image: docker.io/bitnami/spark:3.3
    ports:
      - "9091:8080"
      - "7000:7000"
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=1G
      - SPARK_DRIVER_MEMORY=1G
      - SPARK_EXECUTOR_MEMORY=1G
      - SPARK_WORKLOAD=worker
      - SPARK_LOCAL_IP=spark-worker-a
    volumes:
       - /opt/spark-apps
       - /opt/spark-data
  spark-worker-b:
    image: docker.io/bitnami/spark:3.3
    ports:
      - "9092:8080"
      - "7001:7000"
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=1G
      - SPARK_DRIVER_MEMORY=1G
      - SPARK_EXECUTOR_MEMORY=1G
      - SPARK_WORKLOAD=worker
      - SPARK_LOCAL_IP=spark-worker-b
    volumes:
        - /opt/spark-apps
        - /opt/spark-data

  postgres:
    container_name: postgres_container
    image: postgres:11.7-alpine
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: admin
    volumes:
       - /data/postgres
    ports:
      - "4560:5432"
    restart: unless-stopped

  # jupyterlab with pyspark
  jupyter-pyspark:
    image: jupyter/pyspark-notebook:latest
    environment:
      JUPYTER_ENABLE_LAB: "yes"
    ports:
      - "9999:8888"
    volumes:
      - /app/data

I was succesful connecting to the DB, but I can't print any data. Here's my code:

from pyspark.sql import SparkSession

spark = SparkSession.builder\
                    .appName("salesETL")\
                    .config("spark.driver.extraClassPath", "./postgresql-42.5.1.jar")\
                    .getOrCreate()

df = spark.read.format("jdbc").option("url", "jdbc:postgresql://postgres_container:5432/postgres")\
                              .option("dbtable", "sales")\
                              .option("driver", "org.postgresql.Driver")\
                              .option("user", "admin")\
                              .option("password", "admin").load()

df.show(10).toPandas()

With .toPandas() it gives me this error:

    AttributeError Traceback (most recent call last)
    Cell In[7], line 1
    ----> 1 df.show(10).toPandas()

AttributeError: 'NoneType' object has no attribute 'toPandas'

Without .toPandas() it print the columns but no data

 -------- ---------- ----------- ------------- ----------------- ------------- -------------- ---------- -------- ----------- 
|order_id|order_date|customer_id|customer_name|customer_lastname|customer_city|customer_state|product_id|quantity|order_value|
 -------- ---------- ----------- ------------- ----------------- ------------- -------------- ---------- -------- ----------- 
 -------- ---------- ----------- ------------- ----------------- ------------- -------------- ---------- -------- ----------- 

I am new to Pyspark/Spark so I can't figure out what I am missing. It's my very first project. What can it be?

ps: when I run type(df) it returns pyspark.sql.dataframe.DataFrame

CodePudding user response:

show returns nothing. You should call pandas on the dataframe directly. Moreover, I think it's to_pandas not toPandas (https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_pandas.html). So it seems the error will be vanished, with something like that:

df.to_pandas()

About the empty dataset, is there any error? If there is no error, are you sure that any records exist on the table?

CodePudding user response:

Well, I couldn't find a justification of why this has happened and fix it. Instead, I took a workaround: I loaded data to Python using Pandas and then changed the pandas DF to Pyspark DF.

Here's my code:

import psycopg2
import pandas as pd
from pyspark.sql import SparkSession
from sqlalchemy import create_engine

appName = "salesETL"
master = "local"

spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

engine = create_engine(
    "postgresql psycopg2://admin:admin@postgres_container/postgres?client_encoding=utf8")
pdf = pd.read_sql('select * from sales.sales', engine)

# Convert Pandas dataframe to spark DataFrame
df = spark.createDataFrame(pdf)
  • Related