Home > Enterprise >  Failed to find data source: delta in Python environment
Failed to find data source: delta in Python environment

Time:12-10

Following: https://docs.delta.io/latest/quick-start.html#python

I have installed delta-spark and run:

from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()

However when I run:

data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

the error states: delta not recognised

& if I run

DeltaTable.isDeltaTable(spark, "packages/tests/streaming/data")

It states: TypeError: 'JavaPackage' object is not callable

It seemed that I could run these commands locally (such as unit tests) without Maven or running it in a pyspark shell? It would be good to just see if I am missing a dependency?

CodePudding user response:

You can just install delta-spark PyPi package using pip install delta-spark (it will pull pyspark as well), and then refer to it.

Or you can add a configuration option that will fetch Delta package. It's .config("spark.jars.packages", "io.delta:delta-core_2.12:<delta-version>"). For Spark 3.1 Delta versions is 1.0.0 (see releases mapping docs for more information).

I have an example of using Delta tables in unit tests (please note, that import statement is in the function definition because Delta package is loaded dynamically):

import pyspark
import pyspark.sql
import pytest
import shutil
from pyspark.sql import SparkSession

delta_dir_name = "/tmp/delta-table"

@pytest.fixture
def delta_setup(spark_session):
    data = spark_session.range(0, 5)
    data.write.format("delta").save(delta_dir_name)
    yield data
    shutil.rmtree(delta_dir_name, ignore_errors=True)

def test_delta(spark_session, delta_setup):
    from delta.tables import DeltaTable
    deltaTable = DeltaTable.forPath(spark_session, delta_dir_name)
    hist = deltaTable.history()
    assert hist.count() == 1

environment is initialized via pytest-spark:

[pytest]
filterwarnings =
  ignore::DeprecationWarning
spark_options =
  spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension
  spark.sql.catalog.spark_catalog: org.apache.spark.sql.delta.catalog.DeltaCatalog
  spark.jars.packages: io.delta:delta-core_2.12:1.0.0
  spark.sql.catalogImplementation: in-memory
  • Related