Home > Enterprise >  Error in publishing data to pubSub from dataProc Spark job: No functional channel service provider f
Error in publishing data to pubSub from dataProc Spark job: No functional channel service provider f

Time:10-13

I am running a spark scala job on GCP DataProc cluster. After processing data, I need to publish messages to PubSub topic but i'm getting an error as mentioned below.

No functional channel service provider found. Try adding a dependency on the grpc-okhttp, grpc-netty, or grpc-netty-shaded artifact

Everything works fine till spark processing. As soon as i publish message to PubSub, I get this error. Look at the code...

Try {

  val topicName = TopicName.of(projectName, pubSubTopicName)

  val scope = new ArrayList[String]()
  scope.add("https://www.googleapis.com/auth/pubsub")

  val googleCredentials = GoogleCredentials
        .fromStream(getClass.getResourceAsStream("file path")
        .createScoped(scope)

  val batchingSettings = BatchingSettings
  .newBuilder()
  .setElementCountThreshold(elementCountThreshold)
  .setRequestByteThreshold(requestByteThreshold)
  .setDelayThreshold(delayDuration)
  .build()

  val publisher = getPublisher(
    topicName,
    batchingSettings,
    googleCredentials
  )

  val publishedData: MutableList[String] = MutableList()

  for (pubMessage <- dataToBePublished) {
    val pubSubMessage =
      getPubSubMessage(
        ByteString.copyFromUtf8(pubMessage)
      )

    val messageIdFuture = publisher.publish(pubSubMessage)

    publishedData. =(messageIdFuture.get)
  }
}

def getPublisher(
      topicName: TopicName,
      batchingSettings: BatchingSettings,
      googleCredentials: GoogleCredentials
  ): Publisher = {

Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setBatchingSettings(batchingSettings)
  .build()

}

def getPubSubMessage( data: ByteString ): PubsubMessage = {

PubsubMessage
  .newBuilder()
  .setData(data)
  .build()

}

As it shows channel error, i tried the below change in Publisher but same error

    Publisher
  .newBuilder(topicName)
  .setCredentialsProvider(
    FixedCredentialsProvider.create(googleCredentials)
  )
  .setChannelProvider(
    TopicAdminSettings
      .defaultGrpcTransportProviderBuilder()
      .build()
  )
  .build()

I also tried to add dependencies in sbt but still same error

"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"

All three suggested dependencies are there in libraries, still error.

Please help for this issue, thanks in advance.

CodePudding user response:

So the issue is in assembling fat jar because of pubsub library.

Here are the changes required in build.sbt

  • Add dependency of grpc-netty only.

"io.grpc" % "grpc-netty" % "1.49.2"

  • change merge strategy in jar assembling.

assemblyShadeRules in assembly := Seq( ShadeRule .rename("com.google.common." -> "repackaged.com.google.common.@1") .inAll, ShadeRule .rename("com.google.protobuf." -> "repackaged.com.google.protobuf.@1") .inAll )

  • Also, shade (repackage) some of com.google's libraries which cause issue at runtime in publisher creation.

assemblyMergeStrategy in assembly := { case x if Assembly.isConfigFile(x) => MergeStrategy.concat case PathList(ps @ _) if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) => MergeStrategy.rename case PathList("META-INF", xs @ _) => (xs map { _.toLowerCase }) match { case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") => MergeStrategy.discard case "plexus" :: xs => MergeStrategy.discard case "services" :: xs => MergeStrategy.filterDistinctLines case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) => MergeStrategy.filterDistinctLines case _ => MergeStrategy.first } case _ => MergeStrategy.first }

This will work without runtime error.

  • Related