am trying to read data from Azure event hub and store this dataframe to Mysql table in spark streaming mode.
below is the my pyspark code
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from datetime import datetime as dt
from pyspark.sql import DataFrameWriter
try:
session = SparkSession.builder.master("local").appName("dataingestion").config("")
spark = session.getOrCreate()
print("Successfully build spark session : ")
except:
print("Fail to build spark session : ")
raise Exception
startOffset = "-1"
startingEventPosition = {
"offset": startOffset,
"seqNo": -1, # not in use
"enqueuedTime": None, # not in use
"isInclusive": True,
}
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
endingEventPosition = {
"offset": None, # not in use
"seqNo": -1, # not in use
"enqueuedTime": endTime,
"isInclusive": True
}
ehreadConf = {}
ehreadConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehreadConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)
connectionString = "eventhub-connection-string"
ehreadConf['eventhubs.connectionString'] = connectionString
try:
inputStream = spark.readStream.format("eventhubs").options(**ehreadConf).load()
print("Successfully connected the event hub : ")
print("Check streaming is started or not : ", inputStream.isStreaming)
print("Schema of inputStream : ", inputStream.printSchema())
except Exception:
print("Fail to connect with Azure event hub : ")
raise Exception
inputStream = inputStream.withColumn("body", inputStream["body"].cast("string"))
server_name = "jdbc:mysql://localhost:3306"
database_name = "eventhub"
jdbcurl = server_name "/" database_name
print('%' * 100)
print(jdbcurl)
table_name = "stream_cdr_data"
username = "user"
password = "data@123"
try:
print("Trying to connect MySql sql : ")
sparkDf.writeStream \
.format("jdbc") \
.outputMode("append") \
.option("url", jdbcurl) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.option("driver", "com.mysql.jdbc.Driver") \
.option("checkpointLocation", "./checkpoint") \
.start().awaitTermination(True)
print("Connection to the MySql is successful : ")
except ValueError as error:
print("Connector write failed", error)
spark.sparkContext.stop()
spark.stop()
but am unable to store this spark dataframe into mysql table. am getting an error like data source jdbc dose not support spark streaming.
py4j.protocol.Py4JJavaError: An error occurred while calling o68.start. : java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing>
CodePudding user response:
Like the error notes, support for writing from a stream to a JDBC sink is not present in Spark Yet (I guess).
Instead of writing stream directly into mysq, you can do a forEachBatch in the streamDf and do the write.jdbc
operation.
server_name = "jdbc:mysql://localhost:3306"
database_name = "eventhub"
jdbcurl = server_name "/" database_name
table_name = "stream_cdr_data"
db_properties = {"user":""user"", "password":"data@123"}
def write_to_mysql(df, epoch_id):
dfwriter = df.write.mode("append")
dfwriter.jdbc(url=jdbcurl, table=table_name, properties=db_properties) # if this is not working use below
#df.write.jdbc(url=jdbcurl, table=table_name, properties=db_properties, mode="append")
pass
query = sparkDf.writeStream.outputMode("append").foreachBatch(write_to_mysql).start()
query.awaitTermination()