Tutorial: Extrair, transformar e carregar dados através do Azure Databricks

Neste tutorial, executa uma operação ETL (extrato, transformação e dados de carga) utilizando Azure Databricks. Extrai dados do Azure Data Lake Storage Gen2 em Azure Databricks, faz transformações nos dados em Azure Databricks e carrega os dados transformados em Azure Synapse Analytics.

Os passos neste tutorial utilizam o conector Azure Synapse para a Azure Databricks transferir dados para a Azure Databricks. Este conector, por sua vez, utiliza o Azure Blob Storage como armazenamento temporário para os dados que estão a ser transferidos entre um cluster Azure Databricks e Azure Synapse.

A ilustração seguinte mostra o fluxo da aplicação:

Azure Databricks com Data Lake Store e Azure Synapse

Este tutorial abrange as seguintes tarefas:

  • Crie um serviço Azure Databricks.
  • Crie um cluster Spark em Azure Databricks.
  • Crie um sistema de ficheiros na conta Desasissi, De armazenamento de Dados Gen2.
  • Faça upload de dados da amostra para a conta Azure Data Lake Storage Gen2.
  • Criar um diretor de serviço.
  • Extrair dados da conta Azure Data Lake Storage Gen2.
  • Transforme os dados em Azure Databricks.
  • Carregue os dados em Azure Synapse.

Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.

Nota

Este tutorial não pode ser realizado utilizando a assinatura de ensaio livre do Azure. Se tiver uma conta gratuita, vá ao seu perfil e altere a sua subscrição para pay-as-you-go. Para obter mais informações, veja Conta gratuita do Azure. Em seguida, retire o limite de gastose solicite um aumento de quota para vCPUs na sua região. Quando criar o seu espaço de trabalho Azure Databricks, pode selecionar o nível de preços Do Trial (Premium - 14 Dias Free DBUs) para dar ao espaço de trabalho acesso a DBUs premium Azure Databricks premium durante 14 dias.

Pré-requisitos

Complete estas tarefas antes de iniciar este tutorial:

Reúna a informação de que precisa

Certifique-se de que completa os pré-requisitos deste tutorial.

Antes de começar, deve ter estes itens de informação:

✔️ O nome da base de dados, o nome do servidor da base de dados, o nome do utilizador e a palavra-passe do seu Azure Synapse.

✔️ A chave de acesso da sua conta de armazenamento de bolhas.

✔️ O nome da sua conta de armazenamento de Data Lake Gen2.

✔️ A identificação do inquilino da sua assinatura.

✔️ O ID da aplicação que registou no Azure Ative Directory (Azure AD).

✔️ A chave de autenticação da aplicação que registou no Azure AD.

Criar um serviço Azure Databricks

Nesta secção, cria-se um serviço Azure Databricks utilizando o portal Azure.

  1. No menu do portal do Azure, selecione Criar um recurso.

    Criar um recurso no portal Azure

    Em seguida, selecione Analytics > Azure Databricks.

    Criar databricks Azure no portal Azure

  2. No serviço Azure Databricks, forneça os seguintes valores para criar um serviço Databricks:

    Propriedade Descrição
    Nome da área de trabalho Indique um nome para a sua área de trabalho do Databricks.
    Subscrição Na lista pendente, selecione a sua subscrição do Azure.
    Grupo de recursos Especifique se quer criar um novo grupo de recursos ou utilizar um existente. Um grupo de recursos é um contentor que detém recursos relacionados para uma solução do Azure. Para obter mais informações, veja Descrição geral do Grupo de Recursos do Azure.
    Localização Selecione E.U.A. Oeste 2. Para outras regiões disponíveis, veja Serviços do Azure disponíveis por região.
    Escalão de Preço Selecione Standard.
  3. A criação da conta demora alguns minutos. Para monitorizar o estado de funcionamento, veja a barra de progresso no topo.

  4. Selecione Afixar ao dashboard e, em seguida, selecione Criar.

Criar um cluster do Spark no Azure Databricks

  1. No portal Azure, aceda ao serviço Databricks que criou e selecione Launch Workspace.

  2. Está redirecionado para o portal Azure Databricks. No portal, selecione Cluster.

    Databricks em Azure

  3. Na página Novo cluster, indique os valores para criar um cluster.

    Criar conjunto de faíscas de dados no Azure

  4. Preencha os valores para os campos seguintes e aceite os valores predefinidos para os outros campos:

    • Introduza um nome para o cluster.

    • Certifique-se de que seleciona a Caixa de Verificação De Terminação após minutos de caixa _ _ de verificação de inatividade. Se o cluster não estiver a ser utilizado, forneça uma duração (em minutos) para terminar o cluster.

    • Selecione Criar cluster. Depois do cluster estar em funcionamento, pode anexar cadernos ao cluster e executar trabalhos de Faísca.

Criar um sistema de ficheiros na conta Azure Data Lake Storage Gen2

Nesta secção, você cria um caderno no espaço de trabalho Azure Databricks e, em seguida, executar código snippets para configurar a conta de armazenamento

  1. No portal Azure,vá ao serviço Azure Databricks que criou e selecione Launch Workspace.

  2. À esquerda, selecione Workspace. No menu pendente Área de Trabalho, selecione Criar > Bloco de Notas.

    Criar um caderno em Databricks

  3. Na caixa de diálogo Criar Bloco de Notas, introduza um nome para o bloco de notas. Selecione Scala como a linguagem e selecione o cluster do Spark que criou anteriormente.

    Forneça detalhes para um caderno em Databricks

  4. Selecione Criar.

  5. O bloco de código que se segue define as credenciais principais do serviço predefinido para qualquer conta ADLS Gen 2 acedida na sessão Spark. O segundo bloco de código anexa o nome da conta à definição para especificar credenciais para uma conta específica da ADLS Gen 2. Copie e cole qualquer um dos blocos de código na primeira célula do seu caderno Azure Databricks.

    Configuração de sessão

    val appID = "<appID>"
    val secret = "<secret>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    

    Account configuration (Configuração da conta)

    val storageAccountName = "<storage-account-name>"
    val appID = "<app-id>"
    val secret = "<secret>"
    val fileSystemName = "<file-system-name>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    
  6. Neste bloco de códigos, substitua os valores , e espaços neste bloco de <app-id> <secret> <tenant-id> <storage-account-name> códigos pelos valores que recolheu ao completar os pré-requisitos deste tutorial. Substitua o <file-system-name> valor do espaço reservado por qualquer nome que pretenda dar ao sistema de ficheiros.

    • A <app-id> , e são da app que se <secret> registou com diretório ativo como parte da criação de um principal serviço.

    • É <tenant-id> da sua assinatura.

    • <storage-account-name>É o nome da sua conta de armazenamento Azure Data Lake Storage Gen2.

  7. Prima as teclas SHIFT + ENTER para executar o código neste bloco.

Ingerir dados da amostra na conta Azure Data Lake Storage Gen2

Antes de começar esta secção, tem de satisfazer os seguintes pré-requisitos:

Introduza o seguinte código numa célula do bloco de notas:

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

Na célula, prima SHIFT + ENTER para executar o código.

Agora, numa nova célula abaixo desta, introduza o seguinte código e substitua os valores que aparecem nos parênteses pelos mesmos valores utilizados anteriormente:

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

Na célula, prima SHIFT + ENTER para executar o código.

Extrair dados da conta Azure Data Lake Storage Gen2

  1. Pode agora carregar o ficheiro json da amostra como uma estrutura de dados em Azure Databricks. Cole o seguinte código numa nova célula. Substitua os espaços reservados indicados nos suportes pelos seus valores.

    val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
    
  2. Prima as teclas SHIFT + ENTER para executar o código neste bloco.

  3. Executar o seguinte código para ver o conteúdo do quadro de dados:

    df.show()
    

    Verá um resultado semelhante ao seguinte fragmento:

    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    |               artist|     auth|firstName|gender|itemInSession|  lastName|   length|  level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    | El Arrebato         |Logged In| Annalyse|     F|            2|Montgomery|234.57914| free  |  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
    | Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid  |       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|    11|
    | Gorillaz            |Logged In|     Liam|     M|           11|     Watts|246.17751| paid  |New York-Newark-J...|   PUT|NextSong|1406279422332|     2047|                DARE|   200|1409318685332|   201|
    ...
    ...
    

    Os dados foram agora extraídos do Armazenamento do Azure Data Lake Ger2 para o Azure Databricks.

Transformar dados no Azure Databricks

Os dados da amostra bruta small_radio_json.jsem ficheiro captura o público para uma estação de rádio e tem uma variedade de colunas. Nesta secção, transforma-se os dados apenas para obter colunas específicas a partir do conjunto de dados.

  1. Em primeiro lugar, recupere apenas as colunas primeiroName, último Nome, sexo, localização, e nivelar a partir do dataframe que criou.

    val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
    specificColumnsDf.show()
    

    Recebe a saída como mostrado no seguinte corte:

    +---------+----------+------+--------------------+-----+
    |firstname|  lastname|gender|            location|level|
    +---------+----------+------+--------------------+-----+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    +---------+----------+------+--------------------+-----+
    
  2. Pode transformar mais os dados para mudar o nome da coluna level para subscription_type.

    val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
    renamedColumnsDF.show()
    

    Recebe a saída como mostrado no seguinte corte.

    +---------+----------+------+--------------------+-----------------+
    |firstname|  lastname|gender|            location|subscription_type|
    +---------+----------+------+--------------------+-----------------+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    +---------+----------+------+--------------------+-----------------+
    

Carregar dados em Azure Synapse

Nesta secção, você envia os dados transformados em Azure Synapse. Utilize o conector Azure Synapse para Azure Databricks para carregar diretamente um dataframe como tabela num pool de Faíscas synapse.

Como mencionado anteriormente, o conector Azure Synapse utiliza o armazenamento Azure Blob como armazenamento temporário para carregar dados entre Azure Databricks e Azure Synapse. Assim, comece por indicar a configuração para ligar à conta de armazenamento. Já deve ter criado a conta como parte dos pré-requisitos para este artigo.

  1. Indique a configuração para aceder à conta de Armazenamento do Azure a partir do Azure Databricks.

    val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
    val blobContainer = "<blob-container-name>"
    val blobAccessKey =  "<access-key>"
    
  2. Especifique uma pasta temporária para utilizar enquanto move dados entre Azure Databricks e Azure Synapse.

    val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
    
  3. Execute o seguinte fragmento para armazenar as chaves de acesso do armazenamento de Blobs do Azure na configuração. Esta ação garante que não é preciso manter a chave de acesso no caderno em texto simples.

    val acntInfo = "fs.azure.account.key."+ blobStorage
    sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
    
  4. Forneça os valores para ligar à instância Azure Synapse. Deve ter criado um serviço Azure Synapse Analytics como pré-requisito. Utilize o nome de servidor totalmente qualificado para dwServer. Por exemplo, <servername>.database.windows.net.

    //Azure Synapse related settings
    val dwDatabase = "<database-name>"
    val dwServer = "<database-server-name>"
    val dwUser = "<user-name>"
    val dwPass = "<password>"
    val dwJdbcPort =  "1433"
    val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
    val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
    
  5. Executar o seguinte corte para carregar o dataframe transformado, renomeadoColumnsDF, como uma tabela em Azure Synapse. Este fragmento cria uma tabela chamada SampleTable na base de dados SQL.

    spark.conf.set(
        "spark.sql.parquet.writeLegacyFormat",
        "true")
    
    renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
    

    Nota

    Esta amostra utiliza a forward_spark_azure_storage_credentials bandeira, o que faz com que a Azure Synapse aceda aos dados do armazenamento de bolhas utilizando uma Chave de Acesso. Este é o único método suportado de autenticação.

    Se o seu Azure Blob Storage estiver restrito a selecionar redes virtuais, o Azure Synapse requer identidade de serviço gerida em vez de Teclas de acesso. Isto causará o erro "Este pedido não está autorizado a executar esta operação."

  6. Ligue-se à base de dados SQL e verifique se vê uma base de dados chamada SampleTable.

    Verifique a tabela de amostras

  7. Execute uma consulta select para verificar os conteúdos da tabela. A tabela deve ter os mesmos dados que o renomeado Quadro de Dados DaColumnsDF.

    Verifique o conteúdo da tabela de amostras

Limpar os recursos

Depois de terminar o tutorial, pode terminar o agrupamento. A partir do espaço de trabalho Azure Databricks, selecione Clusters à esquerda. Para que o cluster termine, em Ações, aponte para a elipse (...) e selecione o ícone Terminate.

Pare um cluster Databricks

Se não encerrar manualmente o cluster, para automaticamente, desde que tenha selecionado a caixa de verificação Terminate after _ _ minutes of inactivity quando criou o cluster. Neste caso, o cluster para automaticamente se estiver inativo durante o tempo especificado.

Passos seguintes

Neste tutorial, ficou a saber como:

  • Criar um serviço Azure Databricks
  • Criar um cluster do Spark no Azure Databricks
  • Criar um bloco de notas no Azure Databricks
  • Extrair dados de uma conta Gen2 de armazenamento de data lake
  • Transformar dados no Azure Databricks
  • Carregar dados em Azure Synapse

Avance para o próximo tutorial para saber como transmitir dados em tempo real em fluxo para o Azure Databricks mediante a utilização dos Hubs de Eventos do Azure.