Home > Software design >  A DataFrame object does not have an attribute select
A DataFrame object does not have an attribute select

Time:01-27

In palantir foundry, I am trying to read all xml files from a dataset. Then, in a for loop, I parse the xml files.

Until the second last line, the code runs fine without errors.

from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
from bs4 import BeautifulSoup
import pandas as pd
import lxml

@transform(
    output=Output("/Spring/xx/datasets/mydataset2"),
    source_df=Input("ri.foundry.main.dataset.123"),
)

def read_xml(ctx, source_df, output):
    df = pd.DataFrame()
    filesystem = source_df.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
    for i in files:
        with open(i, 'r') as f:
            file = f.read() 
        soup = BeautifulSoup(file,'xml')
        data = []
        for e in soup.select('offer'):
            data.append({
            'meldezeitraum': e.find_previous('data').get('meldezeitraum'),
            'id':e.get('id'),
            'parent_id':e.get('parent_id'),
            })
        df = df.append(data)

    output.write_dataframe(sanitize_schema_for_parquet(df))

However, as soon as I add the last line:

output.write_dataframe(sanitize_schema_for_parquet(df))

I get this error:

Missing transform attribute

A DataFrame object does not have an attribute select. Please check the spelling and/or the datatype of the object.
/transforms-python/src/myproject/datasets/mydataset.py
    output.write_dataframe(sanitize_schema_for_parquet(df))

What am I doing wrong?

CodePudding user response:

You have to convert your pandas DataFrame to a spark DataFrame. Even though they have the same name those are two different object types in python.

The easiest way to do that is

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df_spark = spark.createDataFrame(df)

You can then pass the spark_df to the output.write_dataframe() function

  • Related