Baixe o app para aproveitar ainda mais
Prévia do material em texto
Tecnologias para o Ecossistema de Big Data Prof. Henrique Batista da Silva Hadoop MapReduce • Hadoop MapReduce: principal componente do Apache Hadoop. Paradigma de programação que permite a execução de programas de forma distribuída em cluster. Hadoop MapReduce • MapReduce é um estilo de computação que pode ser implementado em vários sistema, como por exemplo o Hadoop. • MapReduce é usado para gerenciar computação de larga escala, pois é uma solução tolerante a falhas de hardware. • Duas funções devem ser escritas: • Map: consome os dados de entrada, os processa e emite tuplas (key, value) • Reduce: obtém tuplas geradas pelo Map e agrupa os resultado. • O sistema se encarregará da execução paralela e tolerância a falhas. Hadoop MapReduce Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 • Sequencialmente leia os dados • Fase Map • Extrair a informação realmente importante (por exemplo, palavra e o número de ocorrência) • Agrupar pela chave (ordenar pela chave) • Fase Reduce • Agregar, sumarizar, filtrar os dados. • Escreva o resultado. Hadoop MapReduce Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 The crew of the space shuttle Endeavor recently returned to Earth as ambassadors, harbingers of a new era of space exploration. Scientists at NASA are saying that the recent assembly of the Dextre bot is the first step in a long-term space-based man/mache partnership. '"The work we're doing now -- the robotics we're doing - - is what we're going to need …………………….. Big document (The, 1) (crew, 1) (of, 1) (the, 1) (space, 1) (shuttle, 1) (Endeavor, 1) (recently, 1) …. (crew, 1) (crew, 1) (space, 1) (the, 1) (the, 1) (the, 1) (shuttle, 1) (recently, 1) … (crew, 2) (space, 1) (the, 3) (shuttle, 1) (recently, 1) … MAP: Leia a entrada e produza um conjunto de pares <chave, valor> Group by key: Colete todos os pares com a mesma chave Reduce: Coletar todos os valores pertencentes às chaves e returne (key, value) Fornecido pelo programador Fornecido pelo programador (key, value)(key, value) Hadoop MapReduce Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 • Um exemplo de MapReduce: considere a existência de cinco arquivos, cada um com duas colunas (key, value), representando a cidade e a temperatura: Hadoop MapReduce Toronto, 20 Whitby, 25 New York, 22 Rome, 32 Toronto, 4 Rome, 33 New York, 18 Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 • Precisamos encontrar a temperatura máxima para cada cidade. Com MapReduce, os cinco arquivos podem ser divididos em 5 tarefas. Cada mapper executa em um dos 5 arquivos, retornando a maior temperatura de cada arquivo. Hadoop MapReduce (Toronto, 20) (Whitby, 25) (New York, 22) (Rome, 33) Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 • Os outros quatro mapper (dos outros quatro arquivos), produzem os seguintes resultados: Hadoop MapReduce (Toronto, 18) (Whitby, 27) (New York, 32) (Rome, 37) (Toronto, 32) (Whitby, 20) (New York, 33) (Rome, 38) (Toronto, 22) (Whitby, 19) (New York, 20) (Rome, 31) (Toronto, 31) (Whitby, 22) (New York, 19) (Rome, 30) Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 • Todos estes cinco resultados podem ser combinados da tarefa de reduce, que combina os resultados de cada chave de entrada. Hadoop MapReduce (Toronto, 32) (Whitby, 27) (New York, 33) (Rome, 38) Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 Hadoop MapReduce Client Node Client Node MapReduce application master Map Task / Reduce Task Map Task / Reduce Task Map Task / Reduce Task HDFS Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67 Contador de palavras usando MapReduce • Nesta prática vamos implementar um aplicação para MapReduce. • Vamos implementar um contador de palavras usando MapReduce utilizando o Google Colab. MapReduce """The classic MapReduce job: count the frequency of words. """ from mrjob.job import MRJob import re WORD_RE = re.compile(r"[\w']+") class MRWordFreqCount(MRJob): def mapper(self, _, line): for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer(self, word, counts): yield (word, sum(counts)) if __name__ == '__main__': MRWordFreqCount.run() Contabilizando avaliação de filmes usando MapReduce • Nesta prática vamos implementar um aplicação para MapReduce. • Vamos carregar para dentro do HDFS um dataset de rating de filmes e contabilizar a quantidade de avaliações de 1 a 5 estrelas. MapReduce MapReduce User ID | Movie ID | Rating 30 25 3 20 36 3 30 45 1 40 84 2 40 43 1 3, 1 3, 1 1, 1 2, 1 1, 1 1 – 1, 1 2 – 1 3 – 1, 1 1, 2 2, 1 3, 2 Map Shuffle e sort Reduce • Download do arquivo de dados: MapReduce wget http://files.grouplens.org/datasets/movielens/ml-100k/u.data http://files.grouplens.org/datasets/movielens/ml-100k/u.data from mrjob.job import MRJob from mrjob.step import MRStep class RatingsBreakdown(MRJob): def steps(self): return [ MRStep(mapper=self.mapper_get_ratings, reducer=self.reducer_count_ratings) ] def mapper_get_ratings(self, _, line): (userID, movieID, rating, timestamp) = line.split('\t') yield rating, 1 def reducer_count_ratings(self, key, values): yield key, sum(values) if __name__ == '__main__': RatingsBreakdown.run() Exercício Resolvido I - MapReduce • Agora o objetivo é contabilizar quantas ocorrências de cada filme no banco de dados (popularidade de cada filme). Exercícios • Use o arquivo da prática anterior e altere o campo “rating” para “movieID” e execute a aplicação • Para melhorar o resultado, faça (veja próximo slide): Exercícios from mrjob.job import MRJob from mrjob.step import MRStep class RatingsBreakdown(MRJob): def steps(self): return [ MRStep(mapper=self.mapper_get_ratings, reducer=self.reducer_count_ratings), MRStep(reducer=self.reducer_sorted_output) ] def mapper_get_ratings(self, _, line): (userID, movieID, rating, timestamp) = line.split('\t') yield movieID, 1 def reducer_count_ratings(self, key, values): yield str(sum(values)).zfill(5), key def reducer_sorted_output(self, count, movies): for movie in movies: yield movie, count if __name__ == '__main__': RatingsBreakdown.run() Transforme resultado em uma string de 5 caracteres Apache Spark • Spark é uma plataforma de computação distribuída in-Memory • Desenvolvido para tarefas como ETL, Machine Learning e carga de dados no Hadoop (SQL) Introdução ao Apache Spark • Spark se caracteriza por ser um framework para processamento de grande dados (Big Data) com foco em velocidade e oferece API para processamento e análise de dados. Introdução ao Apache Spark • APIs sobre o Spark: Spark Core e APIs Introdução ao Apache Spark Spark Streaming: processamento de fluxos em tempo real Spark GraphX: processamento sobre grafos Spark SQL: processamento de consulta e carga de dados Spark MLlib: biblioteca para machine learning (clustering, redução de dimensionalidade, etc) • Spark é uma abstração para o uso de ferramentas de Data Science. • Ex.: Spark possibilidade aos algoritmos de machine learning o processamento de dataset em memória de forma distribuída, porém abstraindo toda complexidade para o programador. Introdução ao Apache Spark • Spark estende o modelo MapReduce (Hadoop) para processamento de grande volumes de dados. • Seudesempenho pode chegar a ser 100 vezes maior que o Hadoop (em atividades de regressão logística, por exemplo) Introdução ao Apache Spark • Spark permite programação de aplicações em três linguagens: Python, Java e Scala • Há também suporte para SQL e R Introdução ao Apache Spark • Python tem a vantagem por ser menos verboso e mais “fácil” de aprender • Scala é mais rápido do que Python e normalmente uma escolha padrão para Spark Introdução ao Apache Spark Arquitetura do Spark • Arquitetura do Spark: Introdução ao Apache Spark Fonte: https://spark.apache.org/docs/latest/cluster-overview.html Driver Program: aplicação principal que executa a “main() function” (veremos sobre o SparkContext em breve no código fonte) • Arquitetura do Spark: Introdução ao Apache Spark Cluster manager: serviço externo para gerenciar recursos em cluster. Worker node: qualquer nó que possa executar a aplicação em cluster. Executor: um processo criado por uma aplicação em um nó do cluster. (executa a aplicação mantendo os dados em memória) Fonte: https://spark.apache.org/docs/latest/cluster-overview.html • Arquitetura do Spark: Introdução ao Apache Spark Task: unidade (tarefa) que será enviada para um “Executor” Uma task inicia no (pelo) Driver Program e é enviada ao Worker Node. Fonte: https://spark.apache.org/docs/latest/cluster-overview.html • Resilient Distributed Datasets (RDD): toda aplicação Spark consiste de um “main program” que executa várias operações paralelas no cluster. • Assim, o RDD é a principal abstração fornecida pelo Spark, ou seja, é uma coleção de objetos distribuídos (estrutura de dados) nos nós do cluster e que são executados em paralelo. Introdução ao Apache Spark Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • RDD pode conter qualquer tipo de dados do Python, Scala ou Java, bem como objetos de classes criadas pelos usuários. Introdução ao Apache Spark Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Exemplo em Python: Introdução ao Apache Spark conf = SparkConf().setMaster("local").setAppName("FileName") sc = SparkContext(conf = conf) input = sc.textFile("file.txt") Cria um objeto RDD a partir de um SparkContext Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Outra abstração importante do Spark é o compartilhamento de variáveis em operações de paralelismo. • Quando Spark executa uma função em parelelo (tarefas em diferentes nós), uma cópia de cada variável usada na função é enviada aos nós. Introdução ao Apache Spark Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Spark suporte múltiplos formatos de dados: Json, arquivos texto, csv e tabelas de sistema Relacional • Spark também possui integração com Hadoop sendo um substituto para as funções de MapReduce do Hadoop. Introdução ao Apache Spark Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • batch vs real-time processing (processamento em tempo real) • Hadoop é baseado no conceito de processamento em batch (lote). Hadoop processo blocos de dados (usando MapReduce) que são armazenados ao longo do tempo (2005). Apache Spark vs Hadoop Fonte: https://www.edureka.co/blog/spark-tutorial/ • batch vs real-time processing (processamento em tempo real) • Spark pode processar dados em tempo real e poder até 100 vezes mais rápido (em algumas aplicações) para processamento em lote. Apache Spark vs Hadoop Fonte: https://www.edureka.co/blog/spark-tutorial/ • Para instalação do Spark o ambiente Hadoop não é necessário, apesar que de é possível usar o Spark com o Hadoop: • Instalações: Java, Scala e Spark Instalando Apache Spark Fonte: http://www.tutorialspoint.com/apache_spark/apache_spark_installation.htm http://www.tutorialspoint.com/apache_spark/apache_spark_installation.htm http://www.tutorialspoint.com/apache_spark/apache_spark_installation.htm Programação com RDD • Spark oferece suporte ao Python bem como Java e Scala. • Nosso exemplos serão apresentados na linguagem Python, porém os exemplos com outras linguagens pode ser acessados neste link: Introdução ao RDD em Python Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Ao instalar o spark, será possível executar programas em Python com o “PySpark” (não é necessária a instalação via pip). • Abra a máquina virtual fornecida para esta aula e execute “pyspark” no terminal (antes leia as instruções na pasta do usuário). Veja que será possível introduzir comandos em Python via shell. Digite “exit()” para sair. Introdução ao RDD em Python Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Para executar um script em Python é necessário executar o comando “spark-submit programa.py” no diretório em que o script está localizado. • O comando “spark-submit” irá carregar a bibliotecas do Spark e permitir que a aplicação seja submetida ao cluster. Introdução ao RDD em Python Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Dentro do script será necessário importar alguma biblioteca do Spark, como por exemplo: Introdução ao RDD em Python from pyspark import SparkContext, SparkConf Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • A primeira coisa que um programa deve fazer é criar um objeto SparkContext. Este objeto permite ao programa Spark acessar o cluster. Inicializando Spark conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) appName: nome da sua aplicação master: é a URL do cluster Spark (“local” para executar na máquina local) Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Conforme vimos anteriormente, o RDD é uma coleção de objetos tolerantes a falha que podem ser executados em paralelo. • Há duas formas de criar RDDs: paralelizando uma coleção existente em seu programa (driver program) ou referenciar um dataset em um sistema de armazenamento como o HDFS, ou algum arquivo de entrada. Resilient Distributed Datasets (RDDs) Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Para paralelizar um coleção, basta chamar a funação “sc.paralelize()”. Resilient Distributed Datasets (RDDs) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) Os elementos da coleção “data” serão copiados para um dataset distribuídos e poderão assim ser processados em paralelo Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Vamos a um exemplo: faça um programa no Spark para somar os números de um conjunto de dados. • Dica: expressões lambda Resilient Distributed Datasets (RDDs) data.reduce(lambda a, b: a + b) Uma expressão lambda como essa em Python funciona como uma função que recebe duas variáveis como argumento (“a” e “b”) e retorna a soma da duas (“a + b”) Agrega as funções do RDD (veremos mais em breve) Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html Resilient Distributed Datasets (RDDs) from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local[*]") sc = SparkContext(conf=conf) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) result = distData.reduce(lambda a, b: a + b) print(result) Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Spark pode criar um arquivo distribuído de qualquer fonte de dados suportada pelo Hadoop (HDFS, Cassandra, Hbase, arquivo texto, etc). • Arquivo de texto RDD pode ser criado com a função “sc.textFile()” Acesso a dados externos distFile = sc.textFile("data.txt") Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Podemos usar operações sobre coleções de dados sobre o RDD após carregar um arquivo de texto externo Acesso a dados externos distFile = sc.textFile("data.txt") distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b). Somao tamanho de todas as linha do arquivo (veremos mais sobre as operações como map e reduce) Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Exemplo: crie um arquivo data.txt e faça um programa que some o tamanho de todas as linhas em Spark Acesso a dados externos Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html Acesso a dados externos from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local[*]") sc = SparkContext(conf=conf) distFile = sc.textFile("wordcount.txt") result = distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) print(result) Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html Operações RDD • RDD suportam dois tipos de operações: • (1) Transformações: criam um novo conjunto de dados a partir de um conjunto existente; e (2) ações: executam alguma ação que retorna valor para o “driver program” • Exemplo (transformação): map é uma transformação que passar cada elemento do conjunto para uma função e retorna um novo RDD com o resultado. Operações RDD Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Exemplo map: Operações RDD distFile = sc.textFile("data.txt") distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b). Cada linha “s” do arquivo “data.txt” é passado para a função (em forma de expressão lambda) e um novo valor é retornado (o tamanho da linha). Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Exemplo (ação): reduce agrega todos os elementos do RDD usando alguma função e retorna o resultado final para o seu programa (driver program) Operações RDD Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html • Exemplo reduce: Operações RDD distFile = sc.textFile("data.txt") distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b). Recebe o valor de cada linha (de duas em duas de forma agregada) e retorna a soma destas duas linhas Mais sobre transformações e ações, veja: https://spark.apache.org/docs/latest/rdd-programming-guide.html from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local") sc = SparkContext(conf=conf) distFile = sc.textFile("data.txt") lineLengths = distFile.map(lambda s: len(s)) result = lineLengths.reduce(lambda a, b: a + b) print(result) Acesso a dados externos (modificação) Os dados não são carregados imediatamente do arquivo, distFile é apenas um ponteiro para o arquivo. Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local") sc = SparkContext(conf=conf) distFile = sc.textFile("data.txt") lineLengths = distFile.map(lambda s: len(s)) result = lineLengths.reduce(lambda a, b: a + b) print(result) Acesso a dados externos (modificação) Aqui, quando os dados são transformados, a operação não é executada neste momento. Vamos entender o motivo. Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local") sc = SparkContext(conf=conf) distFile = sc.textFile("data.txt") lineLengths = distFile.map(lambda s: len(s)) result = lineLengths.reduce(lambda a, b: a + b) print(result) Acesso a dados externos (modificação) Spark usa o conceito de laziness. Ou seja, uma operação sobre dados é executada apenas quando seu resultado é requisitado pelo driver program. Uma DAG (Directed Acyclic Graph) é criada para manter o controle da ordem das operações que devem ser executados posteriormente. Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html DAG (Directed Acyclic Graph) Figura: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local") sc = SparkContext(conf=conf) distFile = sc.textFile("data.txt") lineLengths = distFile.map(lambda s: len(s)) result = lineLengths.reduce(lambda a, b: a + b) print(result) Acesso a dados externos (modificação) Quando a ação reduce é chamada, Spark divide a operação em tarefas que executam em nós (máquinas) separadas e cada máquina executa ambas as operações (map e reduce) localmente, retornando sua resposta ao driver program. Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local") sc = SparkContext(conf=conf) distFile = sc.textFile("data.txt") lineLengths = distFile.map(lambda s: len(s)) result = lineLengths.reduce(lambda a, b: a + b) print(result) Acesso a dados externos (modificação) Observe que apenas neste ponto, a operação map é executada e não anteriormente (laziness). Este é um dos motivos da alta performance do Spark Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("soma-data").setMaster("local") sc = SparkContext(conf=conf) distFile = sc.textFile("data.txt") lineLengths = distFile.map(lambda s: len(s)) result = lineLengths.reduce(lambda a, b: a + b) print(result) Acesso a dados externos (modificação) Se for de interesse usar lineLengths novamente, é necessário persistir os dados em memória lineLengths.persist() Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html Exercício resolvido I - Spark • Use o arquivo de rating de filmes para contar quantas avaliações de cada estrela. Exercícios Exercícios from pyspark import SparkConf, SparkContext import collections conf = SparkConf().setMaster("local").setAppName("rating") sc = SparkContext(conf = conf) lines = sc.textFile("u.data") ratings = lines.map(lambda x: x.split()[2]) result = ratings.countByValue() sortedResults = collections.OrderedDict(sorted(result.items())) for key, value in sortedResults.items(): print("%s %i" % (key, value))
Compartilhar