Home > Software engineering >  How to extract struct fields from binary column at Structured Streaming?
How to extract struct fields from binary column at Structured Streaming?

Time:07-26

I'd like receive streaming data from Kafka and extract one struct field from value column. But I cannot convert binary column to struct fields. How can I convert this value column to struct type?

My script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import from_avro

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "hoge:9092,fuga:9092") \
  .option("subscribe", "customer") \
  .option("startingOffsets", "earliest") \
  .option("includeHeaders", "true") \
  .load()

df = df.selectExpr("CAST(value AS STRING)")

query = df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .option("checkpointLocation", "s3://bucketname/tmp/checkpoint/") \
    .start()
query.awaitTermination()

Output

 ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|value                                                                                                                                                           |
 ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|Struct{firstname=Earl,ip=217.54.184.114,id=9425651,lastname=Haley}
 ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 

CodePudding user response:

It feels, there should be a neater way... But the following seems to work.

from pyspark.sql import functions as F
df = spark.createDataFrame([('Struct{firstname=Earl,ip=217.54.184.114,id=9425651,lastname=Haley}',)], ['value'])

df = df.withColumn(
    'value',
    F.from_json(
        F.expr("to_json(str_to_map(regexp_extract(value, '\\\\{(.*)\\\\}', 1), ',', '='))"),
        'struct<firstname:string,ip:string,id:string,lastname:string>'
    )
)

df.show(truncate=0)
#  -------------------------------------- 
# |value                                 |
#  -------------------------------------- 
# |{Earl, 217.54.184.114, 9425651, Haley}|
#  -------------------------------------- 
df.printSchema()
# root
#  |-- value: struct (nullable = true)
#  |    |-- firstname: string (nullable = true)
#  |    |-- ip: string (nullable = true)
#  |    |-- id: string (nullable = true)
#  |    |-- lastname: string (nullable = true)
  • Related