Home > Mobile >  Spark Kafka app, getting "CassandraCatalogException: Attempting to write to C* Table but miss
Spark Kafka app, getting "CassandraCatalogException: Attempting to write to C* Table but miss

Time:12-05

Run env

kafka ----ReadStream----> local ----WriteStream----> cassandra
source code place on local and kafka, local, writeStream is different IP \

table culumn is
col1 | col2 | col3 | col4 | col5 | col6 | col7

df.printSchema is

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

sorry, i try solve alone but can`t find solution.

Run Code

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,
com.datastax.spark:spark-cassandra-connector_2.12:3.2.0,
com.github.jnr:jnr-posix:3.1.15
--conf com.datastax.spark:spark.cassandra.connectiohost{cassandraIP},
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions test.py

Source Code

from pyspark.sql import SparkSession


# Spark Bridge local to spark_master == Connect master
spark = SparkSession.builder \
    .master("spark://{SparkMasterIP}:7077") \
    .appName("Spark_Streaming kafka cassandra") \
    .config('spark.cassandra.connection.host', '{cassandraIP}') \
    .config('spark.cassandra.connection.port', '9042') \
    .getOrCreate()

# Read Stream From {Topic} at BootStrap
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "{KafkaIP}:9092") \
    .option('startingOffsets','earliest') \
    .option('failOnDataLoss','False') \
    .option("subscribe", "{Topic}") \
    .load() \

df.printSchema()

# write Stream at cassandra
ds = df.writeStream \
    .trigger(processingTime='15 seconds') \
    .format("org.apache.spark.sql.cassandra") \
    .option("checkpointLocation","{checkpoint}") \
    .options(table='{table}',keyspace="{key}") \
    .outputMode('update') \
    .start()

ds.awaitTermination()

error

com.datastax.spark.connector.datasource.CassandraCatalogException: Attempting to write to C* Table but missing
primary key columns: [col1,col2,col3]
        at com.datastax.spark.connector.datasource.CassandraWriteBuilder.<init>(CassandraWriteBuilder.scala:44)
        at com.datastax.spark.connector.datasource.CassandraTable.newWriteBuilder(CassandraTable.scala:69)
        at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:590)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:140)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:59)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:295)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStr
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)

Traceback (most recent call last):
  File "/home/test.py", line 33, in <module>
    ds.awaitTermination()
  File "/venv/lib64/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 101, in awaitTe
  File "/venv/lib64/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1322, in
  File "/home/jeju/venv/lib64/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.StreamingQueryException: Attempting to write to C* Table but missing
primary key columns: [col1,col2,col3]
=== Streaming Query ===
Identifier: [id = d7da05f9-29a2-4597-a2c9-86a4ebfa65f2, runId = eea59c10-30fa-4939-8a30-03bd7c96b3f2]
Current Committed Offsets: {}
Current Available Offsets: {}

CodePudding user response:

Error says primary key columns: [col1,col2,col3] are missing. So df doesn't have these columns. You already have df.printSchema(). You can see it yourself that thats the case. df read from Kafka has a fixed schema and you can extract your data by parsing key and value columns. In my case data sent was in value column(if you need you can add key column as well) and json formatted. So i could read it by following code:

dfPerson = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "x.x.x.x") \
.option("subscribe", TOPIC) \
.option("startingOffsets", "latest") \
.load()\
.select(from_json(col("value").cast("string"), schema).alias("data"))\
.select("data.*")

Hope it helps.

  • Related