Tutorial: Azure Data Lake Storage Gen2, Azure Databricks e Spark

Este tutorial mostra como conectar seu cluster do Azure Databricks aos dados armazenados em uma conta de Armazenamento do Azure que tenha o Azure Data Lake Storage Gen2 habilitado. Essa conexão permite que você execute nativamente consultas e análises nos dados por meio do cluster.

Neste tutorial, você irá:

  • Ingerir dados não estruturados em uma conta de armazenamento
  • Executar a análise em seus dados no Armazenamento de Blobs

Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.

Pré-requisitos

Crie um workspace do Azure Databricks, grupo e notebook

  1. Criar um workspace do Azure Databricks. Consulte Criar um workspace do Azure Databricks.

  2. Criar um cluster. Consulte Criar um cluster.

  3. Crie um notebook. Consulte Criar um notebook. Escolha Python como o idioma padrão do notebook.

Mantenha seu notebook aberto. Use-o nas seções a seguir.

Baixar os dados de voos

Este tutorial usa os dados de desempenho de pontualidade de voo de janeiro de 2016 da Agência de Estatísticas de Transporte, para demonstrar como executar uma operação de ETL. Você precisa baixar esses dados para concluir o tutorial.

  1. Baixe o arquivo On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Ele tem os dados de voo.

  2. Descompacte o conteúdo do arquivo compactado e anote o nome e o caminho do arquivo. Você precisará dessas informações em uma etapa posterior.

Caso queira saber mais sobre as informações capturadas nos dados de desempenho de relatórios de pontualidade, você poderá visualizar as descrições de campo no site da Agência de Estatísticas de Transporte.

Ingerir dados

Nesta seção, você carregará os dados de voo .csv na conta do Azure Data Lake Storage Gen2 e, em seguida, montará a conta de armazenamento no cluster do Databricks. Por fim, usará o Databricks para ler os dados de voo .csv e gravá-los de volta ao armazenamento no formato Apache parquet.

Carregue os dados de voo na conta de armazenamento

Use o AzCopy para copiar o arquivo .csv para a conta do Data Lake Storage Gen2. Use o comando azcopy make para criar um contêiner na conta de armazenamento. Em seguida, use o comando azcopy copy para copiar os dados .csv, que você acabou de baixar para um diretório nesse contêiner.

Nas etapas a seguir, você precisará inserir nomes para o contêiner que deseja criar e o diretório e o blob para o qual deseja carregar os dados de voo no contêiner. Use os nomes sugeridos em cada etapa ou especifique suas próprias convenções de nomenclatura para contêineres, diretórios e blobs.

  1. Abra uma janela de prompt de comando e insira o comando a seguir para entrar no Azure Active Directory a fim de acessar sua conta de armazenamento.

    azcopy login
    

    Siga as instruções exibidas na janela do prompt de comando para autenticar sua conta de usuário.

  2. Para criar um contêiner na conta de armazenamento a fim de armazenar os dados de voo, insira o seguinte comando:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Substitua o valor de espaço reservado <storage-account-name> pelo nome da sua conta de armazenamento.

    • Substitua o espaço reservado <container-name> por um nome para o contêiner a ser criado para armazenar os dados .csv, por exemplo, flight-data-container.

  3. Para carregar (copiar) os dados .csv em sua conta de armazenamento, insira o comando a seguir.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Substitua o valor de espaço reservado <csv-folder-path> pelo nome caminho no arquivo .csv.

    • Substitua o valor de espaço reservado <storage-account-name> pelo nome da sua conta de armazenamento.

    • Substitua o espaço reservado <container-name> pelo nome do contêiner em sua conta de armazenamento.

    • Substitua o espaço reservado <directory-name> pelo nome de um diretório para armazenar os dados no contêiner, por exemplo, jan2016.

Montar sua conta de armazenamento no cluster do Databricks

Nesta seção, você montará o armazenamento de objetos na nuvem do Azure Data Lake Storage Gen2 no DBFS (Sistema de Arquivos do Databricks). Use o princípio de serviço do Azure AD criado anteriormente para autenticação com a conta de armazenamento. Para obter mais informações, confira Montagem do armazenamento de objetos de nuvem no Azure Databricks.

  1. Anexe seu notebook ao cluster.

    1. No notebook criado anteriormente, selecione o botão Conectar no canto superior direito da barra de ferramentas do notebook. Este botão abre o seletor de computação. (Se você já conectou o notebook a um cluster, o nome desse cluster será mostrado no texto do botão em vez de Conecte-se).

    2. No menu suspenso do cluster, selecione o cluster criado anteriormente.

    3. Observe que o texto no seletor de cluster é alterado para iniciar. Aguarde a conclusão da inicialização do cluster e o nome do cluster apareça no botão antes de continuar.

  2. Copie e cole o bloco de código a seguir na primeira célula, mas não execute esse código ainda.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. Neste bloco de código:

    • Em configs, substitua os valores de espaço reservado <appId>, <clientSecret> e <tenantId>, pela ID do aplicativo, pelo segredo do cliente e pela ID do locatário copiado quando criou a entidade de serviço nos pré-requisitos.

    • No URI source, substitua os valores de espaço reservado <storage-account-name>, <container-name> e <directory-name>, pelo nome da conta de armazenamento do Azure Data Lake Storage Gen2 e pelo nome do contêiner e diretório especificados quando carregou os dados de voo na conta de armazenamento.

      Observação

      O identificador de esquema no URI, abfss, informa ao Databricks para usar o driver do Azure Blob File System com o TLS (protocolo TLS). Para saber mais sobre o URI, consulte Usar o URI do Azure Data Lake Storage Gen2.

  4. Verifique se o cluster terminou de iniciar antes de continuar.

  5. Pressione as teclas SHIFT+ENTER para executar o código nesse bloco.

O contêiner e o diretório em que carregou os dados de voo na conta de armazenamento, agora está acessível no notebook por meio do ponto de montagem, /mnt/flightdata.

Use o Databricks Notebook para converter CSV em Parquet

Agora que os dados de voo .csv estão acessíveis por meio de um ponto de montagem do DBFS, use um DataFrame do Apache Spark para carregá-los no workspace e gravá-los novamente no formato Apache parquet no armazenamento de objetos do Azure Data Lake Storage Gen2.

  • Um DataFrame do Spark é uma estrutura de dados rotulada bidimensional com colunas de tipos potencialmente diferentes. Use um DataFrame para ler e gravar dados facilmente em vários formatos com suporte. Com um DataFrame, é possível carregar dados do armazenamento de objetos na nuvem e executar análises e transformações neles dentro do cluster de computação, sem afetar os dados subjacentes no armazenamento de objetos na nuvem. Para saber mais, confira Trabalhar com o DataFrames do PySpark no Azure Databricks.

  • O Apache parquet é um formato de arquivo em coluna com otimizações para acelerar as consultas. É um formato de arquivo mais eficiente do que CSV ou JSON. Para saber mais, confira Arquivos Parquet.

No notebook, adicione uma nova célula e cole o código a seguir nele.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Pressione as teclas SHIFT+ENTER para executar o código nesse bloco.

Antes de prosseguir para a próxima seção, verifique se todos os dados parquet foram gravados e que "Concluído" aparece na saída.

Explorar dados

Nesta seção, você usará o utilitário do sistema de arquivos do Databricks para explorar o armazenamento de objetos do Azure Data Lake Storage Gen2 usando o ponto de montagem do DBFS criado na seção anterior.

Em uma nova célula, cole o código a seguir para obter uma lista dos arquivos no ponto de montagem. O primeiro comando gera uma lista de arquivos e diretórios. O segundo comando exibe a saída no formato tabular para facilitar a leitura.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Pressione as teclas SHIFT+ENTER para executar o código nesse bloco.

Observe que o diretório parquet aparece na listagem. Você salvou os dados de voo .csv no formato parquet no diretório parquet/flights na seção anterior. Para listar arquivos no diretório parquet/voos, cole o seguinte código em uma nova célula e execute-o:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Para criar um novo arquivo e listá-lo, cole o seguinte código em uma nova célula e execute-o:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Como arquivo 1.txt não é necessário nesse tutorial, é possível colar o código a seguir em uma célula e executá-lo para excluir o mydirectory de forma recursiva. O parâmetro True indica uma exclusão recursiva.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Como conveniência, use o comando de ajuda para aprender detalhes sobre outros comandos.

dbutils.fs.help("rm")

Com esses exemplos de código, você explorou a natureza hierárquica do HDFS usando dados armazenados em uma conta de armazenamento com o Azure Data Lake Storage Gen2 habilitado.

Consultar os dados

Em seguida, você pode começar a consultar os dados carregados na sua conta de armazenamento. Insira cada um dos blocos de código a seguir em uma nova célula e pressione SHIFT + ENTER para executar o script de Python.

Os DataFrames contém um amplo conjunto de funções (selecionar colunas, filtrar, unir, agregar) para resolução de problemas comuns de análise de dados com eficiência.

Para carregar um DataFrame a partir dos dados de voo do parquet salvos anteriormente e explorar algumas das funcionalidades com suporte, insira esse script em uma nova célula e execute-o.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Insira esse script em uma nova célula para executar algumas consultas básicas de análise nos dados. É possível optar por executar o script inteiro (SHIFT + ENTER), realçar cada consulta e executá-la separadamente com CTRL + SHIFT + ENTER, ou inserir cada consulta em uma célula separada e executá-la lá.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Resumo

Neste tutorial, você:

  • Foram criados recursos do Azure, incluindo uma conta de armazenamento do Azure Data Lake Storage Gen2 e uma entidade de serviço do Azure AD e permissões atribuídas para acessar a conta de armazenamento.

  • Criou um workspace, notebook e cluster de computação do Azure Databricks.

  • Usou o AzCopy para carregar os dados de voo .csv não estruturado, para a conta de armazenamento do Azure Data Lake Storage Gen2.

  • Usou funções do utilitário do Sistema de Arquivos do Databricks para montar sua conta de armazenamento do Azure Data Lake Storage Gen2 e explorar o sistema de arquivos hierárquico.

  • Usou os DataFrames do Apache Spark para transformar os dados de voo .csv, no formato do Apache parquet e armazená-los de volta à conta de armazenamento do Azure Data Lake Storage Gen2.

  • Usou o DataFrames para os dados de voo e executar uma consulta simples.

  • Usou o Apache Spark do SQL para consultar os dados de voo para o número total de voos de cada companhia aérea em janeiro de 2016, os aeroportos do Texas, as companhias aéreas que voam do Texas, o atraso médio à chegada em minutos para cada companhia aérea nacionalmente e a porcentagem de voos de cada companhia aérea que atrasaram partidas ou chegadas.

Limpar os recursos

Caso queira preservar o notebook e voltar para ele mais tarde, é uma boa ideia desligar (encerrar) o cluster para evitar encargos. Para encerrar o cluster, selecione-o no seletor de computação localizado no canto superior direito da barra de ferramentas do notebook, selecione Encerrar no menu e confirme sua seleção. (Por padrão, o cluster será encerrado automaticamente após 120 minutos de inatividade.)

Caso queira excluir recursos individuais do workspace, como notebooks e clusters, poderá fazer isso na barra lateral esquerda do workspace. Para obter instruções detalhadas, consulte Excluir um cluster ou Excluir um notebook.

Quando não forem mais necessários, exclua o grupo de recursos e todos os recursos relacionados. Para fazer isso no portal do Azure, selecione o grupo de recursos da conta de armazenamento e o workspace e selecione Excluir.

Próximas etapas