Env : Azure Databricks Cluster : 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)
I have pandas_udf, its working for 4 rows, but I tried with more than 4 rows getting below error.
PythonException: 'RuntimeError: The length of output in Scalar iterator pandas UDF should be the same with the input's; however, the length of output was 1 and the length of input was 2.'.
Please find below code
data =[{"inputData":"<html>Tanuj is older than Eina. Chetan is older than Tanuj. Eina is older than Chetan. If the first 2 statements are true, the 3rd statement is"},{"inputData":"<html>Pens cost more than pencils. Pens cost less than eraser. Erasers cost more than pencils and pens. If the first two statements are true, the third statement is"},{"inputData":"<html>If we have a tree of n nodes, how many edges will it have?"}, {"inputData":"<div>Which of the following data structures can handle updates and queries in log(n) time on an array?"}]
df = spark.createDataFrame(data)
# removing HTML tags from the input text
@pandas_udf(StringType())
def clean_html(raw_htmls: Iterator[pd.Series]) -> Iterator[pd.Series]:
pd.set_option('display.max_colwidth', 10000)
for raw_html in raw_htmls:
cleanr_regx = re.compile("<.*?>|&([a-z0-9] |#0-9{1,6}|#x[0-9a-f]{1,6});")
cleantext = re.sub(cleanr_regx, " ", raw_html.to_string(index=False))
cleantext = re.sub(" ", " ", cleantext)
yield pd.Series(cleantext)
df = df.withColumn("Question",clean_html("inputData"))
display(df)
Its working fine. But if I add one more row to data, getting above mentioned error.
data =[{"inputData":"<div>Look at this series: 36, 34, 30, 28, 24, … What number should come next?"},{"inputData":"<html>Tanuj is older than Eina. Chetan is older than Tanuj. Eina is older than Chetan. If the first 2 statements are true, the 3rd statement is"},{"inputData":"<html>Pens cost more than pencils. Pens cost less than eraser. Erasers cost more than pencils and pens. If the first two statements are true, the third statement is"},{"inputData":"<html>If we have a tree of n nodes, how many edges will it have?"}, {"inputData":"<div>Which of the following data structures can handle updates and queries in log(n) time on an array?"}]
In my project am reading data from json file, there is also same issue, if its 1 row its working, but more than 1 am getting same ,
Any one please helps me, am stuck for a week with same error.
CodePudding user response:
You can achieve the same requirement using the following code using regex_replace
function.
from pyspark.sql.functions import regexp_replace
final = df.select("inputData", regexp_replace("inputData", "<.*?>|&([a-z0-9] |#0-9{1,6}|#x[0-9a-f]{1,6});", "").alias('Question'))
display(final)
- To make the Pandas UDF work, as an alternative, processing each row separately worked for me. Try using the following code (tried for 100 rows):
from pyspark.sql.functions import lit
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
demo = df.withColumn('tp',lit(1))
#demo.show()
demo = demo.withColumn("rn", row_number().over(Window.partitionBy("tp").orderBy("tp")))
#demo.show()
dfs = []
for i in range(1,df.count() 1):
if(i==1):
final = demo.filter(demo['rn']==i).withColumn("Question",clean_html(col('inputData')))
elif(i>1):
#print(i)
final = final.union(demo.filter(demo['rn']==i).withColumn("Question",clean_html(col('inputData'))))
final.show(100)