Home > Software design >  Databricks to_avro works only if schema is registered without specified event name and namespace
Databricks to_avro works only if schema is registered without specified event name and namespace

Time:03-12

I'm using databricks runtime 10.0 with spark 3.2.0 and scala 2.12. I also have a dependency on io.confluent:kafka-schema-registry-client:6.2.0, from which I use CachedSchemaRegistryClient to register schemas in schema registry like this:


import org.apache.avro.Schema
import io.confluent.kafka.schemaregistry.avro.AvroSchema

@transient lazy val reg = new CachedSchemaRegistryClient(schemaRegistryUrl, schemaRegistryCacheCapacity, null)

  def registerSchema(name: String, schema: Schema): Int = {
    reg.register(name, new AvroSchema(schema))
  }

Now, this works in Spark as expected:

val dataframe = ...
val schema = toAvroType(dataframe.schema)
schemaRegistry.registerSchema("some_name", schema)

          

display(dataframe
       .select(struct(/*some fields*/).alias("body"))
       .select(to_avro('body, lit("some_name"), schemaRegistryUrl).as("body")))

And I'm also able to deserialize. Now, as soon as I make the following change to specify correct schema name and namespace:

val schema = toAvroType(dataframe.schema, true, "some_name", "com.some.namespace")

Spark fails with

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 853.0 failed 4 times, most recent failure: Lost task 2.3 in stage 853.0 (TID 21433) (10.206.5.9 executor driver): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
    at org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:272)

Even though in both cases, I'm able to see schema correctly registered in schema registry. Any idea why the second scenario doesn't work?

CodePudding user response:

These are the code lines(186-192), where exception is thrown

String requestUrl = buildRequestUrl(baseUrl, path);
  try {
    return sendHttpRequest(requestUrl, method, requestBodyData, requestProperties, responseFormat);
  } catch (IOException e) {
    baseUrls.fail(baseUrl);
    if (i == n-1) throw e; // Raise the exception since we have no more urls to try
  }

Line 191 is the line with comment

if (i == n-1) throw e;// Raise the exception since we have no more urls to try

So, Worker/Driver Node(s) is simply not able to connect to schema registry client.

Refer Spark 3.2.0 code here

CodePudding user response:

From https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html:

"If the default output schema of to_avro matches the schema of the target subject... Otherwise, you must provide the schema of the target subject in the to_avro function:"

 to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))

So to_avro with 3 argument works because converted output Avro schema is of record type, the record name is topLevelRecord and there is no namespace by default.

With the specified name and namespace it no longer matches and so additionally schema itself need to be specified.

  • Related