In palantir foundry, I am trying to read all xml files from a dataset. Then, in a for loop, I parse the xml files.
Until the second last line, the code runs fine without errors.
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
from bs4 import BeautifulSoup
import pandas as pd
import lxml
@transform(
output=Output("/Spring/xx/datasets/mydataset2"),
source_df=Input("ri.foundry.main.dataset.123"),
)
def read_xml(ctx, source_df, output):
df = pd.DataFrame()
filesystem = source_df.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
for i in files:
with open(i, 'r') as f:
file = f.read()
soup = BeautifulSoup(file,'xml')
data = []
for e in soup.select('offer'):
data.append({
'meldezeitraum': e.find_previous('data').get('meldezeitraum'),
'id':e.get('id'),
'parent_id':e.get('parent_id'),
})
df = df.append(data)
output.write_dataframe(sanitize_schema_for_parquet(df))
However, as soon as I add the last line:
output.write_dataframe(sanitize_schema_for_parquet(df))
I get this error:
Missing transform attribute
A DataFrame object does not have an attribute select. Please check the spelling and/or the datatype of the object.
/transforms-python/src/myproject/datasets/mydataset.py
output.write_dataframe(sanitize_schema_for_parquet(df))
What am I doing wrong?
CodePudding user response:
You have to convert your pandas DataFrame to a spark DataFrame. Even though they have the same name those are two different object types in python.
The easiest way to do that is
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_spark = spark.createDataFrame(df)
You can then pass the spark_df
to the output.write_dataframe()
function