I'd like receive streaming data from Kafka and extract one struct field from value column. But I cannot convert binary column to struct fields. How can I convert this value column to struct type?
My script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import from_avro
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "hoge:9092,fuga:9092") \
.option("subscribe", "customer") \
.option("startingOffsets", "earliest") \
.option("includeHeaders", "true") \
.load()
df = df.selectExpr("CAST(value AS STRING)")
query = df \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.option("checkpointLocation", "s3://bucketname/tmp/checkpoint/") \
.start()
query.awaitTermination()
Output
----------------------------------------------------------------------------------------------------------------------------------------------------------------
|value |
----------------------------------------------------------------------------------------------------------------------------------------------------------------
|Struct{firstname=Earl,ip=217.54.184.114,id=9425651,lastname=Haley}
----------------------------------------------------------------------------------------------------------------------------------------------------------------
CodePudding user response:
It feels, there should be a neater way... But the following seems to work.
from pyspark.sql import functions as F
df = spark.createDataFrame([('Struct{firstname=Earl,ip=217.54.184.114,id=9425651,lastname=Haley}',)], ['value'])
df = df.withColumn(
'value',
F.from_json(
F.expr("to_json(str_to_map(regexp_extract(value, '\\\\{(.*)\\\\}', 1), ',', '='))"),
'struct<firstname:string,ip:string,id:string,lastname:string>'
)
)
df.show(truncate=0)
# --------------------------------------
# |value |
# --------------------------------------
# |{Earl, 217.54.184.114, 9425651, Haley}|
# --------------------------------------
df.printSchema()
# root
# |-- value: struct (nullable = true)
# | |-- firstname: string (nullable = true)
# | |-- ip: string (nullable = true)
# | |-- id: string (nullable = true)
# | |-- lastname: string (nullable = true)