Home > Net >  How to correctly interrupt multithreading with Kafka Producers?
How to correctly interrupt multithreading with Kafka Producers?

Time:09-22

I want to use multiple threads for sending events via Kafka's Consumer Java-API. The run-method of my thread looks like this:

    @Override
    public void run() {
        logger.info("Producer Thread started for: "   this.topic);
        ProducerRecord<byte[], byte[]> record;
        while (!thread.isInterrupted()) {
            try {
                Thread.sleep(10);
                record = new ProducerRecord<>(this.topic, new Payload());
                producer.send(record);
            } catch (InterruptedException e) {
                logger.info("KafkaTupleProducer with topic "   this.topic  " interrupted");
                producer.flush();
                producer.close();
            }
        }
        producer.flush();
        producer.close();
    }

However, when I externally set the thread.interrupt(), then I receive the following trace:

Exception in thread "source0" java.lang.IllegalStateException: Cannot perform operation after producer has been closed
    at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:872)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:881)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
    at kafka.KafkaTupleProducer.run(KafkaTupleProducer.java:82)
    at java.base/java.lang.Thread.run(Thread.java:829)

What am I doing wrong here? The producer is only closed when the thread is interrupted and then no events should send out anymore. Appreciate any help!

CodePudding user response:

When you interrupt a thread, only one thing happens. That thread's INTERRUPT flag is set. That's all it does. It doesn't abort the thread in any way.

However, many things in the java core library check that flag and act accordingly. But which things? Unfortunately, docs tend not to say. This stems from an original sin: Java is OS agnostic (or tries to be), and the interruptability of various OS interactions depends on the underlying OS; if java made any promises, that would mean a feature either cannot be delivered on an OS (so, no JVM for that entire OS then!) or would be extremely badly performing because the JVM needs to emulate a ton to deliver on the promise. Thus, java makes no promises either way. As a trivial example, any blocking read call on any file inputstream may, or may not, be interruptable; JVM gets to go either way on that.

With that caveat in mind:

The interrupt flag is designed to work as follows: The thread just does its thing until it hits any code anywhere that knows what to do when it is up. That code then deals with it, and lowers the flag. Notably, any visible aborting is dealing with it. As a consequence, various java baked in features such as Thread.sleep will lower that flag and it's best you work with the system in the same way.

  • Anything that is declared to throws InterruptedException reacts to the flag being raised by stopping the waiting as fast as it can, lowering the flag, and then throwing InterruptedException. This notably means that e.g. calling Thread.sleep(any amount here) when the flag is up will result in an instantaneous InterruptedException - no sleeping will even happen. And the flag will now be cleared.
  • The static method Thread.interrupted(), which is the one you are supposed to be using, will return true if the flag is set and lower it. Whereas myThread.isInterrupted() just returns true if it is set and does not lower it.
  • Any blocking core java method that is not declared to throws InterruptedException may completely ignore the flag and continue to block. Or, they do react to it. If they react to it (and on most OSes, they do! You can usually interrupt blocking calls for reading/writing the network or the file system, it's just not a guarantee), they will lower the flag, and throw some other exception. For file and network, generally IOException. This exception probably has an InterruptedException as cause and the .getMessage() generally indicates this is due to an interruption.

With that in mind, let's look at your code:

It is currently broken. In the unlikely, but possible, scenario that the interrupt occurs after the Thread.sleep(10) call, and in fact, after producer.send(record) or at least in the final parts of the send method where nothing would otherwise react to the flag being up, your code will get to the while clause, notice that the thread is interrupted, and escape. However, the flag is still up, and this thread is now in a very tricky state. You still end up calling flush and close on the producer, and given that the flag is up, the effects of this are undefined. It may or may not work, depending on the OS, architecture, and phase of the moon. Not a good idea. That flag needs to be down. So, use Thread.interrupted() instead, which will lower it.

That still doesn't fix all of it: record = new ProducerRecord<> calls code. I think that's your code? You'll need to check what that does. However, given that a large (and undefined!) amount of java core methods do react to that flag being up, it's unlikely that it'll just finish normally. Even if that doesn't cause issues, if the flag is set in between new ProducerRecord<> and producer.send, or is set whilst producer.send is running, the same principle applies.

This is in fact what happens here - the code inside KafkaProducer.send ends up calling another send, which calls a doSend, which calls something, and that something ends up checking the flag (and thus, lowering it, unless it is buggy), and aborting out. This abort issue is then caught by KafaProducer::doSend which decides to wrap the interrupt into an IllegalStateException (that's bad form, sounds like they just picked an arbitrary exception type that vaguely sounds correct and didn't read the javadoc of that exception - it isn't meant for this), and then throw that.

There's not much you can do about this, unless you fork kafka.

Generally interrupt() isn't something that just happens to you - even pressing CTRL C in the terminal or sending a SIGHUP signal or what not to a java process doesn't mean that the threads get their interrupt flag raised. The one and only way to raise the interrupt flag is to write it that way: By invoking the interrupt() method on a given thread object. The vast majority of libraries out there won't do that, because there are no set rules about what 'interrupting a thread' even means. Thus, presumably, you interrupted it. Thus, you know what you want this to mean.

Two options:

  • You want it to mean that the producer.send() method should stop right now, i.e. don't complete the send job at all, and abort as soon as possible. In which case, your code is basically working, you just need to write a workaround for Kafka's bad behaviour here: catch (IllegalStateException), then do some work to identify that it is in fact an interrupt and not some other problem that also caused an IStateEx to be thrown. For example, by checking the cause, and if that doesn't work (that means kafka made ANOTHER bad code decision - at some point you start doubting the quality of this library!), check the stack trace: Is the 'top' throwIfProducerClosed?
  • You want send to finish as normal. However, you then want the while loop to exit instead of to re-loop. In other words, finish send as normally, then flush-and-close and end the thread. In that case, do not use interrupt() - that's not what it is for, and it can't be salvaged: That flag will be up, you can't take it down whilst send is running, and send cannot complete whilst it is up, thus, a dead end. Make your own method instead. This method sets a flag (make a field of type AtomicBoolean for this), and your while loop checks this flag in its condition.

NB: Various tutorials and SO answers about catch (InterruptedException e) will advise you to re-raise the flag by interrupting your own thread. This is incorrect advice, stemming from a misunderstanding of what the interrupt system is for. Ignore such advice. The effect of doing that simply means the thread remains in an unstable state, where almost any further act of serious import that the thread ends up doing will fail in an unspecified, OS-dependent manner. This isn't, surely, what you intend to do. Deal with the interrupt. If that means exiting, then exit (and thus the state of the flag is of no consequence). If that means you've now dealt with it, then don't leave the flag up). Either way, 're-raise the flag' is incorrect.

  • Related