Luft: como foi desenvolver um datastore que consulta 1 bilhão de dados em 10 segundos
(tv.naver.com)Em um ambiente com mais de 10 bilhões de eventos por mês em média, surgiu a necessidade de analisar dados rapidamente para fazer análises de comportamento de usuários (Cohort).
(ex.: mulheres na faixa dos 30 anos que gastaram mais de 100 mil won por mês no nosso app nos últimos 6 meses → taxa de retorno delas)
Este texto conta a história de implementar diretamente um datastore que antes os desenvolvedores apenas usavam.
Para implementar consultas de análise de comportamento de usuários…
-
Era necessário conseguir consultar métricas que não tivessem sido pré-calculadas antecipadamente (+ novos tipos de análise também precisariam ser possíveis sem reindexação)
-
Ao fazer
Group Bydos dados de eventos por usuário, o gargalo de High Cardinality Shuffle precisava ser pequeno
Pensaram entre usar uma solução existente ou criar uma própria
-
O Druid já era usado em outro lugar, mas por causa da limitação do Pre-Aggregation (um método que lê apenas valores já calculados), não era adequado para implementar a funcionalidade
-
Data warehouses como Snowflake ou Redshift podem ser operados em grande escala, mas por sua natureza genérica exigem clusters muito maiores do que o objetivo pedia, o que sai caro
-
Para cobrir necessidades variadas como Funnel e correspondência de ID, bancos de dados baseados em SQL têm limitações
No fim, criaram o próprio datastore
-
Luft = datastore otimizado desde o início para executar rapidamente consultas de análise de comportamento agrupadas por ID de usuário
-
Construído com base em Golang
-
Analisa dados de usuários na casa de dezenas de TB com apenas até 5 nós, em média entre 3 segundos e no máximo 10 segundos
-
Diferente de um RDBMS comum, tem imutabilidade (se necessário, sobrescreve os dados do mesmo período) → design de cluster simples, alto desempenho sem implementar um page manager complexo e possibilidade de projetar o formato de armazenamento desejado
Analisando a base técnica
- TrailDB (motor de armazenamento) - Rowstore de eventos de séries temporais otimizado para particionamento por ID de usuário
→ Dicionariza os valores e armazena apenas seus IDs
→ Ordena os eventos do usuário por tempo e armazena apenas o aumento de tempo em relação ao evento anterior e as colunas que mudaram (já que a maioria dos atributos do usuário não muda)
→ Não há índice. É preciso fazer full scan obrigatoriamente.
→ Mas, de forma surpreendente, tem uma taxa de compressão altíssima (CSV 13GB → ~TrailDB 300mb)
→ Como a complexidade de tempo é O(n), concluíram que bastava reduzir a complexidade espacial
- LLVM (motor de consultas)
→ Porém, o TrailDB só fornece equals no formato OR-AND, e a consulta parseada em Go precisava ser passada para C/C++
→ Descobriram que o PostgreSQL compila consultas com LLVM JIT
→ Como as consultas passam por expansão frequente de funcionalidades, isso evita aumentar o custo de desenvolvimento ao escrever tudo em C/C++ (basta gerar LLVM IR em Golang e passar adiante; em C/C++ ela é executada via compilação JIT)
- Criar diretamente a própria camada de processamento
→ MapReduce é muito usado, mas não puderam usá-lo por estarem usando Golang
→ Spark/Hadoop são otimizados para Long-running Job, então mesmo integrando o desempenho não saía bom
→ Isso também foi feito internamente → https://github.com/ab180/lrmr
→ Combinação de gRPC + Protobuf + etcd, adotando bastante do design familiar do Spark
→ Abriram mão de resiliência → se o desempenho for elevado ao extremo, mesmo que ocorra falha, reiniciar do zero ainda leva menos de 10 segundos
→ Como o processamento em grande escala causava frequentemente buffer overflow (Backpressure), mudaram para um Pull-based Event Stream (adotado em Kafka, Armeria etc.)
- Implementar o próprio sharding
→ Shard = nó histórico
→ E se a faixa de datas da partição fosse usada como valor da chave de sharding?
→ Todas as consultas têm tempo → fácil de filtrar
→ Em uma mesma faixa de tempo, os dados têm volumes semelhantes → fácil de distribuir os dados
→ Ambientes distribuídos não são bonitos…
→ E se um nó cair ou um novo for adicionado?
→ E se o espaço de armazenamento encher?
→ E se por causa de falha tudo se concentrar em um único nó?
→ Personalizaram a Cost Function do Druid para que, quanto mais próximas e sobrepostas fossem as faixas de datas das partições, maior fosse o custo
→ Para disponibilidade dos shards, fizeram o seguinte
→ Aplicaram TTL às informações dos shards e as atualizam periodicamente (etcd)
→ Armazenam as partições no S3 e gerenciam a lista de partições com DynamoDB
Situação atual em produção
- Com apenas 4 instâncias c5.2xlarge, fazem scan de 500GB de dados em até 15 segundos
Objetivos daqui para frente (ou o que ainda precisa ser feito)
-
Querem fazer análise de Funnel em tempo real com um cluster de menos de 10 máquinas
-
Pretendem oferecer suporte a Spark para integração com ML etc.
-
Estão desenvolvendo um column store próprio (Ziegel) para substituir o TrailDB
→ Otimização com SIMD e multicore
→ Filtragem prévia com base em atributos de usuário usando Bitmap Index
2 comentários
traildb é divertido. https://www.youtube.com/watch?v=-oPFxSwn0lM É interessante. Embora seja um vídeo antigo, provavelmente o traildb não mudou nesse meio-tempo.
Agora que vi, também tem um post no blog do desenvolvedor,
https://engineering.ab180.co/stories/introducing-luft
Eu nunca tinha ouvido falar de TrailDB, mas é algo assim...
https://github.com/traildb/traildb