Exercício – Criar e implementar uma dimensão de alteração lenta do Tipo 1 com fluxos de dados de mapeamento

Concluído

Neste exercício, você cria um fluxo de dados para uma SCD do Tipo 1 usando o pool de SQL dedicado do Azure Synapse como origem e destino. Esse fluxo de dados pode, então, ser adicionado a um pipeline do Synapse e executado como parte do processo de ETL (extração, transformação e carregamento).

Configurar a origem e a tabela de dimensões

Para este exercício, carregue uma tabela de dimensões no Azure Synapse de dados de origem que podem ser de vários tipos de sistema, como o SQL do Azure, o Armazenamento do Azure etc. Mantenha a simplicidade criando os dados de origem em seu banco de dados do Azure Synapse.

  1. No Synapse Studio, navegue até o hub Dados.

    Data hub.

  2. Selecione a guia Workspace(1), expanda Bancos de dados e clique com o botão direito do mouse em SQLPool01 (2). Selecione Novo script SQL (3) e escolha Script vazio (4).

    The data hub is displayed with the context menus to create a new SQL script.

  3. Cole o seguinte script na janela de script vazio e selecione Executar ou pressione F5 para executar a consulta:

    CREATE TABLE [dbo].[CustomerSource] (
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8),
        [FirstName] [nvarchar](50),
        [MiddleName] [nvarchar](50),
        [LastName] [nvarchar](50),
        [Suffix] [nvarchar](10),
        [CompanyName] [nvarchar](128),
        [SalesPerson] [nvarchar](256),
        [EmailAddress] [nvarchar](50),
        [Phone] [nvarchar](25)
    ) WITH ( HEAP )
    
    COPY INTO [dbo].[CustomerSource]
    FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv'
    WITH (
        FILE_TYPE='CSV',
        FIELDTERMINATOR='|',
        FIELDQUOTE='',
        ROWTERMINATOR='0x0a',
        ENCODING = 'UTF16'
    )
    
    CREATE TABLE dbo.[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8) NULL,
        [FirstName] [nvarchar](50) NOT NULL,
        [MiddleName] [nvarchar](50) NULL,
        [LastName] [nvarchar](50) NOT NULL,
        [Suffix] [nvarchar](10) NULL,
        [CompanyName] [nvarchar](128) NULL,
        [SalesPerson] [nvarchar](256) NULL,
        [EmailAddress] [nvarchar](50) NULL,
        [Phone] [nvarchar](25) NULL,
        [InsertedDate] [datetime] NOT NULL,
        [ModifiedDate] [datetime] NOT NULL,
        [HashKey] [char](64)
    )
    WITH
    (
        DISTRIBUTION = REPLICATE,
        CLUSTERED COLUMNSTORE INDEX
    )
    

    The script and Run button are both highlighted.

Criar um fluxo de dados de mapeamento

Fluxos de dados de mapeamento são atividades de pipeline que fornecem uma forma visual de especificar como transformar dados por meio de uma experiência sem código. Em seguida, você criará um fluxo de dados de mapeamento para criar um SCD tipo 1.

  1. Navegue até o hub Desenvolver.

    Develop hub.

  2. Selecione + e escolha +.

    The plus button and data flow menu item are highlighted.

  3. No painel de propriedades do novo fluxo de dados, insira UpdateCustomerDimension no campo UpdateCustomerDimension(1) e selecione o botão Propriedades(2) para ocultar o painel de propriedades.

    The data flow properties pane is displayed.

  4. Selecione Adicionar origem na tela.

    The Add Source button is highlighted on the data flow canvas.

  5. Em Source settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira SourceDB
    • Tipo de origem: selecione Dataset
    • Opções: marque Allow schema drift e deixe as outras opções desmarcadas
    • Amostragem: selecione Disable
    • Conjunto de dados: selecione + Novo para criar um conjunto de dados

    The New button is highlighted next to Dataset.

  6. Na caixa de diálogo do novo conjunto de dados da integração, selecione Azure Synapse Analytics e selecione Continuar.

    Azure SQL Database and the Continue button are highlighted.

  7. Nas propriedades do conjunto de dados, configure o seguinte:

    • Nome: insira CustomerSource
    • Serviço vinculado: selecione o serviço vinculado do workspace do Synapse
    • Nome da tabela: selecione o botão Atualizar ao lado do menu suspenso

    The form is configured as described and the refresh button is highlighted.

  8. No campo Valor, insira o nome do pool de SQL e selecione OK.

    The SQLPool01 parameter is highlighted.

  9. Selecione dbo.CustomerSource em dbo.CustomerSource, selecione From connection/store em From connection/store e clique em OK para criar o conjunto de dados.

    The form is completed as described.

  10. Selecione Abrir ao lado do conjunto de dados CustomerSource que você adicionou.

    The open button is highlighted next to the new dataset.

  11. Insira o nome de seu pool de SQL no campo Valor ao lado de DBName.

  12. No editor de fluxo de dados, selecione a caixa Adicionar origem abaixo da atividade SourceDB. Configure essa origem como a tabela DimCustomer seguindo as mesmas etapas usadas para CustomerSource.

    • Nome do fluxo de saída: insira DimCustomer
    • Tipo de origem: selecione Dataset
    • Opções: marque Allow schema drift e deixe as outras opções desmarcadas
    • Amostragem: selecione Disable
    • Conjunto de dados: selecione + Novo para criar um conjunto de dados. Use o serviço vinculado do Azure Synapse e escolha a tabela DimCustomer. Defina o DBName como o nome do pool de SQL.

    The Add Source, Output stream name, and Dataset name are highlighted in the Source settings.

Adicionar transformações ao fluxo de dados

  1. Selecione + à direita da origem SourceDB na tela e selecione +.

    The plus button and derived column menu item are highlighted.

  2. Em Derived column's settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira CreateCustomerHash
    • Fluxo de entrada: selecione SourceDB
    • Colunas: insira o seguinte:
    Coluna Expression Descrição
    Digite HashKey sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,'')) Cria um hash SHA256 dos valores da tabela. É usada para detectar alterações de linha comparando o hash dos registros de entrada com o valor de hash dos registros de destino, correspondendo ao valor CustomerID. A função iifNull substitui valores nulos por cadeias de caracteres vazias. Caso contrário, os valores de hash tendem a ser duplicados quando entradas nulas estão presentes.

    The Derived column's settings form is configured as described.

  3. Selecione + à direita da coluna derivada CreateCustomerHash na tela e selecione +.

    The plus button and exists menu item are both highlighted.

  4. Em Exists settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira Exists
    • Fluxo à esquerda: selecione CreateCustomerHash
    • Fluxo à direita: selecione SynapseDimCustomer
    • Tipo de existe: selecione Doesn't exist
    • Condições de existe: defina o seguinte para Esquerda e Direita:
    Esquerda: coluna de CreateCustomerHash Direita: coluna de SynapseDimCustomer
    HashKey HashKey

    The Exists settings form is configured as described.

  5. Selecione + à direita de Exists na tela e selecione +.

    The plus button and lookup menu item are both highlighted.

  6. Em Lookup settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira LookupCustomerID
    • Fluxo primário: selecione Exists
    • Fluxo de pesquisa: selecione SynapseDimCustomer
    • Corresponder várias linhas: desmarcado
    • Corresponder em: selecione Any row
    • Condições de pesquisa: defina o seguinte para Esquerda e Direita:
    Esquerda: coluna de Existe Direita: coluna de SynapseDimCustomer
    CustomerID CustomerID

    The Lookup settings form is configured as described.

  7. Selecione + à direita de LookupCustomerID na tela e selecione +.

    The plus button and derived column menu item are both highlighted.

  8. Em Derived column's settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira SetDates
    • Fluxo de entrada: selecione LookupCustomerID
    • Colunas: insira o seguinte:
    Coluna Expression Descrição
    Selecione InsertedDate iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate}) Se o valor de InsertedDate for nulo, insira o carimbo de data/hora atual. Caso contrário, use o valor InsertedDate.
    Selecione ModifiedDate currentTimestamp() Sempre atualize o valor de ModifiedDate com o carimbo de data/hora atual.

    Another Derived column's settings form is configured as described.

    Observação

    Para inserir a segunda coluna, selecione + Adicionar acima da lista Colunas e escolha Adicionar coluna.

  9. Selecione + à direita da etapa da coluna derivada SetDates na tela e selecione +.

    The plus button and alter row menu item are both highlighted.

  10. Em Alter row settings, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira AllowUpserts
    • Fluxo de entrada: selecione SetDates
    • Condições de alterar linha: insira o seguinte:
    Condição Expression Descrição
    Selecione Upsert if true() Defina a condição como true() na condição Upsert if para permitir upserts. Isso garante que todos os dados que passam pelas etapas do fluxo de dados de mapeamento sejam inseridos ou atualizados no coletor.

    The alter row settings form is configured as described.

  11. Selecione + à direita da etapa de alterar linha AllowUpserts na tela e selecione +.

    The plus button and sink menu item are both highlighted.

  12. Em Sink, configure as seguintes propriedades:

    • Nome do fluxo de saída: insira Sink
    • Fluxo de entrada: selecione AllowUpserts
    • Tipo de coletor: selecione Dataset
    • Conjunto de dados: selecione DimCustomer
    • Opções: marque Allow schema drift e desmarque Validate schema

    The sink properties form is configured as described.

  13. Selecione a guia Configurações e configure as seguintes propriedades:

    • Método de atualização: marque Allow upsert e desmarque todas as outras opções
    • Colunas de chave: selecione List of columns e escolha CustomerID na lista
    • Ação da tabela: selecione None
    • Habilitar preparo: desmarcado

    The sink settings are configured as described.

  14. Selecione a guia Mapeamento e desmarque Mapeamento automático. Configure o mapeamento das colunas de entrada conforme descrito abaixo:

    Colunas de entrada Colunas de saída
    SourceDB@CustomerID CustomerID
    SourceDB@Title Title
    SourceDB@FirstName FirstName
    SourceDB@MiddleName MiddleName
    SourceDB@LastName LastName
    SourceDB@Suffix Suffix
    SourceDB@CompanyName CompanyName
    SourceDB@SalesPerson SalesPerson
    SourceDB@EmailAddress EmailAddress
    SourceDB@Phone Phone
    InsertedDate InsertedDate
    ModifiedDate ModifiedDate
    CreateCustomerHash@HashKey HashKey

    Mapping settings are configured as described.

  15. O fluxo de mapeamento completo deverá ter a aparência a seguir. Selecione Publicar tudo para salvar as alterações.

    The completed data flow is displayed and Publish all is highlighted.

  16. Selecione Publicar.

    The publish button is highlighted.

Como testar o fluxo de dados

Você concluiu um fluxo de dados de SCD do Tipo 1. Se optar por testá-lo, você poderá adicionar esse fluxo de dados a um pipeline de integração do Synapse. Em seguida, você pode executar o pipeline uma vez para fazer a carga inicial dos dados de origem do cliente para o destino DimCustomer.

Cada execução adicional do pipeline vai comparar os dados na tabela de origem com o que já está na tabela de dimensões (usando o HashKey) e apenas atualizar os registros que foram alterados. Para testar isso, você pode atualizar um registro na tabela de origem, executar o pipeline novamente e verificar as atualizações do registro na tabela de dimensões.

Vamos usar a cliente Janet Gates como exemplo. A carga inicial mostra que LastName é Gates e CustomerId é 4.

The script is displayed with the initial customer record.

Este é um exemplo de instrução que atualizaria o sobrenome do cliente na tabela de origem.

UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4

Depois de atualizar o registro e executar o pipeline novamente, DimCustomer mostraria esses dados atualizados.

The script is displayed with the updated customer record.

O registro do cliente atualizou com êxito o valor de LastName para corresponder ao registro de origem e atualizou o ModifiedDate, sem acompanhar o valor antigo de LastName. Esse é o comportamento esperado para uma SCD do Tipo 1. Se o histórico fosse necessário para o campo LastName, você modificaria a tabela e o fluxo de dados para um dos outros tipos de SCD descritos.