Buscar

04_Python Desk

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes
Você viu 3, do total de 61 páginas

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes
Você viu 6, do total de 61 páginas

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes
Você viu 9, do total de 61 páginas

Faça como milhares de estudantes: teste grátis o Passei Direto

Esse e outros conteúdos desbloqueados

16 milhões de materiais de várias disciplinas

Impressão de materiais

Agora você pode testar o

Passei Direto grátis

Você também pode ser Premium ajudando estudantes

Prévia do material em texto

Tecnologias para o 
Ecossistema de Big Data
Prof. Henrique Batista da Silva
Python Dask
• No contexto de big data, sabemos que grandes 
dataset (terabytes, e em alguns casos 
petabytes) são conjuntos de dados que não 
podem caber na RAM nem no armazenamento 
persistente de um único computador. 
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• Pandas, NumPy e scikit-learn não são 
adequados para conjuntos de dados desse 
tamanho, porque não foram criados 
inerentemente para operar em conjuntos 
de dados distribuídos.
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• O Dask foi lançado no final de 2014 por Matthew Rocklin com 
o objetivo de trazer escalabilidade nativa para o Python e 
superar suas restrições de máquina única. 
• Com o tempo, o projeto se transformou em uma das melhores 
estruturas de computação escalável disponíveis para 
desenvolvedores de Python. 
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• O Dask consiste em vários componentes e APIs 
diferentes, que podem ser categorizados em 
três camadas: o planejador, APIs de baixo 
nível e APIs de alto nível (veja imagem no 
próximo slide)
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
Os componentes e camadas que compõem o Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• No núcleo está o task scheduler, que coordena e monitora a 
execução dos cálculos nos núcleos e máquinas da CPU
• Esses cálculos são representados no código como Dask
Delayed objects (avaliados no momento em que os valores são 
necessários - lazily) ou Dask Futures objects (são avaliados 
em tempo real, independentemente de o valor ser necessário 
imediatamente ou não)
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• As APIs de alto nível da Dask oferecem uma camada 
de abstração sobre Delayed e Futures objects. 
• As operações nesses objetos de alto nível resultam 
em muitas operações paralelas de baixo nível 
gerenciadas pelos task schedulers.
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• Algumas vantagens do Dask:
• O Dask é totalmente implementado em Python e escala nativamente o NumPy, 
Pandas e o scikit-learn.
• O Dask pode ser usado efetivamente para trabalhar com datasets médios em 
uma única máquina e grandes datasets em um cluster.
• O Dask pode ser usado como uma estrutura geral para paralelizar a maioria dos 
objetos Python.
• O Dask possui um overhead de configuração e 
manutenção muito baixa.
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• Essencialmente, Dask divide datasets médios e 
grandes em partes menores e gerencia a execução 
paralela de funções sobre essas partes 
• Dask é tão útil para trabalhar com datasets médios 
em uma única máquina quanto para trabalhar com 
grandes datasets em um cluster
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• Essencialmente, Dask divide datasets médios e grandes em 
partes menores e gerencia a execução paralela de funções 
sobre essas partes 
• Tudo isso pode ser feito sem a necessidade de refatorar o 
código existente ou escrever código adicional para lidar com 
problemas específicos do cluster (gerenciamento de recursos e 
transferência de dados)
Introdução ao Python Dask
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• O Spark se tornou uma estrutura muito popular para analisar grandes conjuntos de 
dados (e é ótimo para isto)
• Spark como uma alternativa in-memory para o MapReduce Apache Hadoop e é 
dependente da JVM (Java Virtual Machine). 
• O suporte ao Python veio mais tarde com o biblioteca PySpark. No entanto, toda a 
interação com o cluster passa pela JVM e assim, algumas execuções ocorrem fora 
do contexto do Python.
• Além disso, o Spark não está equipado para manipular código que não pode ser 
modelo com MapReduce. 
Dask x Spark
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• Dask’s task schedulers usa o conceito de gráficos 
acíclicos direcionados (DAGs). 
• DAGs não permitem loops (o grafo não possui 
ciclo)
Dask e DAGs
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• O Dask usa DAGs para coordenar a execução do código 
paralelo nos núcleos e máquinas da CPU.
• Os nós upstream devem ser concluídos antes que o trabalho 
possa começar em qualquer nó downstream dependente.
• No caso de uma falha, as etapas para alcançar um nó 
podem ser repetidas desde o início sem atrapalhar o restante 
do processo.
Dask e DAGs
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
Dask e DAGs
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
Primeiro contato com
Python Dask
• Conheça o site de opendata:
• A cada terceira semana do mês, o Departamento de Finanças 
da cidade de Nova York registra e publica um conjunto de 
dados de todas as citações de estacionamento emitidas ao 
longo do ano fiscal até agora.
Datasets
https://opendata.cityofnewyork.us/
• Iremos utilizar contém 2Gb de dados: Parking 
Violations Issued - Fiscal Year 2017 (arquivo csv)
Datasets
https://catalog.data.gov/dataset?tags=parking-ticket
• Hello Dask: API DataFrame
• Uma etapa essencial de qualquer projeto de ciência de dados é 
realizar análises exploratórias no conjunto de dados. 
• Durante a análise exploratória, convém verificar os dados em 
busca de valores ausentes, outliers e outros problemas de 
qualidade dos dados. 
A first look at the DataFrame API
• A limpeza do conjunto de dados garante que a análise 
que você faz e quaisquer conclusões que você tire 
sobre os dados não sejam influenciadas por dados 
errados ou anômalos.
A first look at the DataFrame API
• Com o Dask DataFrames, faremos a leitura de um 
dataset em busca de valores ausentes e na 
eliminação de colunas que estão faltando muitos 
dados ou que não serão úteis para análise.
A first look at the DataFrame API
• Abra a máquina virtual fornecida para a aula. Nela, já 
temos o dataset que iremos utilizar. Faça Download 
da VM aqui: 
• Abra o terminal, e entre no python environment
A first look at the DataFrame API
source dask/bin/activate
jupyter notebook
https://1drv.ms/u/s!AjVQhLFLEQuFs8ofwi35IOmIv7eM2g?e=XhTZ8V
User: hadoop
Password: haddop
Programação com 
Python Dask – Parte I
• Neste exemplo, examinaremos apenas os dados 
coletados a partir de 2017. Primeiro, você precisará 
importar os módulos Dask e ler seus dados.
Dask DataFrames
# Listing 2.1
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from matplotlib import pyplot as plt
df = dd.read_csv('nyc-parking-tickets/*2017.csv')
df
Dask DataFrames
Veja a saída gerada (DataFrame metadata):
npartitions: em quantas
partições o DataFrame
é dividido
Dask criou um DAG 
com 99 nós para 
processar os dados.
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
• Observe que os nomes das colunas estão na parte superior e abaixo está o 
respectivo tipo de dados de cada coluna.
• Lembre-se que, diferentemente do Pandas que executa inteiramente na 
RAM, Dask irá lidar com dataset que não podem ser carregados na RAM, e 
portanto, podem estar distribuídos.
• Assim, Dask DataFrames emprega métodos de amostragem aleatória para 
criar um profile e inferir tipos de dados de uma pequena amostra.
Dask DataFrames
• Como o Dask’s scheduler decide interrompero trabalho de processar este 
arquivo?
• O valor de “npartitions“ mostra em quantas partições o DataFrame é 
dividido.
• Como o arquivo possui um pouco mais de 2 GB, em 33 partições, cada 
partição tem aproximadamente 64 MB. Isso significa que, em vez de 
carregar o arquivo inteiro na RAM de uma só vez, cada Dask work thread 
irá executar um pedaço de 64 MB do arquivo por vez.
Dask DataFrames
Dask DataFrames
O Dask divide grandes 
arquivos de dados em várias 
partições e funciona em uma 
partição por vez
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
Dask DataFrames
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
O processamento de 
cada partição (etapas 
1 e 3) pode ser feito 
um de cada vez ou 
em paralelo.
Espaço de memória temporário
• O comportamento é apresentado na figura do slide anterior. 
• Em vez de carregar antecipadamente todo o DataFrame na 
RAM, o Dask divide o arquivo em partes menores que podem 
ser processadas independentemente. 
• No caso do Dask DataFrames, cada partição é um Pandas 
DataFrame relativamente pequeno.
• No exemplo da figura, O DataFrame consiste em duas 
partições. Assim, o Dask DataFrames único é composto por 
dois Pandas DataFrames menores.
Dask DataFrames
• Cada partição pode ser carregada na memória e trabalhada 
uma de cada vez ou em paralelo. 
• Como o worker node pode trabalhar em partes menores por 
vez, o processamento pode ser distribuído para muitas 
máquinas. Ou, no caso de um único nó, o trabalho pode 
prosseguir em datasets muito grandes sem resultar em erros 
de falta de memória.
Dask DataFrames
• Dask criou um DAG com 99 nós para processar os dados. 
• O grafo consiste em 99 nós, porque cada partição requer a 
criação de três operações: (1) lendo os dados brutos, (2) 
dividindo os dados no bloco de tamanho apropriado e (3) 
inicializando o objeto DataFrame subjacente. 
• Assim, 33 partições com 3 tarefas por partição resultam em 
99 tarefas
Dask DataFrames
• Neste exemplo, se tivéssemos 33 workers (nós no cluster ou 
cores um uma máquina), todo o arquivo poderia ser 
processado simultaneamente. 
• Mas com apenas um worker node (ou core), o Dask percorre 
cada partição, uma de cada vez. 
Dask DataFrames
Dask DataFrames
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
O Dask permite que um 
único Pandas DataFrame
seja executado em 
paralelo por vários hosts.
Dask DataFrames
Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607
Processando dados em paralelo 
em várias máquinas
Programação com 
Python Dask – Parte II
• Sobre o tamanho das partições: evite partições muito grandes
• Os chunks de dados devem ser pequenos o suficiente para que muitos deles caibam na 
memória disponível de um worker de uma só vez. 
• O Dask provavelmente manipulará tantos chunks em paralelo em uma máquina quanto você 
tiver núcleos nessa máquina
• Portanto, se você tiver blocos de 1 GB e dez núcleos, é provável que o Dask use pelo menos 
10 GB de memória. 
• Além disso, é comum que o Dask tenha de duas a três vezes mais chunks disponíveis para 
trabalhar, para que ele sempre tenha algo em que trabalhar.
Dask DataFrames
• Agora, vamos tentar contar os valores ausentes em cada 
coluna em todo o arquivo.
Dask DataFrames
missing_values = df.isnull().sum()
missing_values
Dask Series Structure:
npartitions=1 
Date First Observed int64 
Violation Time ... 
dtype: int64 
Dask Name: dataframe-sum-agg, 166 tasks
• Em vez de obter as contagens ausentes, o Dask retorna algumas 
informações de metadados sobre o resultado esperado
• Observe que Dask ainda não realizou nenhum processamento porque usa 
lazy computation. (o Dask preparou outro DAG, que foi armazenado na 
variável missing_values.)
• Os dados não são computados até que o grafo de tarefas seja executado 
explicitamente
Dask DataFrames
• Agora, vamos tentar contar os valores ausentes em cada 
coluna em todo o arquivo.
Dask DataFrames
missing_values = df.isnull().sum()
missing_values
Dask Series Structure:
npartitions=1 
Date First Observed int64 
Violation Time ... 
dtype: int64 
Dask Name: dataframe-sum-agg, 166 tasks
Observe que a contagem de tarefas
aumentou para 166
Dask realizou as 99 primeiras tarefas do DAG 
usadas para ler o arquivo de dados e criar o 
DataFrame chamado df, adicionou 66 
tarefas (2 por partição) para verificar nulos e 
soma e, em seguida, adicionou uma etapa 
final para coletar todos os dados
• Antes de executar o cálculo, iremos transformar esses 
números em porcentagens, dividindo as contagens de valores 
ausentes (missing_values) pelo número total de linhas no 
DataFrame (df.index.size) e multiplicando tudo por 100
Dask DataFrames
missing_count = ((missing_values / df.index.size) * 100)
missing_count
• Observe que o número de tarefas aumentou novamente e o tipo de dados 
da série resultante mudou de int64 to float64 (divisão resulta em uma 
resta de ponto flutuante)
Dask DataFrames
Dask Series Structure: 
npartitions=1 
Date First Observed float64 
Violation Time ... 
dtype: float64 
Dask Name: mul, 235 tasks
• Executando cálculos com o método compute()
Dask DataFrames
with ProgressBar():
missing_count_pct = missing_count.compute()
missing_count_pct
Método do DataFrame chamado para que o Dask calcule
o resultado (utilizando a DAG criada anteriormente). 
imprimirá uma barra de 
progresso baseada em texto, 
mostrando a porcentagem
estimada de conclusão e o 
tempo decorrido para o cálculo.
• Pela saída de nosso cálculo de valores ausentes, podemos 
chegar a algumas conclusões:
• Não há dados sobre “No Standing or Stopping Violation”, 
“Hydrant Violation”, “Double Parking Violation”. Estão 
completamente vazios. Não faz sentido manter estas colunas.
• Eliminaremos qualquer coluna que esteja com mais de 60% de 
seus valores ausentes. 
Dask DataFrames
Dask DataFrames
columns_to_drop = missing_count_pct[missing_count_pct > 60].index
with ProgressBar():
df_dropped = df.drop(columns_to_drop, axis=1).persist()
obtivemos o 
índice da série
filtrada, que é 
uma lista de 
nomes de 
colunas. 
usamos esse índice para descartar colunas no 
Dask DataFrame com o mesmo nome
Agora o objeto é disponibilizado para todos os threads. No 
caso de execução em um cluster, o objeto será serializado 
e transmitido para todos os workes nodes.
Evita que Dask descarte os dados filtrados criados tão logo 
termine a execução. persist() permite que Dask mantenha
em memória o resultado
Paralelizando código com
Dask Delayed
• Para entender melhor como o paralelismo funciona, 
paralelizamos código simples de estilo for-loop com Dask e 
dask.delayed. 
• Essa é uma maneira simples de usar o dask para paralelizar 
bases de código existentes ou criar sistemas complexos. 
Dask Delayed
Fonte: https://docs.dask.org/en/latest/best-practices.html
Dask Delayed
from dask.distributed import Client
client = Client(n_workers=4)
from time import sleep
def inc(x):
sleep(1)
return x + 1
def add(x, y):
sleep(1)
return x + y
Primeiro, vamos criar algumas funções inc e 
add que dormem um pouco para simular algum
processamento. Em seguida, executaremos
essas funções normalmente.
Fonte: https://docs.dask.org/en/latest/best-practices.html
• Cronometramos a execução desse código normal usando %%time, que é 
uma função especial do Jupyter Notebook.
Dask Delayed
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other
x = inc(1)
y = inc(2)
z = add(x, y)
CPU times: user 436 ms, sys: 133 ms, total: 568 ms
Wall time: 3 s
Observe o 
tempo de 
execução de 3 
segundos. Este 
código é 
sequencial.Fonte: https://docs.dask.org/en/latest/best-practices.html
• Paralelizando o código: Observe que estas duas chamadas de incremento 
(inc functions) podem ser chamadas em paralelo, porque são totalmente 
independentesuma da outra.
• Transformaremos o inc e add usando a função dask.delayed. Quando a 
chamamos a versão delayed passando os argumentos, exatamente como 
antes, a função original ainda não é chamada (por isso que a execução 
termina muito rapidamente). Em vez disso, é criado um objeto delayed, 
que monitora a função a ser chamada e os argumentos a serem 
transmitidos.
Dask Delayed
Fonte: https://docs.dask.org/en/latest/best-practices.html
Dask Delayed
from dask import delayed
%%time
# This runs immediately, all it does is build a graph
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
CPU times: user 766 µs, sys: 207 µs, total: 973 µs
Wall time: 733 µs
Foi executado
imediatamente, já
que nada 
realmente
aconteceu ainda.
Fonte: https://docs.dask.org/en/latest/best-practices.html
• Para obter o resultado, chamamos agora a função compute()
Dask Delayed
%%time
# This actually runs our computation using a local process pool
z.compute()
CPU times: user 333 ms, sys: 85 ms, total: 418 ms
Wall time: 2.04 s
Observe que foi executado mais
rapidamente que o código original.
Fonte: https://docs.dask.org/en/latest/best-practices.html
• O que aconteceu neste processamento: o objeto “z” é um lazy Delayed
object. Ele contém tudo o que precisamos para calcular o resultado final, 
incluindo referências a todas as funções necessárias. Podemos avaliar o 
resultado com .compute () ou podemos visualizar o gráfico de tarefas para 
esse valor com .visualize ().
Dask Delayed
z
Delayed('add-197e820c-5b37-4da4-827c-0931d6f717ea')
Fonte: https://docs.dask.org/en/latest/best-practices.html
Dask Delayed
Observe o grafo gerado (Dask DAG). Observe o 
fluxo de execução que foi criado. 
# Look at the task graph for `z`
z.visualize()
Fonte: https://docs.dask.org/en/latest/best-practices.html
• Analisando o resultado:
• Por que passamos de 3 para 2? Por que não conseguimos 
paralelizar até 1s?
Dask Delayed
Fonte: https://docs.dask.org/en/latest/best-practices.html
Exercício resolvido: 
Dask Delayed
• Exercícios: Paralelize o código do loop abaixo: veja o arquivo 
“Parallelize a for loop.ipynb”
• Mostre a DAG gerada
Dask Delayed
Fonte: https://docs.dask.org/en/latest/best-practices.html
Resposta
Fonte: https://docs.dask.org/en/latest/best-practices.html
%%time
# Your parallel code here...
from dask import delayed
results = []
for x in data:
y = delayed(inc)(x)
results.append(y)
total = delayed(sum)(results)
print("Before computing:", total) # Let's see what type of thing total is
result = total.compute()
print("After computing :", result) # After it's computed
total.visualize()
Referências Bibliográficas
Jesse C. Daniel . Data Science with Python 
and Dask. July 2019 ISBN 9781617295607
Considerações Finais

Continue navegando