Implementação de um pipeline de dados em fluxo contínuo
Trustly registou um grande crescimento nos últimos anos, o que afectou a forma como utilizamos os dados e a análise. Os nossos desafios não são provavelmente únicos e ainda não sabemos se as escolhas que fizemos foram as correctas. Só o tempo o dirá. Estou na empresa há quase seis anos e vi os nossos recursos dedicados aos dados e à análise crescerem de zero para talvez 40 pessoas, dependendo da forma como nos contarmos. Daqui a um ano, esse número terá aumentado ainda mais.
Antecedentes
Inicialmente, pretendia que esta publicação do blogue fosse bastante técnica e se centrasse apenas em alguns desafios de engenharia explícitos que enfrentámos e na forma como os resolvemos. Mas depressa me apercebi que não faria muito sentido se não descrevesse um pouco mais o contexto. Uma coisa que notei ao fazer parte de uma organização maior é que se torna ainda mais importante mantermo-nos concentrados nos problemas que estamos a tentar resolver. Como engenheiro, sinto-me melhor quando posso passar um dia inteiro a tentar resolver um problema técnico interessante (e provavelmente não sou o único). Como ser humano, tenho tendência a gravitar em torno de coisas que me fazem sentir bem (não é o caso de todos nós?). Infelizmente, o que me faz sentir bem nem sempre é o mais valioso para o meu empregador. Numa pequena organização onde toda a gente se conhece e todos os empregados almoçam juntos às segundas-feiras, este desalinhamento torna-se rapidamente óbvio e pode ser resolvido, mas quanto maior for a organização, mais tempo posso continuar a resolver problemas que talvez não fossem assim tão importantes. É mais difícil - tanto para mim como para os outros - falsificar a minha convicção de que estou a trabalhar em algo realmente importante. Especialmente quando parece ser algo que um engenheiro de dados deveria estar a fazer, como construir um pipeline de dados de streaming.
Vejamos primeiro uma das nossas actuais plataformas de dados em lote, que utiliza o Google Cloud Platform. Temos uma série de fontes de dados, a mais importante das quais é o nosso produto de pagamento, mas também inclui sistemas CRM, sistemas financeiros, Jira, etc. A maior parte dos dados é extraída destes sistemas através de tarefas escritas em Python e accionadas pelo Airflow. Recebemos todos estes dados e colocamo-los num formato bastante bruto no BigQuery, o nosso lago de dados, por assim dizer. As transformações a jusante são escritas em SQL e executadas pelo dbt. Começámos há alguns anos a orquestrar todas estas transformações SQL com operadores Airflow, mas mudámos tudo para o dbt há cerca de um ano e não nos arrependemos disso nem por um minuto. O dbt funciona muito bem e permite-nos dissociar a transformação dos dados da ingestão.
Quando se passa de uma organização pequena para uma maior, é necessário começar a pensar em processos de dissociação e em como dimensionar as equipas. Quando toda a organização de dados é composta por apenas três pessoas, toda a gente sabe um pouco de tudo - e não há problema. Se quiser aumentar a escala em 10 vezes, a melhor forma de o fazer não é, provavelmente, exigindo que cada novo membro da equipa saiba Java, Python, SQL e arquitetura de nuvens, bem como ter uma compreensão detalhada das métricas de produto importantes, em que sistema de origem encontrar os dados em bruto, como defini-los e como trabalhar com as partes interessadas da empresa. As pessoas que sabem tudo isso (ou que podem mesmo aprender) são um recurso escasso. Em vez disso, permitir que os analistas e os engenheiros analíticos desenvolvam e mantenham pipelines em SQL, sem terem de se preocupar com a forma como os dados em bruto entram na plataforma - uma tarefa que é melhor deixar para os engenheiros de dados - é algo que descobrimos que nos permite eliminar estrangulamentos e distribuir as cargas de trabalho por muitas equipas.
Porquê o streaming?
Então, voltando ao streaming - porque é que precisamos dele? A resposta óbvia seria que precisamos de dados em tempo real e, embora isso seja certamente algo a que devemos aspirar, não diria que é o que mais nos interessa neste momento. Se tudo o resto for igual - ter dados, ou seja, saber algo, mais cedo é melhor do que mais tarde. Mas tudo o resto não é igual quando se trata de batch versus streaming. Eu preferiria classificar as nossas prioridades actuais da seguinte forma:
- Capturar alterações
- Reduzir os tempos de carregamento
- Diminuir a latência
Neste caso, estou a pensar em reduzir a latência de um dia para uma hora (e não de um segundo para um milissegundo). Vejamos cada uma destas situações.
Capturar alterações
Em grande medida, a análise consiste em encontrar padrões subjacentes que o ajudarão a compreender o mundo. Compreender o mundo à sua volta ajuda-o a tomar decisões - esperemos que decisões que ajudem o seu negócio a crescer. Há muito a dizer sobre como (e como não) transformar os dados em conhecimentos (ou melhor ainda - em decisões) e deixo esses desafios para os nossos analistas e cientistas de dados. Dito isto, penso que é bastante incontroverso afirmar que o tempo é uma dimensão importante em grande parte do trabalho de análise. O tempo como em: "Ontem vendemos por 5 SEK, hoje vendemos por 10 SEK, por quanto é que vamos vender amanhã?"
Jay Kreps escreveu um post clássico no seu blogue com o título "The Log: O que todo engenheiro de software deve saber sobre a abstração unificadora de dados em tempo real", que faz um trabalho muito melhor do que eu para explicar os benefícios de capturar mudanças ao longo do tempo no formato de um log de eventos. O ponto principal para o nosso caso de uso é que o nosso sistema de pagamentos geralmente armazena informações sobre o mundo como ele é agora, e apenas em um grau limitado como ele era ontem. Ou meio segundo atrás. Seria impraticável utilizar uma base de dados OLTP para persistir todas as alterações durante longos períodos de tempo, mas se o sistema de pagamentos puder simplesmente disparar os eventos à medida que ocorrem e alguém os recolher na outra extremidade de um corretor de mensagens, podem ser utilizados para "reproduzir" e reconstruir o estado do mundo em qualquer momento que se queira estudar em retrospetiva.
Reduzir os tempos de carregamento
Infelizmente, a base de dados do nosso produto de pagamento não tem um carimbo de data/hora de "última modificação" para todas as tabelas. Isto significa que não há forma de saber quais as linhas de uma tabela que foram alteradas entre ontem e hoje - ou entre um segundo atrás e agora. Para termos dados consistentes na plataforma de dados, precisamos de exportar tabelas inteiras do sistema de produção todas as noites. E como os volumes de transacções do Trustlyexplodiram, o mesmo aconteceu com as nossas exportações nocturnas de dados. Poder-se-ia argumentar que a base de dados deveria ter sido concebida de uma forma melhor desde o início, mas quando o sistema foi construído há muitos anos, o objetivo era obter um produto funcional que pudesse ser vendido, e não adaptar-se a uma plataforma analítica que poderíamos construir vários anos no futuro (se ainda estivéssemos em atividade nessa altura).
Obter um fluxo de eventos, ou seja, apenas as coisas novas, reduz drasticamente a computação e a largura de banda gastas na atualização dos dados na nossa plataforma para o estado mais recente.
Diminuir a latência
É provavelmente o que lhe vem à cabeça quando ouve a palavra "streaming" e, claro, é algo que também nos interessa. O facto de não estar limitado por ter dados frescos apenas uma vez por dia permite novas utilizações dos dados. No entanto, é preciso ter em mente que o streaming é difícil. Uma comparação no mundo real poderia ser a diferença entre fazer trabalhos de canalização nas suas linhas de abastecimento de água (streaming) e regar as suas plantas (batch). As consequências de um erro no primeiro caso são muito mais graves (casa inundada) do que no segundo (peitoril da janela molhado). Se tiver uma estufa, o trabalho de canalização para instalar água pode valer a pena, mas se tiver apenas cinco plantas no seu apartamento, talvez queira refrear as suas ambições de estar na vanguarda tecnológica. Eu diria que o mesmo raciocínio se aplica às plataformas de dados.
A nossa solução
Tendo em conta o que precede, decidimos, há cerca de um ano, criar uma estrutura que melhorasse a ingestão de dados na nossa plataforma de dados. O trabalho não está de modo algum concluído (provavelmente nunca estará), mas isto é o que conseguimos até agora.
Produtores -> Pub/Sub -> Beam (fluxo de dados) -> Google Cloud Storage -> Airflow (Cloud Composer) -> BigQuery -> dbt -> BigQuery -> Consumidores
Esta solução está em produção para alguns subcomponentes do nosso sistema de pagamentos desde o final de agosto e ainda estamos a avaliar a forma de a melhorar. Fizemos algumas observações até agora.
Os esquemas rigorosos são fundamentais para os dados estruturados
Optámos por Avro para a codificação das mensagens enviadas pelos produtores. Depois de termos experimentado as funcionalidades de deteção automática de JSON puro do BigQuery no ano passado, sabíamos que precisávamos de algo mais rigoroso para não acabar num inferno de falhas/manutenção para a equipa de dados. Juntamente com uma das equipas de desenvolvimento de produtos (e eventuais produtores de dados), analisámos o Protobuf e o JSON com esquemas, mas o Avro pareceu-nos ser a escolha com menos desvantagens.
Embora exista algum suporte rudimentar para esquemas no GCP, por exemplo, é possível atribuir esquemas Avro a tópicos Pub/Sub, a nossa experiência diz-nos que é muito mais imaturo do que, por exemplo, o Kafka tem para oferecer. O GCP está sempre a melhorar, por isso talvez daqui a um ano as coisas sejam diferentes. Por enquanto, temos um armazenamento de esquemas em um bucket do GCS onde os produtores colocam seus esquemas e de onde o trabalho de ingestão do Beam pode lê-los.
Utilizar serviços em nuvem sempre que possível
A menos que tenha necessidades muito específicas ou já tenha muita competência em alguma área, descobrimos que a utilização de serviços na nuvem é uma forma fácil de colocar algo escalável em produção no menor espaço de tempo possível. Na verdade, começámos a construir o pipeline com base no Kafka, mas mais tarde mudámos para o Pub/Sub quando se tornou evidente que a equipa de serviços de dados teria de fazer uma parte justa da operação e manutenção do componente de entrega de eventos (Kafka ou Pub/Sub). Uma razão para não optar por um serviço na nuvem é o risco de lock-in, mas se tiver algum cuidado ao fazer a implementação, por exemplo, utilizar a estrutura apenas para o seu objetivo principal de uma forma desacoplada e não começar a (ab)usá-la para todo o tipo de coisas, deve ser substituível. Melhor ainda, é claro, se o serviço em nuvem for baseado em uma estrutura de código aberto em que a lógica possa ser transferida para uma solução hospedada de outro fornecedor de nuvem, por exemplo, Airflow (Cloud Composer) ou Beam (Dataflow), caso seja necessário.
A API Python do Beam é imatura em comparação com Java
A competência da nossa equipa é maioritariamente em Python e SQL, pelo que a escolha natural para nós foi desenvolver o código Beam que faz a ingestão do Pub/Sub para o BigQuery em Python. No entanto, depois de passar um bom tempo, começamos a perceber que usar Java teria nos dado melhor suporte e mais opções. Para sermos justos, a documentação do Beam não esconde este facto, mas nós, talvez de forma um pouco ingénua, não lhe demos muita atenção de antemão. Por exemplo, a falta de um sistema de tipagem rigoroso em Python pode tornar mais rápido começar a fazer algo, mas quando você quer ter certeza de que pode lidar com todas as conversões de tipo entre um esquema Avro e uma tabela BigQuery, Java é mais confiável.
Também acontece que as fontes e sumidouros Java fornecem mais funcionalidades prontas a usar, por exemplo, pode fornecer ao conetor BigQueryIO em Java um nome de tabela para escrever com base numa função que define para ser avaliada em tempo de execução. Para Python, o nome da tabela tem de ser baseado num campo no evento de entrada. Conclusão: se optar pelo Python, terá mais restrições quanto à forma como pode construir o seu pipeline e algumas das funcionalidades que oferece parecem mais ou menos experimentais. Normalmente, é necessário analisar o código-fonte para descobrir o que ele realmente faz e quais são as limitações existentes.
Considere a possibilidade de criar micro-batches para problemas que não necessitem de tempo real
Em Trustly, temos casos de uso que exigem dados de streaming quase em tempo real para o pipeline, mas nenhum deles está totalmente implementado na plataforma ainda. Uma razão pela qual escolhemos o Beam para a ingestão é que ele nos permitiria combinar fluxos de streaming e de lote e - até certo ponto - alternar entre os dois. Dependendo das necessidades do consumidor de dados, poderíamos oferecer um "produto de dados" que não fosse mais complexo do que o necessário. No nosso caso, micro batching significa armazenar dados como ficheiros Avro no GCS de 10 em 10 minutos. Uma vez por dia, carregamos esses arquivos em tabelas particionadas no BigQuery. Durante o dia, os dados podem ser consultados (com latência de até 10 minutos) a partir de uma tabela externa do BigQuery que aponta para a pasta no GCS onde os arquivos de hoje são colocados; muito parecido com as camadas de lote e velocidade de uma arquitetura lambda.
A vantagem disso é que temos mais liberdade na forma como estruturamos os dados no BigQuery. Se os dados estão constantemente a fluir para as tabelas, é difícil alterar as coisas, por exemplo, adicionar particionamento a uma tabela. (Se voltarmos à analogia da canalização, é como tentar substituir a mangueira da máquina de lavar loiça quando esta está a funcionar e não há forma de desligar o abastecimento de água). Além disso, como estamos a ingerir dias inteiros em tabelas particionadas por dia, obtemos trabalhos idempotentes e, no caso de uma falha algures no pipeline, é muito mais fácil resolver as coisas e voltar a um estado conhecido.
Conclusão
Merece algum crédito se chegou até aqui, mas sinto que apenas arranhei a superfície de (um componente de) nossa configuração de dados. Há mais coisas que gostaria de partilhar. Por exemplo, nossa jornada com o Airflow/Cloud Composer e como ampliamos o uso do dbt na organização. No entanto, espero que isto tenha, pelo menos, dado uma ideia daquilo em que estamos a trabalhar e das razões para escolhermos a nossa solução específica. Se acha que há melhores formas de fazer isto ou se apenas pensa que estas tarefas lhe parecem um desafio interessante, porque não se junta a nós? Estamos constantemente à procura de engenheiros de dados qualificados para fazer crescer a equipa e melhorar as nossas práticas dentro da empresa.
Per Isacson
Diretor de Engenharia de Dados