Sidekiq
We love using sidekiq as the job processor for our Push workers.
For ZeroPush, one of the features we wanted was for each app to have its own queue. This isn't strictly provided by sidekiq in general, because the sidekiq process needs to be started with a list of queues from which to process jobs.
One of the options in sidekiq is the fetch strategy. By default, it uses BasicFetch which fetches the list of queues to operate on. The problem is, BasicFetch only gets the list of queues once during instantiation. We needed the Fetcher to return the queues whenever a worker is ready to perform a job. In order to accomplish this, each sidekiq worker needs to query Redis for all available queues when it is ready to perform.
We use the following subclass of BasicFetch to solve the problem.
require 'sidekiq/fetch'
class DynamicFetch < Sidekiq::BasicFetch
def queues_cmd
queues = Sidekiq.redis { |conn| conn.smembers('queues') }
queues.map! { |q| "queue:#{q}" }
if queues.empty?
return super
else
@queues = queues
end
# for supporting strictly_ordered_queues options
@unique_queues = @queues.uniq
super
end
end
Next, we need to make sure sidekiq uses our new fetch strategy. In config/initializers/sidekiq.rb add the following setting.
Sidekiq.options.merge!({
fetch: DynamicFetch
})
Lastly, we need to enqueue jobs from our models like this:
def enqueue(queue_name)
Sidekiq::Client.push('queue' => queue_name, 'class' => Worker, 'args' => [self.id])
end
Now, each time a worker tries to fetch a job, it calls into Redis and computes a new list of queues potentially containing work.