Home > Mobile >  ActiveJob perform_later from inside Redis subscription block
ActiveJob perform_later from inside Redis subscription block

Time:10-27

I want to use the ruby redis client to subscribe to a channel. When subscription was successful I want to start an ActiveJob (Sidekiq is used) job using perform_later. This Job will then do some work and produce a result which will be published to the channel Redis channel. This produces the following error:

Processing by Api::LocationController#yyy as */*
  Parameters: {"zip_code"=>"503400"}
[ActiveJob] Failed enqueuing XXXJob to Sidekiq(default): NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=#<Redis::Connection::TCPSocket:fd 26>>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>)
Completed 500 Internal Server Error in 26ms (Allocations: 2374)


  
NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=nil>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>):

  
app/controllers/api/location_controller.rb:10:in `block (3 levels) in yyy'
app/controllers/api/location_controller.rb:8:in `block in yyy'
app/controllers/api/location_controller.rb:5:in `yyy'

The yyy function is part of a controller that gets called on a GET request:

  def yyy
    REDIS_POOL.with { |r|
      data = r.get("key")
      if data.nil?
        r.subscribe("channel") { |on|
          on.subscribe {
            XXXJob.perform_later(params)
          }
          on.message { |_, message|
            puts message
            render json: message
            r.unsubscribe("channel")
          }
        }
      else
        render json: data
      end
    }
  end

It seems to me like there's some context being lost from the Redis subscribe block which is important for the perform_later function. The same code works if I move the XXXJob.perform_later(params) call above the r.subscribe("channel") call and thus outside of the block. I however want to make sure that the job is only started after the subscription to the Redis channel was successful. Is there any way to achieve this?

CodePudding user response:

It's difficult. It will not work because Sidekiq's default connection pool always reuses the thread's current connection, which is handling the subscribe call and that call is not reentrant. You need to manually create a separate connection pool and use it directly.

ANOTHER_POOL = ConnectionPool.new { Redis.new }

on.subscribe do
  Sidekiq::Client.via(POOL) do
    SomeWorker.perform_async(1,2,3)
    SomeOtherWorker.perform_async(1,2,3)
  end
end
  • Related