Ruby: leitura, análise e encaminhamento de grandes arquivos JSON em pequenos blocos (ou seja, streaming)

Passei alguns dias tentando descobrir isso e pensei em compartilhar uma dica para evitar o aborrecimento de outra pessoa.

O problema : eu tenho uma API Ruby e tenho que ler arquivos JSON enormes de fontes de terceiros, analisá-los e fazer coisas com os dados antes de encaminhá-los para outro lugar. Isso pode resultar claramente em um pesadelo de memória se tudo tiver que ser feito na memória.

Parece haver bastante conhecimento da comunidade sobre como transmitir dados em Ruby do servidor para o cliente (por exemplo, aqui ), mas não tanto sobre como fazer o mesmo ao ler um fluxo JSON.

Embora você acabe passando os dados por meio de Enumeradores Ruby em ambos os casos, há uma diferença fundamental:

  • Ao enviar dados, sabemos que o JSON produzido é válido e queremos apenas gravá-lo em um stream sem fazer mais nada;
  • Ao ler dados, se quisermos consumi-los sem manter todo o documento na memória, precisamos entendê-los à medida que chegam, sem saber como o documento irá evoluir.

Eu encontrei um artigo muito bom sobre como consumir fluxos XML, aqui . Ele ainda vem com uma gema chamada Piperator, que permite que você encadeie etapas em seu pipeline de uma forma limpa e legível.

Com um pouco de ajuda do gem, tentei implementar o mesmo em JSON usando o analisador JSON Oj , que li supera todos os outros por aí.

Aqui está seu exemplo, onde os itens principais a serem verificados são runos yield_chunkmétodos e:

require 'oj'
require
'piperator'

class JSONStreamParser < ::Oj::ScHandler
def initialize
@data_stream_writer = Oj::StringWriter.new
@running = false
end

def run(enumerable_data_source, &block)
if !@running
@running = true
@yielder = block

# This turns your enumerator into an IO class, very handy
# as Oj's sc_parse method wants an IO object.
io
= Piperator::IO.new(enumerable_data_source)
Oj.sc_parse(self, io)
end
end

def hash_key(key)
update_current_path
(:hash_key, key)
@data_stream_writer.push_key(key)
@last_key = key
end

def hash_start
@data_stream_writer.push_object(@last_key)
@last_key = nil
end

def hash_set(h, key, value)
@data_stream_writer.push_value(value, key)
end

def hash_end
@data_stream_writer.pop
yield_if_condition

end

def array_start
@data_stream_writer.push_array(@last_key)
@last_key = nil
end

def array_append(a, value)
@data_stream_writer.push_value(value) unless !value && @array_ended
@array_ended = false
end

def array_end
@data_stream_writer.pop
@array_ended = true
yield_if_condition

end

def add_value(value)
@data_stream_writer.push_value(value, @last_key)
@last_key = nil
end

def error(message, line, column)
p
"ERROR: #{message}"
end

private

def yield_if_condition
# if whatever_logic
# @data_stream_writer.pop_all
# yield_chunk
# @data_stream_writer = Oj::StringWriter.new
# [ further logic depending on data structure ]
# end
end

def yield_chunk
@yielder.call @data_stream_writer.to_s
end
end

http_fetch
= Enumerator.new do |yielder|
url
= "https://raw.githubusercontent.com/zemirco/sf-city-lots-json/master/citylots.json"
request
= Typhoeus::Request.new(url)
request
.on_body do |chunk|
yielder
<< chunk
end
request
.run
end

json_parse
= Enumerator.new do |yielder|
parser
= JSONStreamParser.new
parser
.run(http_fetch) do |parsed_chunk|
yielder
<< parsed_chunk
end
end

json_parse
.map{ |c| puts c }