Dynamic Queues in Sidekiq
Published 2013-11-18 by Stefan Natchev

Sidekiq

We love using sidekiq as the job processor for our Push workers.

For ZeroPush, one of the features we wanted was for each app to have its own queue. This isn't strictly provided by sidekiq in general, because the sidekiq process needs to be started with a list of queues from which to process jobs.

One of the options in sidekiq is the fetch strategy. By default, it uses BasicFetch which fetches the list of queues to operate on. The problem is, BasicFetch only gets the list of queues once during instantiation. We needed the Fetcher to return the queues whenever a worker is ready to perform a job. In order to accomplish this, each sidekiq worker needs to query Redis for all available queues when it is ready to perform.

We use the following subclass of BasicFetch to solve the problem.

require 'sidekiq/fetch'

class DynamicFetch < Sidekiq::BasicFetch
  def queues_cmd
    queues = Sidekiq.redis { |conn| conn.smembers('queues') }
    queues.map! { |q| "queue:#{q}" }

    if queues.empty?
      return super
    else
      @queues = queues
    end

    # for supporting strictly_ordered_queues options
    @unique_queues = @queues.uniq
    super
  end
end

Next, we need to make sure sidekiq uses our new fetch strategy. In config/initializers/sidekiq.rb add the following setting.

Sidekiq.options.merge!({
  fetch: DynamicFetch
})

Lastly, we need to enqueue jobs from our models like this:

def enqueue(queue_name)
  Sidekiq::Client.push('queue' => queue_name, 'class' => Worker, 'args' => [self.id])
end

Now, each time a worker tries to fetch a job, it calls into Redis and computes a new list of queues potentially containing work.