Mysql
 sql >> Base de Dados >  >> RDS >> Mysql

Usando Python e MySQL no Processo ETL:Usando Python e SQLAlchemy




Nos dois artigos anteriores desta série, discutimos como usar Python e SQLAlchemy para realizar o processo de ETL. Hoje faremos o mesmo, mas desta vez usando Python e SQL Alchemy sem comandos SQL em formato textual. Isso nos permitirá usar o SQLAlchemy independentemente do mecanismo de banco de dados ao qual estamos conectados. Então vamos começar.

Hoje vamos discutir como realizar o processo ETL usando Python e SQLAlchemy. Criaremos um script para extrair dados diários de nosso banco de dados operacional, transformá-los e depois carregá-los em nosso data warehouse.

Este é o terceiro artigo da série. Se você não leu os dois primeiros artigos (Usando Python e MySQL no Processo ETL e SQLAlchemy), recomendo fortemente que você o faça antes de continuar.

Esta série inteira é uma continuação de nossa série de data warehouse:
  • Criando um DWH, parte um:um modelo de dados de negócios de assinatura
  • Criando um DWH, parte dois:um modelo de dados de negócios de assinatura
  • Criando um Data Warehouse, Parte 3:Um Modelo de Dados Corporativos de Assinatura

Ok, agora vamos começar no tópico de hoje. Primeiro, vamos olhar para os modelos de dados.

Os modelos de dados



Modelo de dados de banco de dados operacional (ao vivo)



Modelo de dados DWH


Esses são os dois modelos de dados que usaremos. Para obter mais informações sobre data warehouses (DWHs), confira estes artigos:

  • O esquema em estrela
  • O esquema de floco de neve
  • Esquema em estrela vs. Esquema em floco de neve

Por que SQLAlchemy?


A ideia por trás do SQLAlchemy é que, depois de importarmos bancos de dados, não precisamos de código SQL específico para o mecanismo de banco de dados relacionado. Em vez disso, podemos importar objetos para SQLAlchemy e usar a sintaxe SQLAlchemy para instruções. Isso nos permitirá usar a mesma linguagem, independentemente do mecanismo de banco de dados ao qual estamos conectados. A principal vantagem aqui é que um desenvolvedor não precisa cuidar das diferenças entre os diferentes mecanismos de banco de dados. Seu programa SQLAlchemy funcionará exatamente da mesma forma (com pequenas alterações) se você migrar para um mecanismo de banco de dados diferente.

Decidi usar apenas comandos SQLAlchemy e listas Python para comunicar com o armazenamento temporário e entre diferentes bancos de dados. As principais razões por trás dessa decisão são que 1) as listas Python são bem conhecidas e 2) o código seria legível para aqueles sem habilidades em Python.

Isso não quer dizer que o SQLAlchemy seja perfeito. Tem certas limitações, que discutiremos mais tarde. Por enquanto, vamos apenas dar uma olhada no código abaixo:


Executando o script e o resultado

Este é o comando Python usado para chamar nosso script. O script verifica os dados no banco de dados operacional, compara os valores com o DWH e importa os novos valores. Neste exemplo, estamos atualizando valores em duas tabelas de dimensão e uma tabela de fatos; o script retorna a saída apropriada. Todo o script é escrito para que você possa executá-lo várias vezes ao dia. Ele excluirá os dados "antigos" desse dia e os substituirá por novos.

Vamos analisar todo o roteiro, começando de cima.

Importando SQLAlchemy


A primeira coisa que precisamos fazer é importar os módulos que usaremos no script. Normalmente, você importará seus módulos enquanto escreve o script. Na maioria dos casos, você não saberá exatamente de quais módulos precisará desde o início.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

Importamos o datetime do Python módulo, que nos fornece classes que trabalham com datas.

Em seguida, temos o sqlalchemy módulo. Não importaremos o módulo inteiro, apenas as coisas que precisamos – algumas específicas para SQLAlchemy (create_engine , MetaData , Table ), algumas partes da instrução SQL (select , and_ , case ) e func , que nos permite usar funções como count() e soma() .

Conectando aos bancos de dados


Precisaremos nos conectar a dois bancos de dados em nosso servidor. Poderíamos nos conectar a mais bancos de dados (MySQL, SQL Server ou qualquer outro) de diferentes servidores, se necessário. Nesse caso, ambos os bancos de dados são bancos de dados MySQL e são armazenados na minha máquina local.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

Criamos dois mecanismos e duas conexões. Não vou entrar em detalhes aqui porque já explicamos isso no artigo anterior.

Atualizando o dim_time Dimensão


Meta:inserir a data de ontem se ainda não estiver inserida na tabela.

Em nosso script, atualizaremos duas tabelas de dimensão com novos valores. O resto deles segue o mesmo padrão, então vamos passar por isso apenas uma vez; não precisamos escrever códigos quase idênticos mais algumas vezes.

A ideia é muito simples. Sempre executaremos o script para inserir novos dados para ontem. Portanto, precisamos verificar se essa data foi inserida na tabela de dimensões. Se já estiver lá, não faremos nada; se não for, vamos adicioná-lo. Vamos dar uma olhada no código para atualizar o dim_time tabela.

Primeiro, vamos verificar se a data existe. Se não existir, vamos adicioná-lo. Começamos armazenando a data de ontem em uma variável. Em Python, você faz assim:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

A primeira linha pega uma data atual, converte em um valor numérico, subtrai 1 desse valor e converte esse valor numérico de volta para uma data (ontem =hoje – 1 ). A segunda linha armazena a data em formato textual.

Em seguida, testaremos se a data já está no banco de dados:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

Após carregar a tabela, executaremos uma consulta que deve retornar todas as linhas da tabela de dimensões em que o valor de data/hora é igual a ontem. O resultado pode ter 0 (nenhuma data na tabela) ou 1 linha (a data já está na tabela).

Se a data ainda não estiver na tabela, usaremos o comando insert() para adicioná-la:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

Uma coisa nova aqui que eu gostaria de destacar é o uso de. .year , .month , .isocalendar()[1] e .weekday para obter dateparts.

Atualizando a dim_city Dimensão


Objetivo:inserir novas cidades se houver alguma (ou seja, comparar a lista de cidades no banco de dados ao vivo com a lista de cidades no DWH e adicionar as que faltam).

Atualizando o dim_time dimensão era bastante simples. Nós simplesmente testamos se uma data estava na tabela e a inserimos se já não estivesse lá. Para testar um valor no banco de dados DWH, usamos uma variável Python (ontem ). Usaremos esse processo novamente, mas desta vez com listas.

Como não há uma maneira fácil de combinar tabelas de bancos de dados diferentes em uma única consulta SQLAlchemy, não podemos usar a abordagem descrita na Parte 1 desta série. Portanto, precisaremos de um objeto para armazenar os valores necessários para a comunicação entre esses dois bancos de dados. Eu decidi usar listas, porque elas são comuns e fazem o trabalho.

Primeiro, carregaremos o country e city tabelas de um banco de dados ativo para os objetos relevantes.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

A seguir, carregaremos o dim_city tabela do DWH em uma lista:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Então faremos o mesmo para os valores do banco de dados ativo. Juntaremos as tabelas country e city então temos todos os dados necessários nesta lista:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Agora vamos percorrer a lista que contém os dados do banco de dados ativo. Para cada registro, compararemos valores (city_name , postal_code e country_name ). Se não encontrarmos esses valores, adicionaremos um novo registro ao dim_city tabela.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

Para determinar se o valor já está no DWH, testamos uma combinação de atributos que devem ser exclusivos. (A chave primária do banco de dados ativo não nos ajuda muito aqui.) Podemos usar um código semelhante para atualizar outros dicionários. Não é a solução mais legal, mas ainda é bastante elegante. E fará exatamente o que precisamos.

Atualizando o fact_customer_subscribed Tabela


Objetivo:Se tivermos dados antigos para a data de ontem, exclua-os primeiro. Adicione os dados de ontem ao DWH, independentemente de excluirmos algo na etapa anterior ou não.

Após atualizar todas as tabelas de dimensões, devemos atualizar as tabelas de fatos. Em nosso script, atualizaremos apenas uma tabela de fatos. O raciocínio é o mesmo da seção anterior:a atualização de outras tabelas seguiria o mesmo padrão, então repetiríamos principalmente o código.

Antes de inserir valores na tabela de fatos, precisamos conhecer os valores das chaves relacionadas das tabelas de dimensão. Para fazer isso, carregaremos novamente as dimensões nas listas e as compararemos com os valores do banco de dados ativo.

A primeira coisa que faremos é carregar o cliente e fact_customer_subscribed tabelas em objetos:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Agora, precisamos encontrar chaves para a dimensão de tempo relacionada. Como estamos sempre inserindo dados de ontem, pesquisaremos essa data no dim_time table e use seu ID. A consulta retorna 1 linha e o ID está na primeira posição (o índice começa em 0, então é result[0][0] ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

Por enquanto, excluiremos todos os registros associados da tabela de fatos:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Ok, agora temos o ID da dimensão de tempo armazenado no dim_time_id variável. Isso foi fácil porque podemos ter apenas um valor de dimensão de tempo. A história será diferente para a dimensão da cidade. Primeiro, carregaremos todos os valores de que precisamos – valores que descrevem exclusivamente a cidade (não o ID) e valores agregados:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

Há algumas coisas que eu gostaria de enfatizar sobre a consulta acima:

  • func.sum(...) é SOMA(...) do "SQL padrão".
  • O case(...) sintaxe usa and_ antes das condições, não entre elas.
  • .label(...) funciona como um alias SQL AS.
  • Estamos usando \ para passar para a próxima linha e aumentar a legibilidade da consulta. (Confie em mim, é praticamente ilegível sem a barra – eu tentei :) )
  • .group_by(...) desempenha o papel de GROUP BY do SQL.

Em seguida, percorreremos todos os registros retornados usando a consulta anterior. Para cada registro, compararemos valores que definem exclusivamente uma cidade (city_name , postal_code , country_name ) com os valores armazenados na lista criada a partir do DWH dim_city tabela. Se todos os três valores corresponderem, armazenaremos o ID da lista e o usaremos ao inserir novos dados. Dessa forma, para cada registro, teremos IDs para ambas as dimensões:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

E é isso. Atualizamos nosso DWH. O script seria muito mais longo se atualizássemos todas as tabelas de dimensões e fatos. A complexidade também seria maior quando uma tabela de fatos estivesse relacionada a mais tabelas de dimensão. Nesse caso, precisaríamos de um for loop para cada tabela de dimensão.

Isso não funciona!


Fiquei muito desapontado quando escrevi esse script e descobri que algo assim não funcionaria:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

Neste exemplo, estou tentando usar tabelas de dois bancos de dados diferentes. Se estabelecermos duas conexões separadas, a primeira conexão não “verá” tabelas de outra conexão. Se nos conectarmos diretamente ao servidor, e não a um banco de dados, não poderemos carregar tabelas.

Até que isso mude (espero que em breve), você precisará usar algum tipo de estrutura (por exemplo, o que fizemos hoje) para se comunicar entre os dois bancos de dados. Isso complica o código, porque você precisa substituir uma única consulta por duas listas e para aninhadas rotações.

Compartilhe seus pensamentos sobre SQLAlchemy e Python


Este foi o último artigo desta série. Mas quem sabe? Talvez tentemos outra abordagem nos próximos artigos, então fique atento. Enquanto isso, compartilhe seus pensamentos sobre SQLAlchemy e Python em combinação com bancos de dados. O que você acha que falta neste artigo? O que você acrescentaria? Conte-nos nos comentários abaixo.

Você pode baixar o script completo que usamos neste artigo aqui.

E um agradecimento especial a Dirk J Bosman (@dirkjobosman), que recomendou esta série de artigos.