Im using Spring Kafka and wrote Producer Class
@Component
@RequiredArgsConstructor
class Producer {
private static final String TOPIC = "channels";
private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@EventListener(ApplicationStartedEvent.class)
public void channels_01() throws IOException {
}
@EventListener(ApplicationStartedEvent.class)
public void channels_02()throws IOException {
}
@EventListener(ApplicationStartedEvent.class)
public void channels_03()throws IOException {
}
}
Is it possible to run 3 @Eventlistener annotated methods as producer simutaneouly? 3 methods are sending records to exactly same topic. Will spring container and kafka server recognize them as 3 separate producer clients?
CodePudding user response:
If you want to have three producer clients, then you need to have three KafkaTemplate
and use a producerPerThread = true
option of the DefaultKafkaProducerFactory
:
/**
* Set to true to create a producer per thread instead of singleton that is shared by
* all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
* physically close the producer when it is no longer needed. These producers will not
* be closed by {@link #destroy()} or {@link #reset()}.
* @param producerPerThread true for a producer per thread.
* @since 2.3
* @see #closeThreadBoundProducer()
*/
public void setProducerPerThread(boolean producerPerThread) {
Then you need to ensure that ApplicationEventMulticaster
is asynchronous. See SimpleApplicationEventMulticaster
:
/**
* Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
* to invoke each listener with.
* <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
* executing all listeners synchronously in the calling thread.
* <p>Consider specifying an asynchronous task executor here to not block the
* caller until all listeners have been executed. However, note that asynchronous
* execution will not participate in the caller's thread context (class loader,
* transaction association) unless the TaskExecutor explicitly supports this.
* @see org.springframework.core.task.SyncTaskExecutor
* @see org.springframework.core.task.SimpleAsyncTaskExecutor
*/
public void setTaskExecutor(@Nullable Executor taskExecutor) {
Register a bean with name applicationEventMulticaster
for it and set a desired TaskExecutor
to ensure that your @EventListener
methods are called in parallel.