Cópia delta de um banco de dados com uma tabela de controle

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Este artigo descreve um modelo que está disponível para carregar incrementalmente linhas novas ou atualizadas de uma tabela de banco de dados para o Azure usando uma tabela de controle externo que armazena um valor de marca d'água alto.

Este modelo requer que o esquema do banco de dados de origem contenha uma coluna de carimbo de data/hora ou uma chave de incremento para identificar linhas novas ou atualizadas.

Nota

Se você tiver uma coluna de carimbo de data/hora em seu banco de dados de origem para identificar linhas novas ou atualizadas, mas não quiser criar uma tabela de controle externo para usar para cópia delta, poderá usar a ferramenta Dados de Cópia do Azure Data Factory para obter um pipeline. Essa ferramenta usa um tempo agendado por gatilho como uma variável para ler novas linhas do banco de dados de origem.

Sobre este modelo de solução

Este modelo primeiro recupera o valor da marca d'água antiga e o compara com o valor da marca d'água atual. Depois disso, ele copia apenas as alterações do banco de dados de origem, com base em uma comparação entre os dois valores de marca d'água. Finalmente, ele armazena o novo valor de marca d'água alta em uma tabela de controle externo para carregamento de dados delta na próxima vez.

O modelo contém quatro atividades:

  • A pesquisa recupera o valor antigo de marca d'água alta, que é armazenado em uma tabela de controle externa.
  • Outra atividade Pesquisa recupera o valor atual de marca d'água alta do banco de dados de origem.
  • Copiar copia apenas as alterações do banco de dados de origem para o armazenamento de destino. A consulta que identifica as alterações no banco de dados de origem é semelhante a 'SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > "last high-mark" e TIMESTAMP_Column <= "current high-watermark"'.
  • SqlServerStoredProcedure grava o valor atual de marca d'água alta em uma tabela de controle externo para cópia delta na próxima vez.

O modelo define os seguintes parâmetros:

  • Data_Source_Table_Name é a tabela no banco de dados de origem a partir da qual você deseja carregar dados.
  • Data_Source_WaterMarkColumn é o nome da coluna na tabela de origem usada para identificar linhas novas ou atualizadas. O tipo desta coluna é normalmente datetime, INT ou similar.
  • Data_Destination_Container é o caminho raiz do local para onde os dados são copiados no armazenamento de destino.
  • Data_Destination_Directory é o caminho do diretório sob a raiz do local para onde os dados são copiados no armazenamento de destino.
  • Data_Destination_Table_Name é o local para onde os dados são copiados em seu armazenamento de destino (aplicável quando "Azure Synapse Analytics" é selecionado como Destino de Dados).
  • Data_Destination_Folder_Path é o local para onde os dados são copiados em seu armazenamento de destino (aplicável quando "Sistema de arquivos" ou "Azure Data Lake Storage Gen1" é selecionado como Destino de dados).
  • Control_Table_Table_Name é a tabela de controle externo que armazena o valor de marca d'água alta.
  • Control_Table_Column_Name é a coluna na tabela de controle externo que armazena o valor de marca d'água alta.

Como usar este modelo de solução

  1. Explore a tabela de origem que você deseja carregar e defina a coluna de marca d'água alta que pode ser usada para identificar linhas novas ou atualizadas. O tipo desta coluna pode ser datetime, INT ou similar. O valor desta coluna aumenta à medida que novas linhas são adicionadas. Na tabela de origem de exemplo (data_source_table) a seguir, podemos usar a coluna LastModifytime como a coluna de marca d'água alta.

    PersonID	Name            LastModifytime
    1           aaaa            2017-09-01 00:56:00.000
    2           bbbb            2017-09-02 05:23:00.000
    3           cccc            2017-09-03 02:36:00.000
    4           dddd            2017-09-04 03:21:00.000
    5           eeee            2017-09-05 08:06:00.000
    6           fffffff         2017-09-06 02:23:00.000
    7           gggg            2017-09-07 09:01:00.000
    8           hhhh            2017-09-08 09:01:00.000
    9           iiiiiiiii       2017-09-09 09:01:00.000
    
  2. Crie uma tabela de controle no SQL Server ou no Banco de Dados SQL do Azure para armazenar o valor de marca d'água alta para carregamento de dados delta. No exemplo a seguir, o nome da tabela de controle é watermarktable. Nesta tabela, WatermarkValue é a coluna que armazena o valor de marca d'água alta e seu tipo é datetime.

    create table watermarktable
    (
    WatermarkValue datetime,
    );
    INSERT INTO watermarktable
    VALUES ('1/1/2010 12:00:00 AM')
    
  3. Crie um procedimento armazenado na mesma instância do SQL Server ou do Banco de Dados SQL do Azure que você usou para criar a tabela de controle. O procedimento armazenado é usado para gravar o novo valor de marca d'água alta na tabela de controle externo para carregamento de dados delta na próxima vez.

    CREATE PROCEDURE update_watermark @LastModifiedtime datetime
    AS
    
    BEGIN
    
        UPDATE watermarktable
        SET [WatermarkValue] = @LastModifiedtime 
    
    END
    
  4. Vá para a cópia Delta do modelo de banco de dados . Crie uma Nova conexão com o banco de dados de origem do qual você deseja copiar os dados.

    Screenshot showing the creation of a new connection to the source table.

  5. Crie uma Nova conexão com o armazenamento de dados de destino para o qual você deseja copiar os dados.

    Screenshot showing the creation of a new connection to the destination table.

  6. Crie uma Nova conexão com a tabela de controle externo e o procedimento armazenado que você criou nas etapas 2 e 3.

    Screenshot showing the creation of a new connection to the control table data store.

  7. Selecione Utilizar este modelo.

  8. Você vê o pipeline disponível, conforme mostrado no exemplo a seguir:

    Screenshot showing the pipeline.

  9. Selecione Procedimento armazenado. Em Nome do procedimento armazenado, escolha [dbo].[ update_watermark]. Selecione Importar parâmetro e, em seguida, selecione Adicionar conteúdo dinâmico.

    Screenshot showing where to set the stored procedure activity.

  10. Escreva o conteúdo @{activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue} e selecione Concluir.

    Screenshot showing where to write the content for the parameters of the stored procedure.

  11. Selecione Depurar, insira os Parâmetros e selecione Concluir.

    Screenshot showing the Debug button.

  12. Resultados semelhantes ao exemplo a seguir são exibidos:

    Sreenshot showing the result of the pipeline run.

  13. Você pode criar novas linhas na tabela de origem. Aqui está um exemplo de linguagem SQL para criar novas linhas:

    INSERT INTO data_source_table
    VALUES (10, 'newdata','9/10/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
    
  14. Para executar o pipeline novamente, selecione Depurar, insira os Parâmetros e selecione Concluir.

    Você verá que apenas novas linhas foram copiadas para o destino.

  15. (Opcional:) Se você selecionar o Azure Synapse Analytics como o destino dos dados, também deverá fornecer uma conexão com o armazenamento de Blob do Azure para preparação, o que é exigido pelo Azure Synapse Analytics Polybase. O modelo gerará um caminho de contêiner para você. Após a execução do pipeline, verifique se o contêiner foi criado no armazenamento de Blob.

    Screenshot showing where to configure Polybase.