Sqlserver
 sql >> Base de Dados >  >> RDS >> Sqlserver

Implementando carga incremental usando o Change Data Capture no SQL Server


Este artigo será interessante para aqueles que muitas vezes têm que lidar com integração de dados.

Introdução


Suponha que exista um banco de dados onde os usuários sempre modificam os dados (atualizam ou removem). Talvez esse banco de dados seja usado por um grande aplicativo que não permite modificar a estrutura da tabela. A tarefa é carregar dados desse banco de dados para outro banco de dados em um servidor diferente de tempos em tempos. A maneira mais simples de resolver o problema é carregar os novos dados de um banco de dados de origem para um banco de dados de destino com a limpeza preliminar do banco de dados de destino. Você pode usar esse método desde que o tempo para carregar os dados seja aceitável e não exceda os prazos predefinidos. E se demorar vários dias para carregar os dados? Além disso, os canais de comunicação instáveis ​​levam à situação em que o carregamento de dados é interrompido e reiniciado. Se você enfrentar esses obstáculos, sugiro considerar um dos algoritmos de 'recarregamento de dados'. Isso significa que apenas as modificações de dados ocorreram desde que o carregamento mais recente foi carregado.


CDC


No SQL Server 2008, a Microsoft introduziu um mecanismo de rastreamento de dados chamado Change Data Capture (CDC). De um modo geral, o objetivo deste mecanismo é que habilitar o CDC para qualquer tabela de banco de dados criará uma tabela de sistema no mesmo banco de dados com um nome semelhante ao da tabela original (o esquema será o seguinte:'cdc' como prefixo mais o nome do esquema antigo mais ”_” e o final “_CT”. Por exemplo, a tabela original é dbo.Example, então a tabela do sistema será chamada cdc.dbo_Example_CT). Ele armazenará todos os dados que foram modificados.

Na verdade, para aprofundar o CDC, considere o exemplo. Mas primeiro, verifique se o SQL Agent que usa o CDC funciona na instância de teste do SQL Server.

Além disso, vamos considerar um script que cria um banco de dados e uma tabela de teste, preenche essa tabela com dados e habilita o CDC para essa tabela.

Para entender e simplificar a tarefa, usaremos uma instância do SQL Server sem distribuir os bancos de dados de origem e destino para servidores diferentes.
use mastergo-- crie um banco de dados de origem se não existir (selecione * de sys.databases onde nome ='db_src_cdc') crie banco de dados db_src_cdcgouse db_src_cdcgo-- habilite CDC se estiver desabilitado se não existir (selecione * de sys.databases onde nome =db_name() e is_cdc_enabled=1) exec sys.sp_cdc_enable_dbgo-- cria uma função para tabelas com CDCif não existir (selecione * de sys.sysusers onde name ='CDC_Reader' e issqlrole=1) crie uma função CDC_Readergo-- crie uma tableif object_id('dbo.Example','U') é nulo criar tabela dbo.Example ( ID int identity constraint PK_Example chave primária, Title varchar(200) not null )go-- preencher os valores tableinsert dbo.Example (Title)( 'One'),('Two'),('Three'),('Four'),('Five');go-- habilite o CDC para a tabela se não existir (selecione * de sys.tables onde is_tracked_by_cdc =1 e name ='Example') exec sys.sp_cdc_enable_table @source_schema ='dbo', @source_name ='Example', @role_name ='CDC_Reader'go-- preencha a tabela com alguns dados. Vamos alterar ou excluir algo atualizar dbo.Exampleset Title =reverse(Title)where ID in (2,3,4);delete from dbo.Example where ID in (1,2);set identity_insert dbo.Example on;insert dbo. Exemplo (ID, Título) valores(1,'Um'),(6,'Seis');set identity_insert dbo.Example off;go

Agora, vamos ver o que temos depois de executar este script nas tabelas dbo.Example e cdc.dbo_Example_CT (deve-se notar que o CDC é assíncrono. Os dados são preenchidos nas tabelas onde o controle de alterações é armazenado após um determinado período de tempo ).
selecione * de dbo.Example;
ID Title---- ---------------------- 1 One 3 eerhT 4 ruoF 5 Five 6 Six
selecione row_number() sobre ( partição por ordem de ID por __$start_lsn desc, __$seqval desc ) como __$rn, *de ​​cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Title------ ---------- - ----------- ---------------------- ------------ ---- ------------ --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Four 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x0000003A000000580004 
Considere em detalhes a estrutura da tabela na qual o controle de alterações é armazenado. Os campos __ $start_lsn e __ $seqval são LSN (número de sequência de log no banco de dados) e o número da transação dentro da transação, respectivamente. Há uma propriedade importante nestes campos, a saber, podemos ter certeza de que o registro com maior LSN será realizado posteriormente. Devido a esta propriedade, podemos facilmente obter o estado mais recente de cada registro na consulta, filtrando nossa seleção pela condição – onde __ $ rn =1.

O campo __$operation contém o código da transação:
  • 1 – o registro é excluído
  • 2 – o registro é inserido
  • 3, 4 – o registro é atualizado. Os dados antigos antes da atualização são 3, os novos dados são 4.

Além dos campos de serviço com prefixo «__$», os campos da tabela original são completamente duplicados. Essas informações são suficientes para prosseguirmos com a carga incremental.

Configurando um banco de dados para carregamento de dados


Crie uma tabela em nosso banco de dados de destino de teste, na qual os dados serão carregados, bem como uma tabela adicional para armazenar dados sobre o log de carregamento.
use mastergo-- crie um banco de dados de destino se não existir (selecione * em sys.databases onde nome ='db_dst_cdc') crie banco de dados db_dst_cdcgouse db_dst_cdcgo-- crie uma tabela se object_id('dbo.Example','U') for null create table dbo.Example ( ID int constraint PK_Example chave primária, Title varchar(200) not null )go-- cria uma tabela para armazenar a carga logif object_id('dbo.log_cdc','U') is null create table dbo .log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), restrição pk_log_cdc chave primária (table_name,dt desc) )go

Gostaria de chamar a atenção para os campos da tabela LOG_CDC:
  • TABLE_NAME armazena informações sobre qual tabela foi carregada (é possível carregar várias tabelas no futuro, de diferentes bancos de dados ou até mesmo de diferentes servidores; o formato da tabela é ‘SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME’
  • DT é um campo de data e hora de carregamento, que é opcional para o carregamento incremental. No entanto, será útil para auditar o carregamento.
  • LSN – após o carregamento de uma tabela, precisamos armazenar informações sobre o local onde iniciar o próximo carregamento, se necessário. Assim, após cada carregamento, adicionamos o último (máximo) __ $ start_lsn nesta coluna.

Algoritmo para carregamento de dados


Conforme descrito acima, usando a consulta, podemos obter o estado mais recente da tabela com a ajuda das funções da janela. Se soubermos o LSN do carregamento mais recente, na próxima vez que carregarmos, poderemos filtrar da origem todos os dados, cujas alterações são maiores que o LSN armazenado, se houver pelo menos um carregamento anterior completo:
com incr_Example as( selecione row_number() sobre ( partição por ordem de ID por __$start_lsn desc, __$seqval desc ) como __$rn, * de db_src_cdc.cdc.dbo_Example_CT onde __$operation <> 3 e __$ start_lsn> @lsn) selecione * de incr_Example

Então, podemos obter todos os registros para o carregamento completo, se o LSN de carregamento não estiver armazenado:
com incr_Example as( selecione row_number() sobre ( partição por ordem de ID por __$start_lsn desc, __$seqval desc ) como __$rn, * de db_src_cdc.cdc.dbo_Example_CT onde __$operation <> 3 e __$ start_lsn> @lsn), full_Example as(selecione * de db_src_cdc.dbo.Example onde @lsn é nulo)select ID, Title, __$operationfrom incr_Examplewhere __$rn =1union allselect ID, Title, 2 as __$operationfrom full_Example 
Assim, dependendo do valor @LSN, esta consulta mostrará todas as últimas alterações (ignorando as intermediárias) com o status Removido ou não, ou todos os dados da tabela original, adicionando o status 2 (novo registro) – este campo é usado apenas para unificar duas seleções. Com essa consulta, podemos implementar facilmente o carregamento completo ou o recarregamento usando o comando MERGE (começando com a versão SQL 2008).

Para evitar gargalos que podem criar processos alternativos e carregar dados correspondentes de tabelas diferentes (futuramente carregaremos várias tabelas e, possivelmente, pode haver relações relacionais entre elas), sugiro usar um snapshot de banco de dados no banco de dados de origem ( outro recurso do SQL 2008).

O texto completo da carga é o seguinte:

[expandir título=”Código”]
/* Algoritmo de carregamento de dados*/-- crie um instantâneo do banco de dados se existir (selecione * de sys.databases onde nome ='db_src_cdc_ss' ) solte o banco de dados db_src_cdc_ss;declare @query nvarchar(max);selecione @query =N' criar banco de dados db_src_cdc_ss em ( name =N'''+name+ ''', filename =N'''+[filename]+'.ss'' ) como instantâneo de db_src_cdc'from db_src_cdc.sys.sysfiles onde groupid =1; exec ( @query );-- lê o LSN do loaddeclare anterior @lsn binary(10) =(selecione max(lsn) de db_dst_cdc.dbo.log_cdc where table_name ='localhost.db_src_cdc.dbo.Example');-- clear uma tabela antes do carregamento completo se @lsn for nulo truncar a tabela db_dst_cdc.dbo.Example;-- carregar o processo com incr_Example as( selecione row_number() sobre ( partição por ordem de ID por __$start_lsn desc, __$seqval desc ) como __$rn , * de db_src_cdc_ss.cdc.dbo_Example_CT onde __$operation <> 3 e __$start_lsn> @lsn), full_Example as( selecione * de db_src_cdc_ss.dbo.Example onde @lsn é nulo), cte_Example as( selecione ID, Title, __$operation from incr_Example onde __$rn =1 union todos selecionam ID, Title, 2 como __$operation from full_Example)merge db_dst_cdc.dbo.Example como trg usando cte_Example como src em trg.ID=src.IDquando correspondido e __$operation =1 então delete quando correspondido e __$operation <> 1 então atualize set trg.Title =src.Title quando não for correspondido pelo destino e __$operation <> 1 então insira (ID, Title) valores (src.ID, src .Title);-- marca o fim do processo de carregamento e o último LSNinsert db_dst_cdc.dbo.log_cdc (table_name, lsn)values ​​('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))-- exclui o instantâneo do banco de dados se existir (selecione * de sys.databases onde name ='db_src_cdc_ss' ) descarte o banco de dados db_src_cdc_ss

[/expandir]