Passei alguns dias tentando descobrir isso e pensei em compartilhar uma dica para evitar o aborrecimento de outra pessoa.
O problema : eu tenho uma API Ruby e tenho que ler arquivos JSON enormes de fontes de terceiros, analisá-los e fazer coisas com os dados antes de encaminhá-los para outro lugar. Isso pode resultar claramente em um pesadelo de memória se tudo tiver que ser feito na memória.
Parece haver bastante conhecimento da comunidade sobre como transmitir dados em Ruby do servidor para o cliente (por exemplo, aqui ), mas não tanto sobre como fazer o mesmo ao ler um fluxo JSON.
Embora você acabe passando os dados por meio de Enumeradores Ruby em ambos os casos, há uma diferença fundamental:
- Ao enviar dados, sabemos que o JSON produzido é válido e queremos apenas gravá-lo em um stream sem fazer mais nada;
- Ao ler dados, se quisermos consumi-los sem manter todo o documento na memória, precisamos entendê-los à medida que chegam, sem saber como o documento irá evoluir.
Eu encontrei um artigo muito bom sobre como consumir fluxos XML, aqui . Ele ainda vem com uma gema chamada Piperator, que permite que você encadeie etapas em seu pipeline de uma forma limpa e legível.
Com um pouco de ajuda do gem, tentei implementar o mesmo em JSON usando o analisador JSON Oj , que li supera todos os outros por aí.
Aqui está seu exemplo, onde os itens principais a serem verificados são run
os yield_chunk
métodos e:
require 'oj'
require 'piperator'
class JSONStreamParser < ::Oj::ScHandler
def initialize
@data_stream_writer = Oj::StringWriter.new
@running = false
end
def run(enumerable_data_source, &block)
if !@running
@running = true
@yielder = block
# This turns your enumerator into an IO class, very handy
# as Oj's sc_parse method wants an IO object.
io = Piperator::IO.new(enumerable_data_source)
Oj.sc_parse(self, io)
end
end
def hash_key(key)
update_current_path(:hash_key, key)
@data_stream_writer.push_key(key)
@last_key = key
end
def hash_start
@data_stream_writer.push_object(@last_key)
@last_key = nil
end
def hash_set(h, key, value)
@data_stream_writer.push_value(value, key)
end
def hash_end
@data_stream_writer.pop
yield_if_condition
end
def array_start
@data_stream_writer.push_array(@last_key)
@last_key = nil
end
def array_append(a, value)
@data_stream_writer.push_value(value) unless !value && @array_ended
@array_ended = false
end
def array_end
@data_stream_writer.pop
@array_ended = true
yield_if_condition
end
def add_value(value)
@data_stream_writer.push_value(value, @last_key)
@last_key = nil
end
def error(message, line, column)
p "ERROR: #{message}"
end
private
def yield_if_condition
# if whatever_logic
# @data_stream_writer.pop_all
# yield_chunk
# @data_stream_writer = Oj::StringWriter.new
# [ further logic depending on data structure ]
# end
end
def yield_chunk
@yielder.call @data_stream_writer.to_s
end
end
http_fetch = Enumerator.new do |yielder|
url = "https://raw.githubusercontent.com/zemirco/sf-city-lots-json/master/citylots.json"
request = Typhoeus::Request.new(url)
request.on_body do |chunk|
yielder << chunk
end
request.run
end
json_parse = Enumerator.new do |yielder|
parser = JSONStreamParser.new
parser.run(http_fetch) do |parsed_chunk|
yielder << parsed_chunk
end
end
json_parse.map{ |c| puts c }