Escrevendo um sistema de filas no Redis

Nosso projeto atual requer uma fila de tarefas para lidar com o processamento de dados sem sobrecarregar o servidor com muitos processos de uma vez (ou perder dados, ou um milhão de outros motivos).

Recentemente, no entanto, mudamos a forma como enfileiramos os processos para um em que, por boas razões, é possível que o sistema tente enfileirar várias centenas de tarefas duplicadas em um curto espaço de tempo (e apenas uma delas realmente fazer algo útil).

Até agora, usamos o Beanstalkd por meio do módulo de nó fivebeans para manipular, mas como uma fila FIFO pura (embora com suporte a peek), não podemos remover facilmente duplicatas ou de outra forma impedir que sejam criadas.

Uma ideia inicial da solução era usar o Redis para marcar coisas como sendo processadas ou, de outra forma, controlar o que da fila deveria realmente ser executado. Mas é claro, por que se preocupar em fazer um híbrido quando você pode simplesmente abandonar totalmente a dependência técnica!

Você deve se lembrar de um post anterior ( http://coderwall.com/p/8ozzgq ) que já estamos usando uma combinação de Beanstalk para as tarefas e Redis para manter a carga útil da tarefa para contornar o limite de mensagens de 64kb no Beanstalk. Também podemos nos livrar disso usando o Redis como a fila de tarefas!

A fila de tarefas deve fornecer o seguinte:

  1. Suporte a várias filas de tarefas
  2. Execução FIFO, para garantir a ordem correta das operações
  3. Fila prioritária
  4. Uma garantia de que os processos são executados apenas uma vez.
  5. Recusa em adicionar tarefas duplicadas
  6. Rapidez
  7. Permitir que as tarefas sejam atrasadas e expirem

Para conseguir isso, agora fazemos o seguinte ao adicionar uma tarefa:

put: (data, queue, priority, delay=0, expire=-1) ->
data
= JSON.stringify data
hash
= sha512 data
start
= date + delay
end = (expire < 0 and expire) or start + expire
task
=
hash
: hash
start
: start
end: end

method
= (priority and 'lpush') or 'rpush'

key
= 'queue:#{queue}:'
redis
.setnx key+'#{hash}', data, (err, done) =>
if done
redis
[method] key+'tasks', task

A parte mais importante é o comando “setnx”. Isso só grava na chave se ela ainda não existir e, como a chave que estamos usando contém um hash da carga útil, temos a deduplicação!

O retorno de chamada para setnx recebe um booleano indicando se ele escreveu na chave e, se o fez, colocamos o ponteiro de tarefa na fila. A parte [método] é apenas uma opção entre colocá-lo no início ou no final da fila, para permitir que tarefas de alta prioridade sejam executadas imediatamente.

O método “get task” não é nada especial – apenas algumas coisas para lidar com o tempo de início / término e resolver o hash de referência para a carga útil real. Coisas super simples.

Neste estágio temos uma solução funcionando para nossas necessidades, mas se você conhece algum sistema MQ que forneça de-dup, seja no Redis ou não, me mande um tweet!

E se você está pensando “por que não refazer?” então é bastante simples – Resque é uma solução muito mais pesada para este problema do que precisávamos, já que todas as tarefas, bifurcações, etc. já foram implementadas para o Beanstalk.

É muito bom remover uma linha de package.json …