Hello everyone !
I'm trying to send my structured streaming dataframe to one of my kafka topics, detection
.
This is the schema of the structued streaming dataframe:
root
|-- timestamp: timestamp (nullable = true)
|-- Sigma: string (nullable = true)
|-- time: string (nullable = true)
|-- duration: string (nullable = true)
|-- SourceComputer: string (nullable = true)
|-- SourcePort: string (nullable = true)
|-- DestinationComputer: string (nullable = true)
|-- DestinationPort: string (nullable = false)
|-- protocol: string (nullable = true)
|-- packetCount: string (nullable = true)
|-- byteCount: string (nullable = true)
but then i try to send the dataframe, with this method:
dfwriter=df \
.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "/Documents/checkpoint/logs") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("failOnDataLoss", "false") \
.option("topic", detection) \
.start()
Then i got the error :
pyspark.sql.utils.AnalysisException: cannot resolve 'value
' given input columns: [DestinationComputer, DestinationPort, Sigma, SourceComputer, SourcePort, byteCount, duration, packetCount, processName, protocol, time, timestamp]; line 1 pos 5;
If i send a dataframe with juste the column value
it works, i receive the data on my kafka topic consumer.
Any idea to send my dataframe with all columns ?
Thank you !
CodePudding user response:
Your dataframe has no value
column, as the error says.
You'd need to "embed" all columns under a value
StructType
column, then use a function like to_json
, not CAST( .. AS STRING)
In Pyspark, that'd be something like struct(to_json(struct($"*")).as("value")
within a select query
Similar question - Convert all the columns of a spark dataframe into a json format and then include the json formatted data as a column in another/parent dataframe