I am running a spark scala job on GCP DataProc cluster. After processing data, I need to publish messages to PubSub topic but i'm getting an error as mentioned below.
No functional channel service provider found. Try adding a dependency on the grpc-okhttp, grpc-netty, or grpc-netty-shaded artifact
Everything works fine till spark processing. As soon as i publish message to PubSub, I get this error. Look at the code...
Try {
val topicName = TopicName.of(projectName, pubSubTopicName)
val scope = new ArrayList[String]()
scope.add("https://www.googleapis.com/auth/pubsub")
val googleCredentials = GoogleCredentials
.fromStream(getClass.getResourceAsStream("file path")
.createScoped(scope)
val batchingSettings = BatchingSettings
.newBuilder()
.setElementCountThreshold(elementCountThreshold)
.setRequestByteThreshold(requestByteThreshold)
.setDelayThreshold(delayDuration)
.build()
val publisher = getPublisher(
topicName,
batchingSettings,
googleCredentials
)
val publishedData: MutableList[String] = MutableList()
for (pubMessage <- dataToBePublished) {
val pubSubMessage =
getPubSubMessage(
ByteString.copyFromUtf8(pubMessage)
)
val messageIdFuture = publisher.publish(pubSubMessage)
publishedData. =(messageIdFuture.get)
}
}
def getPublisher(
topicName: TopicName,
batchingSettings: BatchingSettings,
googleCredentials: GoogleCredentials
): Publisher = {
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setBatchingSettings(batchingSettings)
.build()
}
def getPubSubMessage( data: ByteString ): PubsubMessage = {
PubsubMessage
.newBuilder()
.setData(data)
.build()
}
As it shows channel error, i tried the below change in Publisher but same error
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setChannelProvider(
TopicAdminSettings
.defaultGrpcTransportProviderBuilder()
.build()
)
.build()
I also tried to add dependencies in sbt but still same error
"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"
All three suggested dependencies are there in libraries, still error.
Please help for this issue, thanks in advance.
CodePudding user response:
So the issue is in assembling fat jar because of pubsub library.
Here are the changes required in build.sbt
- Add dependency of grpc-netty only.
"io.grpc" % "grpc-netty" % "1.49.2"
- change merge strategy in jar assembling.
assemblyShadeRules in assembly := Seq( ShadeRule .rename("com.google.common." -> "repackaged.com.google.common.@1") .inAll, ShadeRule .rename("com.google.protobuf." -> "repackaged.com.google.protobuf.@1") .inAll )
- Also, shade (repackage) some of com.google's libraries which cause issue at runtime in publisher creation.
assemblyMergeStrategy in assembly := { case x if Assembly.isConfigFile(x) => MergeStrategy.concat case PathList(ps @ _) if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) => MergeStrategy.rename case PathList("META-INF", xs @ _) => (xs map { _.toLowerCase }) match { case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") => MergeStrategy.discard case "plexus" :: xs => MergeStrategy.discard case "services" :: xs => MergeStrategy.filterDistinctLines case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) => MergeStrategy.filterDistinctLines case _ => MergeStrategy.first } case _ => MergeStrategy.first }
This will work without runtime error.