Baixe o app para aproveitar ainda mais
Prévia do material em texto
Tecnologias de Big Data – Processamento de Dados Massivos Pedro Lehman Toledo 2022 2 Tecnologias de Big Data – Processamento de Dados Massivos Pedro Lehman Toledo © Copyright do Instituto de Gestão e Tecnologia da Informação. Todos os direitos reservados. 3 Sumário Capítulo 1. O cenário de Big Data .............................................................................. 6 Onde acessar os códigos ............................................................................................... 6 Big Data: coleta de dados massivos .............................................................................. 6 Processamento distribuído ......................................................................................... 10 Ecossistema Hadoop ................................................................................................... 14 YARN ........................................................................................................................... 17 Capítulo 2. Introdução ao Apache Spark .................................................................. 20 O que é o Apache Spark? ............................................................................................ 20 Conceitos básicos de uma aplicação do Spark ........................................................... 25 Transformações, ações e Lazy Evaluation .................................................................. 29 Catalyst Optimizer e o Plano de Execução .................................................................. 31 Download e instalação do Apache Spark .................................................................... 36 Capítulo 3. Manipulando dados com Spark: Parte 1 ................................................ 43 DataFrames e Datasets ............................................................................................... 43 Tipos de dados no Spark ............................................................................................. 44 Schema e criação de DataFrames ............................................................................... 47 Leitura e escrita de dados ........................................................................................... 50 Operações básicas com DataFrames .......................................................................... 62 Trabalhando com diferentes tipos de dados .............................................................. 76 Capítulo 4. Manipulando dados com Spark: parte 2................................................ 89 4 Agrupamento e Agregação ......................................................................................... 89 Window Functions ...................................................................................................... 95 Joins........................................................................................................................... 102 User Defined Functions (UDF) .................................................................................. 107 Capítulo 5. Spark SQL .............................................................................................113 Visão Geral do Spark SQL .......................................................................................... 113 Databases e catalog .................................................................................................. 115 Criando Tabelas e Views ........................................................................................... 117 Fazendo queries no Spark SQL .................................................................................. 120 Capítulo 6. Otimizando Aplicações do Spark .........................................................123 Como configurar e escalar o Spark ........................................................................... 123 Persistência de dados na memória ........................................................................... 129 Estratégias de particionamento de dados ................................................................ 131 Reparticionando DataFrames ................................................................................... 133 Escolhendo o melhor tipo de Join ............................................................................. 135 Capítulo 7. Deploy de Aplicações do Spark ............................................................137 Introdução e modos de execução ............................................................................. 137 Desenvolvendo aplicações Spark .............................................................................. 138 Executando o Spark em Clusters .............................................................................. 141 Exemplo Prático: lançando uma aplicação localmente ............................................ 143 Capítulo 8. Spark na Nuvem ...................................................................................148 5 Introdução ao Dataproc e GCS.................................................................................. 148 Criando um Cluster do Dataproc .............................................................................. 156 Acessando dados no GCS .......................................................................................... 165 Acessando os dados com o Spark ............................................................................. 168 Submetendo aplicações no GCP ............................................................................... 171 Capítulo 9. Conclusão .............................................................................................177 Referências………………. ...................................................................................................178 6 Capítulo 1. O cenário de Big Data O objetivo desse curso é capacitar e auxiliar profissionais de dados, em especial engenheiros de dados, no desenvolvimento de pipelines de processamento de dados massivos, de forma com que esses processos sejam rápidos, eficientes, escaláveis e executáveis em ambientes locais e em nuvem. Assim, para que esse objetivo seja atingido, é necessário apresentar alguns conceitos relacionados à manipulação de grandes quantidades de dados. Eles serão o fundamento de uma compreensão adequada das tecnologias mais modernas utilizadas nessa área – consequentemente, possibilitando um olhar crítico e analítico das abordagens utilizadas em cada situação. Nesse primeiro capítulo, será apresentado um pouco do contexto de Big Data, os conceitos básicos de processamento distribuído e o Ecossistema Hadoop, visto que é uma das primeiras ferramentas para o processamento de dados massivos que surgiram. Onde acessar os códigos Todos os recursos utilizados para produzir este material estão disponibilizados no seguinte repositório do Github: https://github.com/pltoledo/igti-dados-massivos Fique à vontade para clonar o repositório e utilizá-lo durante a leitura da apostila e para fixar ainda mais o aprendizado. Big Data: coleta de dados massivos A coleta de dados aparenta ser uma técnica moderna, porém é um método tão antigo quanto a civilização, e fez-se presente durante grande parte da história da https://github.com/pltoledo/igti-dados-massivos 7 humanidade. Práticas como o censo e a coleta de dados demográficos são utilizadas desde os tempos antigos, estão registradas em diversos achados arqueológicos datados de milhares de anos atrás (HAKKERT, 1996). Posteriormente, a coleta e o armazenamento de dados foram preponderantes para diversas descobertas científicas, especialmente no campo da astronomia e biologia. Por fim, com a invenção do computador e o advento da internet, esses processos foram aperfeiçoadosa ponto de possibilitar ao ser humano uma capacidade analítica até então nunca experimentada. Nos dias de hoje, o termo Big Data tem sido muito utilizado, mas muitas vezes de uma maneira um tanto liberal – como normalmente acontece quando um conceito é popularizado, mas não necessariamente entendido. Sua primeira definição formal foi feita pelo Grupo Gartner, em 2001, e a versão mais atual dessa definição é: Big Data são ativos de informação de alto volume, velocidade e variedade, que demandam formas de processamento inovadores e eficientes em termos de custo, proporcionando melhorias na geração de insights, tomada de decisão e automatização de processos. (GARTNER, 2001, tradução nossa) Após esta, outras definições buscaram enriquecer ainda mais esse conceito: Big Data refere-se a conjuntos de dados cujo tamanho é além da capacidade de coleta, armazenamento, administração e análise das típicas ferramentas de banco de dados. (MCKINSEY GLOBAL INSTITUTE, 2011, tradução nossa) (Big Data) É uma prioridade que tem o potencial de mudar profundamente o cenário competitivo na economia globalmente integrada de hoje. Além de fornecer formas inovadoras de encarar desafios de negócio, Big Data e analytics instigam novas formas de 8 transformar processos, organizações, indústrias inteiras e até mesmo a sociedade. (DEMIRKAN H; et al, 2015, tradução nossa) Enquanto Big Data, inicialmente, referia-se às unidades de informação coletadas e processadas massivamente, atualmente esse termo também é comumente utilizado ao tratar-se de técnicas de análise de comportamento, mineração de dados, análise de dados, análise preditiva e muitas outras aplicações finais às quais os dados podem ser ingeridos. Apesar de ser difícil definir precisamente o que é Big Data em termos do tamanho dos dados ou o escopo de técnicas utilizadas, há uma concordância na literatura acerca de algumas características comuns dessa categoria de dados – descritas, normalmente, pelos famosos V’s do Big Data. Inicialmente, eram três V’s, depois cinco, e hoje já existem autores tratando de oito e dez termos-chave para caracterizar esse conceito. No entanto, acredito que os principais são: • Volume: de onde vem o “Big”. A característica fundamental do Big Data é que os dados são gerados em volumes cada vez maiores, de forma que ferramentas mais tradicionais não conseguem processá-los e armazená-los (pelo menos não de forma eficiente). • Variedade: os dados são coletados dos mais diversos contextos, de forma estruturada, semiestruturada e até não estruturada, como áudios, vídeos e imagens, e até mesmo posts em redes sociais. • Velocidade: os dados são gerados de forma cada vez mais rápida. Informações como posts em redes sociais são geradas em tempo real continuamente, e por isso é necessário que elas sejam analisadas e processadas com a mesma rapidez. 9 • Veracidade: é necessário se certificar que os dados gerados não estejam fora de controle. Essa característica se refere à confiabilidade e manutenção dos dados coletados, o que se mostra um desafio cada vez maior com o avanço das tecnologias de coleta e armazenamento de dados. • Valor: é o que amarra todos os outros conceitos e talvez a característica mais importante para um profissional de dados ter em mente. Não adianta conseguir tirar o máximo dos outros V 's se não há valor para o negócio em que os dados estão inseridos. O foco deve estar principalmente na capacidade do consumidor final de extrair informações relevantes e impactantes. Figura 1 – Os 5 V’s do Big Data. Fonte: Edureka. Esse último V está diretamente relacionado com a parte final da definição dada pelo Gartner: após apresentar as principais características da Big Data, é deixado bem claro que a coleta e processamento desses dados possuem um propósito muito bem https://www.edureka.co/blog/what-is-big-data/ 10 definido de gerar valor para um negócio, por meio de ferramentas que possam auxiliar na tomada de decisão, automatização de processos e insights de negócio. Ainda que tenha potencial para ser um agente transformador do negócio, o uso de Big Data demanda atenção e cuidados específicos. Críticos desse conceito apontam conflitos éticos e sociais advindos de como a coleta de dados em massa pode se configurar como uma invasão de privacidade das pessoas, especialmente se tratando de dados que não são completamente anonimizados (BOYD e CRAWFORD, 2012). Isso leva ao dilema do ponto de vista técnico, uma vez que quanto maior o volume de dados, maior o desafio de manutenção e segurança, o que pode gerar um cenário de violação de normas de privacidade estabelecidas ou até mesmo prejudicar a análise desses dados. Acerca da análise, é válido ressaltar que, mesmo com grandes quantidades de dados a disposição, ainda é importante ter o entendimento de como os dados foram gerados e quais suas suposições e vieses, visto que Big Data não é sinônimo para completude de dados, e considerar a origem da amostra analisada é mais importante que o seu tamanho (BOYD e CRAWFORD, 2012). Dessa forma, fica claro que a aplicação de tecnologias de Big Data tem vantagens e desvantagens. Muitas vezes, “dados pequenos” podem ser suficientes para responder todas as questões de negócio com uma complexidade menor e risco menor. A escolha do que usar depende principalmente do objetivo a ser alcançado, e do valor que isso irá gerar ao final do processo. Na próxima seção, será introduzida uma das principais ferramentas utilizadas nos softwares modernos de processamento de grandes volumes de dados. Processamento distribuído No início dos anos 2000, enquanto as tecnologias de coleta e armazenamento tornavam-se cada vez mais baratas e eficientes, os processadores de computadores 11 enfrentavam uma mudança de paradigma: devido a grandes limitações na dissipação de calor, fabricantes de hardware deixaram de desenvolver processadores individuais que fossem mais potentes, e passaram a adotar a estratégia de adicionar núcleos paralelos às CPUs que fossem capazes de serem executadas na mesma velocidade. Esse cenário foi propício para o surgimento de ideias que fundamentaram todas as tecnologias de processamento de dados massivos que surgiram posteriormente. Aliado a isso, em 2005 o Google anunciou a criação do Google File System (GFS), MapReduce e Bigtable, tecnologias que tinham como objetivo solucionar o problema de armazenamento e computação de dados que a empresa enfrentava, visto que os bancos de dados relacionais clássicos não eram capazes de operar com a escala em que seu popular mecanismo de pesquisa estava sendo desenvolvido. Enquanto o GFS e o Bigtable providenciaram armazenamento distribuído, escalável e tolerante a falhas de dados em um cluster, o MapReduce introduziu um novo paradigma de programação em paralelo para o processamento de dados distribuídos no GFS e no Bigtable. Esse foi o primeiro software de processamento distribuído a ser desenvolvido, e, apesar de ter sido um trabalho privado em sua maior parte, foram publicados artigos que serviram de inspiração para diversas outras ferramentas mais modernas, como o Hadoop MapReduce e o Apache Spark. Antes de continuar, é necessário entender o conceito de computação paralela. Na computação paralela, a execução de uma tarefa é dividida em tarefas menores, que são designadas aos processadores do computador, de forma que elas possam ser realizadas de forma simultânea e independente, potencialmente atingindo maior eficiência e rapidez. O desafio desse tipo de estratégia encontra-se na maneira com que as tarefas são quebradas e na administração dessas tarefas menores ao longo da execução, desta forma é necessário atentar-se a como isso é feito em cada aplicação. 12 Na Figura 2 é possível ver um esquemade comparação entre a computação serial e a computação em paralelo. Figura 2 – Computação Serial e Paralela. Fonte: teldat.com. A computação distribuída segue os mesmos princípios da computação em paralelo, com a diferença de que a paralelização é feita a partir de uma rede de computadores interligados (também chamado de cluster), ao invés de ser somente a nível de processadores de uma máquina. Assim, não só as tarefas são distribuídas ao longo dos computadores (ou nós), mas os dados também são divididos e enviados via internet para cada um deles. Essa arquitetura oferece: • Escalabilidade: para aumentar o poder computacional basta inserir um novo nó no cluster, o que muitas vezes é mais barato do que realizar um upgrade no hardware dos nós já incorporados. https://www.teldat.com/blog/en/parallel-computing-bit-instruction-task-level-parallelism-multicore-computers/ 13 • Tolerância a falhas: é possível desenhar a arquitetura distribuída de forma que o cluster seja capaz de incorporar tarefas à outras máquinas, caso o processamento em um nó seja interrompido, trazendo mais confiabilidade ao sistema. Tal como na computação em paralelo, o processamento distribuído de dados pode se mostrar desafiador na forma de gerenciamento das partições de dados e de tarefas, uma vez que uma pequena alteração nos dados impacta todo o cluster. Além disso, devido ao fato de os nós muitas vezes estarem em locais físicos diferentes e de que os dados são transferidos entre eles por meio da internet, a segurança dos dados mostra-se um ponto de atenção frequente. Figura 3 – Computação Distribuída. Fonte: Khan Academy. Portanto, caso a tarefa a ser executada seja grande ao ponto de seu processamento em um único nó não ser rápido e/ou eficiente o bastante, o processamento distribuído se torna a melhor alternativa. https://www.khanacademy.org/computing/ap-computer-science-principles/algorithms-101/x2d2f703b37b450a3:parallel-and-distributed-computing/a/distributed-computing 14 Ecossistema Hadoop A partir do trabalho desenvolvido pelo Google por meio do GFS e MR, o Yahoo!, que também enfrentava problemas relacionados a processamento de grandes quantidades de dados em um mecanismo de pesquisa, criou o projeto Hadoop como a sua própria abordagem para solucioná-los. As ferramentas desenvolvidas foram bem- sucedidas, e em 2008, o Hadoop foi liberado como um projeto open source para a Apache Software Foundation, a partir da qual foi desenvolvido um framework completo de armazenamento e processamento distribuído, que está entre as mais tradicionais ferramentas de Big Data. A capacidade de trabalhar com dados massivos – estruturados e não estruturados, escalabilidade e o fato de ser gratuito, contribuíram para sua rápida popularização no mercado. A biblioteca principal conta com os seguintes módulos: • Hadoop Common: módulo que contém os utilitários comuns a todos os outros módulos do Hadoop. • Hadoop Distributed File System (HDFS): módulo que contém as funcionalidades relacionadas ao armazenamento distribuído de dados. • Hadoop MapReduce: módulo que oferece serviços de computação distribuída no ambiente Hadoop. • Hadoop YARN: módulo que realiza o gerenciamento de recursos e divisão de tarefas dentro do ambiente distribuído do Hadoop. Além desses módulos, o framework conta com tecnologias disponíveis para outros propósitos, como: 15 • Apache HIVE: banco de dados que utiliza uma interface de SQL no ambiente distribuído. • Apache Mahout: módulo para a criação de aplicações de machine learning. • Apache Ambari: serviços de provisionamento, gerenciamento e monitoramento de clusters no Apache Hadoop. • Apache Oozie: serviços de agendamento de jobs. • Apache Zookeeper: módulo para coordenar os serviços do Ecossistema Hadoop. Figura 4 – Computação Distribuída. Fonte: Savvycom. https://www.edureka.co/blog/hadoop-ecosystem#apache_hive https://www.edureka.co/blog/hadoop-ecosystem#apache_mahout https://www.edureka.co/blog/hadoop-ecosystem#apache_ambari https://www.edureka.co/blog/hadoop-ecosystem#apache_oozie https://www.edureka.co/blog/hadoop-ecosystem#apache_zookeeper https://savvycomsoftware.com/what-you-need-to-know-about-hadoop-and-its-ecosystem/ 16 Os componentes apresentados foram concebidos para serem integráveis dentro do Ecossistema Hadoop, todavia, também é possível utilizá-los em conjunto com outras tecnologias externas, dependendo da necessidade do usuário. Hadoop Distributed File System (HDFS) O HDFS é o sistema de armazenamento de dados do Hadoop e provavelmente o módulo mais importante de todo o ecossistema. Por meio dele, grandes quantidades de dados de diferentes fontes, tipos e níveis de estruturação, podem ser armazenados de maneira distribuída através de um cluster de computadores, em contraste com o armazenamento de um único nó. Apesar disso, a ferramenta cria um nível de abstração de forma que o usuário consiga enxergar somente uma grande unidade de armazenamento. A arquitetura do HDFS é definida em termos de dois componentes: o NameNode e o DataNode, que definem uma relação de mestre/escravos entre os computadores do cluster. • NameNode: é o gerenciador de recursos e tarefas dentro da rede de computadores, que não armazena os dados em si, mas guarda informações relevantes sobre eles (metadados). É muito importante manter o NameNode tolerante a falhas, realizando backups periódicos do estado do sistema e dos metadados, já que uma falha nesse nó pode comprometer todos os dados HDFS. • DataNode: são os nós que de fato armazenam dados e atendem as demandas de escrita e leitura dos usuários feitas por meio do NameNode. Arquivos armazenados são particionados em blocos de dados (tipicamente de 128GB) e distribuídos ao longo do cluster. Esses blocos também são replicados e enviados para diferentes nós, segundo um fator de replicação 17 pré-estabelecido, de forma que se uma máquina falhar o dado não é perdido. Um ponto importante é que os DataNodes são commodity hardwares, isto é, infraestrutura de baixo custo e fácil aquisição, o que torna o HDFS acessível financeiramente e facilmente escalável. YARN O YARN (Yet Another Resource Negotiator) é o dispositivo de gerenciamento de recursos do Ecossistema Hadoop, responsável por controlar os clusters e aplicações em ambientes distribuídos. Por meio do YARN, é possível criar jobs de processamento de dados armazenados no HDFS ou na cloud (como S3), utilizando ferramentas como MapReduce ou até mesmo softwares independentes. Os principais componentes do YARN são: • ResourceManager: é o dispositivo responsável por gerenciar os cluster, agendar a alocação de recursos nos nós e aceitar ou rejeitar aplicações submetidas pelo cliente. • NodeManager: presente em todos os nós do cluster, o NodeManager gerencia os recursos necessários para a execução dos jobs em cada máquina específica por meio da criação e destruição de containers. Os containers são uma coleção de recursos físicos em um nó, por meio da qual as tarefas são executadas. • ApplicationMaster: presente em todos os nós do cluster, é responsável por informar o ResourceManager a respeito do status de execução da aplicação e da necessidade de recursos. 18 As principais vantagens de se utilizar o YARN são sua escalabilidade, alocação dinâmica de clusters, compatibilidade com versões anteriores do Hadoop e integração com diferentes ferramentas de processamento para diferentes cargas de trabalho, sejam elas processamento em batch, streaming ou consultas interativas. Por causa disso, o YARN é hoje um dos gerenciadores de recursos mais utilizados em ambientes distribuídos. MapReduce O Hadoop MapReduce, assim como a implementação anterior do Google, é a ferramenta utilizada no processamentoparalelo e distribuído de volumes massivos de dados. O MapReduce funciona em um paradigma baseado na definição dos dados em pares chave/valor e na realização de duas operações distintas, Map e Reduce, que são responsáveis por definir os pares e realizar operações de agregação, respectivamente. Ou seja, inicialmente são definidos inputs das operações a partir da criação de vários pares de chave e valor, que então são reduzidos a um conjunto menor de pares por meio de operações de agregação. Há ainda um processo de ordenação e agrupamento das chaves, que acontece entre as fases de Map e Reduce, que serve como uma forma de otimizar as operações de agregação. Abaixo é possível ver um esquema do processo completo: 19 Figura 5 – MapReduce Workflow. Fonte: Yahoo!. Apesar de ser capaz de suportar cargas de trabalho pesadas em diferentes tipos e estruturas de dados, o MapReduce apresenta algumas desvantagens. Damji et al. (2020), argumentam que o framework apresenta uma complexidade operacional que o torna difícil de gerenciar, além do modelo de programação disponível na API ser verboso e requerer muito código preparatório, o que torna a programação difícil e sujeita a erros. Ainda, como os resultados intermediários de operações no MapReduce são escritos em disco, um job que necessita de diversas tarefas acaba tendo uma performance baixa devido às constantes operações de I/O, podendo levar horas e até mesmo dias para ser concluído. https://web.archive.org/web/20180310080336if_/https:/developer.yahoo.com/hadoop/tutorial/module4.html#functional 20 Capítulo 2. Introdução ao Apache Spark Com o objetivo de criar um dispositivo de processamento distribuído de dados massivos que fosse mais eficiente, mais simples e mais fácil de usar e aprender que o Hadoop MapReduce (MR), pesquisadores da UC Berkeley começaram um projeto chamado Spark, em 2009. O projeto agiu principalmente sob a ótica de pegar ideias do MR, mas melhorá-las de forma que o Spark pudesse ser altamente tolerante a falhas e extremamente paralelo, com suporte ao armazenamento em memória de computações intermediárias entre operações de map e reduce, além de estar disponível em múltiplas linguagens como um modelo de programação unificado (Damji et al. 2018). O resultado foi uma ferramenta que, nos seus estágios iniciais, era 10 a 20 vezes mais rápida que o Hadoop MapReduce para algumas cargas de trabalho. Hoje, na Apache Software Foundation, o Spark chega a ser até 100 vezes mais rápido. Neste capítulo serão apresentados os principais conceitos do Apache Spark, incluindo os principais componentes de uma aplicação, aspectos importantes do mecanismo de execução e sua forma de funcionamento. Ao final, serão apresentadas as formas de instalação da ferramenta para máquinas Windows e Linux. O que é o Apache Spark? “Rápido como um raio, o Apache Spark é um mecanismo de analytics unificado para Big Data e machine learning. ” (Databricks, tradução nossa) O Apache Spark é um framework 100% open source de processamento distribuído e computação em clusters, projetado especialmente para trabalhar com quantidades massivas de dados. O framework dispõe de módulos para diferentes objetivos, como processamento em streaming e computação de grafos, mas que foram desenvolvidos seguindo um mesmo design. Damji et al. (2018) afirmam que a filosofia https://databricks.com/spark/about 21 do Spark gira em torno de quatro características principais: velocidade, usabilidade, modularidade e extensibilidade. Velocidade O Spark busca entregar velocidade no processamento de dados desde a sua concepção, e o seu design reflete claramente esse objetivo. As principais características que proporcionam velocidade à ferramenta são: • Capacidade de armazenar computações intermediárias na memória, limitando as operações de I/O. • Utilização de um Directed Acyclic Graph (DAG) para programar as tarefas a serem executadas na aplicação, proporcionando um agendamento otimizado e eficiente das tarefas. • Otimização dos planos lógicos e físicos de execução, por meio do Catalyst Optimizer, de forma a sempre executar as cargas de trabalho da forma mais otimizada possível. • Utilização de lazy evaluation permite que o Spark armazene todas as operações sendo realizadas, o que permite fazer uso mais eficiente da memória e replanejar a ordem das operações. Abaixo, é possível visualizar benchmarks do Daytona GraySort Contest, um desafio de processamento em que era necessário ordenar 100 TB de dados. A tabela aponta que o Apache Spark realizou a operação cerca de 3x mais rápido e utilizando 10x menos computadores do que o Hadoop MapReduce, instaurando também um novo recorde mundial para a operação. Também é possível ver o benchmark dessa operação de ordenação aplicado a um conjunto de dados de 1 PB. 22 Figura 6 – Daytona GraySort Benchmarks. Fonte: Databricks. Simplicidade Todas as operações realizadas no Spark são realizadas sob uma abstração de dados chamada Resilient Distributed Dataset (RDD), que representa uma coleção imutável e particionada de registros. No entanto, esse é o nível mais baixo de abstração dos dados, e nas APIs mais recentes. No lugar de operar diretamente nos RDDs, as operações são realizadas em DataFrames e/ou Datasets, que são construções já conhecidas no contexto de análise e processamento de dados, em que estes são estruturados em linhas e colunas. Além disso, as operações implementadas seguem uma lógica de programação simples (herdada do SQL, como veremos mais à frente), o que https://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html 23 proporciona facilidade de aprendizado tanto para profissionais veteranos, quanto para iniciantes na área. Modularidade Como já foi dito, o framework é dividido em componentes distintos, mas que atuam sobre um mecanismo unificado disponível para diferentes linguagens de programação: Figura 7 – Ecossistema do Spark. Fonte: Databricks. Apesar dessa diversidade de linguagens, o modelo de programação é o mesmo e o usuário deve encontrar pequenas diferenças entre uma linguagem e outra. Essa padronização faz parte do objetivo do Spark de ser um mecanismo completo, mas ainda assim unificado, de processamento de dados massivos. Os módulos do framework são divididos conforme seu objetivo principal: https://databricks.com/spark/about 24 • Spark SQL: módulo direcionado para o processamento de dados estruturados de forma tabular. Oferece um mecanismo de consultas SQL interativas num ambiente distribuído. Pode ser integrado com o restante do ecossistema para construir pipelines completas. • Structured Streaming: módulo de streaming, que permite a criação de aplicações para o processamento em tempo real que suportam fluxos constantes de grandes quantidades de dados. • Spark ML: módulo de machine learning, que dispõe de uma variedade de algoritmos implementados e de ferramentas para pré-processamento, validação e construção de pipelines. Com esse módulo, é possível desenvolver modelos escaláveis e robustos. • GraphX: módulo para computação de grafos, que facilita a criação, transformação e análise de grafos em escala. Assim, é possível utilizar o Spark para toda a pipeline de dados, sem necessidade de usar bibliotecas distintas para realizar cargas de trabalho diferentes. Extensibilidade O Spark é um framework de computação de dados distribuídos e, diferente do Apache Hadoop, não se preocupa em fornecer também um dispositivo de armazenamento, o que permite ao Spark se conectar com muitas fontes de dados, como Apache Hadoop, Apache Cassandra, Apache HBase, MongoDB, Apache Hive, RDBMSs e muitos outros. Além disso, o Spark consegue conectar-se com serviços dearmazenamento de dados na nuvem, como Amazon S3 e Azure Storage. 25 Figura 8 – Conectores do Spark. Fonte: Learning Spark (Cap. 1, p. 6). Conceitos básicos de uma aplicação do Spark A arquitetura distribuída do Spark é dividida em dois componentes principais: o driver e os executores. Ao submeter uma aplicação, o driver irá executar a função principal e ficará responsável por manter informações sobre a aplicação, responder à inputs do programa ou usuário e analisar, distribuir e agendar tarefas nos executores. No contexto de computação em clusters, o driver também se comunica com o gerenciador de clusters e requisita recursos físicos para os nós (workers). Assim, esse componente é essencial 26 para a aplicação, uma vez que armazena todas as informações relevantes e dá controle ao usuário. Os executores são responsáveis por de fato realizar as tarefas designadas a eles pelo driver. Assim, eles têm apenas a função de executar código e reportar sobre o status da computação de volta para o driver. Apesar de sua arquitetura ser própria para deploy em clusters, o Spark pode ser executado localmente em uma só máquina e, nesse caso, a paralelização e distribuição das tarefas ocorre em termos das threads do processador do computador. Abaixo podemos ver a arquitetura de uma aplicação do Spark: Figura 9 – Aplicação do Spark. Fonte: Learning Spark (Cap. 1, p. 10) Na imagem acima, podemos observar um componente crucial da aplicação: a SparkSession. Esse objeto permite que o usuário tenha acesso unificado a todas as funcionalidades do Spark, como a leitura e criação de DataFrames, realização de queries 27 do SQL, configurações da aplicação e acesso ao catálogo de metadados. Para mais, diferentemente do que acontecia em versões anteriores, uma mesma SparkSession pode ser utilizada para trabalhar como todos os módulos, não havendo a necessidade de instanciar objetos diferentes para trabalhar com o Spark SQL e o ML, por exemplo. Figura 10 – Criando uma SparkSession. 28 É interessante observar como o modelo de programação é coerente entre as duas linguagens, o que torna fácil a transição de uma linguagem de programação para a outra e contribui para o tema de unificação e simplicidade do framework. O Spark é escrito primariamente em Scala e, por isso, essa é considerada a linguagem “padrão” do framework, mesmo assim quase todas as funcionalidades também estão disponíveis para as demais linguagens. Além disso, o código do Spark é executado por Java Virtual Machines (JVMs) e, no caso do Python e R, ainda que o código deva ser traduzido antes da execução, a performance permanece relativamente a mesma. Uma vez submetida a aplicação, as operações realizadas sobre os dados são quebradas em várias pequenas tarefas a serem realizadas. O Spark aplica esse princípio por meio da divisão da aplicação em um ou mais jobs, e da posterior transformação de cada um deles em um DAG, que pode ser visto essencialmente como o plano de execução do Spark. Cada um dos DAGs gerados é formado por conjuntos de tarefas chamadas stages, que são divididas baseado na capacidade de serem executadas de forma serial ou paralela. Normalmente, um stage é definido pela necessidade de troca de dados entre os executores. Por fim, as tarefas que compõe os stages são chamadas de tasks, e constituem a menor unidade de processamento da arquitetura do Spark. Figura 11 - Divisão das Tarefas da Aplicação Fonte: Learning Spark (Cap. 2, p. 28) 29 Transformações, ações e Lazy Evaluation As operações que podem ser aplicadas sobre os dados no Spark podem ser classificadas em dois tipos: transformações e ações. Porém, para entender a diferença entre elas é preciso antes compreender o conceito de lazy evaluation. Lazy evaluation, de forma geral, é o princípio de que um conjunto de operações não precisa ser executado até que seja estritamente necessário, isto é, o histórico de operações é armazenado para que seja executado somente quando o usuário precisar visualizar ou retornar o resultado das computações. No Spark, todas as transformações são executadas de forma lazy, enquanto as ações são as operações que servem de gatilho para a realização efetiva do processamento. Isso permite que: • Sejam realizadas otimizações no plano de execução da consulta, como a ordem em que as operações serão executadas e a divisão da tarefa em um ou mais stages. • Evitar o uso desnecessário de memória, uma vez que não há necessidade de carregar todos os dados na memória para manipulá-los. Assim, as transformações são operações avaliadas de forma lazy, que transformam um DataFrame do Spark em um novo DataFrame, preservando os dados originais e garantindo a propriedade de imutabilidade dos dados. Essa propriedade, combinada com o histórico de operações inerente ao lazy evaluation, permite que o Spark alcance um alto nível de tolerância às falhas, pois basta recorrer ao histórico de transformações para chegar ao estado original no caso de algum problema. Alguns exemplos de transformações: • select() • filter() 30 • withColumn() As ações são operações que ativam o histórico de transformações e retornam o DataFrame resultante. Elas são caracterizadas pela necessidade dos dados serem exibidos de alguma forma, ou de que o resultado explícito de algum cálculo seja retornado. Alguns exemplos de ações são: • count() • show() • toPandas() • collect() • approxQuantile() • save() (em conjunto com .write, para escrever dados) Por ser uma ferramenta de processamento distribuído, frequentemente os executores têm acesso a todos os dados ao mesmo tempo. Assim, é importante entender também o conceito de transformações narrow e wide. As transformações narrow são aquelas em que não é necessário que os dados sejam movimentados entre os executores, no movimento chamado de shuffle. Logo, uma operação narrow é aquela que não necessita de dados de outras partições para ser realizada, basta somente o que está à disposição do executor. Já as transformações wide são aquelas em que é necessário haver um shuffle de dados para que os cálculos sejam feitos. Um exemplo bastante comum é a realização de joins, em que os executores precisam trocar informações sobre as chaves presentes em suas partições. 31 Figura 12 – Transformações Narrow e Wide. Fonte: Learning Spark (Cap. 2, p. 31). O shuffle de dados é um dos principais gargalos de performance em aplicações do Spark, e, portanto, um tópico bastante importante na discussão de métodos de otimização de processamento. Catalyst Optimizer e o Plano de Execução Falamos muito sobre a capacidade do Spark de otimizar consultas e operações em bancos de dados. Nesta seção, será apresentado o dispositivo por trás disso. O Catalyst Optimizer é o mecanismo do Spark que transforma uma query escrita pelo SQL ou por meio das APIs de DataFrames em um plano de execução. Ele consiste de quatro fases: 1. Análise: nessa fase, o otimizador utiliza metadados – nome de colunas, tipos de dados, funções etc. – dos dados sendo utilizados para interpretar, validar e complementar o código enviado pelo usuário, uma vez que o código pode estar correto sintaticamente, mas as colunas referenciadas nas operações podem não existir (o código nesse estado é chamado de Unresolved Logical Plan). Se o código passar pela análise, ela vira input da fase seguinte. 32 2. Planejamento Lógico: nessa fase, o Catalyst recebe a query do usuário e identifica formas de otimizar o processo, principalmente movendo a ordem das operações, ainda abstraindo as transformações a serem aplicadas. O plano otimizado é então input do planejamento físico. 3. Planejamento Físico: essa é a etapa em que o plano lógico escolhido étransformado em diversas opções de planos físicos, que dizem respeito a como o plano lógico será realmente executado. Então, essas opções são comparadas utilizando um modelo de custo para escolher aquela mais eficiente. 4. Geração de Código: por fim, com a fase de planejamento terminada, o plano final é transformado em operações de RDD, no nível mais baixo de abstração do Spark. O código é então executado pelas JVMs dos executores. Figura 13 – Catalyst Optimizer. Fonte: Learning Spark (Cap. 3, p. 78) 33 Durante o desenvolvimento, é possível verificar o plano de execução do Spark para as transformações realizadas em um DataFrame, usando o método .explain(). Observe o exemplo abaixo utilizando o banco de dados do IMDB, em que são selecionados os 15 títulos (filmes, séries, documentários etc.) com a melhor avaliação no site: Figura 14 – Acessando o Plano de Execução. Ao executar a última linha é imprimido no console o plano físico de execução: https://www.imdb.com/interfaces/ 34 Figura 15 – Plano Físico. 35 Uma análise cuidadosa do plano mostra quais operações serão realizadas durante a execução, permitindo que o usuário identifique possíveis gargalos de processamento e possíveis melhorias. Essa é uma parte importante da otimização do Spark, e será abordado mais à frente. 36 Também é possível analisar o plano lógico de execução trocando a chamada ao método para explain(True), mas via de regra o plano físico tem mais aplicações práticas do que o plano lógico. Download e instalação do Apache Spark A instalação do Apache Spark é feita seguindo alguns passos simples, que são comuns às instalações em máquinas Linux e Windows. No entanto, a instalação no Windows requer alguns passos a mais, como será demonstrado abaixo. Obs.: esse curso irá abordar principalmente o desenvolvimento com pyspark e, por isso, é necessário que o Python já esteja instalado na máquina antes de começar a instalação do Spark. Caso não tenha o Python instalado, uma das distribuições mais comuns é a da Anaconda. Passo 1: Instalação do Java 8 JDK Verifique se a máquina tem o Java Development Kit (JDK) 8 ou maior instalado. Isso pode ser feito abrindo um console e digitando o seguinte na linha de comando: Figura 16 – Verificando a instalação do Java. Caso o Java não esteja instalado ou a versão instalada seja anterior a 1.8, será necessário acessar o site oficial do Java e realizar o download e instalação. Por enquanto, o Spark só é compatível com as versões 8 e 11. https://www.anaconda.com/ https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html 37 Uma vez verificada essa dependência, é necessário garantir que as variáveis de ambiente estejam configuradas apropriadamente. Considerando a versão 8, as variáveis de ambiente do Java são: • JAVA_HOME = C:\Program FIles\Java \jdk1.8.0_201 • Adicionar à variável PATH = C:\Program Files\Java \jdk1.8.0_201\bin Feito isso, pode-se avançar ao próximo passo. Passo 2: Download do Apache Spark Acesse a página de download oficial do Spark e selecione qual a versão do Spark e do Hadoop que se deseja fazer a instalação: Figura 17 – Download do Apache Spark. Fonte: Apache Spark. Uma vez selecionadas as versões, basta clicar no link do tópico “Download Spark” para realizar o download. Feito isso, extraia os arquivos comprimidos na pasta de sua preferência. Obs.: é recomendado que o Spark esteja em um diretório em que o nome das pastas não possuam caracteres especiais e/ou espaços para evitar conflitos de referenciamento. Uma alternativa comum é salvar no disco local (Ex: C:/spark). https://spark.apache.org/downloads.html https://spark.apache.org/downloads.html 38 Semelhantemente à instalação do Java, é necessário criar algumas variáveis de ambiente após a extração dos arquivos do Spark. As variáveis são: • SPARK_HOME = C:/spark/spark-3.1.2-bin-hadoop2.7 • HADOOP_HOME = C:/spark/spark-3.1.2-bin-hadoop2.7 • Adicionar à variável PATH = C:/spark/spark-3.1.2-bin-hadoop2.7/bin Passo 3: Download dos binários do Windows (somente para Windows) Na instalação no Windows, é necessário realizar o download de arquivos binários extras do Hadoop, disponíveis neste repositório, mantido por um dos desenvolvedores do Hadoop. Basta acessar a pasta referente à versão do Hadoop contida na instalação do Spark e fazer o download dos arquivos winutils.exe e hadoop.dll. Figura 18 – Download do winutils.exe. Fonte: github.com/cdarlint/winutils. https://github.com/steveloughran/winutils https://github.com/cdarlint/winutils 39 Feito isso, salve os arquivos baixados no diretório SPARK_HOME/bin, que, considerando o exemplo anterior, seria C:/spark/spark-3.1.2-bin-hadoop2.7/bin. Por fim, existe um truque que não é obrigatório, mas que pode ser útil para resolver problemas de alguns usuários. As etapas são as seguintes: 1. Crie a pasta C:\tmp\hive. 2. Abra o cmd como administrador e execute os seguintes comandos: 1. winutils.exe chmod -R 777 C:\tmp\hive 2. winutils.exe ls -F C:\tmp\hive O output deve ser algo parecido com isso: drwxrwxrwx|1|LAPTOP-... Passo 4: Testar a instalação Abra um aplicativo de linha de comando e execute o seguinte: Figura 19 – Abrindo o Spark Shell. 40 Observe acima que o shell padrão do Spark é em Scala. Para abrir o shell interativo em Python, é necessário que o aplicativo suporte a execução do Python, mas, uma vez que isso esteja garantido, basta executar: Figura 20 – Abrindo o PySpark Shell. É possível visualizar que ambos shells estão funcionando com a mesma versão do Spark. Para finalizar os testes, o output do seguinte comando deve ser: Figura 21 – Testando o Spark. As shells do Spark estão disponíveis como um ambiente de testes interativo para os usuários desenvolverem protótipos e aprenderem as APIs do programa. Quando 41 abertas dessa forma, as shells executam o Spark em modo local e uma SparkSession é gerada automaticamente para o usuário. Bônus: Executando o Spark em Jupyter Notebooks Apesar de as shells do Spark fornecerem um bom ambiente de desenvolvimento interativo e aprendizado, os Jupyter Notebooks são ferramentas mais completas e úteis para esse mesmo propósito. Assim, para executar o Spark em um notebook, é necessário instalar o pacote findspark por meio do seguinte comando: conda install -c conda-forge findspark Uma vez instalada a biblioteca, abra um Jupyter Notebook e execute o seguinte: Figura 22 – Testando o PySpark no Jupyter Notebook. : O output deve ser o mesmo da shell do pyspark. 42 Com isso, está finalizada a introdução ao Spark, seus principais conceitos, forma de funcionamento, instalação e configuração inicial. Nos próximos capítulos serão apresentadas as principais operações de manipulação de dados com a ferramenta. 43 Capítulo 3. Manipulando dados com Spark: Parte 1 Neste capítulo será abordado o uso prático da API de DataFrames, a principal forma de manipulação de dados com o Spark. Serão apresentadas as operações mais básicas e comuns em um processo de ETL e seu funcionamento no contexto do Spark, de forma que, ao final do capítulo, o leitor seja capaz de relacionar e identificar processos análogos em outras ferramentas, como o SQL e a bibliotecas pandas do Python. DataFrames e Datasets Antes de apresentar de fato as operações, é preciso entender um pouco da API que será utilizada. O Spark implementa duas construções ligeiramente diferentes para a manipulação de dados estruturados: DataFrame e Datasets. Segundo Zaharia e Chambers (2018), ambos consistem em coleções de dados distribuídos em forma de tabelas, com linhas e colunas bem definidas, de forma similar a tabelas embancos de dados relacionais. Assim, cada coluna deve ter o mesmo número de linhas que todas as outras colunas, e seus valores devem pertencer a um mesmo tipo, apesar de ser possível utilizar o valor nulo para indicar ausência de valores. Os DataFrames e Datasets são abstrações feitas sobre os RDDs, o que faz com que eles herdem todas as características vistas anteriormente, como a imutabilidade dos dados e a lazy evaluation. Por isso, essas estruturas podem ser vistas como uma lista de operações, especificadas na forma das transformações, a serem realizadas sobre os dados representados por linhas e colunas. A diferença entre DataFrames e Datasets está principalmente na forma como são construídos. Os Datasets são fortemente tipados, isto é, os tipos das colunas são verificados em tempo de compilação, enquanto, para os DataFrames, isso ocorre em tempo de execução. Os Datasets estão disponíveis somente para as linguagens Java e 44 Scala (uma vez que R e Pyhton são linguagens com tipagem dinâmica), mas os DataFrames também estão disponíveis em Scala. Dessa forma, os Datasets devem ser criados utilizando uma case class (Scala) ou JavaBeans (Java). Abaixo um exemplo da criação de um Dataset em Scala: Figura 23 – Criando um Dataset em Scala. Fonte: Exemplo retirado do Learning Spark (Cap.6, p. 158.) Mesmo com as diferenças em sua construção, a manipulação de dados com DataFrames e Datasets é a mesma. No geral, deve-se usar Datasets quando é necessário garantir que os tipos serão checados em tempo de compilação, e nas demais situações fica a critério do usuário escolher qual construção usar. No entanto, vale a pena ressaltar que os DataFrames têm uma pequena vantagem em termos de otimização de execução. Tipos de dados no Spark Por ter implementações em diferentes linguagens, o Spark interage com os tipos de dados da linguagem em que está sendo utilizado, mas a ferramenta também 45 implementa tipos internos de dados que se relacionam com esses tipos nativos. O Spark dispõe de duas categorias de tipos: básicos e complexos – e há uma gama de funções disponíveis para operar sob cada um deles. Tipicamente, os tipos complexos se diferem dos tipos básicos por aceitarem estruturas aninhadas e de datas. Os tipos básicos são: Figura 24 – Tipos básicos em Python. Fonte: Learning Spark (Cap. 3, p. 49). E os tipos complexos: Figura 25 – Tipos Complexos em Python. Fonte: Learning Spark (Cap. 3, p. 50) 46 Todas as APIs possuem os mesmos tipos internos, ainda que eles possam ser mapeados para diferentes tipos nativos, dependendo da linguagem. Os tipos podem ser acessados da seguinte forma: Figura 26 – Acessando os Tipos. A principal função dos tipos acessados dessa forma é auxiliar na definição de um schema utilizado na criação de DataFrames. Eles também podem ser utilizados para mudar uma coluna de um tipo para o outro ao utilizar o método cast(), mas nesse caso é mais vantajoso especificar os tipos como strings, para evitar um código muito verboso. Exemplo: 47 Figura 27 – Mudando o tipo da coluna. Todos os tipos podem ser especificados com strings, mas, para os tipos complexos, muitas vezes é melhor utilizar a API disponível. Schema e criação de DataFrames Um schema no Spark é uma especificação de tipos das colunas de um DataFrame. Eles são usados na leitura de dados externos e criação de DataFrames e podem ser passados diretamente ao Spark ou podem ser inferidos. Passar um schema na leitura traz benefícios interessantes, como: • Evita que o Spark faça inferência de tipos, o que é custoso e demorado dependendo do tamanho do arquivo, além de propenso a erros. • Permite que o usuário identifique erros nos dados logo na leitura, caso os dados não sigam o schema especificado. Existem duas formas de criar um schema: por meio da API de tipos apresentada anteriormente ou por meio de uma string DDL (Data Definition Language), que fica muito mais simples e fácil de ler (Damji et al, 2020). Usando a API de tipos: 48 Figura 28 – Definindo o Schema Programaticamente. Usando o string DDL no mesmo exemplo anterior, é possível ver que o código fica muito mais limpo e fácil de ler: schema = “nome STRING, id INT” É possível verificar o schema de um DataFrame a qualquer momento utilizando o atributo schema ou o método printSchema(), sendo que o segundo é preferível por exibir as informações de uma forma mais legível e formatada. Como dito anteriormente, os schemas são utilizados na definição de DataFrames, seja por leitura externa ou na criação a partir de dados do ambiente. É possível criar DataFrames utilizando a SparkSession e o método .createDataFrame(). Para criar um DataFrame, são necessários os dados e um schema. O schema pode ser passado das duas formas apresentadas ou então como uma lista com os nomes das colunas e, nesse caso, o Spark fará a inferência dos tipos das colunas. De forma geral, é melhor evitar que o Spark faça inferência dos tipos, pois é um processo suscetível a erros e que pode ser bastante demorado. Já os dados são passados como uma lista de 49 objetos iteráveis, normalmente tuplas ou listas, em que cada iterável representa uma linha e cada elemento dentro dele representa uma coluna. Dessa forma, é possível enxergar que essa lista é uma coletânea de indivíduos, e que cada indivíduo é uma coletânea de variáveis. Os dados também podem ser passados como um RDD ou pandas.DataFrame. Figura 29 – Criando um DataFrame. Existe ainda uma outra forma de gerar DataFrames no Spark, mas sem muitas opções de customização. O método SparkSession.range() pode ser utilizado para gerar um DataFrame com uma única coluna chamada id do tipo LongType, que consiste basicamente de uma sequência de números. Os intervalos dessa sequência podem ser definidos e são argumentos do método. 50 Leitura e escrita de dados Nesta seção serão apresentados os dispositivos de leitura e escrita de dados nativos do Apache Spark, assim como exemplos com dados de diferentes formatos e as principais configurações utilizadas. Para realizar essas operações, é necessário acessar os objetos DataFrameReader e DataFrameWriter, que estão disponíveis na SparkSession como uma forma de acessar as funcionalidades de leitura e escrita, respectivamente. DataFrameReader Existem duas maneiras de especificar a operação de leitura: Figura 30 – Formas de leitura. Observe que o DataFrameReader é acessado pelo atributo SparkSession.read. As duas formas realizam a mesma tarefa de ler dados no formato parquet, mas a forma como a primeira está escrita não permite parametrizar a leitura em termos da fonte de dados utilizada. Isso significa que em uma aplicação de processamento, caso o formato dos arquivos sendo lidos mudassem, seria necessário mudar diretamente o código, enquanto se a segunda forma fosse utilizada, bastaria mudar um dos argumentos da função. Assim, essa é a forma preferível e mais utilizada. 51 Além de especificar o formato dos dados sendo lidos, é possível passar o schema, o caminho dos dados e as configurações extras de leitura, que são passadas pelo método .option() – conforme discutido anteriormente. Os argumentos desse método devem ser um único par de chave/valor, que denotam uma única opção de leitura sendo configurada, e ambos os argumentos devem estar no formato de strings. Esse método é bastante utilizado ao trabalhar com dados originados de arquivos JSON ou CSV, uma vez que existem diversas formas de configurar a leitura de arquivos nesses formatos. Também é possível usar .options(), uma alternativa que permite especificar todas as configurações empregadas em uma mesma chamada: Figura 31 – Usando o método options.52 Obs.: usar o dicionário como no exemplo acima só funciona em Python, pois é uma característica nativa da linguagem. DataFrameWriter Para a escrita, existem também alguns padrões de uso: Figura 32 – Formas de escrita. De forma semelhante à leitura, as duas primeiras formas da Figura 32 diferem na possibilidade de parametrizar o formato no qual os dados são salvos. A última forma indica a escrita de uma tabela, assunto que será abordado mais à frente. A principal configuração da escrita é o mode, argumento que indica qual o comportamento do Spark caso ele encontre dados já existentes no diretório indicado como destino dos dados. As opções são as seguintes: • append: anexa o conteúdo do DataFrame aos dados existentes. • overwrite: sobrescreve dados existentes. • ignore: ignora silenciosamente essa operação se os dados já existirem. • error ou errorifexists (default): retorna erro se os dados já existirem. Também estão disponíveis opções de escrita para particionar ainda mais os dados, tema que será abordado no Capítulo 6. 53 Uma característica importante do dispositivo de escrita do Spark é que o número de arquivos ao final de uma operação de salvamento de dados está diretamente relacionado ao número de partições do DataFrame. Isso significa que se um DataFrame está dividido em 200 partições – que como já visto, são unidades de processamento – os dados serão salvos em 200 arquivos, cada um com uma parte desses dados. Uma consequência dessa característica é que o caminho de destino dos dados não precisa ser especificado com a extensão característica do formato em que se deseja salvar, mas ao invés disso representa o diretório em que os arquivos serão salvos. Por exemplo, ao executar o seguinte código: Figura 33 – Salvando arquivos particionados. Os dados finais serão salvos da seguinte forma: 54 Figura 34 – Exemplo de diretório após escrita dos dados. Cada um desses arquivos está salvo no formato parquet numa pasta chamada “df_notas”, criada no diretório de trabalho em que o código foi executado. Cada uma das partições representa um “pedaço” dos dados originais. Agora serão apresentadas as fontes de dados nativas do Spark, suas principais configurações e exemplos de leitura e escrita. Parquet O parquet é a fonte de dados padrão do Spark, e é altamente utilizado no contexto de Big Data por ser um formato muito eficiente e versátil. Algumas das vantagens do parquet são: • Armazenamento colunar, em contraste com o CSV, que armazena baseado nas linhas. Assim, quando uma query é realizada é possível ignorar os dados não relevantes de maneira rápida e fácil, resultando em operações bem mais eficientes. 55 • Preservação de metadados, incluindo os tipos das colunas, o que garante eficiência e praticidade na escrita e leitura (não é necessário especificar schemas para arquivos parquet). • Suporte a dados estruturados de forma aninhada, como listas. • Otimizado para processar dados particionados com volume na casa dos gigabytes para cada arquivo. • Compressão de dados na escrita, de forma a ocupar menos espaço. • Integração com ferramentas como AWS Athena, Amazon Redshift Spectrum, Google BigQuery e Google Dataproc. Figura 35 – Comparação Parquet e CSV. Fonte: Databricks. https://databricks.com/glossary/what-is-parquet 56 A principal opção de leitura e escrita de arquivos parquet é o modo de compressão, denotado pelo argumento compression. Como dito anteriormente, o fato desse formato de arquivo ser salvo de forma comprimida proporciona uma grande economia de espaço em disco, e escolher a forma mais adequada de comprimir os dados pode ajudar a melhorar a eficiência das operações de input/output. O default é a compressão utilizando snappy, mas há algumas outras opções disponíveis. Em linhas gerais, é recomendado utilizar arquivos parquet em cargas de trabalhos processadas com o Apache Spark sempre que possível. JSON O formato JSON é também bastante popular e se faz presente em diversos contextos e aplicações, pois é o resultado de uma consulta à uma API. Entre suas vantagens, estão o fato de suas operações de input/output serem leves e eficientes e sua versatilidade, já que mesmo tendo se originado no JavaScript, é independente da linguagem utilizada. Para ler arquivos JSON basta utilizar “json” no método format(): Figura 36 – Lendo arquivos JSON. 57 Escrever arquivos JSON também é simples. De forma semelhante à escrita, basta especificar corretamente os argumentos e métodos e o caminho do diretório de destino: Figura 37 – Escrevendo arquivos JSON. As principais configurações disponíveis no DataFrameReader e DataFrameWriter para arquivos JSON são descritas na tabela abaixo: Figura 38 – Opções para arquivos JSON. Fonte: Learning Spark (Cap. 4, p. 101). 58 CSV Arquivos CSV são uma das formas mais comuns de se compartilhar e administrar dados. Nesses arquivos, os dados são organizados de forma tabular e o valor de cada uma das colunas é separado por um delimitador, usualmente uma vírgula. A maior parte dos softwares de manipulação de planilhas e geração de relatórios tem a capacidade de disponibilizar os dados como CSV's e, por isso, o formato se tornou bastante popular entre analistas de dados e de negócios. Para ler arquivos CSV basta utilizar “csv” no método format(): Figura 39 – Lendo arquivos CSV. 59 Escrever arquivos CSV também é simples. De forma semelhante à escrita, basta especificar corretamente os argumentos, métodos e o caminho do diretório de destino: Figura 40 – Escrevendo arquivos CSV. As principais configurações disponíveis no DataFrameReader e DataFrameWriter para arquivos CSV são descritas na tabela abaixo: Figura 41 – Opções para arquivos CSV. Fonte: Learning Spark (Cap. 4, p. 104) 60 Vale destacar ainda a opção encoding, que permite mudar a representação de strings internamente. Essa opção muitas vezes corrige erros em que caracteres de colunas de texto vêm faltando ou são lidos com erro. Para saber mais sobre os diferentes tipos de encoding, acesse esse link. Obs.: ao trabalhar com arquivos CSV, é bastante comum haver situações em que o output de um processamento deve ser um arquivo não particionado, para servir de consumo de usuários em ferramentas como o Microsoft Excel. Nesses casos, existem duas alternativas: 1. Transformar o Spark DataFrame em um pandas DataFrame: Nessa situação, é utilizado o dispositivo do pandas para salvar os dados em somente um arquivo, com o custo de ter que colocar todo o DataFrame na memória para realizar essa operação. Por isso, deve ser usado somente quando o output do processamento tiver um volume pequeno de dados, como um relatório gerencial ou dados agregados. Essa alternativa só está disponível na linguagem Python. 2. Reparticionar o Spark DataFrame https://en.wikipedia.org/wiki/Character_encoding 61 No caso acima, o Spark internamente reduz o DataFrame a somente uma partição, de forma que os dados estejam concentrados em uma única unidade de processamento. Assim, quando os dados forem salvos, eles serão escritos em somente um arquivo. Essa alternativa está disponível independente da linguagem, mas requer um ponto de atenção: como dito anteriormente, não é possível escolher o nome do arquivo final, somente o nome da pasta em que o Spark salvará esse arquivo, e por isso não é necessário colocar a extensão “.csv” ao final do caminho de destino. ORC No formato ORC, semelhante ao parquet, os dados são armazenados de forma colunar objetivando alcançar maior eficiência. Desenvolvido para cargas de trabalho Hadoop, os arquivos ORC também podem ser lidos no Spark a partir da versão 2.0. A principal diferençaentre o uso de arquivos ORC e arquivos parquet é que o Spark implementa otimizações específicas para o uso do segundo, o que o torna preferível. Para ler arquivos ORC basta utilizar “orc” no método format(): 62 Figura 42 – Lendo arquivos ORC. Escrever arquivos ORC também é simples. De forma semelhante à escrita, basta especificar corretamente os argumentos e métodos e o caminho do diretório de destino: Figura 43 – Escrevendo arquivos ORC. Diferente dos outros formatos apresentados, o Spark não implementa nenhuma opção extra de leitura ou escrita para arquivos ORC. Operações básicas com DataFrames O primeiro passo para realizar operações no Spark, é entender como funcionam as unidades básicas de manipulação dos dados: as colunas. Todas as transformações 63 definidas agem sobre as colunas para alterar o DataFrame de alguma forma, de modo que todo o processo de manipulação tem de ser desenvolvido com a ideia de que as operações são colunares. Programadores sem muita experiência com bancos de dados relacionais ou a linguagem SQL muitas vezes encontram dificuldades, mas pensar dessa forma é crucial para que seja desenvolvido um processo consistente e eficiente. Colunas e expressões As colunas são uma coleção de registros de um mesmo tipo, identificados por um nome. Um conjunto de colunas compõem linhas, que por sua vez compõem um DataFrame. É possível entender as colunas como um vetor do R ou uma Series do pandas, no sentido de que as operações definidas sobre as colunas são vetoriais: as operações são aplicadas sobre cada um dos elementos que formam a coluna, um por vez. Por exemplo: Figura 44 – Operações nas colunas. 64 Observe que ao adicionar o valor 5 à coluna “nota”, o valor foi adicionado sobre cada um dos elementos da coluna. Esse é o comportamento padrão da maior parte das funções e operações que agem sobre colunas no Spark. A função col, que foi utilizada para fazer referência à coluna, faz parte do módulo pyspark.sql.functions (Python) e org.apache.spark.sql.functions (Scala), que concentra todas as funções utilizadas para transformar colunas. As colunas podem ser referenciadas das seguintes formas: • col(“column”) / column(“column”) • df[“colum”] • df.column • df.col(“column”) Todas as formas apresentadas são equivalentes, mas é recomendado utilizar o primeiro ou segundo método, pois os dois últimos não permitem fazer a referência de colunas criadas a partir do encadeamento de operações. A diferença entre usar col(“column”) e df.col(“column”) é que o segundo evita que o Spark tenha de validar os nomes das colunas durante a fase de análise do Catalyst Optimizer, o que gera pequenos ganhos em termos de performance. Usando Scala, devido às características nativas da linguagem, também é possível fazer referência a colunas de mais duas formas, alcançando o mesmo resultado dos anteriores: • $”column” • ‘column 65 O último aspecto das colunas que é importante entender, antes de apresentar as operações de manipulação de DataFrames, é o fato de que as colunas, nesse caso os objetos de manipulação, nada mais são do que expressões. As expressões são transformações sobre as colunas físicas do DataFrame, que tem como input os nomes das colunas a serem transformadas e algum tipo de operação a ser realizada sobre elas, seja uma operação matemática ou alguma função implementada. No caso mais simples, uma expressão referencia uma coluna da mesma forma que a função col(). Para acessar as expressões, deve-se utilizar a função expr(), presente no mesmo módulo de funções. Alguns exemplos: Figura 45 – Colunas e Expressões. Todas as funções presentes no módulo de funções podem ser usadas operando sobre colunas ou dentro de expressões. Vale ressaltar também que, para as operações realmente surtirem efeito, elas devem estar definidas dentro de métodos do DataFrame, que possibilitam criar colunas, como vai ser mostrado adiante. Seleção de Colunas Na linguagem SQL, a cláusula SELECT é a forma mais básica de acessar o conteúdo de uma tabela, por meio da qual é possível selecionar e/ou criar colunas. De 66 maneira semelhante, essa operação pode ser realizada no Apache Spark por meio do método select(): Figura 46 – Seleção básica. É possível utilizar as colunas e expressões vistas anteriormente para criar novas colunas ou renomear colunas já existentes: Figura 47 – Criando Colunas na Seleção. 67 A função alias() é um método das colunas e pode ser usado em qualquer momento em que uma coluna for retornada. Também fica claro com o exemplo acima que a ordem em que as colunas são escritas na função é a mesma ordem em que elas serão exibidas no DataFrame resultante. Em Python, as colunas que servem como argumento do select() podem ser passadas como strings, referências de colunas ou uma lista com um desses dois, mas não é possível misturar referências individuais com listas. O seguinte código, por exemplo, retornaria um erro: df.select(“col1”, [“col2”, “col3”]) Uma alternativa é utilizar o operador *, que serve para indicar ao compilador que cada um dos elementos de uma lista deve ser visto como um argumento da função. Assim, o seguinte código é válido: df.select(“col1”, *[“col2”, “col3”]) Por fim, é possível utilizar o método select() em conjunto com o método distinct(), para selecionar valores distintos de uma coluna: Figura 48 – Selecionando Valores Distintos. 68 Obs.: o método distinct() retorna um DataFrame com os valores distintos a nível de linha, ou seja, a função irá retornar linhas únicas que devem diferir em pelo menos uma coluna. Dessa forma, esse método pode ser utilizado para remover valores duplicados, o que também pode ser feito com dropDuplicates(). Renomeação de Colunas Apesar de ser possível renomear colunas usando o método select() apresentado anteriormente, essa operação dispõe de um método reservado que pode ser útil no processamento. Para renomear colunas é utilizada a função withColumnRenamed(), da seguinte forma: df.withColumnRenamed(“nome_antigo”, “nome_novo”) Uma aplicação bastante comum é quando há necessidade de acrescentar um sufixo ao nome de todas as colunas: Figura 49 – Acrescentando Sufixo aos Nomes de Colunas. 69 Filtragem de Linhas Para filtrar linhas, é necessário denotar uma expressão que ao ser avaliada retorna valores booleanos: true (verdadeiro) ou false (falso). Ela pode ser construída por meio de um string - que pode usar sintaxe SQL e funciona da mesma maneira que a função expr() - ou uma série de manipulações de colunas. Definida a expressão, ela deve ser passada para o método filter() ou where(), que são basicamente idênticos e não apresentam diferenças de performance. Exemplo: Figura 50 – Filtro Básico de Linhas. Também é possível realizar filtros com mais de uma condição, utilizando operadores lógicos. No entanto, nem sempre isso é útil, porque o Spark executa todos os filtros ao mesmo tempo independente da ordem em que são escritos, o que faz com que filtros do tipo “e” sejam desnecessários e possam ser escritos com múltiplas chamadas ao método: 70 Figura 51 – Filtro Múltiplo. Observações Quando nos referimos às colunas por meio da função col(), temos acesso a diversos métodos das colunas que podem ser utilizados para auxiliar na filtragem do DataFrame. Alguns deles são: • isin(): checa se a coluna contém os valores listados na função. • contains(): utilizado para verificar se uma coluna de texto contém algum padrão especificado (não aceita regex). Aceita uma outra coluna de texto. 71 • like(): utilizado para verificar se uma coluna de texto contém algum padrão especificado (não aceita regex). Funcionade forma similar ao "LIKE" do SQL. • rlike(): utilizado para verificar se uma coluna de texto contém algum padrão especificado (aceita regex). Funciona de forma similar ao "RLIKE" do SQL. • startswith(): utilizado para verificar se uma coluna de texto começa com algum padrão especificado (aceita regex). • endswith(): utilizado para verificar se uma coluna de texto termina com algum padrão especificado (aceita regex). • between(): checa se os valores da coluna estão dentro do intervalo especificado. Os dois lados do intervalo são inclusivos. • isNull(): retorna true se o valor da coluna é nulo; • isNotNull(): retorna true se o valor da coluna não é nulo. Outros métodos úteis: • alias()/name(): usado para renomear as colunas em operações como select() e agg(); • astype()/cast(): usado para mudar o tipo das colunas. Aceita tanto um string como um tipo especificado pelo módulo pyspark.sql.types • substr(): utilizado para cortar um string com base em índices dos caracteres. 72 Utilizando expressões, a maioria desses métodos estarão acessíveis por meio dos seus equivalentes em SQL. Ordenação do DataFrame A ordenação do DataFrame pode ser feita utilizando as funções orderBy() ou sort(). Algumas funções auxiliares importantes para serem usadas nessa operação: • asc(): ordena a coluna de forma ascendente (default). • desc(): ordena a coluna de forma decrescente. • asc_nulls_first() / desc_nulls_first(): ordena a coluna de forma ascendente e decrescente, respectivamente, mantendo os campos nulos primeiro; asc_nulls_first() é o default quando há dados faltantes. • asc_nulls_last() / desc_nulls_last(): ordena a coluna de forma ascendente e decrescente, respectivamente, mantendo os campos nulos por último. 73 Figura 52 – Ordenação do DataFrame. Criação de Colunas Assim como na renomeação de colunas, a criação de colunas pode ser feita de forma direta e desvinculada da seleção utilizando o método withColumn(): df.withColumn("nome_da_coluna", {expressão}) Então, o DataFrame dos exemplos anteriores pode ser construído com a seguinte nova sintaxe: 74 Figura 53 – Criação de Colunas. Fica claro que, até o momento, só é possível criar, alterar e manipular colunas já existentes em um DataFrame no momento de sua criação. No entanto, caso seja necessário criar uma coluna a partir de uma constante ou algum elemento presente no ambiente em que o Spark está sendo executado, é possível utilizar a função lit() para criar uma coluna que replica este valor para todas as linhas: Figura 54 – Criação de Colunas a partir de Constantes. 75 Finalmente, é possível criar campos baseados em condições lógicas, de forma similar à cláusula CASE WHEN do SQL. Usando a manipulação de colunas, é necessário usar a função abaixo dentro de um método capaz de gerar novas colunas: when({primeira condição}, {expressão se verdadeiro}) .when({segunda condição}, {expressão se verdadeiro}) (...) .otherwise({expressão se nenhuma condição verdadeira}) Qualquer expressão lógica é válida como condição, inclusive aquelas usadas para realizar filtros. Alguns exemplos: Figura 55 – Colunas Condicionais. Observe que não é necessário utilizar a função lit() para especificar os valores de retorno se verdadeiro ou falso. Porém, quando os resultados são colunas, eles devem ser especificados com as funções col() ou expr() . 76 Trabalhando com diferentes tipos de dados Uma vez que ficou claro a forma como criar e modificar colunas no Spark, é possível entender algumas das principais funcionalidades disponíveis para a manipulação de diferentes tipos de dados. A seguir serão apresentadas funções do módulo pyspark.sql.functions (Python) e org.apache.spark.sql.functions (Scala) que são comumente usadas em uma pipeline de processamento. Números Quando se trabalha com valores numéricos, as funções mais utilizadas estão principalmente relacionadas às transformações matemáticas que podem ser aplicadas sobre esses valores. Vale destacar também algumas funções úteis para comparação, como encontrar o maior ou menor valor em um conjunto. Abaixo uma lista das funções mais usadas: • monotonically_increasing_id(): retorna um id único para cada linha, começando em 0. • rand(): retorna uma amostra independente de uma distribuição uniforme entre 0 e 1. • randn(): retorna uma amostra independente de uma distribuição normal padrão (média 0 e variância 1). • round(): arredonda o valor. • ceil(): arredonda o valor para o maior inteiro mais próximo. • floor(): arredonda o valor para o menor inteiro mais próximo. 77 • sqrt(): retorna a raiz quadrada do valor. • exp(): retorna a exponencial do valor. • log(): retorna a logaritmo natural do valor. • log10(): retorna a logaritmo na base 10 do valor. • pow(): retorna o valor de uma coluna elevado a potência passada pelo usuário. • greatest(): retorna o maior valor dentre os valores das colunas. Análogo ao max(), mas opera sobre valores de uma mesma linha, ao invés de uma única coluna. • least(): retorna o menor valor dentre os valores das colunas. Análogo ao min(), mas opera sobre valores de uma mesma linha, ao invés de uma única coluna. Obs.: funções como max(), sum() e mean() naturalmente são exemplos de funções que devem ser aplicadas a dados numéricos, e obviamente são bastante utilizadas em processamentos. No entanto, elas não figuram nessa seção porque são funções de agregação, um assunto que será abordado no próximo capítulo. Além das funções apresentadas acima, também é importante considerar os operadores numéricos disponíveis na linguagem de programação sendo utilizada, que são utilizados para expressar as operações básicas de soma (+), subtração (-), multiplicação (*) e divisão (/). 78 Figura 56 – Exemplo com Números. Strings A manipulação de texto é uma das tarefas mais comuns em uma pipeline de processamento de dados, oriundos principalmente de dados cadastrais, tweets, extratos de redes sociais, documentos e dados coletados da web. Como essas fontes são caracterizadas principalmente pela digitação humana e uso da linguagem natural, é esperado que esses dados sejam coletados com bastante sujeira e erros de formatação. Diante disso, a tarefa mais importante ao lidar com strings é formatá-los de forma que eles sigam algum padrão estabelecido, e contenham somente as informações necessárias. Isto é, os caracteres devem ser transformados, e muitas vezes removidos. As funções a seguir auxiliam nesse propósito: • upper(): retorna o string em letras maiúsculas. • lower(): retorna o string em letras minúsculas. 79 • initcap(): retorna a primeira letra de cada palavra no string em letras maiúsculas. • trim(): retira os espaços em branco do início e do final do string. • ltrim() / rtrim(): retira os espaços em branco do início e do final do string, respectivamente. • lpad() / rpad(): acrescenta um caractere no início e no final do string, respectivamente, até que o string tenha um determinado comprimento. • length(): retorna o comprimento do string, em quantidade de caracteres. • split(): quebra o string a partir de um padrão e retorna um array com os string resultantes. • concat(): concatena uma ou mais colunas de string. • concat_ws(): concatena uma ou mais colunas de string, com um separador entre elas. • regexp_extract(): retorna um match no string a partir de um padrão regex. • regexp_replace(): substitui um match no string a partir de um padrão regex com outros caracteres passados para a função. • substring(): retorna os caracteres do string que estão entre os índices especificados. Análogo a col().substring(). 80 Dica: Diante de uma coluna de texto,
Compartilhar