Nosso projeto atual requer uma fila de tarefas para lidar com o processamento de dados sem sobrecarregar o servidor com muitos processos de uma vez (ou perder dados, ou um milhão de outros motivos).
Recentemente, no entanto, mudamos a forma como enfileiramos os processos para um em que, por boas razões, é possível que o sistema tente enfileirar várias centenas de tarefas duplicadas em um curto espaço de tempo (e apenas uma delas realmente fazer algo útil).
Até agora, usamos o Beanstalkd por meio do módulo de nó fivebeans para manipular, mas como uma fila FIFO pura (embora com suporte a peek), não podemos remover facilmente duplicatas ou de outra forma impedir que sejam criadas.
Uma ideia inicial da solução era usar o Redis para marcar coisas como sendo processadas ou, de outra forma, controlar o que da fila deveria realmente ser executado. Mas é claro, por que se preocupar em fazer um híbrido quando você pode simplesmente abandonar totalmente a dependência técnica!
Você deve se lembrar de um post anterior ( http://coderwall.com/p/8ozzgq ) que já estamos usando uma combinação de Beanstalk para as tarefas e Redis para manter a carga útil da tarefa para contornar o limite de mensagens de 64kb no Beanstalk. Também podemos nos livrar disso usando o Redis como a fila de tarefas!
A fila de tarefas deve fornecer o seguinte:
- Suporte a várias filas de tarefas
- Execução FIFO, para garantir a ordem correta das operações
- Fila prioritária
- Uma garantia de que os processos são executados apenas uma vez.
- Recusa em adicionar tarefas duplicadas
- Rapidez
- Permitir que as tarefas sejam atrasadas e expirem
Para conseguir isso, agora fazemos o seguinte ao adicionar uma tarefa:
put: (data, queue, priority, delay=0, expire=-1) ->
data = JSON.stringify data
hash = sha512 data
start = date + delay
end = (expire < 0 and expire) or start + expire
task =
hash: hash
start: start
end: end
method = (priority and 'lpush') or 'rpush'
key = 'queue:#{queue}:'
redis.setnx key+'#{hash}', data, (err, done) =>
if done
redis[method] key+'tasks', task
A parte mais importante é o comando “setnx”. Isso só grava na chave se ela ainda não existir e, como a chave que estamos usando contém um hash da carga útil, temos a deduplicação!
O retorno de chamada para setnx recebe um booleano indicando se ele escreveu na chave e, se o fez, colocamos o ponteiro de tarefa na fila. A parte [método] é apenas uma opção entre colocá-lo no início ou no final da fila, para permitir que tarefas de alta prioridade sejam executadas imediatamente.
O método “get task” não é nada especial – apenas algumas coisas para lidar com o tempo de início / término e resolver o hash de referência para a carga útil real. Coisas super simples.
Neste estágio temos uma solução funcionando para nossas necessidades, mas se você conhece algum sistema MQ que forneça de-dup, seja no Redis ou não, me mande um tweet!
E se você está pensando “por que não refazer?” então é bastante simples – Resque é uma solução muito mais pesada para este problema do que precisávamos, já que todas as tarefas, bifurcações, etc. já foram implementadas para o Beanstalk.
É muito bom remover uma linha de package.json …