Baixe o app para aproveitar ainda mais
Prévia do material em texto
Prof. Giovane Barcelos giovane_barcelos@uniritter.edu.br Engenharia de Dados ED0401 mailto:giovane_barcelos@uniritter.edu.br Pág. 2 Engenharia de Dados De 71 Plano de Ensino Conteúdo programático N2N2 N1N1 1. Construindo os pipelines de dados: extrair, transformar e carregar 1.1 O que é engenharia de dados? (a) 1.2 Construindo nossa infraestrutura de engenharia de dados (b) 1.3 Leitura e gravação de arquivos 1.4 Trabalhando com bancos de dados (c, d) 1.5 Limpeza, transformação e enriquecimento de dados 1.6 Construindo o pipeline de dados (h) 2. Implantação de pipelines de dados na produção 2.1 Recursos de um pipeline em produção (f, g) 2.2 Controle de versão com o NiFi Registry (e) 2.3 Monitorando pipelines de dados 2.4 Implantando pipelines de dados 2.5 Construindo um pipeline de dados em produção (i) 3 Além do Lote (batch) - criando pipelines de dados em tempo real 3.1 Construindo um cluster Kafka (b) 3.2 Streaming de dados com Apache Kafka (e) 3.3 Processamento de dados com Apache Spark (e) 3.4 Dados de borda (edge) em tempo real com MiNiFi, Kafka e Spark 3.5 Construção, implantação e gerenciamento de pipelines: uma revisão geral Pág. 3 Engenharia de Dados De 71 ➢ Inserir e extrair dados relacionais com Python ➢ Inserir e extrair dados do banco de dados NoSQL em Python ➢ Criação de pipelines de banco de dados no Airflow ➢ Criação de pipelines de banco de dados em NiFi Trabalhando com Banco de Dados O que vamos aprender? Pág. 4 Engenharia de Dados De 71 ➢ Quando você ouve a palavra banco de dados, provavelmente está imaginando um banco de dados relacional - isto é, um banco de dados feito de tabelas contendo colunas e linhas com relacionamentos entre as tabelas ➢ Por exemplo, um sistema de pedido de compra que tem estoque, compras e informações do cliente ➢ Os bancos de dados relacionais existem há mais de 40 anos e vêm do modelo de dados relacionais desenvolvido por E. F. Codd no final dos anos 1970 ➢ Existem vários fornecedores de bancos de dados relacionais - incluindo IBM, Oracle e Microsoft - mas todos esses bancos de dados usam um dialeto semelhante de SQL, que significa Structured Query Language ➢ Vamos trabalhar com um banco de dados de código aberto popular - PostgreSQL Trabalhando com Banco de Dados Inserir e extrair dados relacionais com Python Pág. 5 Engenharia de Dados De 71 ➢ Anteriormente instalamos o PgAdmin 3 que funciona bem até a versão 11 ➢ No entanto a partir da versão 11 do Postgres ocorrem incompatibilidades em relação as funções (“column "proisagg" does not exist”), neste caso seria necessário instalar o PgAdmin 4 ➢ Vamos instalar o PgAdmin 4 executando o seguinte na linha de comando do Linux: sudo apt install curl gnupg2 -y sudo curl https://www.pgadmin.org/static/packages_pgadmin_org.pub | apt- key add sudo echo "deb https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/buster pgadmin4 main" >> /etc/apt/sources.list.d/pgdg.list sudo apt update sudo apt install pgadmin4 -y sudo /usr/pgadmin4/bin/setup-web.sh # Email e senha sugeridos: admin@localhost.go / 123456 http://localhost/pgadmin4 Trabalhando com Banco de Dados Criação de um banco de dados e tabelas PostgreSQL mailto:admin@localhost.go http://localhost/pgadmin4 Pág. 6 Engenharia de Dados De 71 ➢ Crie uma conexão do servidor (Menu Object Create Server) com nome → → → (Name) Localhost e Hostname no Connection localhost ➢ Como pode-se verificar o banco de dados dataengineering criado esta disponível: Trabalhando com Banco de Dados Criação de um banco de dados e tabelas PostgreSQL Pág. 7 Engenharia de Dados De 71 ➢ Existem várias bibliotecas e maneiras de se conectar a um banco de dados em Python - pyodbc, sqlalchemy, psycopg2 e usando uma API e solicitações ➢ Usaremos a biblioteca psycopg2 para se conectar ao PostgreSQL porque ela foi construída especificamente para se conectar ao PostgreSQL ➢ Conforme suas habilidades progridem, você pode querer olhar para ferramentas como SQLAlchemy ➢ SQLAlchemy é um kit de ferramentas e um mapeador relacional de objetos para Python ➢ Ela permite que você execute consultas de uma forma mais Pythônica - sem SQL - e mapeie classes Python para tabelas de banco de dados Trabalhando com Banco de Dados Inserindo dados no PostgreSQL Pág. 8 Engenharia de Dados De 71 ➢ Pode-se verificar se tem psycopg2 instalado executando o seguinte comando: python3 -c "import psycopg2; print(psycopg2.__version__)" ➢ Se não estiver instalado, pode-se adicioná-lo com o seguinte comando: pip3 install psycopg2 ➢ O uso de pip requer que existam dependências adicionais para que ele funcione. Se encontrar problemas, também pode instalar uma versão binária pré-compilada usando o seguinte comando: pip3 install psycopg2-binary Trabalhando com Banco de Dados Instalando psycopg2 Pág. 9 Engenharia de Dados De 71 ➢ Pode-se verificar se tem psycopg2 instalado executando o seguinte comando: python3 -c "import psycopg2; print(psycopg2.__version__)" ➢ Se não estiver instalado, pode-se adicioná-lo com o seguinte comando: pip3 install psycopg2 ➢ O uso de pip requer que existam dependências adicionais para que ele funcione. Se encontrar problemas, também pode instalar uma versão binária pré-compilada usando o seguinte comando: pip3 install psycopg2-binary Trabalhando com Banco de Dados Instalando psycopg2 Pág. 10 Engenharia de Dados De 71 ➢ Para se conectar ao seu banco de dados usando psycopg2, é necessário criar uma conexão, criar um cursor, executar um comando e obter os resultados ➢ Pode-se executar essas mesmas etapas, quer esteja consultando ou inserindo dados. Vamos percorrer as etapas da seguinte forma: 1) Importe a biblioteca e faça referência a ela como banco de dados: import psycopg2 as db 2) Crie uma string de conexão que contenha o host, o banco de dados, o nome de usuário e a senha: conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" 3) Crie o objeto de conexão passando a string de conexão para o método connect (): conn=db.connect(conn_string) 4) Em seguida, crie o cursor a partir da conexão: cur=conn.cursor() Trabalhando com Banco de Dados Conectando-se ao PostgreSQL com Python Pág. 11 Engenharia de Dados De 71 ➢ Agora que você tem uma conexão aberta, pode inserir dados usando SQL ➢ Para inserir uma única pessoa, você precisa formatar uma instrução de inserção SQL, conforme mostrado: query = "insert into users (id,name,street,city,zip) values({},'{}','{}','{}','{}')".format(1,'Big Bird','Sesame Street','Fakeville','12345') ➢ Para ver a aparência dessa consulta, você pode usar o método mogrify(): cur.mogrify(query) ➢ O código anterior criará uma instrução de inserção SQL adequada; no entanto, à medida que avança, você adicionará vários registros em uma única instrução. Para fazer isso, você criará uma tupla de tuplas. Para criar a mesma instrução SQL, você pode usar o seguinte código: query2 = "insert into users (id,name,street,city,zip) values(%s,%s,%s,%s,%s)" data=(1,'Big Bird','Sesame Street','Fakeville','12345') cur.mogrify(query2,data) Trabalhando com Banco de Dados Inserindo dados Pág. 12 Engenharia de Dados De 71 ➢ Os resultados do mogrify na consulta e na consulta2 devem ser idênticos. Agora, você pode executar a consulta para adicioná-la ao banco de dados: cur.execute(query2,data) ➢ Para tornar a inserção permanente confirme a transação usando o seguinte código: conn.commit() Trabalhando com Banco de Dados Inserindo dados Pág. 13 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python para inserção import psycopg2 as db conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) cur=conn.cursor() query = 'create table users (id integer primary key, name varchar not null, street varchar, city varchar, zip varchar)'cur.execute(query) query = "insert into users (id,name,street,city,zip) values({},'{}','{}','{}','{}')".format(1,'Big Bird','Sesame Street','Fakeville','12345') print(cur.mogrify(query)) query2 = "insert into users (id,name,street,city,zip) values(%s,%s,%s,%s,%s)" data=(1,'Big Bird','Sesame Street','Fakeville','12345') print(cur.mogrify(query2,data)) cur.execute(query2,data) conn.commit() Pág. 14 Engenharia de Dados De 71 Trabalhando com Banco de Dados Consultando inserção no Postgresql Pág. 15 Engenharia de Dados De 71 ➢ Para inserir vários registros, pode-se percorrer os dados e usar o mesmo código mostrado na seção anterior, mas isso exigiria uma transação por registro no banco de dados. ➢ Uma maneira melhor seria usar uma única transação e enviar todos os dados, permitindo que psycopg2 cuide da inserção em massa usando o método executemany ➢ O código a seguir usará Faker para criar os registros e, em seguida, executemany() para inseri-los: 1) Importe as bibliotecas necessárias: import psycopg2 as db from faker import Faker 2) Crie o objeto faker e um array para conter todos os dados. Inicialize uma variável, i, para conter um ID: fake=Faker() data=[] i=2 Trabalhando com Banco de Dados Inserindo múltiplos registros Pág. 16 Engenharia de Dados De 71 3) Agora, pode-se olhar, iterar e anexar uma tupla fake ao array criado na etapa anterior. Incremente i para o próximo registro. Lembre-se que anteriormente criamos um registro para o Big Bird com um ID de 1. É por isso iniciará com 2 neste exemplo. Não podemos ter a mesma chave primária na mesma tabela: for r in range(1000): data.append((i,fake.name(),fake.street_address(), fake.city(),fake.zipcode())) i+=1 4) Converta a matriz em uma tupla de tuplas: data_for_db=tuple(data) 5) Agora, voltaremos ao código psycopg, que será semelhante ao exemplo anterior: 6) Trabalhando com Banco de Dados Inserindo múltiplos registros Pág. 17 Engenharia de Dados De 71 5) Agora, voltaremos ao código psycopg, que será semelhante ao exemplo anterior: conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) cur=conn.cursor() query = "insert into users(id,name,street,city,zip) values(%s,%s,%s,%s,%s)" 6) Pode-se imprimir o que o código enviará ao banco de dados usando um único registro da variável data_for_db: print(cur.mogrify(query,data_for_db[1])) 7) Por último, use executemany() em vez de execute() para permitir que a biblioteca manipule as várias inserções. Em seguida, confirme a transação: cur.executemany(query,data_for_db) conn.commit() Trabalhando com Banco de Dados Inserindo múltiplos registros Pág. 18 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python para inserção múltipla import psycopg2 as db from faker import Faker fake=Faker() data=[] i=2 for r in range(1000): data.append((i,fake.name(), fake.street_address(), fake.city(), fake.zipcode())) i+=1 data_for_db=tuple(data) print(data_for_db) conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) cur=conn.cursor() query = "insert into users (id,name,street,city,zip) values(%s,%s,%s, %s,%s)" print(cur.mogrify(query,data_for_db[1])) cur.executemany(query,data_for_db) conn.commit() query2 = "select * from users" cur.execute(query2) print(cur.fetchall()) Pág. 19 Engenharia de Dados De 71 Trabalhando com Banco de Dados Consultando inserção múltipla no Postgresql Pág. 20 Engenharia de Dados De 71 ➢ A extração de dados usando psycopgs segue exatamente o mesmo procedimento da inserção, a única diferença é o uso da instrução select em vez de insert. As etapas a seguir mostram como extrair dados: 1) Importe a biblioteca e configure sua conexão e cursor: import psycopg2 as db conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) cur=conn.cursor() 2) Agora, pode executar uma consulta. Neste exemplo, selecionaremos todos os registros da tabela de usuários: query = "select * from users" cur.execute(query) Trabalhando com Banco de Dados Extraindo dados do PostgreSQL Pág. 21 Engenharia de Dados De 71 3) Agora, teremos um objeto iterável com os resultados. Pode-se iterar sobre o cursor, conforme demonstrado: for record in cur: print(record) 4) Como alternativa, pode usar um dos métodos de busca: cur.fetchall () cur.fetchmany (howmany) # onde howmany é igual ao número # de registros que se deseja devolver cur.fetchone () 5) Para obter um único registro, pode atribuí-lo a uma variável e examiná- lo. Observe que mesmo quando seleciona um registro, o cursor retorna uma matriz: data=cur.fetchone() print(data[0]) Trabalhando com Banco de Dados Extraindo dados do PostgreSQL Pág. 22 Engenharia de Dados De 71 6) Independentemente de estar buscando um ou vários, precisa-se saber onde está e quantos registros existem. Pode-se obter a contagem de linhas da consulta usando o seguinte código: cur.rowcount # 1001 7) Pode obter o número da linha atual usando rownumber. Se usar fetchone() e, em seguida, chamar rownumber novamente, ele deve ser incrementado com sua nova posição: cur.rownumber # A última coisa a mencionar é que também pode consultar uma tabela e gravá-la em um arquivo CSV usando o método copy_to() 8) Crie a conexão e o cursor: conn=db.connect(conn_string) cur=conn.cursor() Trabalhando com Banco de Dados Extraindo dados do PostgreSQL Pág. 23 Engenharia de Dados De 71 9) Abra um arquivo para gravar a tabela: f=open('fromdb.csv','w') 10) Em seguida, chame copy_to e passe o arquivo, o nome da tabela e o separador (que terá como padrão tabs se não for definido). Feche o arquivo e terá todas as linhas como um CSV: cur.copy_to(f,'users',sep=',') f.close() 11) Pode-se verificar os resultados abrindo o arquivo e imprimindo o conteúdo: f=open('fromdb.csv','r') f.read() Trabalhando com Banco de Dados Extraindo dados do PostgreSQL Pág. 24 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python para extração de dados import psycopg3 as db conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) cur=conn.cursor() query = "select * from users" cur.execute(query) print(cur.fetchone()) print(cur.rowcount) print(cur.rownumber) print(cur.fetchmany(3)) print(cur.rownumber) f=open('fromdb.csv','w') conn=db.connect(conn_string) cur=conn.cursor() cur.copy_to(f,'users',sep=',') f.close() f=open('fromdb.csv','r') print(f.read()) Pág. 25 Engenharia de Dados De 71 ➢ Também é possível extrair dados usando o pandas DataFrames ➢ Para fazer isso, precisa estabelecer uma conexão usando psycopg2 e, em seguida, pular o cursor e ir direto para a consulta ➢ Os DataFrames oferecem muito poder na filtragem, análise e transformação de dados ➢ As etapas a seguir orientarão o uso de DataFrames: 1) Configure a conexão: import psycopg2 as db import pandas as pd conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) Trabalhando com Banco de Dados Extraindo dados com DataFrames Pág. 26 Engenharia de Dados De 71 2) Agora, pode-se executar a consulta em um DataFrame usando o método read_sql() do pandas. O método leva uma consulta e uma conexão: df=pd.read_sql("select * from users", conn) 3) O resultado é um DataFrame, df, com todos os usuários da tabela. Agora tem acesso total a todas as ferramentas DataFrame para trabalhar com os dados - por exemplo, pode exportar para JSON usando o seguinte: df.to_json(orient='records') Trabalhando com Banco de Dados Extraindo dados com DataFrames Pág. 27 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python para extração com pandas import psycopg2 as db import pandas aspd conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) df=pd.read_sql("select * from users", conn) print(df.head()) print(df['city'].value_counts()) Pág. 28 Engenharia de Dados De 71 ➢ Bancos de dados relacionais podem ser o que você pensa quando ouve o termo banco de dados, mas existem vários outros tipos de bancos de dados, como em colunas, documentos, chave valores e séries temporais. ➢ Vamos trabalhar com o Elasticsearch, que é um banco de dados NoSQL ➢ NoSQL é um termo genérico que se refere a bancos de dados que não armazenam dados em linhas e colunas ➢ Os bancos de dados NoSQL geralmente armazenam seus dados como documentos JSON e usam uma linguagem de consulta diferente de SQL ➢ Aprenderemos em seguida a carregar dados no Elasticsearch Trabalhando com Banco de Dados Inserindo e extraindo dados do banco de dados NoSQL em Python Pág. 29 Engenharia de Dados De 71 ➢ Para instalar a biblioteca elasticsearch, pode usar pip3, conforme mostrado: pip3 install elasticsearch ➢ Para verificar a instalação e verificar a versão, você utilize o seguinte código import elasticsearch elasticsearch.__version__ ➢ O código anterior deve imprimir algo como o seguinte: (7, 14, 0) ➢ Se tiver a versão certa para sua versão do Elasticsearch, está pronto para começar a importar dados. Trabalhando com Banco de Dados Instalando a biblioteca do Elasticsearch Pág. 30 Engenharia de Dados De 71 ➢ Antes de consultar o Elasticsearch, é necessário carregar alguns dados em um índice ➢ Anteriormente utilizamos a biblioteca, psycopg2, para acessar o PostgreSQL ➢ Para acessar o Elasticsearch utilizaremos a biblioteca do elasticsearch ➢ Para carregar dados, precisa criar a conexão e, em seguida, emitir comandos para o Elasticsearch ➢ Siga as etapas fornecidas para adicionar um registro ao Elasticsearch: 1) Importe as bibliotecas. Podemos criar o objeto Faker para gerar dados aleatórios: from elasticsearch import Elasticsearch from faker import Faker fake=Faker() Trabalhando com Banco de Dados Inserindo dados no Elasticsearch Pág. 31 Engenharia de Dados De 71 2) Crie uma conexão com o Elasticsearch: es = Elasticsearch() 3) O código anterior pressupõe que a instância do Elasticsearch está sendo executada no localhost. Se não for, pode-se especificar o endereço IP, conforme mostrado: es=Elasticsearch({'127.0.0.1'}) ➢ Agora, pode emitir comandos para a instância Elasticsearch ➢ O método de índice permitirá adicionar dados ➢ O método recebe um nome de índice, o tipo de documento e um corpo ➢ O corpo que é enviado ao Elasticsearch e é um objeto JSON: doc={"name": fake.name(),"street": fake.street_address(), "city": fake.city(),"zip":fake.zipcode()} res=es.index(index="users",doc_type="doc",body=doc) print(res['result']) #criado Trabalhando com Banco de Dados Inserindo dados no Elasticsearch Pág. 32 Engenharia de Dados De 71 ➢ Usando o método bulk, pode-se inserir muitos documentos de uma vez ➢ O processo é semelhante ao de inserir um único registro, exceto que irá gerar todos os dados e, em seguida, inseri-los Trabalhando com Banco de Dados Inserindo dados com helpers Pág. 33 Engenharia de Dados De 71 1) Importe a biblioteca helpers para acessar o método bulk: from elasticsearch import helpers 2) Os dados precisam ser uma matriz de objetos JSON. Precisamos especificar o índice e o tipo. Os sublinhados nos nomes são usados para os campos do Elasticsearch. O campo _source é onde colocaríamos o documento JSON que desejamos inserir no banco de dados. Fora do JSON existe um loop for. Este loop cria 999 documentos (já foi adicionado um e indexa de 0 a 998): actions = [{ "_index": "users", "_type": "doc", "_source": { "name": fake.name(), "street": fake.street_address(), "city": fake.city(), "zip":fake.zipcode()}} for x in range(998) # or for i,r in df.iterrows() ] Trabalhando com Banco de Dados Etapas para inserir dados com helpers Pág. 34 Engenharia de Dados De 71 3) Agora, pode-se chamar o método bulk e passá-lo para a instância elasticsearch e a matriz de dados. Pode imprimir os resultados para verificar se funcionou: res = helpers.bulk(es, actions) print(res['result']) Trabalhando com Banco de Dados Etapas para inserir dados com helpers Pág. 35 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python para inserção de dados/índice no Elasticsearch from elasticsearch import Elasticsearch from elasticsearch import helpers from faker import Faker fake=Faker() es = Elasticsearch() #ou ip {127.0.0.1} actions = [ { "_index": "users", "_type": "doc", "_source": { "name": fake.name(), "street": fake.street_address(), "city": fake.city(), "zip":fake.zipcode()} } for x in range(998) # ou for i,r in df.iterrows() ] response = helpers.bulk(es, actions) print(response) Pág. 36 Engenharia de Dados De 71 ➢ Agora deve-se ter 1.000 registros em um índice Elasticsearch nomeando usuários ➢ Podemos verificar isso em Kibana ➢ Para adicionar o novo índice ao Kibana, navegue até o painel do Kibana em http://localhost:5601 ➢ No menu selecione Management Stack Management Kibana Index → → → Patterns Create index pattern e insira “users*” no Index pattern name → e click no botão Next Step, por fim pressione o botão Create index pattern, no menu escolha Analytics Discover para ver a tela a seguir→ Trabalhando com Banco de Dados Etapas para inserir dados com helpers http://localhost:5601/ Pág. 37 Engenharia de Dados De 71 Trabalhando com Banco de Dados Kibana - Discover Pág. 38 Engenharia de Dados De 71 ➢ A consulta no Elasticsearch segue exatamente as mesmas etapas da inserção de dados ➢ A única diferença é que você usa um método diferente – search - para enviar um objeto de corpo diferente ➢ Vamos examinar uma consulta simples com todos os dados: 1) Importe a biblioteca e crie sua instância de elasticsearch: from elasticsearch import Elasticsearch es = Elasticsearch() 2) Crie o objeto JSON para enviar ao Elasticsearch. O objeto é uma consulta, usando a pesquisa match_all: doc={"query":{"match_all":{}}} Trabalhando com Banco de Dados Pesquisando no Elasticsearch Pág. 39 Engenharia de Dados De 71 3) Passe o objeto para o Elasticsearch usando o método de pesquisa. Passe o índice e o tamanho do retorno. Nesse caso, você retornará apenas 10 registros. O tamanho máximo de retorno é 10.000 documentos: res=es.search(index="users",body=doc,size=10) 4) Por último, você pode imprimir os documentos: print(res['hits']['hits']) # ou você pode iterar capturando apenas _source: for doc in res['hits']['hits']: print(doc['_source']) Trabalhando com Banco de Dados Pesquisando no Elasticsearch Pág. 40 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python para pesquisa de dados/índice no Elasticsearch from elasticsearch import Elasticsearch es = Elasticsearch() # Pesquisa todos os documentos doc={"query":{"match_all":{}}} res=es.search(index="users",body=doc,size=10) print(res['hits']['hits'][9]['_source']) # Obtem Muttley doc={"query":{"match":{"name":"Muttley"}}} res=es.search(index="users",body=doc,size=10) print(res['hits']['hits'][0]['_source']) # Obtem Muttley com sintaxe Lucene res=es.search(index="users",q="name:Muttley", size=10) print(res['hits']['hits'][0]['_source']) # Obtem City Gottan doc={"query":{"match":{"city":"Gottan"}}} res=es.search(index="users",body=doc,size=10) print(res['hits']['hits']) # Obtem Gottan e filtra zip doc={"query":{"bool":{"must":{"match": {"city":"Gottan"}},"filter":{"term": {"zip":"63792"}}}}} res=es.search(index="users",body=doc,size=10) print(res['hits']['hits']) Pág. 41 Engenharia de Dados De 71 ➢ Você pode carregar os resultados da consulta em um DataFrame do pandas ➢Para carregar os resultados em um DataFrame, importe json_normalize da biblioteca pandas json e use-o (json_normalize) nos resultados JSON, conforme mostrado no código a seguir: from pandas.io.json import json_normalize df=json_normalize(res['hits']['hits']) Trabalhando com Banco de Dados Pesquisando no Elasticsearch com pandas Dataframe Pág. 42 Engenharia de Dados De 71 ➢ No primeiro exemplo, você usou um tamanho 10 para sua pesquisa ➢ Você poderia ter obtido todos os 1.000 registros, mas o que você faz quando tem mais de 10.000 e precisa de todos eles? ➢ O Elasticsearch possui um método de rolagem que permite iterar os resultados até obter todos eles ➢ Para percorrer os dados, siga as etapas fornecidas: 1) Importe a biblioteca e crie sua instância Elasticsearch: from elasticsearch import Elasticsearch es = Elasticsearch() Trabalhando com Banco de Dados Usando a scroll para lidar com resultados maiores Pág. 43 Engenharia de Dados De 71 2) Pesquise seus dados. Como você não tem mais de 10.000 registros, definirá o tamanho para 500. Isso significa que perderão 500 registros em sua pesquisa inicial. Você passará um novo parâmetro para o método de pesquisa - scroll. Este parâmetro especifica por quanto tempo você deseja disponibilizar os resultados. Estou usando 20 milissegundos. Ajuste este número para garantir que você tenha tempo suficiente para obter os dados - isso dependerá do tamanho do documento e da velocidade da rede: res = es.search( index = 'users', doc_type = 'doc', scroll = '20m', size = 500, body = {"query":{"match_all":{}}} ) Trabalhando com Banco de Dados Usando a scroll para lidar com resultados maiores Pág. 44 Engenharia de Dados De 71 3) Os resultados incluirão _scroll_id, que você precisará passar para o método de scroll mais tarde. Salve o ID de rolagem e o tamanho do conjunto de resultados: sid = res['_scroll_id'] size = res['hits']['total']['value'] 4) Para começar a rolar, use um loop while para obter os registros até que o tamanho seja 0, o que significa que não há mais dados. Dentro do loop, você chamará o método scroll e passará _scroll_id e por quanto tempo rolar. Isso pegará mais resultados da consulta original: while (size > 0): res = es.scroll(scroll_id = sid, scroll = '20m') Trabalhando com Banco de Dados Usando a scroll para lidar com resultados maiores Pág. 45 Engenharia de Dados De 71 5) Em seguida, obtenha o novo ID de rolagem e o tamanho para que você possa fazer um loop novamente se os dados ainda existirem: sid = res['_scroll_id'] size = len(res['hits']['hits']) 6) Por último, você pode fazer algo com os resultados dos scrolls. No código a seguir, você imprimirá a fonte para cada registro: for doc in res['hits']['hits']: print(doc['_source']) Trabalhando com Banco de Dados Usando a scroll para lidar com resultados maiores Pág. 46 Engenharia de Dados De 71 ➢ No último encontro, você construiu seu primeiro pipeline de dados do Airflow usando um operador Bash e Python ➢ Desta vez, você combinará dois operadores Python para extrair dados do PostgreSQL, salvá-los como um arquivo CSV e, em seguida, lê-los e gravá-los em um índice Elasticsearch Trabalhando com Banco de Dados Criação de pipelines de dados no Apache Airflow Pág. 47 Engenharia de Dados De 71 ➢ No último encontro, você construiu seu primeiro pipeline de dados do Airflow usando um operador Bash e Python ➢ Desta vez, você combinará dois operadores Python para extrair dados do PostgreSQL, salvá-los como um arquivo CSV e, em seguida, lê-los e gravá-los em um índice Elasticsearch Trabalhando com Banco de Dados Criação de pipelines de dados no Apache Airflow Pág. 48 Engenharia de Dados De 71 ➢ Cada DAG terá algum código padrão e clichê para executá-lo no Airflow ➢ Você sempre importará as bibliotecas necessárias e, a seguir, quaisquer outras bibliotecas necessárias para suas tarefas ➢ No código a seguir, você importa o operador, DAG e as bibliotecas de tempo para o Airflow ➢ Para suas tarefas, você importa as bibliotecas pandas, psycopg2 e elasticsearch: import datetime as dt from datetime import timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator import pandas as pd import psycopg2 as db from elasticsearch import Elasticsearch Trabalhando com Banco de Dados Configurando o boilerplate do Airflow Pág. 49 Engenharia de Dados De 71 ➢ Em seguida, você especificará os argumentos para seu DAG ➢ Lembre-se de que o horário de início deve ser um dia antes se você programar a tarefa para ser executada diariamente: default_args = { 'owner': 'dickvigarista', 'start_date': dt.datetime(2021, 8, 30), 'retries': 1, 'retry_delay': dt.timedelta(minutes=5), } Trabalhando com Banco de Dados Configurando o boilerplate do Airflow Pág. 50 Engenharia de Dados De 71 ➢ Agora, você pode passar os argumentos para o DAG, nomeá-lo e definir o intervalo de execução ➢ Você definirá seus operadores aqui também ➢ Neste exemplo, você criará dois operadores Python - um para obter dados do PostgreSQL e outro para inserir dados no Elasticsearch ➢ A tarefa getData será upstream e a tarefa insertData downstream, então você usará o operador >> bit shift para especificar isto: Trabalhando com Banco de Dados Configurando o boilerplate do Airflow Pág. 51 Engenharia de Dados De 71 with DAG('MyDBDAG', default_args=default_args, schedule_interval=timedelta(minutes=5), # '0 * * * *', ) as dag: getData = PythonOperator(task_id='QueryPostgreSQL', python_callable=queryPostgresql) insertData = PythonOperator( task_id='InsertDataElasticsearch', python_callable=insertElasticsearch) getData >> insertData Trabalhando com Banco de Dados Configurando o boilerplate do Airflow Pág. 52 Engenharia de Dados De 71 ➢ Por último, você definirá as tarefas ➢ Nos operadores anteriores, você os nomeou queryPostgresql e insertElasticsearch ➢ O código nessas tarefas deve parecer muito familiar; é quase idêntico ao código das seções anteriores ➢ Para consultar o PostgreSQL, você cria a conexão, executa a consulta sql usando o método pandas read_sql() e, em seguida, usa o método pandas to_csv() para gravar os dados no disco: Trabalhando com Banco de Dados Configurando o boilerplate do Airflow Pág. 53 Engenharia de Dados De 71 def queryPostgresql(): conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) df=pd.read_sql("select name,city from users",conn) df.to_csv('postgresqldata.csv') print("-------Dados Salvos------") Trabalhando com Banco de Dados Configurando o boilerplate do Airflow Pág. 54 Engenharia de Dados De 71 ➢ Para inserir os dados no Elasticsearch, você cria o objeto Elasticsearch conectando-se ao localhost ➢ Em seguida, leia o CSV da tarefa anterior em um DataFrame, itere por meio do DataFrame, convertendo cada linha em JSON e insira os dados usando o método de índice: def insertElasticsearch(): es = Elasticsearch() df=pd.read_csv('postgresqldata.csv') for i,r in df.iterrows(): doc=r.to_json() res=es.index(index="frompostgresql", doc_type="doc",body=doc) print(res) ➢ Agora você tem um pipeline de dados completo no Airflow. Em seguida você o executará e verá os resultados. Trabalhando com Banco de Dados Configurando o boilerplate do Airflow Pág. 55 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python completo Airflow import datetime as dt from datetime import timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator import pandas as pd import psycopg2 as db from elasticsearch import Elasticsearch def queryPostgresql():conn_string="dbname='dataengineering' host='localhost' user='postgres' password='postgres'" conn=db.connect(conn_string) df=pd.read_sql("select name, city from users",conn) df.to_csv('postgresqldata.csv') print("-------Dados Salvos------") Pág. 56 Engenharia de Dados De 71 Trabalhando com Banco de Dados Código Python completo Airflow def insertElasticsearch(): es = Elasticsearch() df=pd.read_csv('postgresqldata.csv') for i,r in df.iterrows(): doc=r.to_json() res=es.index( index="frompostgresql", doc_type="doc",body=doc) print(res) default_args = { 'owner': 'dickvigarista', 'start_date': dt.datetime(2021, 8, 29), 'retries': 1, 'retry_delay': dt.timedelta(minutes=5), } with DAG('MyDBDAG', default_args=default_args, schedule_interval=timedelta(minutes=5), # '0 * * * *', ) as dag: getData = PythonOperator( task_id='QueryPostgreSQL', python_callable=queryPostgresql) insertData = PythonOperator( task_id='InsertDataElasticsearch', python_callable=insertElasticsearch) getData >> insertData Pág. 57 Engenharia de Dados De 71 ➢ Copie o código Python do MyDBDAG para o diretório das DAGs do Airflow (Ex: cp AirflowDB.py /opt/aiflow/dags) ➢ Reinicialize o Airflow e o Scheduler se a nova DAG não estiver aparecendo na páginas da DAG no Airflow (http://localhost:8080/) ➢ Ative e execute a MyDBDAG Trabalhando com Banco de Dados Executando o DAG http://localhost:8080/ Pág. 58 Engenharia de Dados De 71 ➢ Para verificar se o pipeline de dados foi bem-sucedido, você pode visualizar os dados no Elasticsearch usando Kibana ➢ Para ver os resultados, navegue até o Kibana em http://localhost:5601 ➢ Você precisará criar um novo índice no Kibana ➢ Você deve ver os registros conforme mostrado na seguinte captura de tela: Trabalhando com Banco de Dados Executando o DAG http://localhost:5601/ Pág. 59 Engenharia de Dados De 71 ➢ O processador mais usado para lidar com bancos de dados relacionais em NiFi é o processador ExecuteSQLRecord ➢ Entre no NiFi (https://localhost:9300/nifi/) e arraste o ícone do processador para a tela e procure o processador ExecuteSQLRecord ➢ Depois de adicionado à tela, você precisa configurá-lo Trabalhando com Banco de Dados Extraindo dados do PostgreSQL usando NiFi https://localhost:9300/nifi/ Pág. 60 Engenharia de Dados De 71 ➢ Para configurar o processador, você precisa criar um pool de conexão de banco de dados, conforme mostrado na seguinte captura de tela: Trabalhando com Banco de Dados Configurando o processador ExecuteSQLCommand Pág. 61 Engenharia de Dados De 71 ➢ Depois de selecionar Create new service…, você verá um pop-up para adicionar o serviço do controlador ➢ Selecione a opção padrão, DBCPConnectionPool, e para tornar mais fácil lembrar a qual banco de dados esse serviço se destina, você pode nomeá-lo com o nome do banco de dados – dataengineering ➢ Observe como eu não o chamei de PostgreSQL ➢ À medida que adiciona mais serviços, você adiciona mais conexões PostgreSQL para bancos de dados diferentes ➢ Assim, seria difícil lembrar para qual banco de dados PostgreSQL o serviço se destinava ➢ Para configurar o serviço, selecione a seta na configuração do processador ➢ A configuração do serviço deve ser semelhante à seguinte captura de tela: Trabalhando com Banco de Dados Configurando o processador ExecuteSQLCommand Pág. 62 Engenharia de Dados De 71 Trabalhando com Banco de Dados Configurando o processador ExecuteSQLCommand Pág. 63 Engenharia de Dados De 71 ➢ A configuração requer que você especifique o URL de conexão, que é uma string de conexão de banco de dados Java (Ex: jdbc:postgresql://localhost/dataengineering) ➢ A string especifica Java Database Connectivity (JDBC) e o tipo de banco de dados – PostgreSQL ➢ Em seguida, ele nomeia o host, localhost e o nome do banco de dados, dataengineering ➢ A classe do driver especifica o driver postgresql (org.postgresql.Driver) ➢ A localização do driver é onde você fez o download em aula anterior na construção da nossa infraestrutura de engenharia de dados ➢ Ele deve estar em seu diretório inicial na pasta nifi, em um subdiretório chamado drivers ➢ Por último, você precisa inserir o nome de usuário e a senha do banco de dados Trabalhando com Banco de Dados Configurando o processador ExecuteSQLCommand Pág. 64 Engenharia de Dados De 71 ➢ Em seguida, você precisa criar um serviço Record Writer ➢ Selecione Create new service … em Record Writer, escolha JSONRecordSetWriter e clique na seta para configurá-lo ➢ Há uma definição de configuração importante que você não pode ignorar – Output Grouping ➢ Você deve definir esta propriedade como One Line Per Object ➢ A configuração finalizada será semelhante à seguinte captura de tela: Trabalhando com Banco de Dados Configurando o processador ExecuteSQLCommand Pág. 65 Engenharia de Dados De 71 ➢ Agora que configurou os serviços para o processador, você precisa concluir a configuração do processo ➢ O último parâmetro que você precisa configurar é SQL Select Query ➢ É aqui que você pode especificar o comando SQL a ser executado no banco de dados. Por exemplo, você pode inserir o seguinte: select name, city from users ➢ Isso irá capturar todos os registros no banco de dados PostgreSQL, mas apenas os campos de nome e cidade ➢ Agora você pode passar para o próximo processador no pipeline Trabalhando com Banco de Dados Configurando o processador ExecuteSQLCommand Pág. 66 Engenharia de Dados De 71 ➢ Agora que configurou o processador ExecuteSQLRecord, você receberá uma matriz de registros ➢ Para processar esses dados, você precisa ter um arquivo de fluxo por registro ➢ Para fazer isso, você pode usar o processador SplitText ➢ Arraste-o para a tela e abra a guia Propriedades clicando duas vezes no processador - ou clique com o botão direito e selecione Propriedades ➢ Os padrões do processador funcionam, mas certifique-se de que Line Split Count está definido como 1, Header Line Count é 0 - seus dados não têm um cabeçalho quando vêm do processador ExecuteSQLRecord - e Remove Trailing Newlines deve ser true ➢ Essas configurações permitirão que o processador pegue cada linha do arquivo de fluxo e divida-o em um novo arquivo de fluxo ➢ Portanto, seu único arquivo de fluxo de entrada sairá deste processador como 1.000 arquivos de fluxo Trabalhando com Banco de Dados Configurando o processador SplitText Pág. 67 Engenharia de Dados De 71 ➢ A última etapa no pipeline de dados é inserir os arquivos de fluxo no Elasticsearch ➢ Você pode fazer isso usando o processador PutElasticsearchHttp ➢ Existem quatro processadores PutElasticsearch diferentes ➢ Apenas dois serão relevantes nesta disciplina - PutElasticsearchHttp e PutelasticSearchHttpRecord ➢ Esses são os processadores para inserir um único registro ou usar a API em massa ➢ Os outros dois processadores - Putelasticsearch e Putelasticsearch5 - são para versões mais antigas do Elasticsearch (2 e 5) ➢ Para configurar o processador, você deve especificar o URL e a porta ➢ Neste exemplo, você usará http://localhost:9200 ➢ O Index será fromnifi, mas você pode nomeá-lo como quiser ➢ O Type é doc e a Index Operation será index Trabalhando com Banco de Dados Configurando o processador PutElasticsearchHttp http://localhost:9200/ Pág. 68 Engenharia de Dados De 71 ➢ Agora que você configurou todos os processadores, pode conectá-los arrastando a seta de ExecuteSQLRecord para o processador SplitText para obter sucesso ➢ Em seguida, conecte o processador SplitText ao processador PutElasticsearchHttp para splits ➢ Por último, encerre o processador PutElasticsearchHttp para todos os relacionamentos.➢ Execute cada um dos processadores ou, no painel Operações, selecione Start para iniciar todos ➢ Você verá um arquivo de fluxo na primeira fila e, em seguida, ele será dividido em 1.000 arquivos de fluxo na segunda fila ➢ A fila ficará vazia em lotes de 100 conforme eles são inseridos no Elasticsearch ➢ Para verificar os resultados, você pode usar a API elasticsearch, e não Kibana ➢ Em seu navegador, acesse http://localhost:9200/_cat/indices ➢ Este é o ponto de extremidade REST para visualizar os índices em seu banco de dados Elasticsearch ➢ Você deverá ver seu novo índice, fromnifi, e o número total de documentos, conforme mostrado na seguinte captura de tela: Trabalhando com Banco de Dados Executar o pipeline de dados http://localhost:9200/_cat/indices Pág. 69 Engenharia de Dados De 71 ➢ O número de documentos no índice irá variar dependendo se você deixou o pipeline em execução ou não ➢ Assim como no exemplo do Airflow, este pipeline não é idempotente ➢ À medida que é executado, ele continuará adicionando os mesmos registros com uma ID diferente no Elasticsearch ➢ Este não é o comportamento que você deseja na produção e corrigiremos isso quando discutirmos a implementação de pipelines na produção Trabalhando com Banco de Dados Executar o pipeline de dados Pág. 70 Engenharia de Dados De 71 ➢ Aprendemos a usar Python para consultar e inserir dados em bancos de dados relacionais e NoSQL ➢ Você também aprendeu a usar o Airflow e o NiFi para criar pipelines de dados ➢ Habilidades de banco de dados são algumas das mais importantes para um engenheiro de dados ➢ Haverá muito poucos pipelines de dados que não os afetem de alguma forma ➢ As habilidades que você aprendeu fornecem a base para as outras habilidades que você precisará aprender - principalmente SQL ➢ Combinar fortes habilidades de SQL com as habilidades de pipeline de dados que você aprendeu permitirá que você realize a maioria das tarefas de engenharia de dados que encontrará ➢ Nos exemplos, os pipelines de dados não eram idempotentes ➢ Cada vez que eles eram executados, você obtinha novos resultados e resultados indesejados ➢ Corrigiremos isso quando implantarmos pipelines em produção ➢ Mas antes de chegar a isso, você precisará aprender como lidar com problemas comuns de dados e como enriquecer e transformar seus dados. ➢ Em seguida vamos aprender a usar Python para trabalhar com seus dados entre as fases de extração e carregamento de seus pipelines de dados Trabalhando com Banco de Dados Resumo Pág. 71 Engenharia de Dados De 71 Lembre-seLembre-se “Se você não consegue explicar de maneira simples, não entende bem o suficiente.” Albert Einstein Slide 1 Slide 2 Slide 3 Slide 4 Slide 5 Slide 6 Slide 7 Slide 8 Slide 9 Slide 10 Slide 11 Slide 12 Slide 13 Slide 14 Slide 15 Slide 16 Slide 17 Slide 18 Slide 19 Slide 20 Slide 21 Slide 22 Slide 23 Slide 24 Slide 25 Slide 26 Slide 27 Slide 28 Slide 29 Slide 30 Slide 31 Slide 32 Slide 33 Slide 34 Slide 35 Slide 36 Slide 37 Slide 38 Slide 39 Slide 40 Slide 41 Slide 42 Slide 43 Slide 44 Slide 45 Slide 46 Slide 47 Slide 48 Slide 49 Slide 50 Slide 51 Slide 52 Slide 53 Slide 54 Slide 55 Slide 56 Slide 57 Slide 58 Slide 59 Slide 60 Slide 61 Slide 62 Slide 63 Slide 64 Slide 65 Slide 66 Slide 67 Slide 68 Slide 69 Slide 70 Slide 71
Compartilhar