my lambda function triggers glue job by boto3 glue.start_job_run
and here is my glue job script
from awsglue.utils import getResolvedOptions
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import col, regexp_extract, max
conf = SparkConf().setAppName("pyspark-etl")
sc = SparkContext.getOrCreate(conf=conf)
args = getResolvedOptions(sys.argv,['s3_target_path_key','s3_target_path_bucket'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']
inputFilePath = f"s3a://{bucket}/{fileName}"
finalFilePath = f"s3a://glu-job-final-juiceb"
print(bucket, fileName)
rdd = sc.textFile(inputFilePath)
rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x : (x.split(" ")[0], 1)).reduceByKey(add)
df = rdd.toDF(schema=('rawEntities string, Count int'))
df = df.withColumn("Entities", regexp_extract(col("rawEntities"),'[^!".?@:,\'*…_()] ',0))
df = df.filter(col("Entities") != "")
df = df.select("Entities","Count").groupBy("Entities").agg(max("Count").alias("Count"))
df.write.mode("append").options(header='True').parquet(finalFilePath)
The Glue job error message is "AttributeError: 'PipelinedRDD' object has no attribute 'toDF'
After googling, I noticed that in glue "toDF" means DynamicFrame to DataFrame.
It's not meaning RDD to DataFrame.
How can I convert RDD to DataFrame In glue?
CodePudding user response:
You can't define schema types using toDF()
. By using toDF()
method, we don't have the control over schema customization. Having said that, using createDataFrame()
method we have complete control over the schema customization.
See below logic -
from pyspark.sql.types import *
schema = StructType([ StructField('rawEntities', StringType()), StructField('Count' , IntegerType())])
df = spark.createDataFrame(data=<your rdd>, schema = schema)