Prévia do material em texto
APACHE SPARK Processamento de Big Data em Escala Massiva SUMÁRIO Página 1 - Introdução: A Revolução do Big Data Página 2 - O Que É Apache Spark Página 3 - Arquitetura e Componentes Fundamentais Página 4 - RDDs: Resilient Distributed Datasets Página 5 - Spark SQL e DataFrames Página 6 - Spark Streaming: Dados em Tempo Real Página 7 - Machine Learning com MLlib Página 8 - Configuração e Otimização Página 9 - Casos de Uso e Aplicações Reais Página 10 - Ecossistema e Futuro do Spark Anexo - Perguntas e Respostas PÁGINA 1 - INTRODUÇÃO: A REVOLUÇÃO DO BIG DATA Vivemos na era dos dados exponenciais. A cada dia, geramos 2,5 quintilhões de bytes de dados - mais informação do que toda a humanidade produziu até o ano 2000. Mas como processar essa quantidade astronômica de informações de forma eficiente? Durante décadas, o paradigma tradicional de processamento de dados funcionou bem: um servidor, um banco de dados, análises sequenciais. Porém, quando o volume ultrapassou terabytes e petabytes, essa abordagem se tornou impraticável. Era necessária uma revolução. O Desafio do Volume, Velocidade e Variedade Os famosos "3 V's" do Big Data criaram desafios sem precedentes: Volume: Datasets que não cabem em uma única máquina Velocidade: Necessidade de processamento em tempo real Variedade: Dados estruturados, semi-estruturados e não estruturados A Limitação do MapReduce O Hadoop MapReduce foi pioneiro no processamento distribuído, mas tinha limitações críticas: Lentidão devido a operações de disco frequentes Complexidade para algoritmos iterativos Dificuldade para análises interativas Curva de aprendizado íngreme Enter Apache Spark: A Nova Geração Apache Spark emergiu como a solução que o mercado esperava: um framework de processamento distribuído que é até 100x mais rápido que o MapReduce para workloads em memória, mantendo a robustez e facilidade de uso. Por que Spark Importa Hoje? 80% das empresas Fortune 500 usam Spark Processamento 10-100x mais rápido que alternativas tradicionais Suporte unificado para batch, streaming, ML e graph processing APIs simples em Python, Scala, Java e R Prepare-se para descobrir como esta tecnologia está redefinindo o que é possível no mundo do big data! PÁGINA 2 - O QUE É APACHE SPARK Apache Spark é um framework de computação distribuída open-source, projetado para processar grandes volumes de dados de forma rápida e eficiente em clusters de computadores. Definição Técnica Spark é um motor de análise unificado que combina: Processamento de dados em lote (batch) Streaming em tempo real Machine Learning Processamento de grafos Consultas SQL interativas Tudo isso através de uma API consistente e arquitetura otimizada para memória. A Filosofia do Spark Criado na UC Berkeley em 2009 por Matei Zaharia, o Spark foi desenvolvido com três princípios fundamentais: 1. Velocidade: Processamento em memória RAM até 100x mais rápido 2. Facilidade: APIs intuitivas que reduzem linhas de código em 2-5x 3. Generalidade: Uma plataforma para múltiplas workloads Spark vs Hadoop: A Evolução Aspecto Hadoop MapReduce Apache Spark Velocidade Baseado em disco Baseado em memória Facilidade Código verboso APIs de alto nível Casos de Uso Batch processing Batch + Stream + ML + Graph Latência Minutos/horas Segundos/milissegundos Iterações Múltiplas operações I/O Cache inteligente em memória Características Distintivas In-Memory Computing: Dados ficam na RAM entre operações Lazy Evaluation: Operações são otimizadas antes da execução Fault Tolerance: Recuperação automática de falhas Unified API: Mesma sintaxe para diferentes tipos de processamento Multi-language: Python, Scala, Java, R, SQL Quando Usar Spark? ✅ Ideal para: Datasets > 1GB Algoritmos iterativos (ML, graph algorithms) Análises interativas Processamento de streaming ETL complexo ❌ Não ideal para: Datasets pequenos (- unidades básicas de paralelismo: Persistence e Caching python nums.map(lambda x: x * 2) # [2, 4, 6, 8, 10] python nums.filter(lambda x: x > 3) # [4, 5] python words.flatMap(lambda line: line.split()) python rdd1.union(rdd2) python # Controla número de partições rdd = sc.textFile("file.txt", minPartitions=10) # Verifica partições print(rdd.getNumPartitions()) # Reparticiona rdd_repart = rdd.repartition(20) Para evitar recomputação custosa: Lineage Graph - A Magia da Resiliência Cada RDD "lembra" como foi criado: Se uma partição falha, Spark recomputa apenas o necessário seguindo o lineage. RDDs são a base sobre a qual todo o ecossistema Spark é construído! PÁGINA 5 - SPARK SQL E DATAFRAMES Spark SQL revolucionou a análise de big data ao trazer a familiaridade do SQL para o processamento distribuído, combinando o melhor dos dois mundos: performance do Spark e expressividade do SQL. A Evolução: RDD → DataFrame → Dataset DataFrames: RDDs com Schema DataFrames são RDDs estruturados com schema definido - como tabelas de banco de dados distribuídas: Vantagens dos DataFrames Catalyst Optimizer: Otimização automática de consultas Código Generation: Geração de código Java otimizado Schema Inference: Detecção automática de tipos API Unificada: Mesmo código em Python, Scala, Java, R python # Cache na memória rdd.cache() # Diferentes níveis de storage from pyspark import StorageLevel rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) File RDD → map() → filter() → Final RDD python # Criando DataFrame df = spark.read.json("users.json") df.printSchema() Operações com DataFrames Leitura de Dados Transformações Comuns SQL Nativo no Spark python # JSON df = spark.read.json("data.json") # CSV com header df = spark.read.option("header", "true").csv("data.csv") # Parquet (formato columnar otimizado) df = spark.read.parquet("data.parquet") # Banco de dados df = spark.read.jdbc(url, "table", properties) python # Seleção de colunas df.select("name", "age") # Filtros df.filter(df.age > 21) df.where("age > 21") # Agrupamentos df.groupBy("department").avg("salary") # Junções df1.join(df2, df1.id == df2.user_id) # Ordenação df.orderBy("age", ascending=False) python Catalyst Optimizer: A Inteligência por Trás O Catalyst é o cérebro que torna Spark SQL tão eficiente: 1. Analysis: Resolve referências e tipos 2. Logical Optimization: Elimina operações desnecessárias 3. Physical Planning: Escolhe algoritmos mais eficientes 4. Code Generation: Gera código Java otimizado Window Functions - Análises Avançadas User Defined Functions (UDFs) # Registra DataFrame como tabela temporária df.createOrReplaceTempView("employees") # Executa SQL padrão result = spark.sql(""" SELECT department, AVG(salary) as avg_salary, COUNT(*) as total_employees FROM employees WHERE age BETWEEN 25 AND 50 GROUP BY department ORDER BY avg_salary DESC """) python from pyspark.sql.window import Window from pyspark.sql.functions import * window = Window.partitionBy("department").orderBy("salary") df.withColumn("rank", rank().over(window)) \ .withColumn("running_total", sum("salary").over(window)) python Formatos de Dados Modernos Parquet: Columnar, compressão eficiente Delta Lake: ACID transactions, time travel Iceberg: Schema evolution, hidden partitioning Performance Tips Use Parquet para storage Particione dados por colunas frequentemente filtradas Cache DataFrames reutilizados Evite UDFs Python quando possível (prefira funções built-in) Spark SQL democratizou big data ao permitir que analistas SQL trabalhem com petabytes de dados usando sintaxe familiar! PÁGINA 6 - SPARK STREAMING: DADOS EM TEMPO REAL Em um mundo onde dados chegam continuamente - cliques em websites, sensores IoT, transações financeiras - a capacidade de processar informações em tempo real tornou-se crítica. Spark Streaming torna isso possível de forma elegante. O Desafio do Tempo Real Processamento tradicional funciona em lotes: 1. Acumula dados por horas/dias 2. Processa tudo de uma vez 3. Gera relatórios Mas negócios modernos precisam de: Detecção de fraude instantânea Recomendações personalizadas em tempo real from pyspark.sql.types import * # UDF Python def age_category(age): return "Senior" if age > 60 else "Junior" age_udf = udf(age_category, StringType()) df.withColumn("category", age_udf("age")) Monitoramento de sistemas críticos Análise de sentimento de redes sociais Spark Streaming: Micro-Batch Architecture Spark Streaming divide streams contínuos em micro-lotes pequenos (segundos): Structured Streaming: A Nova Geração Spark 2.0+ introduziu Structured Streaming - uma abstração mais poderosa: Fontes de Dados Streaming Apache Kafka (mais comum) Stream contínuo: ----●●●●●●●●●●●●●●●●---- ↓ Micro-batches: [●●●] [●●●] [●●●] [●●●] ↓ Processamento: RDD RDD RDD RDD python # Lê stream de um diretório df = spark.readStream \ .format("json") \ .option("path", "/stream/input") \ .load() # Processa como DataFrame normal result = df.groupBy("user_id") \ .count() # Escreve resultado query = result.writeStream \ .outputMode("update") \ .format("console") \ .start() python df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "topic1") \ .load() Socket Streams File Streams Operações de Janela (Windowing) Para análises agregadas ao longo do tempo: Controle de Estado (Stateful Operations) Spark mantém estado entre micro-batches: python lines = spark.readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() python df = spark.readStream \ .format("csv") \ .option("path", "/streaming/input") \ .load() python from pyspark.sql.functions import * # Janela de 10 minutos, deslizando a cada 5 minutos windowedCounts = df \ .groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") ) \ .count() python # Contagem acumulativa por usuário userCounts = df \ .groupBy("user_id") \ .count() # Estado mantido automaticamente Output Modes Append: Apenas novos registros Complete: Resultado completo a cada batch Update: Apenas registros modificados Tratamento de Late Data Dados podem chegar fora de ordem: Monitoramento e Debugging Casos de Uso Reais 1. Detecção de Fraude: Análise de padrões suspeitos em transações 2. IoT Analytics: Processamento de sensores em tempo real 3. Clickstream Analysis: Comportamento de usuários em websites 4. Alert Systems: Notificações baseadas em métricas Spark Streaming democratiza processamento em tempo real, tornando-o tão simples quanto análises batch! PÁGINA 7 - MACHINE LEARNING COM MLLIB MLlib transforma Apache Spark em uma plataforma completa de Machine Learning distribuído, permitindo treinar modelos em datasets que não cabem em uma única máquina. python # Define watermark para dados atrasados df.withWatermark("timestamp", "10 minutes") \ .groupBy(window("timestamp", "5 minutes")) \ .count() python # Inicia query com checkpoint query = stream.writeStream \ .option("checkpointLocation", "/checkpoint") \ .start() # Monitora progresso query.status query.recentProgress MLlib: Escalando Machine Learning Tradicionalmente, ML tinha limitações de escala: Scikit-learn: Excelente, mas single-machine Frameworks distribuídos: Complexos e especializados MLlib: ML distribuído com API simples Arquitetura do MLlib Pipeline Concept MLlib usa o conceito de pipelines inspirado no scikit-learn: Feature Engineering Distribuído Transformações Numéricas Transformações Categóricas python from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import RandomForestClassifier # Pipeline stages assembler = VectorAssembler(inputCols=["feature1", "feature2"],outputCol="features") rf = RandomForestClassifier(featuresCol="features", labelCol="label") # Constrói pipeline pipeline = Pipeline(stages=[assembler, rf]) # Treina modelo model = pipeline.fit(training_data) python # Normalização from pyspark.ml.feature import StandardScaler scaler = StandardScaler(inputCol="features", outputCol="scaled") # Binning from pyspark.ml.feature import Bucketizer bucketizer = Bucketizer(splits=[-float("inf"), 0, 10, float("inf")], inputCol="value", outputCol="bucket") Processamento de Texto Algoritmos de Classification Algoritmos de Regression python # String to Index from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") # One-Hot Encoding from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec") python # Tokenização from pyspark.ml.feature import Tokenizer tokenizer = Tokenizer(inputCol="text", outputCol="words") # TF-IDF from pyspark.ml.feature import HashingTF, IDF hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures") idf = IDF(inputCol="rawFeatures", outputCol="features") python # Logistic Regression from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(maxIter=10, regParam=0.01) # Random Forest from pyspark.ml.classification import RandomForestClassifier rf = RandomForestClassifier(numTrees=100) # Gradient Boosting from pyspark.ml.classification import GBTClassifier gbt = GBTClassifier(maxIter=10) # Naive Bayes from pyspark.ml.classification import NaiveBayes nb = NaiveBayes() Clustering Sistemas de Recomendação Hyperparameter Tuning python # Linear Regression from pyspark.ml.regression import LinearRegression lr = LinearRegression(featuresCol="features", labelCol="label") # Random Forest Regressor from pyspark.ml.regression import RandomForestRegressor rfr = RandomForestRegressor() python # K-Means from pyspark.ml.clustering import KMeans kmeans = KMeans(k=3, seed=1) model = kmeans.fit(df) # Bisecting K-Means from pyspark.ml.clustering import BisectingKMeans bkm = BisectingKMeans(k=3) python # Collaborative Filtering (ALS) from pyspark.ml.recommendation import ALS als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating") model = als.fit(training) # Gera recomendações recommendations = model.recommendForAllUsers(10) python Model Persistence Vantagens do MLlib Escala para petabytes de dados Integração nativa com Spark SQL APIs consistentes Performance otimizada MLlib democratiza machine learning em grande escala! PÁGINA 8 - CONFIGURAÇÃO E OTIMIZAÇÃO Um Spark mal configurado pode ser 10x mais lento que um bem otimizado. Esta seção revela os segredos para extrair máxima performance do seu cluster. Configuração do Spark Application from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator # Grid de parâmetros paramGrid = ParamGridBuilder() \ .addGrid(lr.regParam, [0.1, 0.01]) \ .addGrid(lr.maxIter, [10, 50]) \ .build() # Cross validation crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=3) # Encontra melhor modelo cvModel = crossval.fit(training) python # Salva modelo model.write().overwrite().save("path/to/model") # Carrega modelo from pyspark.ml.classification import LogisticRegressionModel model = LogisticRegressionModel.load("path/to/model") Spark Context e Session Gerenciamento de Memória Anatomia da Memória no Spark Storage Memory (60%): Cache de RDDs/DataFrames Execution Memory (20%): Shuffles, joins, aggregations User Memory (20%): Código do usuário, metadados Configurações Críticas Otimização de CPU e Paralelismo Cores per Executor python from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("MyApp") \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ .config("spark.executor.memory", "4g") \ .config("spark.executor.cores", "4") \ .getOrCreate() bash # Memória total do executor --executor-memory 4g # Memória do driver --driver-memory 2g # Fração para storage (padrão: 0.6) --conf spark.sql.queryExecutionListeners=0.5 # Serialização (recomendado: Kryo) --conf spark.serializer=org.apache.spark.serializer.KryoSerializer bash Dynamic Allocation Particionamento Inteligente Número Ideal de Partições Otimização de I/O Formatos de Arquivo Compressão # Regra geral: 2-5 cores por executor --executor-cores 4 --num-executors 10 bash # Ajuste automático de executors --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=2 --conf spark.dynamicAllocation.maxExecutors=20 --conf spark.dynamicAllocation.initialExecutors=5 python # Regra geral: 2-4 partições por core # Cluster com 40 cores = 80-160 partições # Controla partições df.repartition(100) # Força redistribuição df.coalesce(50) # Reduz sem shuffle completo # Particiona por coluna (para joins) df.repartition("user_id") python # ❌ Evite: CSV, JSON para grandes datasets df.write.csv("output") # ✅ Use: Parquet (columnar, compressão) df.write.parquet("output") # ✅ Use: Delta Lake (ACID, versioning) df.write.format("delta").save("output") Otimização de Joins Broadcast Joins Bucketing Cache Strategies python # Configuração de compressão spark.conf.set("spark.sql.parquet.compression.codec", "snappy") spark.conf.set("spark.sql.hive.convertMetastoreParquet.compression.codec", "gzip") python from pyspark.sql.functions import broadcast # Força broadcast para tabelas pequenas (