Home > OS >  Possible to add a delay in spring integration DSL?
Possible to add a delay in spring integration DSL?

Time:08-27

I have the following adapter:

@Component
class MyCustomFlow : IntegrationFlowAdapter() {

    fun singleThreadTaskExecutor(): TaskExecutor {
        val executor = ThreadPoolTaskExecutor()
        executor.maxPoolSize = 1
        executor.initialize()
        return executor
    }

    @Filter
    fun filter(data: SomeData): Boolean = ...

    @Transformer
    fun transform(customer: Data): Message<SomeData> {
      ....
    }

    @ServiceActivator
    fun handle(data: Data): SomeData {
       ....
    }

    @Bean(name = [PollerMetadata.DEFAULT_POLLER])
    fun poller(): PollerSpec? {
        return Pollers.fixedRate(500)
    }

    @Bean
    override fun buildFlow(): IntegrationFlowDefinition<*> {
        return from(MessageChannels.queue("updateCustomersLocation"))
            .channel(MessageChannels.executor(singleThreadTaskExecutor()))
            .split()
            .filter(this)
            .transform(this)
            .handle(this)
            .channel("customerLocationFetched")
    }
}

And if I understand the Java DSL, the handle is:

handle → ServiceActivator

Given a ServiceActivator, outside of the IntegrationFlowAdapter, I have an option to define a poller. Within that poller I could add a delay before the next messages is processed.

    @ServiceActivator(
        poller = [Poller(fixedDelay = "3000", maxMessagesPerPoll = "1", fixedRate = "3000")]
    )

Would it be possible, within the adapter, to add a delay for the ServiceActivator (the method named handle, similar to the logic I get if I add another channel.

    @Bean
    override fun buildFlow(): IntegrationFlowDefinition<*> {
        return from(MessageChannels.queue("updateCustomersLocation"))
            .channel(MessageChannels.executor(singleThreadTaskExecutor()))
            .split()
            .filter(this)
            .transform(this)
            .channel("myNewChannel")
            //.handle(this)
            //.channel("customerLocationFetched")
    }

And then from outside the adapter I could just define a new ServiceActivator:

    @ServiceActivator(
        inputChannel = "myNewChannel"
        poller = [Poller(fixedDelay = "3000", maxMessagesPerPoll = "1", fixedRate = "3000")]
    )

The reason I want a delay between the messages in the handle/service activator is that the method sends requests and I wanna control the rate the request are sent.

CodePudding user response:

First of all the buildFlow() method must not be marked with @Bean : the IntegrationFlowAdapter does the trick to register everything.

You can add poller to the endpoint spec. See a second arg of that handle(). Only the point that input channel for this polling endpoint must be pollable - the QueueChannel exists out-of-the-box and you can place it just before your handle().

  • Related