Buscar

03_Hadoop MapReduce e Spark

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes
Você viu 3, do total de 72 páginas

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes
Você viu 6, do total de 72 páginas

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes
Você viu 9, do total de 72 páginas

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Prévia do material em texto

Tecnologias para o 
Ecossistema de Big Data
Prof. Henrique Batista da Silva
Hadoop MapReduce
• Hadoop MapReduce: principal componente do Apache 
Hadoop. Paradigma de programação que permite a 
execução de programas de forma distribuída em 
cluster.
Hadoop MapReduce
• MapReduce é um estilo de computação que pode ser implementado em 
vários sistema, como por exemplo o Hadoop. 
• MapReduce é usado para gerenciar computação de larga escala, pois é uma 
solução tolerante a falhas de hardware.
• Duas funções devem ser escritas:
• Map: consome os dados de entrada, os processa e emite tuplas (key, value)
• Reduce: obtém tuplas geradas pelo Map e agrupa os resultado.
• O sistema se encarregará da execução paralela e 
tolerância a falhas.
Hadoop MapReduce
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
• Sequencialmente leia os dados
• Fase Map
• Extrair a informação realmente importante (por exemplo, palavra e o 
número de ocorrência)
• Agrupar pela chave (ordenar pela chave)
• Fase Reduce
• Agregar, sumarizar, filtrar os dados.
• Escreva o resultado.
Hadoop MapReduce
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
The crew of the space
shuttle Endeavor recently
returned to Earth as
ambassadors, harbingers of
a new era of space
exploration. Scientists at
NASA are saying that the
recent assembly of the
Dextre bot is the first step in
a long-term space-based
man/mache partnership.
'"The work we're doing now
-- the robotics we're doing -
- is what we're going to
need ……………………..
Big document
(The, 1)
(crew, 1)
(of, 1)
(the, 1)
(space, 1)
(shuttle, 1)
(Endeavor, 1)
(recently, 1)
….
(crew, 1)
(crew, 1)
(space, 1)
(the, 1)
(the, 1)
(the, 1)
(shuttle, 1)
(recently, 1)
…
(crew, 2)
(space, 1)
(the, 3)
(shuttle, 1)
(recently, 1)
…
MAP:
Leia a entrada e 
produza um 
conjunto de pares 
<chave, valor>
Group by key:
Colete todos os
pares com a 
mesma chave
Reduce:
Coletar todos os
valores
pertencentes às
chaves e returne
(key, value)
Fornecido pelo
programador
Fornecido pelo
programador
(key, value)(key, value)
Hadoop MapReduce
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
• Um exemplo de MapReduce: considere a existência 
de cinco arquivos, cada um com duas colunas (key, 
value), representando a cidade e a temperatura:
Hadoop MapReduce
Toronto, 20
Whitby, 25
New York, 22
Rome, 32
Toronto, 4
Rome, 33
New York, 18
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
• Precisamos encontrar a temperatura máxima para 
cada cidade. Com MapReduce, os cinco arquivos 
podem ser divididos em 5 tarefas. Cada mapper
executa em um dos 5 arquivos, retornando a maior 
temperatura de cada arquivo. 
Hadoop MapReduce
(Toronto, 20) (Whitby, 25) (New York, 22) (Rome, 33)
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
• Os outros quatro mapper (dos outros quatro 
arquivos), produzem os seguintes resultados:
Hadoop MapReduce
(Toronto, 18) (Whitby, 27) (New York, 32) (Rome, 37)
(Toronto, 32) (Whitby, 20) (New York, 33) (Rome, 38)
(Toronto, 22) (Whitby, 19) (New York, 20) (Rome, 31)
(Toronto, 31) (Whitby, 22) (New York, 19) (Rome, 30)
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
• Todos estes cinco resultados podem ser combinados 
da tarefa de reduce, que combina os resultados de 
cada chave de entrada. 
Hadoop MapReduce
(Toronto, 32) (Whitby, 27) (New York, 33) (Rome, 38)
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
Hadoop MapReduce
Client
Node
Client
Node
MapReduce
application master
Map Task / 
Reduce Task
Map Task / 
Reduce Task
Map Task / 
Reduce Task
HDFS
Fonte: Rajaraman, A., & Ullman, J. D. (2011). Mining of Massive Datasets. Lecture Notes for Stanford CS345A Web Mining (Vol. 67
Contador de palavras usando 
MapReduce
• Nesta prática vamos implementar um aplicação para 
MapReduce.
• Vamos implementar um contador de palavras usando 
MapReduce utilizando o Google Colab.
MapReduce
"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
WORD_RE = re.compile(r"[\w']+")
class MRWordFreqCount(MRJob):
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner(self, word, counts):
yield (word, sum(counts))
def reducer(self, word, counts):
yield (word, sum(counts))
if __name__ == '__main__':
MRWordFreqCount.run()
Contabilizando avaliação de filmes 
usando MapReduce
• Nesta prática vamos implementar um aplicação para 
MapReduce.
• Vamos carregar para dentro do HDFS um dataset de 
rating de filmes e contabilizar a quantidade de 
avaliações de 1 a 5 estrelas.
MapReduce
MapReduce
User ID | Movie ID | Rating
30 25 3
20 36 3
30 45 1
40 84 2
40 43 1
3, 1
3, 1
1, 1
2, 1
1, 1
1 – 1, 1
2 – 1 
3 – 1, 1 
1, 2
2, 1
3, 2
Map
Shuffle
e sort Reduce
• Download do arquivo de dados:
MapReduce
wget http://files.grouplens.org/datasets/movielens/ml-100k/u.data
http://files.grouplens.org/datasets/movielens/ml-100k/u.data
from mrjob.job import MRJob
from mrjob.step import MRStep
class RatingsBreakdown(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_ratings,
reducer=self.reducer_count_ratings)
]
def mapper_get_ratings(self, _, line):
(userID, movieID, rating, timestamp) = line.split('\t')
yield rating, 1
def reducer_count_ratings(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
RatingsBreakdown.run()
Exercício Resolvido I - MapReduce
• Agora o objetivo é contabilizar quantas ocorrências de 
cada filme no banco de dados (popularidade de cada 
filme).
Exercícios
• Use o arquivo da prática anterior e altere o campo 
“rating” para “movieID” e execute a aplicação
• Para melhorar o resultado, faça (veja próximo slide):
Exercícios
from mrjob.job import MRJob
from mrjob.step import MRStep
class RatingsBreakdown(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_ratings,
reducer=self.reducer_count_ratings),
MRStep(reducer=self.reducer_sorted_output)
]
def mapper_get_ratings(self, _, line):
(userID, movieID, rating, timestamp) = line.split('\t')
yield movieID, 1
def reducer_count_ratings(self, key, values):
yield str(sum(values)).zfill(5), key
def reducer_sorted_output(self, count, movies):
for movie in movies:
yield movie, count
if __name__ == '__main__':
RatingsBreakdown.run()
Transforme resultado em uma string 
de 5 caracteres
Apache Spark
• Spark é uma plataforma de computação distribuída 
in-Memory
• Desenvolvido para tarefas como ETL, Machine 
Learning e carga de dados no Hadoop (SQL)
Introdução ao Apache Spark
• Spark se caracteriza por ser um framework 
para processamento de grande dados (Big 
Data) com foco em velocidade e oferece API 
para processamento e análise de dados.
Introdução ao Apache Spark
• APIs sobre o Spark: Spark Core e APIs
Introdução ao Apache Spark
Spark Streaming: processamento 
de fluxos em tempo real
Spark GraphX: processamento 
sobre grafos
Spark SQL: processamento de 
consulta e carga de dados
Spark MLlib: biblioteca para 
machine learning (clustering, 
redução de dimensionalidade, etc)
• Spark é uma abstração para o uso de ferramentas de 
Data Science.
• Ex.: Spark possibilidade aos algoritmos de machine 
learning o processamento de dataset em memória de 
forma distribuída, porém abstraindo toda 
complexidade para o programador. 
Introdução ao Apache Spark
• Spark estende o modelo MapReduce (Hadoop) para 
processamento de grande volumes de dados. 
• Seudesempenho pode chegar a ser 100 vezes maior 
que o Hadoop (em atividades de regressão logística, 
por exemplo)
Introdução ao Apache Spark
• Spark permite programação de aplicações em 
três linguagens: Python, Java e Scala
• Há também suporte para SQL e R
Introdução ao Apache Spark
• Python tem a vantagem por ser menos 
verboso e mais “fácil” de aprender
• Scala é mais rápido do que Python e 
normalmente uma escolha padrão para Spark
Introdução ao Apache Spark
Arquitetura do Spark
• Arquitetura do Spark:
Introdução ao Apache Spark
Fonte: https://spark.apache.org/docs/latest/cluster-overview.html
Driver Program: aplicação 
principal que executa a 
“main() function” (veremos 
sobre o SparkContext em 
breve no código fonte)
• Arquitetura do Spark:
Introdução ao Apache Spark
Cluster manager: serviço externo para 
gerenciar recursos em cluster. 
Worker node: qualquer nó que possa 
executar a aplicação em cluster.
Executor: um processo criado por 
uma aplicação em um nó do cluster. 
(executa a aplicação mantendo os 
dados em memória)
Fonte: https://spark.apache.org/docs/latest/cluster-overview.html
• Arquitetura do Spark:
Introdução ao Apache Spark
Task: unidade (tarefa) que 
será enviada para um 
“Executor” 
Uma task inicia no (pelo) 
Driver Program e é 
enviada ao Worker Node. 
Fonte: https://spark.apache.org/docs/latest/cluster-overview.html
• Resilient Distributed Datasets (RDD): toda aplicação Spark
consiste de um “main program” que executa várias operações 
paralelas no cluster. 
• Assim, o RDD é a principal abstração fornecida pelo Spark, ou 
seja, é uma coleção de objetos distribuídos (estrutura de 
dados) nos nós do cluster e que são executados em paralelo. 
Introdução ao Apache Spark
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• RDD pode conter qualquer tipo de dados do Python, 
Scala ou Java, bem como objetos de classes criadas 
pelos usuários. 
Introdução ao Apache Spark
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Exemplo em Python:
Introdução ao Apache Spark
conf = SparkConf().setMaster("local").setAppName("FileName")
sc = SparkContext(conf = conf)
input = sc.textFile("file.txt")
Cria um objeto RDD a partir de um SparkContext
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Outra abstração importante do Spark é o 
compartilhamento de variáveis em operações de 
paralelismo. 
• Quando Spark executa uma função em parelelo
(tarefas em diferentes nós), uma cópia de cada 
variável usada na função é enviada aos nós.
Introdução ao Apache Spark
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Spark suporte múltiplos formatos de dados: Json, 
arquivos texto, csv e tabelas de sistema Relacional
• Spark também possui integração com Hadoop sendo 
um substituto para as funções de MapReduce do 
Hadoop. 
Introdução ao Apache Spark
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• batch vs real-time processing (processamento em 
tempo real)
• Hadoop é baseado no conceito de processamento em 
batch (lote). Hadoop processo blocos de dados 
(usando MapReduce) que são armazenados ao longo 
do tempo (2005).
Apache Spark vs Hadoop
Fonte: https://www.edureka.co/blog/spark-tutorial/
• batch vs real-time processing (processamento 
em tempo real)
• Spark pode processar dados em tempo real e 
poder até 100 vezes mais rápido (em algumas 
aplicações) para processamento em lote.
Apache Spark vs Hadoop
Fonte: https://www.edureka.co/blog/spark-tutorial/
• Para instalação do Spark o ambiente Hadoop não é 
necessário, apesar que de é possível usar o Spark
com o Hadoop:
• Instalações: Java, Scala e Spark
Instalando Apache Spark
Fonte: http://www.tutorialspoint.com/apache_spark/apache_spark_installation.htm
http://www.tutorialspoint.com/apache_spark/apache_spark_installation.htm
http://www.tutorialspoint.com/apache_spark/apache_spark_installation.htm
Programação com RDD
• Spark oferece suporte ao Python bem como Java e 
Scala. 
• Nosso exemplos serão apresentados na linguagem 
Python, porém os exemplos com outras linguagens 
pode ser acessados neste link:
Introdução ao RDD em Python
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Ao instalar o spark, será possível executar programas em 
Python com o “PySpark” (não é necessária a instalação via 
pip).
• Abra a máquina virtual fornecida para esta aula e execute 
“pyspark” no terminal (antes leia as instruções na pasta do 
usuário). Veja que será possível introduzir comandos em 
Python via shell. Digite “exit()” para sair.
Introdução ao RDD em Python
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Para executar um script em Python é necessário 
executar o comando “spark-submit programa.py” no 
diretório em que o script está localizado. 
• O comando “spark-submit” irá carregar a bibliotecas 
do Spark e permitir que a aplicação seja submetida 
ao cluster. 
Introdução ao RDD em Python
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Dentro do script será necessário importar alguma 
biblioteca do Spark, como por exemplo:
Introdução ao RDD em Python
from pyspark import SparkContext, SparkConf
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• A primeira coisa que um programa deve fazer é criar 
um objeto SparkContext. Este objeto permite ao 
programa Spark acessar o cluster.
Inicializando Spark
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
appName: nome da sua aplicação
master: é a URL do cluster Spark (“local” para executar na 
máquina local)
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Conforme vimos anteriormente, o RDD é uma coleção de 
objetos tolerantes a falha que podem ser executados em 
paralelo.
• Há duas formas de criar RDDs: paralelizando uma coleção 
existente em seu programa (driver program) ou referenciar 
um dataset em um sistema de armazenamento como o HDFS, 
ou algum arquivo de entrada. 
Resilient Distributed Datasets (RDDs)
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Para paralelizar um coleção, basta chamar a funação
“sc.paralelize()”.
Resilient Distributed Datasets (RDDs)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
Os elementos da coleção “data” serão 
copiados para um dataset distribuídos 
e poderão assim ser processados em 
paralelo
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Vamos a um exemplo: faça um programa no Spark
para somar os números de um conjunto de dados.
• Dica: expressões lambda 
Resilient Distributed Datasets (RDDs)
data.reduce(lambda a, b: a + b)
Uma expressão lambda como essa em Python 
funciona como uma função que recebe duas 
variáveis como argumento (“a” e “b”) e retorna 
a soma da duas (“a + b”) 
Agrega as funções do RDD (veremos mais em breve)
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
Resilient Distributed Datasets (RDDs)
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local[*]")
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
result = distData.reduce(lambda a, b: a + b)
print(result)
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Spark pode criar um arquivo distribuído de qualquer 
fonte de dados suportada pelo Hadoop (HDFS, 
Cassandra, Hbase, arquivo texto, etc). 
• Arquivo de texto RDD pode ser criado com a função 
“sc.textFile()”
Acesso a dados externos
distFile = sc.textFile("data.txt")
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Podemos usar operações sobre coleções de dados sobre o RDD 
após carregar um arquivo de texto externo
Acesso a dados externos
distFile = sc.textFile("data.txt")
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).
Somao tamanho de todas as linha do arquivo (veremos mais 
sobre as operações como map e reduce)
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Exemplo: crie um arquivo data.txt e faça um 
programa que some o tamanho de todas as linhas em 
Spark
Acesso a dados externos
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
Acesso a dados externos
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local[*]")
sc = SparkContext(conf=conf)
distFile = sc.textFile("wordcount.txt")
result = distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
print(result)
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
Operações RDD
• RDD suportam dois tipos de operações: 
• (1) Transformações: criam um novo conjunto de dados a partir 
de um conjunto existente; e (2) ações: executam alguma ação 
que retorna valor para o “driver program” 
• Exemplo (transformação): map é uma transformação que 
passar cada elemento do conjunto para uma função e retorna 
um novo RDD com o resultado. 
Operações RDD
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Exemplo map:
Operações RDD
distFile = sc.textFile("data.txt")
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).
Cada linha “s” do arquivo “data.txt” é passado para a 
função (em forma de expressão lambda) e um novo valor 
é retornado (o tamanho da linha).
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Exemplo (ação): reduce agrega todos os elementos 
do RDD usando alguma função e retorna o resultado 
final para o seu programa (driver program)
Operações RDD
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
• Exemplo reduce:
Operações RDD
distFile = sc.textFile("data.txt")
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).
Recebe o valor de cada linha (de duas em duas de forma 
agregada) e retorna a soma destas duas linhas
Mais sobre transformações e ações, veja: 
https://spark.apache.org/docs/latest/rdd-programming-guide.html
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("data.txt")
lineLengths = distFile.map(lambda s: len(s))
result = lineLengths.reduce(lambda a, b: a + b)
print(result)
Acesso a dados externos (modificação)
Os dados não são 
carregados 
imediatamente do 
arquivo, distFile é 
apenas um ponteiro 
para o arquivo. 
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("data.txt")
lineLengths = distFile.map(lambda s: len(s))
result = lineLengths.reduce(lambda a, b: a + b)
print(result)
Acesso a dados externos (modificação)
Aqui, quando os dados 
são transformados, a 
operação não é 
executada neste 
momento. Vamos 
entender o motivo.
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("data.txt")
lineLengths = distFile.map(lambda s: len(s))
result = lineLengths.reduce(lambda a, b: a + b)
print(result)
Acesso a dados externos (modificação)
Spark usa o conceito de laziness. Ou seja, uma 
operação sobre dados é executada apenas 
quando seu resultado é requisitado pelo driver 
program. Uma DAG (Directed Acyclic Graph) 
é criada para manter o controle da ordem das 
operações que devem ser executados 
posteriormente.
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
DAG (Directed Acyclic Graph) 
Figura: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("data.txt")
lineLengths = distFile.map(lambda s: len(s))
result = lineLengths.reduce(lambda a, b: a + b)
print(result)
Acesso a dados externos (modificação)
Quando a ação reduce é 
chamada, Spark divide a 
operação em tarefas que 
executam em nós 
(máquinas) separadas e 
cada máquina executa 
ambas as operações (map
e reduce) localmente, 
retornando sua resposta 
ao driver program.
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("data.txt")
lineLengths = distFile.map(lambda s: len(s))
result = lineLengths.reduce(lambda a, b: a + b)
print(result)
Acesso a dados externos (modificação)
Observe que apenas 
neste ponto, a 
operação map é 
executada e não 
anteriormente 
(laziness). Este é um 
dos motivos da alta 
performance do Spark
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("soma-data").setMaster("local")
sc = SparkContext(conf=conf)
distFile = sc.textFile("data.txt")
lineLengths = distFile.map(lambda s: len(s))
result = lineLengths.reduce(lambda a, b: a + b)
print(result)
Acesso a dados externos (modificação)
Se for de interesse usar 
lineLengths novamente, é 
necessário persistir os 
dados em memória 
lineLengths.persist()
Fonte: https://spark.apache.org/docs/latest/rdd-programming-guide.html
Exercício resolvido I - Spark
• Use o arquivo de rating de filmes para contar 
quantas avaliações de cada estrela.
Exercícios
Exercícios
from pyspark import SparkConf, SparkContext
import collections
conf = SparkConf().setMaster("local").setAppName("rating")
sc = SparkContext(conf = conf)
lines = sc.textFile("u.data")
ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue()
sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print("%s %i" % (key, value))

Continue navegando