Tornar uma tarefa multi-threaded usando Queue em Ruby

Freqüentemente, teremos uma tarefa que requer a execução de uma operação ou método simples sobre todos os itens de uma lista. Normalmente, usaremos #eachou #mappara executar essas operações uma de cada vez, mas há momentos em que ter várias operações em execução ao mesmo tempo é mais eficiente. Isso é especialmente verdadeiro com qualquer operação que exija uma chamada de rede. Mesmo a adição de apenas um thread extra irá acelerar a tarefa drasticamente. No entanto, essa ideia funcionará em qualquer parte do código que chame #eachou #mapchame um método ou bloco em vários objetos.

Usar um Enumerableou Arrayem um ambiente multi-thread pode ser feito facilmente com Queue , que é seguro para threads e é semelhante Arrayem sua função. Ao contrário Array, Queuecarece de muitos métodos auxiliares e é surpreendentemente “de baixo nível” para uma classe Ruby nativa. Por exemplo, os únicos métodos para acessar os dados são #push, #pope #shift( #pope #shifttêm o efeito colateral incômodo de remover o objeto quando você o acessa). Apesar de suas limitações, Queueé perfeito para uma pilha simples ou FIFO. Embora não haja nenhum método para converter entre um Array e uma Queue, existe um truque simples para preencher um usando qualquer Enumerableobjeto:

(1..100_000).inject(Queue.new, :push)

Convenientemente, Queue#pushretorna a instância Queue após realizar sua operação. Isso nos permite encadear #pushchamadas com cada objeto passado como parâmetro para uma chamada. Isso é o que .inject(Queue.new, :push)faz no código acima:

# Essentially the same
Queue.new.push(1).push(2).push(3) #... .push(100_000)

Agora que nossa fila está preenchida, precisamos iniciar um número fixo de threads para processá-la:

NUM_THREADS = 4
Thread.abort_on_exception = true

def do_stuff(object)
# ...
end

@queue = (1..100_000).inject(Queue.new, :push)

@threads = Array.new(NUM_THREADS) do
Thread.new do
until @queue.empty?
# This will remove the first object from @queue
next_object
= @queue.shift

do_stuff
(next_object)
end
end
end

@threads.each(&:join)

Essencialmente, o código acima cria 4 threads que irão continuamente puxar um objeto para fora @queuee chamá #do_stuff-lo até que @queueesteja vazio. Observe que Thread.abort_on_exception = trueisso permitirá que quaisquer erros lançados em uma thread sejam detectados durante a #joinchamada:

begin
@threads.each(&:join)
ensure
cleanup
()
end