Home > Mobile >  Kafka Scala: How to move val into try catch block
Kafka Scala: How to move val into try catch block

Time:05-22

I'd like to move:

val kafkaPartitionOffset = kafkaConsumer.endOffsets(consumedPartitions.asJava)

into a try catch block like so:

val kafkaPartitionOffset : SomeClass = 
            try {
               kafkaConsumer.endOffsets(consumedPartitions.asJava)
            } catch {
              case e: Exception => {
                log.error(s"${consumerGroupId} Could not get Kafka offset", e)
                None
              }
            }

But I'm having trouble on what the SomeClass should be. I've tried Map[TopicPartition, Long] but it says Type mismatch. Any help is appreciated, thank you!

Update: I've also tried Any but I'm unable to do a kafkaPartitionOffset.get(topicPartition) below (get is highlighted red with error message cannot resolve symbol get:

        for((topicPartition,OffsetAndMetadata) <- mapTopicPartitionOffset){
          
          val bbCurrentOffset =  OffsetAndMetadata.get(topicPartition)

          // latest offset
          val partitionLatestOffset = kafkaPartitionOffset.get(topicPartition)

          // Log for a particular partition
          val delta = partitionLatestOffset - bbCurrentOffset

          topicOffsetList  = delta.abs

        }

CodePudding user response:

Take a look at this:

val x = try { 
  throw new RuntimeException("runtime ex")
  "some string"
} catch { case _: RuntimeException => 2 }

The compiler needs to know the type of x before runtime, since x can be used somewhere else in your code, right? So the compiler says:

"Hmm, what is a type that this literal "some string" is of that type, and also, literal 2 is of that type?"

So it looks for the lowest super-type of both String and Int, which is Any in this case! so val x: Any = .... Now I'm not aware of what this expression kafkaConsumer.endOffsets(...) returns, in case it returns Option[T], then SomeClass would also be Option[T] since you're returning None in the catch block, and if not, do not use None there just because nothing else would fit, there are better approaches to exception handling.
But anyways, Scala provides some utility types to avoid this kind of try catch as much as possible, I would recommend you to use Try in this case.

val kafkaPartitionOffset: Try[Whatever-endOffsets-returns] = 
  Try(kafkaConsumer.endOffsets(consumedPartitions.asJava))

By the way, the title of the question doesn't match the actual question, please consider changing the title :)

  • Related