Freqüentemente, teremos uma tarefa que requer a execução de uma operação ou método simples sobre todos os itens de uma lista. Normalmente, usaremos #each
ou #map
para executar essas operações uma de cada vez, mas há momentos em que ter várias operações em execução ao mesmo tempo é mais eficiente. Isso é especialmente verdadeiro com qualquer operação que exija uma chamada de rede. Mesmo a adição de apenas um thread extra irá acelerar a tarefa drasticamente. No entanto, essa ideia funcionará em qualquer parte do código que chame #each
ou #map
chame um método ou bloco em vários objetos.
Usar um Enumerable
ou Array
em um ambiente multi-thread pode ser feito facilmente com Queue , que é seguro para threads e é semelhante Array
em sua função. Ao contrário Array
, Queue
carece de muitos métodos auxiliares e é surpreendentemente “de baixo nível” para uma classe Ruby nativa. Por exemplo, os únicos métodos para acessar os dados são #push
, #pop
e #shift
( #pop
e #shift
têm o efeito colateral incômodo de remover o objeto quando você o acessa). Apesar de suas limitações, Queue
é perfeito para uma pilha simples ou FIFO. Embora não haja nenhum método para converter entre um Array e uma Queue, existe um truque simples para preencher um usando qualquer Enumerable
objeto:
(1..100_000).inject(Queue.new, :push)
Convenientemente, Queue#push
retorna a instância Queue após realizar sua operação. Isso nos permite encadear #push
chamadas com cada objeto passado como parâmetro para uma chamada. Isso é o que .inject(Queue.new, :push)
faz no código acima:
# Essentially the same
Queue.new.push(1).push(2).push(3) #... .push(100_000)
Agora que nossa fila está preenchida, precisamos iniciar um número fixo de threads para processá-la:
NUM_THREADS = 4
Thread.abort_on_exception = true
def do_stuff(object)
# ...
end
@queue = (1..100_000).inject(Queue.new, :push)
@threads = Array.new(NUM_THREADS) do
Thread.new do
until @queue.empty?
# This will remove the first object from @queue
next_object = @queue.shift
do_stuff(next_object)
end
end
end
@threads.each(&:join)
Essencialmente, o código acima cria 4 threads que irão continuamente puxar um objeto para fora @queue
e chamá #do_stuff
-lo até que @queue
esteja vazio. Observe que Thread.abort_on_exception = true
isso permitirá que quaisquer erros lançados em uma thread sejam detectados durante a #join
chamada:
begin
@threads.each(&:join)
ensure
cleanup()
end