Baixe o app para aproveitar ainda mais
Prévia do material em texto
Alex Cunha Princípios e Técnicas Data Science Apache Spark Apache Spark é uma plataforma de computação em cluster que fornece uma API para programação distribuída para processamento de dados em larga escala, semelhante ao modelo MapReduce, mas projetada para ser rápida para consultas interativas e algoritmos iterativos. MapReduce Apache Spark O Spark permite que você distribua dados e tarefas em clusters com vários nós. Imagine cada nó como um computador separado. A divisão dos dados torna mais fácil o trabalho com conjuntos de dados muito grandes porque cada nó funciona processa apenas uma parte parte do volume total de dados. Apache Spark Preparação de dados Modelos de machine learning Análise de dados em tempo real O Spark é amplamente utilizado em projetos analíticos nas seguintes frentes: PySpark PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o shell PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core. PySpark PySpark Spark SQL e DataFrame Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído. PySpark Spark Streaming Executando em cima do Spark, o recurso de streaming no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em streaming e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas. PySpark Spark MLlib Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar pipelines de aprendizado de máquina. PySpark Spark Core Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (Resilient Distributed Dataset) e recursos de computação na memória. Prática Instalação do PySpark i!pip install pyspark==3.3.1 Prática Instalação do openjdk !apt-get install openjdk-8-jdk-headless -qq > /dev/null Prática Download do Spark !wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark 3.3.1-bin-hadoop2.tgz !ls(verificar lista de arquivos) Prática Descompactar Spark !tar xf spark-3.3.1-bin-hadoop2.tgz Prática Instalação do findspark !pip install -q findspark Prática Instalação do findspark import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop2" Prática Instalação do findspark import findspark findspark.init() Prática from pyspark.sql import SparkSession Prática spark = SparkSession.builder.master('local[*]').appName('Iniciando com Spark').config('spark.ui.port', '4050').getOrCreate() Prática !wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable linux-amd64.zip Prática !unzip ngrok-stable-linux-amd64.zip Prática get_ipython().system_raw('./ngrok http 4050 &') Prática !ngrok config add-authtoken 2OH1UOyQIbdBMYDn17YxyOGQ8Cf_6vGgCkPj1JSLbjaM6j83f Prática !ngrok config add-authtoken 2OH1UOyQIbdBMYDn17YxyOGQ8Cf_6vGgCkPj1JSLbjaM6j83f Prática !curl -s http://localhost:4040/api/tunnels Prática data = [('Zeca', '35'), ('Eva', '29')] colNames = ['Nome', 'Idade'] Prática df = spark.createDataFrame(data, colNames) df.show() Prática df.toPandas() Prática from google.colab import drive drive.mount('/content/drive') Prática import zipfile Prática zipfile.ZipFile(' ',' ').extractall(' ') Prática path = '/content/drive/MyDrive/spark/empresas' empresas = spark.read.csv(path, sep=';', inferSchema=True) Prática empresas.count() Prática empresas.limit(5).toPandas() Prática empresasColNames = ['cnpj_basico 'razao_social_nome_empresarial', 'natureza_juridica 'qualificacao_do_responsavel', 'capital_social_da_empresa 'porte_da_empresa', 'ente_federativo_responsavel'] Prática for item in enumerate(empresasColNames): print(item) Prática for index, colName in enumerate(empresasColNames): empresas = empresas.withColumnRenamed(f"_c{index}" colName) Prática empresas.columns Prática empresas.limit(5).toPandas() Prática estabsColNames = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial'] Prática for index, colName in enumerate(estabsColNames): estabelecimentos = estabelecimentos.withColumnRenamed(f"_c{index}", colName) estabelecimentos.columns Prática sociosColNames = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria'] Prática for index, colName in enumerate(sociosColNames): socios = socios.withColumnRenamed(f"_c{index}", colName) socios.columns Prática DataTypes empresas.limit(5).toPandas() empresas.printSchema() Prática DataTypes socios.limit(5).toPandas() socios.printSchema() Prática Modificando tipo de dados (StringType, DoubleType) from pyspark.sql.types import DoubleType, StringType from pyspark.sql import functions as f empresas.printSchema() Obs.: capital_social_da_empresa: string Prática Modificando tipo de dados (StringType, DoubleType) empresas = empresas.withColumn('capital_social_da_empresa', f.regexp_replace('capital_social_da_empresa', ',', '.')) empresas.limit(5).toPandas() empresas.printSchema() Prática Modificando tipo de dados (StringType, DoubleType) empresas = empresas.withColumn('capital_social_da_empresa', empresas['capital_social_da_empresa'].cast(DoubleType())) empresas.limit(5).toPandas() empresas.printSchema() Prática StringType->DateType df = spark.createDataFrame([(20200924,), (20201022,), (20210215,)], ['data']) df.toPandas() df.printSchema() Prática StringType->DateType df = df.withColumn("data", f.to_date(df.data.cast(StringType()), 'yyyyMMdd')) df.printSchema() df.toPandas() Prática StringType->DateType estabelecimentos.printSchema() CONVERTA A COLUNA(estabelecimentos): data_situacao_cadastral Prática StringType->DateType estabelecimentos = estabelecimentos\ .withColumn( "data_situacao_cadastral", f.to_date(estabelecimentos.data_situacao_cadastral.cast(StringType()), 'yyyyMMdd') )\ .withColumn( "data_de_inicio_atividade", f.to_date(estabelecimentos.data_de_inicio_atividade.cast(StringType()), 'yyyyMMdd') )\ .withColumn( "data_da_situacao_especial", f.to_date(estabelecimentos.data_da_situacao_especial.cast(StringType()), 'yyyyMMdd') ) estabelecimentos.printSchema() Prática StringType->DateType CONVERTA PARA DATA(socios): data_de_entrada_sociedade Prática StringType->DateType socios = socios\ .withColumn( "data_de_entrada_sociedade", f.to_date(socios.data_de_entrada_sociedade.cast(StringT ype()), 'yyyyMMdd') ) socios.printSchema() Seleção e Consultas empresas.select('*').show(5) Para remover o truncaded passar o parametro False .show(truncate=False) Seleção e Consultas(Colunas) empresas\ .select('natureza_juridica','porte_da_empresa','capital_soci al_da_empresa').show(5) Seleção e Consultas(Extrair, Ano, Mês e Dia) socios\ f.dayofweek f.month .alias('coluna') .select('nome_do_socio_ou_razao_social','faixa_etaria',f.year('data_de_entrada_sociedade')).show(5) Identificando valores nulos(NaN) df = spark.createDataFrame([(1,), (2,), (3,), (None,)], ['data']) df.toPandas() NaN df.show() null Identificando valores nulos(NaN) df = spark.createDataFrame([(1.,), (2.,), (3.,), (float('nan'),)], ['data']) df.toPandas() NaN df.show() NaN Identificando valores nulos(NaN) socios.limit(5).toPandas() pais NaN nome_do_representante None Identificando valores nulos(NaN) socios.limit(5).show() pais null nome_do_representante null Identificando valores nulos por coluna socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show() Identificando valores nulos por coluna socios.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in socios.columns]).show() socios.printSchema() pais: integer nome_do_representante: string Substituir valores nulos socios.na.fill().limit(5).toPandas() () '-' para string () 0 - para inteiro
Compartilhar