I am new to spark's structured streaming and working on a poc that needs to be implemented on structured streaming.
input source : kafka input format: json language: python3 library: spark 3.2
I am trying to format incoming json in spark dataframe of a predefined structure.
So far I am able to fetch json events and able to get the results in console (not in expected format). It will be very helpful if you could nudge me in right direction or suggest a solution.
Below is my code so far.
json from kafka
{"property1" : "hello","property2" : "world"}
structured_kafka.py
"""
Run the script
`$ bin/spark-submit structured_kafka.py \
host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField("property1", StringType(), True),
StructField("property2" , StringType(), True),
])
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
df = lines.select('*')
# Start running the query that prints the running counts to the console
query = df\
.writeStream\
.outputMode('Append')\
.format('console')\
.start()
query.awaitTermination()
output
Batch: 1
-------------------------------------------
--------------------
| parsed_value|
--------------------
|{hello, world} |
--------------------
expected
-------------------- --------------------
| property1 | property2 |
-------------------- --------------------
|hello |world |
-------------------- ---------------------
If I could get the df in this format , I will be able to apply my usecase.
Kindly suggest.
note: I have looked all existing solutions, most of the solutions are either in scala or not for structured streaming or not for kafka as source.
CodePudding user response:
After line:
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
add:
.select(col("parsed_value.property1"), col("parsed_value.property2"))
or:
.select(col("parsed_value.*"))