21 pontos por hiddenest 2020-12-24 | 2 comentários | Compartilhar no WhatsApp

Em um ambiente com mais de 10 bilhões de eventos por mês em média, surgiu a necessidade de analisar dados rapidamente para fazer análises de comportamento de usuários (Cohort).

(ex.: mulheres na faixa dos 30 anos que gastaram mais de 100 mil won por mês no nosso app nos últimos 6 meses → taxa de retorno delas)

Este texto conta a história de implementar diretamente um datastore que antes os desenvolvedores apenas usavam.

Para implementar consultas de análise de comportamento de usuários…

  • Era necessário conseguir consultar métricas que não tivessem sido pré-calculadas antecipadamente (+ novos tipos de análise também precisariam ser possíveis sem reindexação)

  • Ao fazer Group By dos dados de eventos por usuário, o gargalo de High Cardinality Shuffle precisava ser pequeno

Pensaram entre usar uma solução existente ou criar uma própria

  • O Druid já era usado em outro lugar, mas por causa da limitação do Pre-Aggregation (um método que lê apenas valores já calculados), não era adequado para implementar a funcionalidade

  • Data warehouses como Snowflake ou Redshift podem ser operados em grande escala, mas por sua natureza genérica exigem clusters muito maiores do que o objetivo pedia, o que sai caro

  • Para cobrir necessidades variadas como Funnel e correspondência de ID, bancos de dados baseados em SQL têm limitações

No fim, criaram o próprio datastore

  • Luft = datastore otimizado desde o início para executar rapidamente consultas de análise de comportamento agrupadas por ID de usuário

  • Construído com base em Golang

  • Analisa dados de usuários na casa de dezenas de TB com apenas até 5 nós, em média entre 3 segundos e no máximo 10 segundos

  • Diferente de um RDBMS comum, tem imutabilidade (se necessário, sobrescreve os dados do mesmo período) → design de cluster simples, alto desempenho sem implementar um page manager complexo e possibilidade de projetar o formato de armazenamento desejado

Analisando a base técnica

  • TrailDB (motor de armazenamento) - Rowstore de eventos de séries temporais otimizado para particionamento por ID de usuário

→ Dicionariza os valores e armazena apenas seus IDs

→ Ordena os eventos do usuário por tempo e armazena apenas o aumento de tempo em relação ao evento anterior e as colunas que mudaram (já que a maioria dos atributos do usuário não muda)

→ Não há índice. É preciso fazer full scan obrigatoriamente.

→ Mas, de forma surpreendente, tem uma taxa de compressão altíssima (CSV 13GB → ~TrailDB 300mb)

→ Como a complexidade de tempo é O(n), concluíram que bastava reduzir a complexidade espacial

  • LLVM (motor de consultas)

→ Porém, o TrailDB só fornece equals no formato OR-AND, e a consulta parseada em Go precisava ser passada para C/C++

→ Descobriram que o PostgreSQL compila consultas com LLVM JIT

→ Como as consultas passam por expansão frequente de funcionalidades, isso evita aumentar o custo de desenvolvimento ao escrever tudo em C/C++ (basta gerar LLVM IR em Golang e passar adiante; em C/C++ ela é executada via compilação JIT)

  • Criar diretamente a própria camada de processamento

→ MapReduce é muito usado, mas não puderam usá-lo por estarem usando Golang

→ Spark/Hadoop são otimizados para Long-running Job, então mesmo integrando o desempenho não saía bom

→ Isso também foi feito internamente → https://github.com/ab180/lrmr

→ Combinação de gRPC + Protobuf + etcd, adotando bastante do design familiar do Spark

→ Abriram mão de resiliência → se o desempenho for elevado ao extremo, mesmo que ocorra falha, reiniciar do zero ainda leva menos de 10 segundos

→ Como o processamento em grande escala causava frequentemente buffer overflow (Backpressure), mudaram para um Pull-based Event Stream (adotado em Kafka, Armeria etc.)

  • Implementar o próprio sharding

→ Shard = nó histórico

→ E se a faixa de datas da partição fosse usada como valor da chave de sharding?

→ Todas as consultas têm tempo → fácil de filtrar

→ Em uma mesma faixa de tempo, os dados têm volumes semelhantes → fácil de distribuir os dados

→ Ambientes distribuídos não são bonitos…

→ E se um nó cair ou um novo for adicionado?

→ E se o espaço de armazenamento encher?

→ E se por causa de falha tudo se concentrar em um único nó?

→ Personalizaram a Cost Function do Druid para que, quanto mais próximas e sobrepostas fossem as faixas de datas das partições, maior fosse o custo

→ Para disponibilidade dos shards, fizeram o seguinte

→ Aplicaram TTL às informações dos shards e as atualizam periodicamente (etcd)

→ Armazenam as partições no S3 e gerenciam a lista de partições com DynamoDB

Situação atual em produção

  • Com apenas 4 instâncias c5.2xlarge, fazem scan de 500GB de dados em até 15 segundos

Objetivos daqui para frente (ou o que ainda precisa ser feito)

  • Querem fazer análise de Funnel em tempo real com um cluster de menos de 10 máquinas

  • Pretendem oferecer suporte a Spark para integração com ML etc.

  • Estão desenvolvendo um column store próprio (Ziegel) para substituir o TrailDB

→ Otimização com SIMD e multicore

→ Filtragem prévia com base em atributos de usuário usando Bitmap Index

2 comentários

 
gera1d 2020-12-24

traildb é divertido. https://www.youtube.com/watch?v=-oPFxSwn0lM É interessante. Embora seja um vídeo antigo, provavelmente o traildb não mudou nesse meio-tempo.

 
hiddenest 2020-12-24

Agora que vi, também tem um post no blog do desenvolvedor,

https://engineering.ab180.co/stories/introducing-luft

Eu nunca tinha ouvido falar de TrailDB, mas é algo assim...

https://github.com/traildb/traildb