I'm using databricks runtime 10.0 with spark 3.2.0 and scala 2.12. I also have a dependency on io.confluent:kafka-schema-registry-client:6.2.0, from which I use CachedSchemaRegistryClient to register schemas in schema registry like this:
import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
@transient lazy val reg = new CachedSchemaRegistryClient(schemaRegistryUrl, schemaRegistryCacheCapacity, null)
def registerSchema(name: String, schema: Schema): Int = {
reg.register(name, new AvroSchema(schema))
}
Now, this works in Spark as expected:
val dataframe = ...
val schema = toAvroType(dataframe.schema)
schemaRegistry.registerSchema("some_name", schema)
display(dataframe
.select(struct(/*some fields*/).alias("body"))
.select(to_avro('body, lit("some_name"), schemaRegistryUrl).as("body")))
And I'm also able to deserialize. Now, as soon as I make the following change to specify correct schema name and namespace:
val schema = toAvroType(dataframe.schema, true, "some_name", "com.some.namespace")
Spark fails with
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 853.0 failed 4 times, most recent failure: Lost task 2.3 in stage 853.0 (TID 21433) (10.206.5.9 executor driver): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:272)
Even though in both cases, I'm able to see schema correctly registered in schema registry. Any idea why the second scenario doesn't work?
CodePudding user response:
These are the code lines(186-192), where exception is thrown
String requestUrl = buildRequestUrl(baseUrl, path);
try {
return sendHttpRequest(requestUrl, method, requestBodyData, requestProperties, responseFormat);
} catch (IOException e) {
baseUrls.fail(baseUrl);
if (i == n-1) throw e; // Raise the exception since we have no more urls to try
}
Line 191 is the line with comment
if (i == n-1) throw e;// Raise the exception since we have no more urls to try
So, Worker/Driver Node(s) is simply not able to connect to schema registry client.
Refer Spark 3.2.0 code here
CodePudding user response:
From https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html:
"If the default output schema of to_avro matches the schema of the target subject... Otherwise, you must provide the schema of the target subject in the to_avro function:"
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
So to_avro with 3 argument works because converted output Avro schema is of record type, the record name is topLevelRecord and there is no namespace by default.
With the specified name and namespace it no longer matches and so additionally schema itself need to be specified.