Analisando o protocolo Kafka com Erlang. Padrão de correspondência FTW!

Esta postagem é sobre como recebi uma resposta crua de um corretor Kafka que parecia …

<<0,0,0,0,0,0,0,3,0,0,0,3,0,25,118,97,103,114,97,110,116,
45,117,98,117,110,116,117,45,112,114,101,99,105,115,101,

45,54,52,0,0,35,133,0,0,0,1,0,25,118,97,103,114,97,110,

116,45,117,98,117,110,116,117,45,112,114,101,99,105,115,

101,45,54,52,0,0,35,131,0,0,0,2,0,25,118,97,103,114,97,

110,116,45,117,98,117,110,116,117,45,112,114,101,99,105,

115,101,45,54,52,0,0,35,132,0,0,0,3,0,0,0,2,97,49,0,0,0,

2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,

0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,

2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,

0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,

0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,

2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,

0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

… e corrigiu um cliente erlang kafka para converter o blob em:

{metadata,0,
[{broker,2,<<"vagrant-ubuntu-precise-64">>,9092},
{broker,1,<<"vagrant-ubuntu-precise-64">>,9091},
{broker,3,<<"vagrant-ubuntu-precise-64">>,9093}],
[{topic,<<"a3">>,undefined,
[{partition,1,0,1,
[{replica,2},{replica,1}],
[{isr,2},{isr,1}]},
{partition,0,0,3,
[{replica,1},{replica,3}],
[{isr,1},{isr,3}]}]},
{topic,<<"a2">>,undefined,
[{partition,1,0,2,[{replica,2}],[{isr,2}]},
{partition,0,0,1,[{replica,1}],[{isr,1}]}]},
{topic,<<"a1">>,undefined,
[{partition,1,0,1,[{replica,1}],[{isr,1}]},
{partition,0,0,3,[{replica,3}],[{isr,3}]}]}]}

… lendo o protocolo de conexão Kafka documentado como:

Response => CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse

MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]

de https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse

Uma nota sobre Kafka

Em primeiro lugar, uma palavra sobre Kafka. Kafka é o log de commit da sua infraestrutura e uma revisão da fila de mensagens em um mundo distribuído. Mais em http://kafka.apache.org , mas recomendo começar a ler seu protocolo com fio.

As mensagens são publicadas ou consumidas a partir de tópicos. Partições e fatores de replicação para um tópico podem ser definidos durante sua criação; que produtores / consumidores devem combinar. Então, de certa forma – Kafka força a decisão de qual nó atingir o cliente. Para publicar ou buscar, os dados devem ser enviados ao broker que está atuando atualmente como o líder de uma determinada partição. Esses dados podem ser consultados de um corretor diretamente para os produtores e do zookeeper para clientes consumidores.

Os próprios clientes mantêm isso consultando metadados de qualquer corretor. Embora um corretor possa não ser o líder de um tópico, ele terá metadados sobre todos os tópicos, partições e lideranças. Qual broker é consultado para obter os metadados depende novamente do cliente.

Clientes Kafka na selva

O cliente Kafka escrito em Go by Shopify http://godoc.org/github.com/Shopify/sarama tem, de longe, a melhor documentação e conjunto de recursos fora do Java / Scala / clientes internos do LinkedIn IMHO.

Alguns clientes ainda se conectam ao zookeeper, se a descoberta automática para corretores estiver habilitada. Isso geralmente ocorre quando nenhum host kafka foi fornecido em uma configuração.

Como estou trabalhando no erlang atualmente ( lembrete para acompanhar os vídeos do ErlangFactory ’14! ), Notei vários clientes erlang existentes ainda se conectando ao zookeeper para obter metadados de corretores em vez de enviar ping diretamente para corretores. Pelo que eu entendi do pessoal amigável em # apache-kafka, este é mais um paradigma 0.7. Desde o 0.8, o produtor não precisa se conectar ao zookeeper para descobrir em qual partição escrever. Ele pode decidir – mas isso é com o cliente.

Eu pessoalmente prefiro consultar os corretores por metadados, mas infelizmente não encontrei uma biblioteca que fizesse isso.

A implementação mais próxima de apenas enviar pacotes entre um broker estava em https://github.com/rmenke/ekafka. Aqui está como ele codifica uma consulta de metadados.

encode_metadata_request(CorrelationId, ClientId, Topics) ->
MetadataRequest = encode_array([encode_string(Topic) || Topic <- Topics]),
encode_request
(?METADATA_REQUEST, CorrelationId, ClientId, MetadataRequest).

Uma vez que a funcionalidade de decodificação, entretanto, não foi implementada e me deu a chance de explorar o protocolo Kafka wire.

Leia o protocolo

Vamos rever o que a documentação nos documentos do Kafka diz sobre a resposta da API de metadados:

MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]

Observação para mim mesmo: algum dia, vou apresentar uma maneira elegante de descrever as informações de cabeçalho / carga útil / protocolo em um formato legível por máquina estruturado!)

Se você leu apenas um artigo / link sobre Kafka. Torne-o um do protocolo com fio (o primeiro link). É um prazer ler. Dito isso, várias suposições tornavam difícil para os desenvolvedores de novos clientes ver, por exemplo: o tamanho do Host. apenas diz string. ( outra seção no entanto fala sobre o tamanho, mas demorei um pouco para encontrá-lo ). Ou que [Broker] entre colchetes significava um número inteiro de 32 bits informando quantos corretores seriam preenchidos antes dos corretores.

De qualquer forma, é para isso que me foi apresentado esta noite para dar sentido. A seguir está uma resposta de metadados válida de um corretor kafka.

<<0,0,0,0,0,0,0,3,0,0,0,3,0,25,118,97,103,114,97,110,116,
45,117,98,117,110,116,117,45,112,114,101,99,105,115,101,

45,54,52,0,0,35,133,0,0,0,1,0,25,118,97,103,114,97,110,

116,45,117,98,117,110,116,117,45,112,114,101,99,105,115,

101,45,54,52,0,0,35,131,0,0,0,2,0,25,118,97,103,114,97,

110,116,45,117,98,117,110,116,117,45,112,114,101,99,105,

115,101,45,54,52,0,0,35,132,0,0,0,3,0,0,0,2,97,49,0,0,0,

2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,

0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,

2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,

0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,

0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,

2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,

0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

Deixe a engenharia reversa começar

Tendo trabalhado com pacotes AIM / oscar de engenharia reversa, pacotes de bate-papo YMSG, pacotes MQTT e outros – isso me apresentou um desafio interessante.

A informação que eu sabia era que as portas kafka padrão começaram com 9091 a 9093 e que eu havia criado um tópico a1 um tempo atrás.

Pelos documentos, eu sabia que primeiro procuraria o corretor.

os primeiros 4 bytes não faziam sentido até eu ler isso …

Response => CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse

O correlação id é análogo a um id de solicitação que você passa para a solicitação. Int32 significava que eu precisava pular 32 bits ou 4 bytes.

NOTA: é assim que você analisa os primeiros 4 bytes em erlang (binário é um tipo de dados primário em erlang)

<<First4Bytes:32, RemainingBytes/binary>>

Neste caso, os primeiros 4 bytes foram

<0,0,0,0>>

Os próximos 4 bytes foram

<<0,0,0,3>> 

Então, com isso, eu poderia construir os primeiros dois bytes:

%  0,0,0,0,                                     %% correlation_id
% 0,0,0,3, %% 3 nodes

Foi quando percebi que kafka gostava de seguir um headerlen, padrão de cabeçalho seguido pela maioria dos protocolos bem projetados. Comecei a procurar padrões.

  • << 0,0, X, Y >> padrões que poderiam muito bem ser usados ​​para significar comprimento

    • representações binárias de números de porta como 9091 que mapeiam para algo como <<0,0,35,133 >>

    tente isso em um shell erlang

    < 9091: 32 > = << 0, 0, 35, 131 >>
    – e padrões repetitivos que podem significar nomes de host comuns

Pude preencher alguns campos e tive dúvidas sobre vários (indicados por?)

    % 4  bytes  %  0,0,0,0,              %% cor_id
% 4 bytes % 0,0,0,3, %% 3 nodes

??? 0,0,0,3,0,25,118,97,103,114,97,110,116,
45,117,98,117,110,116,117,45,112,114,101,99,105,115,101,
45,54,52,

% 4 bytes % 0,0,35,133, %% = port 9093

??? 0,0,0,1,0,25,118,97,103,114,97,110,
116,45,117,98,117,110,116,117,45,112,114,101,99,105,115,
101,45,54,52,

% 4 bytes % 0,0,35,131, %% = port 9091

??? 0,0,0,2,0,25,118,97,103,114,97,
110,116,45,117,98,117,110,116,117,45,112,114,101,99,105,
115,101,45,54,52,

% 4 bytes % 0,0,35,132, %% = port 9092

?? 0,0,0,3,0,0,0,2,97,49,0,0,0,
2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,
0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,
2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,
0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,
0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,
2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,
0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

Voltando aos documentos, foi fácil dissecar metade do pacote do Brokers.

MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32

que mapeou para

% 4  %  0,0,0,0,                                     %% correlation id
% 4 % 0,0,0,3, %% 3 nodes

% 4 % 0,0,0,3, %% NODE1
% 2 % 0,25, %% HOST1 LEN
% 25 % 118,97,103,114,97,.....52, %% HOST1
% 4 % 0,0,35,133, %% PORT1

% 4 % 0,0,0,1, %% NODE2
% 2 % 0,25, %% HOST2 LEN
% 25 % 118,97,103,114,97,....,52, %% HOST2
% 4 % 0,0,35,131, %% PORT2

% 4 % 0,0,0,2, %% NODE3
% 4 % 0,25, %% HOST3 LEN
% 25 % 118,97,103,114,97,.....52, %% HOST3
% 4 % 0,0,35,132, %% PORT3

A parte Broker do protocolo parecia embrulhada, pelo menos compreendida visualmente. A próxima parte seria mais complicada, pois havia estruturas aninhadas. Vários tópicos podem ter várias partições. E cada partição pode ter várias réplicas e ISRs.

TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]

Portanto, o acima tinha que corresponder ao blob binário restante que se parecia com isso …

<<0,0,0,3,0,0,0,2,97,49,0,0,0,
2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,

0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,

2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,

0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,

0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,

2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,

0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

A próxima parte demorou um pouco para dissecar corretamente por causa das partições e réplicas aninhadas.

% 0,0,0,3, % topics len
% 0,0,0,2, % topic1 name len
% 97,49, % topic1 name a1