Buscar

ED0401_TrabalhandoComBancoDeDados

Prévia do material em texto

Prof. Giovane Barcelos 
giovane_barcelos@uniritter.edu.br 
Engenharia de Dados
ED0401
mailto:giovane_barcelos@uniritter.edu.br
Pág. 2 Engenharia de Dados De 71
Plano de Ensino
Conteúdo programático
N2N2
N1N1
1. Construindo os pipelines de dados: extrair, transformar e carregar
1.1 O que é engenharia de dados? (a)
1.2 Construindo nossa infraestrutura de engenharia de dados (b)
1.3 Leitura e gravação de arquivos
1.4 Trabalhando com bancos de dados (c, d)
1.5 Limpeza, transformação e enriquecimento de dados
1.6 Construindo o pipeline de dados (h)
2. Implantação de pipelines de dados na produção
2.1 Recursos de um pipeline em produção (f, g)
2.2 Controle de versão com o NiFi Registry (e)
2.3 Monitorando pipelines de dados
2.4 Implantando pipelines de dados
2.5 Construindo um pipeline de dados em produção (i)
3 Além do Lote (batch) - criando pipelines de dados em tempo real
3.1 Construindo um cluster Kafka (b)
3.2 Streaming de dados com Apache Kafka (e)
3.3 Processamento de dados com Apache Spark (e)
3.4 Dados de borda (edge) em tempo real com MiNiFi, Kafka e Spark
3.5 Construção, implantação e gerenciamento de pipelines: uma revisão geral
Pág. 3 Engenharia de Dados De 71
➢ Inserir e extrair dados relacionais com Python
➢ Inserir e extrair dados do banco de dados NoSQL em Python
➢ Criação de pipelines de banco de dados no Airflow
➢ Criação de pipelines de banco de dados em NiFi
Trabalhando com Banco de Dados
O que vamos aprender?
Pág. 4 Engenharia de Dados De 71
➢ Quando você ouve a palavra banco de dados, provavelmente está 
imaginando um banco de dados relacional - isto é, um banco de dados 
feito de tabelas contendo colunas e linhas com relacionamentos entre as 
tabelas
➢ Por exemplo, um sistema de pedido de compra que tem estoque, 
compras e informações do cliente
➢ Os bancos de dados relacionais existem há mais de 40 anos e vêm do 
modelo de dados relacionais desenvolvido por E. F. Codd no final dos 
anos 1970
➢ Existem vários fornecedores de bancos de dados relacionais - incluindo 
IBM, Oracle e Microsoft - mas todos esses bancos de dados usam um 
dialeto semelhante de SQL, que significa Structured Query Language
➢ Vamos trabalhar com um banco de dados de código aberto popular - 
PostgreSQL
Trabalhando com Banco de Dados
Inserir e extrair dados relacionais com Python
Pág. 5 Engenharia de Dados De 71
➢ Anteriormente instalamos o PgAdmin 3 que funciona bem até a versão 11
➢ No entanto a partir da versão 11 do Postgres ocorrem incompatibilidades em 
relação as funções (“column "proisagg" does not exist”), neste caso seria 
necessário instalar o PgAdmin 4
➢ Vamos instalar o PgAdmin 4 executando o seguinte na linha de comando do 
Linux:
 sudo apt install curl gnupg2 -y 
 sudo curl https://www.pgadmin.org/static/packages_pgadmin_org.pub | apt-
key add
 sudo echo "deb https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/buster 
pgadmin4 main" >> /etc/apt/sources.list.d/pgdg.list
 sudo apt update
 sudo apt install pgadmin4 -y
 sudo /usr/pgadmin4/bin/setup-web.sh
 # Email e senha sugeridos: admin@localhost.go / 123456
 http://localhost/pgadmin4 
 
Trabalhando com Banco de Dados
Criação de um banco de dados e tabelas PostgreSQL
mailto:admin@localhost.go
http://localhost/pgadmin4
Pág. 6 Engenharia de Dados De 71
➢ Crie uma conexão do servidor (Menu Object Create Server) com nome → → →
(Name) Localhost e Hostname no Connection localhost
➢ Como pode-se verificar o banco de dados dataengineering criado esta 
disponível:
Trabalhando com Banco de Dados
Criação de um banco de dados e tabelas PostgreSQL
Pág. 7 Engenharia de Dados De 71
➢ Existem várias bibliotecas e maneiras de se conectar a um banco de 
dados em Python - pyodbc, sqlalchemy, psycopg2 e usando uma API e 
solicitações
➢ Usaremos a biblioteca psycopg2 para se conectar ao PostgreSQL porque 
ela foi construída especificamente para se conectar ao PostgreSQL
➢ Conforme suas habilidades progridem, você pode querer olhar para 
ferramentas como SQLAlchemy
➢ SQLAlchemy é um kit de ferramentas e um mapeador relacional de 
objetos para Python
➢ Ela permite que você execute consultas de uma forma mais Pythônica - 
sem SQL - e mapeie classes Python para tabelas de banco de dados
Trabalhando com Banco de Dados
Inserindo dados no PostgreSQL
Pág. 8 Engenharia de Dados De 71
➢ Pode-se verificar se tem psycopg2 instalado executando o seguinte 
comando:
 python3 -c "import psycopg2; print(psycopg2.__version__)"
➢ Se não estiver instalado, pode-se adicioná-lo com o seguinte comando:
 pip3 install psycopg2
➢ O uso de pip requer que existam dependências adicionais para que ele 
funcione. Se encontrar problemas, também pode instalar uma versão 
binária pré-compilada usando o seguinte comando:
 pip3 install psycopg2-binary
 
Trabalhando com Banco de Dados
Instalando psycopg2
Pág. 9 Engenharia de Dados De 71
➢ Pode-se verificar se tem psycopg2 instalado executando o seguinte 
comando:
 python3 -c "import psycopg2; print(psycopg2.__version__)"
➢ Se não estiver instalado, pode-se adicioná-lo com o seguinte comando:
 pip3 install psycopg2
➢ O uso de pip requer que existam dependências adicionais para que ele 
funcione. Se encontrar problemas, também pode instalar uma versão 
binária pré-compilada usando o seguinte comando:
 pip3 install psycopg2-binary
 
Trabalhando com Banco de Dados
Instalando psycopg2
Pág. 10 Engenharia de Dados De 71
➢ Para se conectar ao seu banco de dados usando psycopg2, é necessário criar 
uma conexão, criar um cursor, executar um comando e obter os resultados
➢ Pode-se executar essas mesmas etapas, quer esteja consultando ou inserindo 
dados. Vamos percorrer as etapas da seguinte forma:
1) Importe a biblioteca e faça referência a ela como banco de dados:
import psycopg2 as db
2) Crie uma string de conexão que contenha o host, o banco de dados, o nome de 
usuário e a senha:
conn_string="dbname='dataengineering' host='localhost' user='postgres' 
password='postgres'"
3) Crie o objeto de conexão passando a string de conexão para o método connect 
():
conn=db.connect(conn_string)
4) Em seguida, crie o cursor a partir da conexão:
cur=conn.cursor()
Trabalhando com Banco de Dados
Conectando-se ao PostgreSQL com Python
Pág. 11 Engenharia de Dados De 71
➢ Agora que você tem uma conexão aberta, pode inserir dados usando SQL
➢ Para inserir uma única pessoa, você precisa formatar uma instrução de inserção 
SQL, conforme mostrado:
 query = "insert into users (id,name,street,city,zip)
 values({},'{}','{}','{}','{}')".format(1,'Big Bird','Sesame
 Street','Fakeville','12345')
➢ Para ver a aparência dessa consulta, você pode usar o método mogrify(): 
cur.mogrify(query)
➢ O código anterior criará uma instrução de inserção SQL adequada; no entanto, à 
medida que avança, você adicionará vários registros em uma única instrução. 
Para fazer isso, você criará uma tupla de tuplas. Para criar a mesma instrução 
SQL, você pode usar o seguinte código:
 query2 = "insert into users (id,name,street,city,zip)
 values(%s,%s,%s,%s,%s)" 
 data=(1,'Big Bird','Sesame Street','Fakeville','12345')
 cur.mogrify(query2,data)
 
Trabalhando com Banco de Dados
Inserindo dados
Pág. 12 Engenharia de Dados De 71
➢ Os resultados do mogrify na consulta e na consulta2 devem ser 
idênticos. Agora, você pode executar a consulta para adicioná-la ao 
banco de dados:
 cur.execute(query2,data)
➢ Para tornar a inserção permanente confirme a transação usando o 
seguinte código:
 conn.commit()
Trabalhando com Banco de Dados
Inserindo dados
Pág. 13 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python para inserção
import psycopg2 as db
conn_string="dbname='dataengineering' host='localhost' user='postgres' 
password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = 'create table users (id integer primary key, name varchar not null, street varchar, 
city varchar, zip varchar)'cur.execute(query)
query = "insert into users (id,name,street,city,zip) 
values({},'{}','{}','{}','{}')".format(1,'Big Bird','Sesame Street','Fakeville','12345')
print(cur.mogrify(query))
query2 = "insert into users (id,name,street,city,zip) values(%s,%s,%s,%s,%s)"
data=(1,'Big Bird','Sesame Street','Fakeville','12345')
print(cur.mogrify(query2,data))
cur.execute(query2,data)
conn.commit()
Pág. 14 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Consultando inserção no Postgresql
Pág. 15 Engenharia de Dados De 71
➢ Para inserir vários registros, pode-se percorrer os dados e usar o mesmo código 
mostrado na seção anterior, mas isso exigiria uma transação por registro no 
banco de dados.
➢ Uma maneira melhor seria usar uma única transação e enviar todos os dados, 
permitindo que psycopg2 cuide da inserção em massa usando o método 
executemany
➢ O código a seguir usará Faker para criar os registros e, em seguida, 
executemany() para inseri-los:
1) Importe as bibliotecas necessárias:
import psycopg2 as db
from faker import Faker
2) Crie o objeto faker e um array para conter todos os dados. Inicialize uma 
variável, i, para conter um ID:
fake=Faker()
data=[]
i=2
Trabalhando com Banco de Dados
Inserindo múltiplos registros
Pág. 16 Engenharia de Dados De 71
3) Agora, pode-se olhar, iterar e anexar uma tupla fake ao array criado na etapa 
anterior. Incremente i para o próximo registro. Lembre-se que anteriormente 
criamos um registro para o Big Bird com um ID de 1. É por isso iniciará com 2 
neste exemplo. Não podemos ter a mesma chave primária na mesma tabela:
for r in range(1000):
 data.append((i,fake.name(),fake.street_address(),
 fake.city(),fake.zipcode()))
 i+=1
4) Converta a matriz em uma tupla de tuplas:
data_for_db=tuple(data)
5) Agora, voltaremos ao código psycopg, que será semelhante ao exemplo 
anterior:
6)
Trabalhando com Banco de Dados
Inserindo múltiplos registros
Pág. 17 Engenharia de Dados De 71
5) Agora, voltaremos ao código psycopg, que será semelhante ao exemplo 
anterior:
conn_string="dbname='dataengineering' host='localhost'
user='postgres' password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = "insert into users(id,name,street,city,zip)
 values(%s,%s,%s,%s,%s)"
6) Pode-se imprimir o que o código enviará ao banco de dados usando um único 
registro da variável data_for_db:
print(cur.mogrify(query,data_for_db[1]))
7) Por último, use executemany() em vez de execute() para permitir que a 
biblioteca manipule as várias inserções. Em seguida, confirme a transação:
cur.executemany(query,data_for_db)
conn.commit()
Trabalhando com Banco de Dados
Inserindo múltiplos registros
Pág. 18 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python para inserção múltipla
import psycopg2 as db
from faker import Faker
fake=Faker()
data=[]
i=2
for r in range(1000):
 data.append((i,fake.name(),
 fake.street_address(), fake.city(),
 fake.zipcode()))
 i+=1
data_for_db=tuple(data)
print(data_for_db)
conn_string="dbname='dataengineering' 
host='localhost' user='postgres' 
password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = "insert into users 
(id,name,street,city,zip) values(%s,%s,%s,
%s,%s)"
print(cur.mogrify(query,data_for_db[1]))
cur.executemany(query,data_for_db)
conn.commit()
query2 = "select * from users"
cur.execute(query2)
print(cur.fetchall())
Pág. 19 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Consultando inserção múltipla no Postgresql
Pág. 20 Engenharia de Dados De 71
➢ A extração de dados usando psycopgs segue exatamente o mesmo 
procedimento da inserção, a única diferença é o uso da instrução select 
em vez de insert. As etapas a seguir mostram como extrair dados:
1) Importe a biblioteca e configure sua conexão e cursor:
import psycopg2 as db
conn_string="dbname='dataengineering' host='localhost'
user='postgres' password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
2) Agora, pode executar uma consulta. Neste exemplo, selecionaremos 
todos os registros da tabela de usuários:
query = "select * from users"
cur.execute(query)
Trabalhando com Banco de Dados
Extraindo dados do PostgreSQL
Pág. 21 Engenharia de Dados De 71
3) Agora, teremos um objeto iterável com os resultados. Pode-se iterar 
sobre o cursor, conforme demonstrado:
for record in cur:
 print(record)
4) Como alternativa, pode usar um dos métodos de busca:
cur.fetchall ()
cur.fetchmany (howmany) # onde howmany é igual ao número
 # de registros que se deseja devolver
cur.fetchone ()
5) Para obter um único registro, pode atribuí-lo a uma variável e examiná-
lo. Observe que mesmo quando seleciona um registro, o cursor retorna 
uma matriz:
data=cur.fetchone()
print(data[0])
Trabalhando com Banco de Dados
Extraindo dados do PostgreSQL
Pág. 22 Engenharia de Dados De 71
6) Independentemente de estar buscando um ou vários, precisa-se saber 
onde está e quantos registros existem. Pode-se obter a contagem de 
linhas da consulta usando o seguinte código:
cur.rowcount
# 1001
7) Pode obter o número da linha atual usando rownumber. Se usar 
fetchone() e, em seguida, chamar rownumber novamente, ele deve ser 
incrementado com sua nova posição:
cur.rownumber
# A última coisa a mencionar é que também pode consultar uma tabela 
e gravá-la em um arquivo CSV usando o método copy_to()
8) Crie a conexão e o cursor:
conn=db.connect(conn_string)
cur=conn.cursor()
Trabalhando com Banco de Dados
Extraindo dados do PostgreSQL
Pág. 23 Engenharia de Dados De 71
9) Abra um arquivo para gravar a tabela:
f=open('fromdb.csv','w')
10) Em seguida, chame copy_to e passe o arquivo, o nome da tabela e o 
separador (que terá como padrão tabs se não for definido). Feche o 
arquivo e terá todas as linhas como um CSV:
cur.copy_to(f,'users',sep=',')
f.close()
11) Pode-se verificar os resultados abrindo o arquivo e imprimindo o 
conteúdo:
f=open('fromdb.csv','r')
f.read()
Trabalhando com Banco de Dados
Extraindo dados do PostgreSQL
Pág. 24 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python para extração de dados
import psycopg3 as db
conn_string="dbname='dataengineering' 
host='localhost' user='postgres' 
password='postgres'"
conn=db.connect(conn_string)
cur=conn.cursor()
query = "select * from users"
cur.execute(query)
print(cur.fetchone())
print(cur.rowcount)
print(cur.rownumber)
print(cur.fetchmany(3))
print(cur.rownumber)
f=open('fromdb.csv','w')
conn=db.connect(conn_string)
cur=conn.cursor()
cur.copy_to(f,'users',sep=',')
f.close()
f=open('fromdb.csv','r')
print(f.read())
Pág. 25 Engenharia de Dados De 71
➢ Também é possível extrair dados usando o pandas DataFrames
➢ Para fazer isso, precisa estabelecer uma conexão usando psycopg2 e, em 
seguida, pular o cursor e ir direto para a consulta
➢ Os DataFrames oferecem muito poder na filtragem, análise e 
transformação de dados
➢ As etapas a seguir orientarão o uso de DataFrames:
1) Configure a conexão:
import psycopg2 as db
import pandas as pd
conn_string="dbname='dataengineering' host='localhost'
user='postgres' password='postgres'"
conn=db.connect(conn_string)
Trabalhando com Banco de Dados
Extraindo dados com DataFrames
Pág. 26 Engenharia de Dados De 71
2) Agora, pode-se executar a consulta em um DataFrame usando o método 
read_sql() do pandas. O método leva uma consulta e uma conexão:
df=pd.read_sql("select * from users", conn)
3) O resultado é um DataFrame, df, com todos os usuários da tabela. 
Agora tem acesso total a todas as ferramentas DataFrame para 
trabalhar com os dados - por exemplo, pode exportar para JSON usando 
o seguinte:
df.to_json(orient='records')
Trabalhando com Banco de Dados
Extraindo dados com DataFrames
Pág. 27 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python para extração com pandas
import psycopg2 as db
import pandas aspd
conn_string="dbname='dataengineering' host='localhost' user='postgres' 
password='postgres'"
conn=db.connect(conn_string)
df=pd.read_sql("select * from users", conn)
print(df.head())
print(df['city'].value_counts())
Pág. 28 Engenharia de Dados De 71
➢ Bancos de dados relacionais podem ser o que você pensa quando ouve o 
termo banco de dados, mas existem vários outros tipos de bancos de 
dados, como em colunas, documentos, chave valores e séries temporais.
➢ Vamos trabalhar com o Elasticsearch, que é um banco de dados NoSQL
➢ NoSQL é um termo genérico que se refere a bancos de dados que não 
armazenam dados em linhas e colunas
➢ Os bancos de dados NoSQL geralmente armazenam seus dados como 
documentos JSON e usam uma linguagem de consulta diferente de SQL
➢ Aprenderemos em seguida a carregar dados no Elasticsearch
Trabalhando com Banco de Dados
Inserindo e extraindo dados do banco de dados NoSQL em Python
Pág. 29 Engenharia de Dados De 71
➢ Para instalar a biblioteca elasticsearch, pode usar pip3, conforme 
mostrado: 
pip3 install elasticsearch
➢ Para verificar a instalação e verificar a versão, você utilize o seguinte 
código 
import elasticsearch
elasticsearch.__version__
➢ O código anterior deve imprimir algo como o seguinte:
(7, 14, 0)
➢ Se tiver a versão certa para sua versão do Elasticsearch, está pronto 
para começar a importar dados.
Trabalhando com Banco de Dados
Instalando a biblioteca do Elasticsearch
Pág. 30 Engenharia de Dados De 71
➢ Antes de consultar o Elasticsearch, é necessário carregar alguns dados 
em um índice
➢ Anteriormente utilizamos a biblioteca, psycopg2, para acessar o 
PostgreSQL
➢ Para acessar o Elasticsearch utilizaremos a biblioteca do elasticsearch
➢ Para carregar dados, precisa criar a conexão e, em seguida, emitir 
comandos para o Elasticsearch
➢ Siga as etapas fornecidas para adicionar um registro ao Elasticsearch:
1) Importe as bibliotecas. Podemos criar o objeto Faker para gerar dados 
aleatórios:
from elasticsearch import Elasticsearch
from faker import Faker
fake=Faker()
Trabalhando com Banco de Dados
Inserindo dados no Elasticsearch
Pág. 31 Engenharia de Dados De 71
2) Crie uma conexão com o Elasticsearch:
es = Elasticsearch()
3) O código anterior pressupõe que a instância do Elasticsearch está sendo 
executada no localhost. Se não for, pode-se especificar o endereço IP, 
conforme mostrado:
es=Elasticsearch({'127.0.0.1'})
➢ Agora, pode emitir comandos para a instância Elasticsearch
➢ O método de índice permitirá adicionar dados
➢ O método recebe um nome de índice, o tipo de documento e um corpo
➢ O corpo que é enviado ao Elasticsearch e é um objeto JSON: 
 doc={"name": fake.name(),"street": fake.street_address(),
 "city": fake.city(),"zip":fake.zipcode()}
res=es.index(index="users",doc_type="doc",body=doc)
print(res['result']) #criado
Trabalhando com Banco de Dados
Inserindo dados no Elasticsearch
Pág. 32 Engenharia de Dados De 71
➢ Usando o método bulk, pode-se inserir muitos documentos de uma vez
➢ O processo é semelhante ao de inserir um único registro, exceto que irá 
gerar todos os dados e, em seguida, inseri-los
Trabalhando com Banco de Dados
Inserindo dados com helpers
Pág. 33 Engenharia de Dados De 71
1) Importe a biblioteca helpers para acessar o método bulk:
from elasticsearch import helpers
2) Os dados precisam ser uma matriz de objetos JSON. Precisamos 
especificar o índice e o tipo. Os sublinhados nos nomes são usados para 
os campos do Elasticsearch. O campo _source é onde colocaríamos o 
documento JSON que desejamos inserir no banco de dados. Fora do 
JSON existe um loop for. Este loop cria 999 documentos (já foi 
adicionado um e indexa de 0 a 998):
actions = [{ "_index": "users", "_type": "doc", 
 "_source": { "name": fake.name(),
 "street": fake.street_address(),
 "city": fake.city(), "zip":fake.zipcode()}}
 for x in range(998) # or for i,r in df.iterrows()
 ]
Trabalhando com Banco de Dados
Etapas para inserir dados com helpers
Pág. 34 Engenharia de Dados De 71
3) Agora, pode-se chamar o método bulk e passá-lo para a instância 
elasticsearch e a matriz de dados. Pode imprimir os resultados para 
verificar se funcionou:
res = helpers.bulk(es, actions)
print(res['result'])
Trabalhando com Banco de Dados
Etapas para inserir dados com helpers
Pág. 35 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python para inserção de dados/índice no Elasticsearch
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from faker import Faker
fake=Faker()
es = Elasticsearch() #ou ip {127.0.0.1}
actions = [
 {
 "_index": "users",
 "_type": "doc",
 "_source": {
 "name": fake.name(),
 "street": fake.street_address(),
 "city": fake.city(),
 "zip":fake.zipcode()}
 }
 for x in range(998) # ou for i,r in 
df.iterrows()
]
response = helpers.bulk(es, actions)
print(response)
Pág. 36 Engenharia de Dados De 71
➢ Agora deve-se ter 1.000 registros em um índice Elasticsearch nomeando 
usuários
➢ Podemos verificar isso em Kibana
➢ Para adicionar o novo índice ao Kibana, navegue até o painel do Kibana 
em http://localhost:5601 
➢ No menu selecione Management Stack Management Kibana Index → → →
Patterns Create index pattern e insira “users*” no Index pattern name →
e click no botão Next Step, por fim pressione o botão Create index 
pattern, no menu escolha Analytics Discover para ver a tela a seguir→
Trabalhando com Banco de Dados
Etapas para inserir dados com helpers
http://localhost:5601/
Pág. 37 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Kibana - Discover
Pág. 38 Engenharia de Dados De 71
➢ A consulta no Elasticsearch segue exatamente as mesmas etapas da 
inserção de dados
➢ A única diferença é que você usa um método diferente – search - para 
enviar um objeto de corpo diferente
➢ Vamos examinar uma consulta simples com todos os dados:
1) Importe a biblioteca e crie sua instância de elasticsearch:
from elasticsearch import Elasticsearch
es = Elasticsearch()
2) Crie o objeto JSON para enviar ao Elasticsearch. O objeto é uma 
consulta, usando a pesquisa match_all:
doc={"query":{"match_all":{}}}
Trabalhando com Banco de Dados
Pesquisando no Elasticsearch
Pág. 39 Engenharia de Dados De 71
3) Passe o objeto para o Elasticsearch usando o método de pesquisa. 
Passe o índice e o tamanho do retorno. Nesse caso, você retornará 
apenas 10 registros. O tamanho máximo de retorno é 10.000 
documentos:
res=es.search(index="users",body=doc,size=10)
4) Por último, você pode imprimir os documentos:
print(res['hits']['hits'])
# ou você pode iterar capturando apenas _source:
for doc in res['hits']['hits']:
 print(doc['_source'])
Trabalhando com Banco de Dados
Pesquisando no Elasticsearch
Pág. 40 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python para pesquisa de dados/índice no Elasticsearch
from elasticsearch import Elasticsearch
es = Elasticsearch()
# Pesquisa todos os documentos
doc={"query":{"match_all":{}}}
res=es.search(index="users",body=doc,size=10)
print(res['hits']['hits'][9]['_source'])
# Obtem Muttley
doc={"query":{"match":{"name":"Muttley"}}}
res=es.search(index="users",body=doc,size=10)
print(res['hits']['hits'][0]['_source'])
# Obtem Muttley com sintaxe Lucene
res=es.search(index="users",q="name:Muttley",
size=10)
print(res['hits']['hits'][0]['_source'])
# Obtem City Gottan 
doc={"query":{"match":{"city":"Gottan"}}}
res=es.search(index="users",body=doc,size=10)
print(res['hits']['hits'])
# Obtem Gottan e filtra zip 
doc={"query":{"bool":{"must":{"match":
{"city":"Gottan"}},"filter":{"term":
{"zip":"63792"}}}}}
res=es.search(index="users",body=doc,size=10)
print(res['hits']['hits'])
Pág. 41 Engenharia de Dados De 71
➢ Você pode carregar os resultados da consulta em um DataFrame do 
pandas 
➢Para carregar os resultados em um DataFrame, importe json_normalize 
da biblioteca pandas json e use-o (json_normalize) nos resultados JSON, 
conforme mostrado no código a seguir:
from pandas.io.json import json_normalize
df=json_normalize(res['hits']['hits']) 
Trabalhando com Banco de Dados
Pesquisando no Elasticsearch com pandas Dataframe
Pág. 42 Engenharia de Dados De 71
➢ No primeiro exemplo, você usou um tamanho 10 para sua pesquisa
➢ Você poderia ter obtido todos os 1.000 registros, mas o que você faz 
quando tem mais de 10.000 e precisa de todos eles? 
➢ O Elasticsearch possui um método de rolagem que permite iterar os 
resultados até obter todos eles
➢ Para percorrer os dados, siga as etapas fornecidas:
1) Importe a biblioteca e crie sua instância Elasticsearch:
from elasticsearch import Elasticsearch
es = Elasticsearch()
Trabalhando com Banco de Dados
Usando a scroll para lidar com resultados maiores
Pág. 43 Engenharia de Dados De 71
2) Pesquise seus dados. Como você não tem mais de 10.000 registros, 
definirá o tamanho para 500. Isso significa que perderão 500 registros 
em sua pesquisa inicial. Você passará um novo parâmetro para o método 
de pesquisa - scroll. Este parâmetro especifica por quanto tempo você 
deseja disponibilizar os resultados. Estou usando 20 milissegundos. 
Ajuste este número para garantir que você tenha tempo suficiente para 
obter os dados - isso dependerá do tamanho do documento e da 
velocidade da rede:
res = es.search(
 index = 'users',
 doc_type = 'doc',
 scroll = '20m',
 size = 500,
 body = {"query":{"match_all":{}}}
)
Trabalhando com Banco de Dados
Usando a scroll para lidar com resultados maiores
Pág. 44 Engenharia de Dados De 71
3) Os resultados incluirão _scroll_id, que você precisará passar para o 
método de scroll mais tarde. Salve o ID de rolagem e o tamanho do 
conjunto de resultados:
sid = res['_scroll_id']
size = res['hits']['total']['value']
4) Para começar a rolar, use um loop while para obter os registros até que 
o tamanho seja 0, o que significa que não há mais dados. Dentro do loop, 
você chamará o método scroll e passará _scroll_id e por quanto tempo 
rolar. Isso pegará mais resultados da consulta original:
while (size > 0):
 res = es.scroll(scroll_id = sid, scroll = '20m')
Trabalhando com Banco de Dados
Usando a scroll para lidar com resultados maiores
Pág. 45 Engenharia de Dados De 71
5) Em seguida, obtenha o novo ID de rolagem e o tamanho para que você 
possa fazer um loop novamente se os dados ainda existirem:
sid = res['_scroll_id']
size = len(res['hits']['hits'])
6) Por último, você pode fazer algo com os resultados dos scrolls. No 
código a seguir, você imprimirá a fonte para cada registro:
for doc in res['hits']['hits']:
 print(doc['_source'])
Trabalhando com Banco de Dados
Usando a scroll para lidar com resultados maiores
Pág. 46 Engenharia de Dados De 71
➢ No último encontro, você construiu seu primeiro pipeline de dados do 
Airflow usando um operador Bash e Python
➢ Desta vez, você combinará dois operadores Python para extrair dados do 
PostgreSQL, salvá-los como um arquivo CSV e, em seguida, lê-los e 
gravá-los em um índice Elasticsearch
Trabalhando com Banco de Dados
Criação de pipelines de dados no Apache Airflow
Pág. 47 Engenharia de Dados De 71
➢ No último encontro, você construiu seu primeiro pipeline de dados do 
Airflow usando um operador Bash e Python
➢ Desta vez, você combinará dois operadores Python para extrair dados do 
PostgreSQL, salvá-los como um arquivo CSV e, em seguida, lê-los e 
gravá-los em um índice Elasticsearch
Trabalhando com Banco de Dados
Criação de pipelines de dados no Apache Airflow
Pág. 48 Engenharia de Dados De 71
➢ Cada DAG terá algum código padrão e clichê para executá-lo no Airflow
➢ Você sempre importará as bibliotecas necessárias e, a seguir, quaisquer outras 
bibliotecas necessárias para suas tarefas
➢ No código a seguir, você importa o operador, DAG e as bibliotecas de tempo 
para o Airflow
➢ Para suas tarefas, você importa as bibliotecas pandas, psycopg2 e 
elasticsearch:
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch
Trabalhando com Banco de Dados
Configurando o boilerplate do Airflow
Pág. 49 Engenharia de Dados De 71
➢ Em seguida, você especificará os argumentos para seu DAG
➢ Lembre-se de que o horário de início deve ser um dia antes se você 
programar a tarefa para ser executada diariamente:
default_args = {
 'owner': 'dickvigarista',
 'start_date': dt.datetime(2021, 8, 30),
 'retries': 1,
 'retry_delay': dt.timedelta(minutes=5),
}
Trabalhando com Banco de Dados
Configurando o boilerplate do Airflow
Pág. 50 Engenharia de Dados De 71
➢ Agora, você pode passar os argumentos para o DAG, nomeá-lo e definir o 
intervalo de execução
➢ Você definirá seus operadores aqui também
➢ Neste exemplo, você criará dois operadores Python - um para obter 
dados do PostgreSQL e outro para inserir dados no Elasticsearch
➢ A tarefa getData será upstream e a tarefa insertData downstream, então 
você usará o operador >> bit shift para especificar isto:
Trabalhando com Banco de Dados
Configurando o boilerplate do Airflow
Pág. 51 Engenharia de Dados De 71
with DAG('MyDBDAG',
 default_args=default_args,
 schedule_interval=timedelta(minutes=5),
 # '0 * * * *',
 ) as dag:
 getData = PythonOperator(task_id='QueryPostgreSQL',
 python_callable=queryPostgresql)
 insertData = PythonOperator( task_id='InsertDataElasticsearch',
 python_callable=insertElasticsearch)
 getData >> insertData
Trabalhando com Banco de Dados
Configurando o boilerplate do Airflow
Pág. 52 Engenharia de Dados De 71
➢ Por último, você definirá as tarefas
➢ Nos operadores anteriores, você os nomeou queryPostgresql e 
insertElasticsearch
➢ O código nessas tarefas deve parecer muito familiar; é quase idêntico ao 
código das seções anteriores 
➢ Para consultar o PostgreSQL, você cria a conexão, executa a consulta sql 
usando o método pandas read_sql() e, em seguida, usa o método pandas 
to_csv() para gravar os dados no disco:
Trabalhando com Banco de Dados
Configurando o boilerplate do Airflow
Pág. 53 Engenharia de Dados De 71
def queryPostgresql():
 conn_string="dbname='dataengineering' host='localhost'
 user='postgres' password='postgres'"
 conn=db.connect(conn_string)
 df=pd.read_sql("select name,city from users",conn)
 df.to_csv('postgresqldata.csv')
 print("-------Dados Salvos------")
Trabalhando com Banco de Dados
Configurando o boilerplate do Airflow
Pág. 54 Engenharia de Dados De 71
➢ Para inserir os dados no Elasticsearch, você cria o objeto Elasticsearch 
conectando-se ao localhost
➢ Em seguida, leia o CSV da tarefa anterior em um DataFrame, itere por meio do 
DataFrame, convertendo cada linha em JSON e insira os dados usando o método 
de índice:
def insertElasticsearch():
 es = Elasticsearch()
 df=pd.read_csv('postgresqldata.csv')
 for i,r in df.iterrows():
 doc=r.to_json()
 res=es.index(index="frompostgresql",
 doc_type="doc",body=doc)
 print(res)
➢ Agora você tem um pipeline de dados completo no Airflow. Em seguida você o 
executará e verá os resultados.
Trabalhando com Banco de Dados
Configurando o boilerplate do Airflow
Pág. 55 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python completo Airflow
import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import 
BashOperator
from airflow.operators.python import 
PythonOperator
import pandas as pd
import psycopg2 as db
from elasticsearch import Elasticsearch
def queryPostgresql():conn_string="dbname='dataengineering' 
 host='localhost' user='postgres'
 password='postgres'"
 conn=db.connect(conn_string)
 df=pd.read_sql("select name,
 city from users",conn)
 df.to_csv('postgresqldata.csv')
 print("-------Dados Salvos------")
 
Pág. 56 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Código Python completo Airflow
def insertElasticsearch():
 es = Elasticsearch()
 df=pd.read_csv('postgresqldata.csv')
 for i,r in df.iterrows():
 doc=r.to_json()
 res=es.index(
 index="frompostgresql",
 doc_type="doc",body=doc)
 print(res)
default_args = {
 'owner': 'dickvigarista',
 'start_date': dt.datetime(2021, 8, 29),
 'retries': 1,
 'retry_delay': dt.timedelta(minutes=5),
}
with DAG('MyDBDAG',
 default_args=default_args,
 schedule_interval=timedelta(minutes=5),
 # '0 * * * *',
 ) as dag:
 getData = PythonOperator(
 task_id='QueryPostgreSQL',
 python_callable=queryPostgresql)
 insertData = PythonOperator(
 task_id='InsertDataElasticsearch',
 python_callable=insertElasticsearch)
getData >> insertData
Pág. 57 Engenharia de Dados De 71
➢ Copie o código Python do MyDBDAG para o diretório das DAGs do Airflow (Ex: cp 
 AirflowDB.py /opt/aiflow/dags) 
➢ Reinicialize o Airflow e o Scheduler se a nova DAG não estiver aparecendo na 
páginas da DAG no Airflow (http://localhost:8080/) 
 
➢ Ative e execute a MyDBDAG
Trabalhando com Banco de Dados
Executando o DAG
http://localhost:8080/
Pág. 58 Engenharia de Dados De 71
➢ Para verificar se o pipeline de dados foi bem-sucedido, você pode visualizar os 
dados no Elasticsearch usando Kibana
➢ Para ver os resultados, navegue até o Kibana em http://localhost:5601
➢ Você precisará criar um novo índice no Kibana
➢ Você deve ver os registros conforme mostrado na seguinte captura de tela:
Trabalhando com Banco de Dados
Executando o DAG
http://localhost:5601/
Pág. 59 Engenharia de Dados De 71
➢ O processador mais usado para lidar com bancos de dados relacionais 
em NiFi é o processador ExecuteSQLRecord
➢ Entre no NiFi (https://localhost:9300/nifi/) e arraste o ícone do 
processador para a tela e procure o processador ExecuteSQLRecord
➢ Depois de adicionado à tela, você precisa configurá-lo
Trabalhando com Banco de Dados
Extraindo dados do PostgreSQL usando NiFi
https://localhost:9300/nifi/
Pág. 60 Engenharia de Dados De 71
➢ Para configurar o processador, você precisa criar um pool de conexão de 
banco de dados, conforme mostrado na seguinte captura de tela:
Trabalhando com Banco de Dados
Configurando o processador ExecuteSQLCommand
Pág. 61 Engenharia de Dados De 71
➢ Depois de selecionar Create new service…, você verá um pop-up para 
adicionar o serviço do controlador
➢ Selecione a opção padrão, DBCPConnectionPool, e para tornar mais fácil 
lembrar a qual banco de dados esse serviço se destina, você pode 
nomeá-lo com o nome do banco de dados – dataengineering
➢ Observe como eu não o chamei de PostgreSQL
➢ À medida que adiciona mais serviços, você adiciona mais conexões 
PostgreSQL para bancos de dados diferentes
➢ Assim, seria difícil lembrar para qual banco de dados PostgreSQL o 
serviço se destinava
➢ Para configurar o serviço, selecione a seta na configuração do 
processador
➢ A configuração do serviço deve ser semelhante à seguinte captura de 
tela:
Trabalhando com Banco de Dados
Configurando o processador ExecuteSQLCommand
Pág. 62 Engenharia de Dados De 71
Trabalhando com Banco de Dados
Configurando o processador ExecuteSQLCommand
Pág. 63 Engenharia de Dados De 71
➢ A configuração requer que você especifique o URL de conexão, que é 
uma string de conexão de banco de dados Java (Ex: 
jdbc:postgresql://localhost/dataengineering)
➢ A string especifica Java Database Connectivity (JDBC) e o tipo de banco 
de dados – PostgreSQL
➢ Em seguida, ele nomeia o host, localhost e o nome do banco de dados, 
dataengineering
➢ A classe do driver especifica o driver postgresql (org.postgresql.Driver)
➢ A localização do driver é onde você fez o download em aula anterior na 
construção da nossa infraestrutura de engenharia de dados
➢ Ele deve estar em seu diretório inicial na pasta nifi, em um subdiretório 
chamado drivers
➢ Por último, você precisa inserir o nome de usuário e a senha do banco de 
dados
Trabalhando com Banco de Dados
Configurando o processador ExecuteSQLCommand
Pág. 64 Engenharia de Dados De 71
➢ Em seguida, você precisa criar um serviço Record Writer
➢ Selecione Create new service … em Record Writer, escolha 
JSONRecordSetWriter e clique na seta para configurá-lo
➢ Há uma definição de configuração importante que você não pode ignorar – 
Output Grouping
➢ Você deve definir esta propriedade como One Line Per Object
➢ A configuração finalizada será semelhante à seguinte captura de tela:
Trabalhando com Banco de Dados
Configurando o processador ExecuteSQLCommand
Pág. 65 Engenharia de Dados De 71
➢ Agora que configurou os serviços para o processador, você precisa 
concluir a configuração do processo
➢ O último parâmetro que você precisa configurar é SQL Select Query
➢ É aqui que você pode especificar o comando SQL a ser executado no 
banco de dados. Por exemplo, você pode inserir o seguinte:
select name, city from users
➢ Isso irá capturar todos os registros no banco de dados PostgreSQL, mas 
apenas os campos de nome e cidade
➢ Agora você pode passar para o próximo processador no pipeline
Trabalhando com Banco de Dados
Configurando o processador ExecuteSQLCommand
Pág. 66 Engenharia de Dados De 71
➢ Agora que configurou o processador ExecuteSQLRecord, você receberá 
uma matriz de registros
➢ Para processar esses dados, você precisa ter um arquivo de fluxo por 
registro
➢ Para fazer isso, você pode usar o processador SplitText
➢ Arraste-o para a tela e abra a guia Propriedades clicando duas vezes no 
processador - ou clique com o botão direito e selecione Propriedades
➢ Os padrões do processador funcionam, mas certifique-se de que Line 
Split Count está definido como 1, Header Line Count é 0 - seus dados não 
têm um cabeçalho quando vêm do processador ExecuteSQLRecord - e 
Remove Trailing Newlines deve ser true
➢ Essas configurações permitirão que o processador pegue cada linha do 
arquivo de fluxo e divida-o em um novo arquivo de fluxo
➢ Portanto, seu único arquivo de fluxo de entrada sairá deste processador 
como 1.000 arquivos de fluxo
Trabalhando com Banco de Dados
Configurando o processador SplitText
Pág. 67 Engenharia de Dados De 71
➢ A última etapa no pipeline de dados é inserir os arquivos de fluxo no 
Elasticsearch
➢ Você pode fazer isso usando o processador PutElasticsearchHttp
➢ Existem quatro processadores PutElasticsearch diferentes
➢ Apenas dois serão relevantes nesta disciplina - PutElasticsearchHttp e 
PutelasticSearchHttpRecord
➢ Esses são os processadores para inserir um único registro ou usar a API 
em massa
➢ Os outros dois processadores - Putelasticsearch e Putelasticsearch5 - 
são para versões mais antigas do Elasticsearch (2 e 5)
➢ Para configurar o processador, você deve especificar o URL e a porta
➢ Neste exemplo, você usará http://localhost:9200 
➢ O Index será fromnifi, mas você pode nomeá-lo como quiser
➢ O Type é doc e a Index Operation será index
Trabalhando com Banco de Dados
Configurando o processador PutElasticsearchHttp
http://localhost:9200/
Pág. 68 Engenharia de Dados De 71
➢ Agora que você configurou todos os processadores, pode conectá-los arrastando a 
seta de ExecuteSQLRecord para o processador SplitText para obter sucesso
➢ Em seguida, conecte o processador SplitText ao processador PutElasticsearchHttp 
para splits
➢ Por último, encerre o processador PutElasticsearchHttp para todos os 
relacionamentos.➢ Execute cada um dos processadores ou, no painel Operações, selecione Start para 
iniciar todos
➢ Você verá um arquivo de fluxo na primeira fila e, em seguida, ele será dividido em 
1.000 arquivos de fluxo na segunda fila
➢ A fila ficará vazia em lotes de 100 conforme eles são inseridos no Elasticsearch
➢ Para verificar os resultados, você pode usar a API elasticsearch, e não Kibana
➢ Em seu navegador, acesse http://localhost:9200/_cat/indices 
➢ Este é o ponto de extremidade REST para visualizar os índices em seu banco de 
dados Elasticsearch
➢ Você deverá ver seu novo índice, fromnifi, e o número total de documentos, 
conforme mostrado na seguinte captura de tela:
Trabalhando com Banco de Dados
Executar o pipeline de dados
http://localhost:9200/_cat/indices
Pág. 69 Engenharia de Dados De 71
➢ O número de documentos no índice irá variar dependendo se você deixou o 
pipeline em execução ou não
➢ Assim como no exemplo do Airflow, este pipeline não é idempotente
➢ À medida que é executado, ele continuará adicionando os mesmos registros com 
uma ID diferente no Elasticsearch
➢ Este não é o comportamento que você deseja na produção e corrigiremos isso 
quando discutirmos a implementação de pipelines na produção 
Trabalhando com Banco de Dados
Executar o pipeline de dados
Pág. 70 Engenharia de Dados De 71
➢ Aprendemos a usar Python para consultar e inserir dados em bancos de dados relacionais 
e NoSQL
➢ Você também aprendeu a usar o Airflow e o NiFi para criar pipelines de dados
➢ Habilidades de banco de dados são algumas das mais importantes para um engenheiro de 
dados
➢ Haverá muito poucos pipelines de dados que não os afetem de alguma forma
➢ As habilidades que você aprendeu fornecem a base para as outras habilidades que você 
precisará aprender - principalmente SQL
➢ Combinar fortes habilidades de SQL com as habilidades de pipeline de dados que você 
aprendeu permitirá que você realize a maioria das tarefas de engenharia de dados que 
encontrará
➢ Nos exemplos, os pipelines de dados não eram idempotentes
➢ Cada vez que eles eram executados, você obtinha novos resultados e resultados 
indesejados
➢ Corrigiremos isso quando implantarmos pipelines em produção
➢ Mas antes de chegar a isso, você precisará aprender como lidar com problemas comuns 
de dados e como enriquecer e transformar seus dados.
➢ Em seguida vamos aprender a usar Python para trabalhar com seus dados entre as fases 
de extração e carregamento de seus pipelines de dados
Trabalhando com Banco de Dados
Resumo
Pág. 71 Engenharia de Dados De 71
Lembre-seLembre-se
“Se você não consegue explicar de maneira simples, não 
entende bem o suficiente.”
Albert Einstein
	Slide 1
	Slide 2
	Slide 3
	Slide 4
	Slide 5
	Slide 6
	Slide 7
	Slide 8
	Slide 9
	Slide 10
	Slide 11
	Slide 12
	Slide 13
	Slide 14
	Slide 15
	Slide 16
	Slide 17
	Slide 18
	Slide 19
	Slide 20
	Slide 21
	Slide 22
	Slide 23
	Slide 24
	Slide 25
	Slide 26
	Slide 27
	Slide 28
	Slide 29
	Slide 30
	Slide 31
	Slide 32
	Slide 33
	Slide 34
	Slide 35
	Slide 36
	Slide 37
	Slide 38
	Slide 39
	Slide 40
	Slide 41
	Slide 42
	Slide 43
	Slide 44
	Slide 45
	Slide 46
	Slide 47
	Slide 48
	Slide 49
	Slide 50
	Slide 51
	Slide 52
	Slide 53
	Slide 54
	Slide 55
	Slide 56
	Slide 57
	Slide 58
	Slide 59
	Slide 60
	Slide 61
	Slide 62
	Slide 63
	Slide 64
	Slide 65
	Slide 66
	Slide 67
	Slide 68
	Slide 69
	Slide 70
	Slide 71

Continue navegando