Baixe o app para aproveitar ainda mais
Prévia do material em texto
Tecnologias para o Ecossistema de Big Data Prof. Henrique Batista da Silva Python Dask • No contexto de big data, sabemos que grandes dataset (terabytes, e em alguns casos petabytes) são conjuntos de dados que não podem caber na RAM nem no armazenamento persistente de um único computador. Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • Pandas, NumPy e scikit-learn não são adequados para conjuntos de dados desse tamanho, porque não foram criados inerentemente para operar em conjuntos de dados distribuídos. Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • O Dask foi lançado no final de 2014 por Matthew Rocklin com o objetivo de trazer escalabilidade nativa para o Python e superar suas restrições de máquina única. • Com o tempo, o projeto se transformou em uma das melhores estruturas de computação escalável disponíveis para desenvolvedores de Python. Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • O Dask consiste em vários componentes e APIs diferentes, que podem ser categorizados em três camadas: o planejador, APIs de baixo nível e APIs de alto nível (veja imagem no próximo slide) Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 Os componentes e camadas que compõem o Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • No núcleo está o task scheduler, que coordena e monitora a execução dos cálculos nos núcleos e máquinas da CPU • Esses cálculos são representados no código como Dask Delayed objects (avaliados no momento em que os valores são necessários - lazily) ou Dask Futures objects (são avaliados em tempo real, independentemente de o valor ser necessário imediatamente ou não) Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • As APIs de alto nível da Dask oferecem uma camada de abstração sobre Delayed e Futures objects. • As operações nesses objetos de alto nível resultam em muitas operações paralelas de baixo nível gerenciadas pelos task schedulers. Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • Algumas vantagens do Dask: • O Dask é totalmente implementado em Python e escala nativamente o NumPy, Pandas e o scikit-learn. • O Dask pode ser usado efetivamente para trabalhar com datasets médios em uma única máquina e grandes datasets em um cluster. • O Dask pode ser usado como uma estrutura geral para paralelizar a maioria dos objetos Python. • O Dask possui um overhead de configuração e manutenção muito baixa. Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • Essencialmente, Dask divide datasets médios e grandes em partes menores e gerencia a execução paralela de funções sobre essas partes • Dask é tão útil para trabalhar com datasets médios em uma única máquina quanto para trabalhar com grandes datasets em um cluster Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • Essencialmente, Dask divide datasets médios e grandes em partes menores e gerencia a execução paralela de funções sobre essas partes • Tudo isso pode ser feito sem a necessidade de refatorar o código existente ou escrever código adicional para lidar com problemas específicos do cluster (gerenciamento de recursos e transferência de dados) Introdução ao Python Dask Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • O Spark se tornou uma estrutura muito popular para analisar grandes conjuntos de dados (e é ótimo para isto) • Spark como uma alternativa in-memory para o MapReduce Apache Hadoop e é dependente da JVM (Java Virtual Machine). • O suporte ao Python veio mais tarde com o biblioteca PySpark. No entanto, toda a interação com o cluster passa pela JVM e assim, algumas execuções ocorrem fora do contexto do Python. • Além disso, o Spark não está equipado para manipular código que não pode ser modelo com MapReduce. Dask x Spark Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • Dask’s task schedulers usa o conceito de gráficos acíclicos direcionados (DAGs). • DAGs não permitem loops (o grafo não possui ciclo) Dask e DAGs Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • O Dask usa DAGs para coordenar a execução do código paralelo nos núcleos e máquinas da CPU. • Os nós upstream devem ser concluídos antes que o trabalho possa começar em qualquer nó downstream dependente. • No caso de uma falha, as etapas para alcançar um nó podem ser repetidas desde o início sem atrapalhar o restante do processo. Dask e DAGs Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 Dask e DAGs Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 Primeiro contato com Python Dask • Conheça o site de opendata: • A cada terceira semana do mês, o Departamento de Finanças da cidade de Nova York registra e publica um conjunto de dados de todas as citações de estacionamento emitidas ao longo do ano fiscal até agora. Datasets https://opendata.cityofnewyork.us/ • Iremos utilizar contém 2Gb de dados: Parking Violations Issued - Fiscal Year 2017 (arquivo csv) Datasets https://catalog.data.gov/dataset?tags=parking-ticket • Hello Dask: API DataFrame • Uma etapa essencial de qualquer projeto de ciência de dados é realizar análises exploratórias no conjunto de dados. • Durante a análise exploratória, convém verificar os dados em busca de valores ausentes, outliers e outros problemas de qualidade dos dados. A first look at the DataFrame API • A limpeza do conjunto de dados garante que a análise que você faz e quaisquer conclusões que você tire sobre os dados não sejam influenciadas por dados errados ou anômalos. A first look at the DataFrame API • Com o Dask DataFrames, faremos a leitura de um dataset em busca de valores ausentes e na eliminação de colunas que estão faltando muitos dados ou que não serão úteis para análise. A first look at the DataFrame API • Abra a máquina virtual fornecida para a aula. Nela, já temos o dataset que iremos utilizar. Faça Download da VM aqui: • Abra o terminal, e entre no python environment A first look at the DataFrame API source dask/bin/activate jupyter notebook https://1drv.ms/u/s!AjVQhLFLEQuFs8ofwi35IOmIv7eM2g?e=XhTZ8V User: hadoop Password: haddop Programação com Python Dask – Parte I • Neste exemplo, examinaremos apenas os dados coletados a partir de 2017. Primeiro, você precisará importar os módulos Dask e ler seus dados. Dask DataFrames # Listing 2.1 import dask.dataframe as dd from dask.diagnostics import ProgressBar from matplotlib import pyplot as plt df = dd.read_csv('nyc-parking-tickets/*2017.csv') df Dask DataFrames Veja a saída gerada (DataFrame metadata): npartitions: em quantas partições o DataFrame é dividido Dask criou um DAG com 99 nós para processar os dados. Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 • Observe que os nomes das colunas estão na parte superior e abaixo está o respectivo tipo de dados de cada coluna. • Lembre-se que, diferentemente do Pandas que executa inteiramente na RAM, Dask irá lidar com dataset que não podem ser carregados na RAM, e portanto, podem estar distribuídos. • Assim, Dask DataFrames emprega métodos de amostragem aleatória para criar um profile e inferir tipos de dados de uma pequena amostra. Dask DataFrames • Como o Dask’s scheduler decide interrompero trabalho de processar este arquivo? • O valor de “npartitions“ mostra em quantas partições o DataFrame é dividido. • Como o arquivo possui um pouco mais de 2 GB, em 33 partições, cada partição tem aproximadamente 64 MB. Isso significa que, em vez de carregar o arquivo inteiro na RAM de uma só vez, cada Dask work thread irá executar um pedaço de 64 MB do arquivo por vez. Dask DataFrames Dask DataFrames O Dask divide grandes arquivos de dados em várias partições e funciona em uma partição por vez Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 Dask DataFrames Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 O processamento de cada partição (etapas 1 e 3) pode ser feito um de cada vez ou em paralelo. Espaço de memória temporário • O comportamento é apresentado na figura do slide anterior. • Em vez de carregar antecipadamente todo o DataFrame na RAM, o Dask divide o arquivo em partes menores que podem ser processadas independentemente. • No caso do Dask DataFrames, cada partição é um Pandas DataFrame relativamente pequeno. • No exemplo da figura, O DataFrame consiste em duas partições. Assim, o Dask DataFrames único é composto por dois Pandas DataFrames menores. Dask DataFrames • Cada partição pode ser carregada na memória e trabalhada uma de cada vez ou em paralelo. • Como o worker node pode trabalhar em partes menores por vez, o processamento pode ser distribuído para muitas máquinas. Ou, no caso de um único nó, o trabalho pode prosseguir em datasets muito grandes sem resultar em erros de falta de memória. Dask DataFrames • Dask criou um DAG com 99 nós para processar os dados. • O grafo consiste em 99 nós, porque cada partição requer a criação de três operações: (1) lendo os dados brutos, (2) dividindo os dados no bloco de tamanho apropriado e (3) inicializando o objeto DataFrame subjacente. • Assim, 33 partições com 3 tarefas por partição resultam em 99 tarefas Dask DataFrames • Neste exemplo, se tivéssemos 33 workers (nós no cluster ou cores um uma máquina), todo o arquivo poderia ser processado simultaneamente. • Mas com apenas um worker node (ou core), o Dask percorre cada partição, uma de cada vez. Dask DataFrames Dask DataFrames Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 O Dask permite que um único Pandas DataFrame seja executado em paralelo por vários hosts. Dask DataFrames Ref.: Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 Processando dados em paralelo em várias máquinas Programação com Python Dask – Parte II • Sobre o tamanho das partições: evite partições muito grandes • Os chunks de dados devem ser pequenos o suficiente para que muitos deles caibam na memória disponível de um worker de uma só vez. • O Dask provavelmente manipulará tantos chunks em paralelo em uma máquina quanto você tiver núcleos nessa máquina • Portanto, se você tiver blocos de 1 GB e dez núcleos, é provável que o Dask use pelo menos 10 GB de memória. • Além disso, é comum que o Dask tenha de duas a três vezes mais chunks disponíveis para trabalhar, para que ele sempre tenha algo em que trabalhar. Dask DataFrames • Agora, vamos tentar contar os valores ausentes em cada coluna em todo o arquivo. Dask DataFrames missing_values = df.isnull().sum() missing_values Dask Series Structure: npartitions=1 Date First Observed int64 Violation Time ... dtype: int64 Dask Name: dataframe-sum-agg, 166 tasks • Em vez de obter as contagens ausentes, o Dask retorna algumas informações de metadados sobre o resultado esperado • Observe que Dask ainda não realizou nenhum processamento porque usa lazy computation. (o Dask preparou outro DAG, que foi armazenado na variável missing_values.) • Os dados não são computados até que o grafo de tarefas seja executado explicitamente Dask DataFrames • Agora, vamos tentar contar os valores ausentes em cada coluna em todo o arquivo. Dask DataFrames missing_values = df.isnull().sum() missing_values Dask Series Structure: npartitions=1 Date First Observed int64 Violation Time ... dtype: int64 Dask Name: dataframe-sum-agg, 166 tasks Observe que a contagem de tarefas aumentou para 166 Dask realizou as 99 primeiras tarefas do DAG usadas para ler o arquivo de dados e criar o DataFrame chamado df, adicionou 66 tarefas (2 por partição) para verificar nulos e soma e, em seguida, adicionou uma etapa final para coletar todos os dados • Antes de executar o cálculo, iremos transformar esses números em porcentagens, dividindo as contagens de valores ausentes (missing_values) pelo número total de linhas no DataFrame (df.index.size) e multiplicando tudo por 100 Dask DataFrames missing_count = ((missing_values / df.index.size) * 100) missing_count • Observe que o número de tarefas aumentou novamente e o tipo de dados da série resultante mudou de int64 to float64 (divisão resulta em uma resta de ponto flutuante) Dask DataFrames Dask Series Structure: npartitions=1 Date First Observed float64 Violation Time ... dtype: float64 Dask Name: mul, 235 tasks • Executando cálculos com o método compute() Dask DataFrames with ProgressBar(): missing_count_pct = missing_count.compute() missing_count_pct Método do DataFrame chamado para que o Dask calcule o resultado (utilizando a DAG criada anteriormente). imprimirá uma barra de progresso baseada em texto, mostrando a porcentagem estimada de conclusão e o tempo decorrido para o cálculo. • Pela saída de nosso cálculo de valores ausentes, podemos chegar a algumas conclusões: • Não há dados sobre “No Standing or Stopping Violation”, “Hydrant Violation”, “Double Parking Violation”. Estão completamente vazios. Não faz sentido manter estas colunas. • Eliminaremos qualquer coluna que esteja com mais de 60% de seus valores ausentes. Dask DataFrames Dask DataFrames columns_to_drop = missing_count_pct[missing_count_pct > 60].index with ProgressBar(): df_dropped = df.drop(columns_to_drop, axis=1).persist() obtivemos o índice da série filtrada, que é uma lista de nomes de colunas. usamos esse índice para descartar colunas no Dask DataFrame com o mesmo nome Agora o objeto é disponibilizado para todos os threads. No caso de execução em um cluster, o objeto será serializado e transmitido para todos os workes nodes. Evita que Dask descarte os dados filtrados criados tão logo termine a execução. persist() permite que Dask mantenha em memória o resultado Paralelizando código com Dask Delayed • Para entender melhor como o paralelismo funciona, paralelizamos código simples de estilo for-loop com Dask e dask.delayed. • Essa é uma maneira simples de usar o dask para paralelizar bases de código existentes ou criar sistemas complexos. Dask Delayed Fonte: https://docs.dask.org/en/latest/best-practices.html Dask Delayed from dask.distributed import Client client = Client(n_workers=4) from time import sleep def inc(x): sleep(1) return x + 1 def add(x, y): sleep(1) return x + y Primeiro, vamos criar algumas funções inc e add que dormem um pouco para simular algum processamento. Em seguida, executaremos essas funções normalmente. Fonte: https://docs.dask.org/en/latest/best-practices.html • Cronometramos a execução desse código normal usando %%time, que é uma função especial do Jupyter Notebook. Dask Delayed %%time # This takes three seconds to run because we call each # function sequentially, one after the other x = inc(1) y = inc(2) z = add(x, y) CPU times: user 436 ms, sys: 133 ms, total: 568 ms Wall time: 3 s Observe o tempo de execução de 3 segundos. Este código é sequencial.Fonte: https://docs.dask.org/en/latest/best-practices.html • Paralelizando o código: Observe que estas duas chamadas de incremento (inc functions) podem ser chamadas em paralelo, porque são totalmente independentesuma da outra. • Transformaremos o inc e add usando a função dask.delayed. Quando a chamamos a versão delayed passando os argumentos, exatamente como antes, a função original ainda não é chamada (por isso que a execução termina muito rapidamente). Em vez disso, é criado um objeto delayed, que monitora a função a ser chamada e os argumentos a serem transmitidos. Dask Delayed Fonte: https://docs.dask.org/en/latest/best-practices.html Dask Delayed from dask import delayed %%time # This runs immediately, all it does is build a graph x = delayed(inc)(1) y = delayed(inc)(2) z = delayed(add)(x, y) CPU times: user 766 µs, sys: 207 µs, total: 973 µs Wall time: 733 µs Foi executado imediatamente, já que nada realmente aconteceu ainda. Fonte: https://docs.dask.org/en/latest/best-practices.html • Para obter o resultado, chamamos agora a função compute() Dask Delayed %%time # This actually runs our computation using a local process pool z.compute() CPU times: user 333 ms, sys: 85 ms, total: 418 ms Wall time: 2.04 s Observe que foi executado mais rapidamente que o código original. Fonte: https://docs.dask.org/en/latest/best-practices.html • O que aconteceu neste processamento: o objeto “z” é um lazy Delayed object. Ele contém tudo o que precisamos para calcular o resultado final, incluindo referências a todas as funções necessárias. Podemos avaliar o resultado com .compute () ou podemos visualizar o gráfico de tarefas para esse valor com .visualize (). Dask Delayed z Delayed('add-197e820c-5b37-4da4-827c-0931d6f717ea') Fonte: https://docs.dask.org/en/latest/best-practices.html Dask Delayed Observe o grafo gerado (Dask DAG). Observe o fluxo de execução que foi criado. # Look at the task graph for `z` z.visualize() Fonte: https://docs.dask.org/en/latest/best-practices.html • Analisando o resultado: • Por que passamos de 3 para 2? Por que não conseguimos paralelizar até 1s? Dask Delayed Fonte: https://docs.dask.org/en/latest/best-practices.html Exercício resolvido: Dask Delayed • Exercícios: Paralelize o código do loop abaixo: veja o arquivo “Parallelize a for loop.ipynb” • Mostre a DAG gerada Dask Delayed Fonte: https://docs.dask.org/en/latest/best-practices.html Resposta Fonte: https://docs.dask.org/en/latest/best-practices.html %%time # Your parallel code here... from dask import delayed results = [] for x in data: y = delayed(inc)(x) results.append(y) total = delayed(sum)(results) print("Before computing:", total) # Let's see what type of thing total is result = total.compute() print("After computing :", result) # After it's computed total.visualize() Referências Bibliográficas Jesse C. Daniel . Data Science with Python and Dask. July 2019 ISBN 9781617295607 Considerações Finais
Compartilhar