I want to use the ruby redis client to subscribe to a channel. When subscription was successful I want to start an ActiveJob (Sidekiq is used) job using perform_later
. This Job will then do some work and produce a result which will be published to the channel
Redis channel.
This produces the following error:
Processing by Api::LocationController#yyy as */*
Parameters: {"zip_code"=>"503400"}
[ActiveJob] Failed enqueuing XXXJob to Sidekiq(default): NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=#<Redis::Connection::TCPSocket:fd 26>>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>)
Completed 500 Internal Server Error in 26ms (Allocations: 2374)
NoMethodError (undefined method `call_pipeline' for #<Redis::SubscribedClient:0x00007fd9222cdf78 @client=#<Redis::Client:0x00007fd921e1b890 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}, @reconnect=true, @logger=nil, @connection=#<Redis::Connection::Ruby:0x00007fd921df08c0 @sock=nil>, @command_map={}, @pending_reads=-3, @connector=#<Redis::Client::Connector:0x00007fd921e1a2d8 @options={:url=>"redis://localhost?db=xxx", :scheme=>"redis", :host=>"localhost", :port=>6379, :path=>nil, :read_timeout=>5.0, :write_timeout=>5.0, :connect_timeout=>5.0, :timeout=>5.0, :username=>nil, :password=>nil, :db=>0, :driver=>Redis::Connection::Ruby, :id=>nil, :tcp_keepalive=>0, :reconnect_attempts=>1, :reconnect_delay=>0.0, :reconnect_delay_max=>0.5, :inherit_socket=>false, :logger=>nil, :sentinels=>nil, :role=>:master, :_parsed=>true}>, @pid=54943>>):
app/controllers/api/location_controller.rb:10:in `block (3 levels) in yyy'
app/controllers/api/location_controller.rb:8:in `block in yyy'
app/controllers/api/location_controller.rb:5:in `yyy'
The yyy
function is part of a controller that gets called on a GET
request:
def yyy
REDIS_POOL.with { |r|
data = r.get("key")
if data.nil?
r.subscribe("channel") { |on|
on.subscribe {
XXXJob.perform_later(params)
}
on.message { |_, message|
puts message
render json: message
r.unsubscribe("channel")
}
}
else
render json: data
end
}
end
It seems to me like there's some context being lost from the Redis subscribe
block which is important for the perform_later
function. The same code works if I move the XXXJob.perform_later(params)
call above the r.subscribe("channel")
call and thus outside of the block. I however want to make sure that the job is only started after the subscription to the Redis channel was successful. Is there any way to achieve this?
CodePudding user response:
It's difficult. It will not work because Sidekiq's default connection pool always reuses the thread's current connection, which is handling the subscribe call and that call is not reentrant. You need to manually create a separate connection pool and use it directly.
ANOTHER_POOL = ConnectionPool.new { Redis.new }
on.subscribe do
Sidekiq::Client.via(POOL) do
SomeWorker.perform_async(1,2,3)
SomeOtherWorker.perform_async(1,2,3)
end
end