- xkafka é uma biblioteca open source que permite usar o Kafka de forma tão simples quanto um serviço HTTP no ambiente Go
- Ao usar o confluent-kafka-go, normalmente são necessários loops de processamento complexos e muito código boilerplate, mas o xkafka permite focar na lógica principal com uma estrutura de Handler, Middleware e Message
- A publicação e o consumo de mensagens são tratados de forma intuitiva, como no modelo de requisição/resposta HTTP, escondendo boa parte da complexidade do Kafka, como gerenciamento de offsets, configuração de concorrência e tratamento de erros
- Suporta de forma simples vários padrões exigidos em serviços reais, como processamento Streaming/Batch, processamento sequencial/assíncrono e garantias de At-most-once/At-least-once
- Facilita a aplicação de padrões importantes para produção, como tratamento de erros em camadas e retry/logging/métricas baseados em middleware
Kafka no estilo HTTP
- xkafka é uma biblioteca que abstrai o Kafka em Go como se fosse um serviço HTTP
- Message é semelhante a uma requisição HTTP e inclui tópico/partição/offset/chave/valor/headers/callback etc.
- Handler processa a lógica de negócio, como um HTTP Handler
- Middleware permite aplicar funcionalidades adicionais, como logging, métricas e retry, separando-as da lógica de negócio
Publicação de mensagens (Publishing Messages)
- Crie um Producer com
xkafka.NewProducer, monte um objeto de mensagem e publique com a função Publish
- Também há suporte a publicação assíncrona (
AsyncPublish) e registro de callback, facilitando processamento assíncrono de eventos e cenários de alta performance
- A entrega das mensagens é processada em uma goroutine em segundo plano, e o status pode ser acompanhado por callback
Consumo de mensagens (Consuming Messages)
- Ao criar o Consumer, defina a função Handler, além de tópico/broker/configuração etc.
- É possível adicionar middleware com
consumer.Use()
- O consumo de mensagens começa com
consumer.Run(ctx)
Streaming vs. Batch
- Streaming: processa imediatamente uma mensagem por vez conforme ela chega. É vantajoso para baixo volume, economia de memória e garantias de processamento mais fortes
- Batch: processa mensagens em grupos por quantidade ou janela de tempo. É vantajoso para sistemas de alta vazão ou para reduzir a carga downstream
Sequencial ou Async
- O padrão é processamento sequencial (Sequential) — a próxima mensagem só é lida quando a atual termina
- Ao usar
xkafka.Concurrency(N), há suporte ao modo assíncrono (Async), com processamento simultâneo de N mensagens (ou lotes)
Gerenciamento de offsets
- No comportamento padrão do Kafka, o offset avança assim que a mensagem é entregue, o que pode causar perda de mensagens em caso de falha
- O xkafka configura
enable.auto.offset.store=false, salvando o offset somente após a conclusão do processamento da mensagem (ou lote)
- Assim, é possível obter garantias de processamento no Kafka sem precisar gerenciar o estado da mensagem em um banco de dados ou fila separada
-
Garantia At-Most-Once
- Por padrão, o offset é commitado em segundo plano conforme o
enable.auto.commit=true do Kafka
- Com
xkafka.ManualCommit(true) e processamento sequencial, o offset é commitado antes da leitura de cada mensagem/lote, garantindo At-most-once
-
Garantia At-Least-Once
- Ao combinar
xkafka.ManualCommit(true) com concorrência (N>1), os offsets são commitados de forma síncrona e em ordem mesmo durante o processamento paralelo
- Isso facilita a aplicação do padrão de garantia At-least-once
Tratamento de erros
-
Nível do Handler
- Dentro do Handler, é possível tratar erros da aplicação e enviar mensagens para uma Dead Letter Queue, entre outras ações
- Em caso de sucesso, use
msg.AckSuccess(), para ignorar use msg.AckSkip(), e em caso de falha use msg.AckFail(err) para controle explícito
-
Nível do Middleware
- No middleware, lógicas comuns como retry e logging de erro podem ser reutilizadas em vários Handlers
- Também é fácil aplicar políticas ou formas de tratamento diferentes conforme o tipo de erro
-
Nível Global
- Erros do broker Kafka ou da biblioteca são tratados centralmente pela opção obrigatória
xkafka.ErrorHandler
- Se esse handler retornar um erro non-nil, o Consumer/Producer interrompe a operação
Conclusão
- xkafka transforma a experiência complexa de uso do Apache Kafka em uma estrutura de servidor HTTP familiar para desenvolvedores Go
- Reduz boilerplate desnecessário e oferece um ambiente em que é possível focar apenas na lógica de negócio
- Em comparação com código usando confluent-kafka-go, é muito mais conciso e intuitivo
- É possível começar imediatamente consultando a documentação oficial e os exemplos
1 comentários
Hum, eu achava que no Golang o
saramaera mais preferido...Pelo que parece, cliente de Kafka é... mais complexo do que eu imaginava quando há falha de broker ou exceções, então fico pensando se vai conseguir cobrir todos os casos...