您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

教程:使用 Spark 通过 Azure Databricks 访问 Data Lake Storage Gen2 数据Tutorial: Access Data Lake Storage Gen2 data with Azure Databricks using Spark

本教程介绍如何将 Azure Databricks 群集连接到启用了 Azure Data Lake Storage Gen2 的 Azure 存储帐户中存储的数据。This tutorial shows you how to connect your Azure Databricks cluster to data stored in an Azure storage account that has Azure Data Lake Storage Gen2 enabled. 建立此连接后,即可在群集本机上针对数据运行查询和分析。This connection enables you to natively run queries and analytics from your cluster on your data.

在本教程中,将:In this tutorial, you will:

  • 创建 Databricks 群集Create a Databricks cluster
  • 将非结构化数据引入存储帐户中Ingest unstructured data into a storage account
  • 对 Blob 存储中的数据运行分析Run analytics on your data in Blob storage

如果还没有 Azure 订阅,可以在开始前创建一个免费帐户If you don’t have an Azure subscription, create a free account before you begin.

先决条件Prerequisites

下载航班数据Download the flight data

本教程使用美国运输统计局的航班数据,演示如何执行 ETL 操作。This tutorial uses flight data from the Bureau of Transportation Statistics to demonstrate how to perform an ETL operation. 必须下载该数据才能完成本教程。You must download this data to complete the tutorial.

  1. 转到美国运输统计局的研究与技术创新管理部门Go to Research and Innovative Technology Administration, Bureau of Transportation Statistics.

  2. 选中“预压缩的文件”复选框以选择所有数据字段。 Select the Prezipped File check box to select all data fields.

  3. 选择“下载”按钮并将结果保存到计算机。 Select the Download button and save the results to your computer.

  4. 将压缩文件的内容解压缩,并记下文件名和文件路径。Unzip the contents of the zipped file and make a note of the file name and the path of the file. 在稍后的步骤中需要使用此信息。You need this information in a later step.

创建 Azure Databricks 服务Create an Azure Databricks service

在本部分中,你将使用 Azure 门户创建 Azure Databricks 服务。In this section, you create an Azure Databricks service by using the Azure portal.

  1. 在 Azure 门户中,选择“创建资源” > “分析” > “Azure Databricks”。In the Azure portal, select Create a resource > Analytics > Azure Databricks.

    Azure 门户上的 DatabricksDatabricks on Azure portal

  2. 在“Azure Databricks 服务” 下,提供以下值来创建 Databricks 服务:Under Azure Databricks Service, provide the following values to create a Databricks service:

    属性Property 说明Description
    工作区名称Workspace name 为 Databricks 工作区提供一个名称。Provide a name for your Databricks workspace.
    订阅Subscription 从下拉列表中选择自己的 Azure 订阅。From the drop-down, select your Azure subscription.
    资源组Resource group 指定是要创建新的资源组还是使用现有的资源组。Specify whether you want to create a new resource group or use an existing one. 资源组是用于保存 Azure 解决方案相关资源的容器。A resource group is a container that holds related resources for an Azure solution. 有关详细信息,请参阅 Azure 资源组概述For more information, see Azure Resource Group overview.
    位置Location 选择“美国西部 2” 。Select West US 2. 有关其他可用区域,请参阅各区域推出的 Azure 服务For other available regions, see Azure services available by region.
    定价层Pricing Tier 选择“标准” 。Select Standard.

    创建 Azure Databricks 工作区Create an Azure Databricks workspace

  3. 创建帐户需要几分钟时间。The account creation takes a few minutes. 若要监视操作状态,请查看顶部的进度栏。To monitor the operation status, view the progress bar at the top.

  4. 选择“固定到仪表板” ,然后选择“创建” 。Select Pin to dashboard and then select Create.

在 Azure Databricks 中创建 Spark 群集Create a Spark cluster in Azure Databricks

  1. 在 Azure 门户中,转到所创建的 Databricks 服务,然后选择“启动工作区”。 In the Azure portal, go to the Databricks service that you created, and select Launch Workspace.

  2. 系统随后会将你重定向到 Azure Databricks 门户。You're redirected to the Azure Databricks portal. 在门户中选择“群集”。 From the portal, select Cluster.

    Azure 上的 DatabricksDatabricks on Azure

  3. 在“新建群集”页中,提供用于创建群集的值。 In the New cluster page, provide the values to create a cluster.

    在 Azure 上创建 Databricks Spark 群集Create Databricks Spark cluster on Azure

  4. 填写以下字段的值,对于其他字段接受默认值:Fill in values for the following fields, and accept the default values for the other fields:

    • 输入群集的名称。Enter a name for the cluster.

    • 在本文中,请创建运行时为 5.1 的群集。For this article, create a cluster with the 5.1 runtime.

    • 请务必选中“在不活动超过 __ 分钟后终止” 复选框。Make sure you select the Terminate after __ minutes of inactivity check box. 如果未使用群集,则请提供一个持续时间(以分钟为单位),超过该时间后群集会被终止。If the cluster isn't being used, provide a duration (in minutes) to terminate the cluster.

    • 选择“创建群集”。 Select Create cluster. 群集运行后,可将笔记本附加到该群集,并运行 Spark 作业。After the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

引入数据Ingest data

将源数据复制到存储帐户中Copy source data into the storage account

使用 AzCopy 将 .csv 文件中的数据复制到 Data Lake Storage Gen2 帐户。Use AzCopy to copy data from your .csv file into your Data Lake Storage Gen2 account.

  1. 打开命令提示符窗口,输入以下命令登录到存储帐户。Open a command prompt window, and enter the following command to log into your storage account.

    azcopy login
    

    按照命令提示符窗口中的说明对用户帐户进行身份验证。Follow the instructions that appear in the command prompt window to authenticate your user account.

  2. 若要复制 .csv 帐户中的数据,请输入以下命令。To copy data from the .csv account, enter the following command.

    azcopy cp "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<file-system-name>/folder1/On_Time.csv
    
    • <csv-folder-path> 占位符值替换为 .csv 文件的路径 。Replace the <csv-folder-path> placeholder value with the path to the .csv file.

    • <storage-account-name> 占位符值替换为存储帐户的名称。Replace the <storage-account-name> placeholder value with the name of your storage account.

    • <file-system-name> 占位符替换为你要为文件系统提供的任何名称。Replace the <file-system-name> placeholder with any name that you want to give your file system.

创建并装载文件系统Create a file system and mount it

在本部分,你将在存储帐户中创建一个文件系统和文件夹。In this section, you'll create a file system and a folder in your storage account.

  1. Azure 门户中,转到所创建的 Azure Databricks 服务,然后选择“启动工作区”。 In the Azure portal, go to the Azure Databricks service that you created, and select Launch Workspace.

  2. 在左侧选择“工作区” 。On the left, select Workspace. 工作区下拉列表中,选择创建 > 笔记本From the Workspace drop-down, select Create > Notebook.

    在 Databricks 中创建笔记本Create a notebook in Databricks

  3. 在“创建 Notebook”对话框中,输入 Notebook 的名称。 In the Create Notebook dialog box, enter a name for the notebook. 选择“Python”作为语言,然后选择前面创建的 Spark 群集。 Select Python as the language, and then select the Spark cluster that you created earlier.

  4. 选择“创建” 。Select Create.

  5. 将以下代码块复制并粘贴到第一个单元格中,但目前请勿运行此代码。Copy and paste the following code block into the first cell, but don't run this code yet.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<password>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/folder1",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  6. 在此代码块中,请将 appIdpasswordtenantstorage-account-name 占位符值替换为在完成本教程的先决条件时收集的值。In this code block, replace the appId, password, tenant, and storage-account-name placeholder values in this code block with the values that you collected while completing the prerequisites of this tutorial. file-system-name 占位符值替换为在上一步中为 ADLS 文件系统指定的名称。Replace the file-system-name placeholder value with the name that you gave to the ADLS File System on the previous step.

使用这些值替换上述占位符。Use these values to replace the mentioned placeholders.

  • appIdpassword 来自在创建服务主体的过程中向 active directory 注册的应用。The appId, and password are from the app that you registered with active directory as part of creating a service principal.

  • tenant-id 来自你的订阅。The tenant-id is from your subscription.

  • storage-account-name 是 Azure Data Lake Storage Gen2 存储帐户的名称。The storage-account-name is the name of your Azure Data Lake Storage Gen2 storage account.

  • file-system-name 占位符替换为你要为文件系统提供的任何名称。Replace the file-system-name placeholder with any name that you want to give your file system.

备注

在生产设置中,请考虑将密码存储在 Azure Databricks 中。In a production setting, consider storing your password in Azure Databricks. 然后,将查找密钥(而不是密码)添加到代码块中。Then, add a look up key to your code block instead of the password. 完成了本快速入门之后,请参阅 Azure Databricks 网站上的 Azure Data Lake Storage Gen2 一文以查看此方法的示例。After you've completed this quickstart, see the Azure Data Lake Storage Gen2 article on the Azure Databricks Website to see examples of this approach.

  1. SHIFT + ENTER 键,运行此块中的代码。Press the SHIFT + ENTER keys to run the code in this block.

请将此笔记本保持打开状态,因为稍后要在其中添加命令。Keep this notebook open as you will add commands to it later.

使用 Databricks Notebook 将 CSV 转换为 ParquetUse Databricks Notebook to convert CSV to Parquet

在前面创建的笔记本中添加一个新的单元,并将以下代码粘贴到该单元中。In the notebook that you previously created, add a new cell, and paste the following code into that cell.

# Use the previously established DBFS mount point to read the data.
# create a data frame to read data.

flightDF = spark.read.format('csv').options(header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# read the airline csv file and write the output to parquet format for easy query.
flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

浏览数据Explore data

在新单元中粘贴以下代码,以获取通过 AzCopy 上传的 CSV 文件列表。In a new cell, paste the following code to get a list of CSV files uploaded via AzCopy.

import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls("/mnt/flightdata"))

若要创建新文件并列出 parquet/flights 文件夹中的文件,请运行以下脚本:To create a new file and list files in the parquet/flights folder, run this script:

dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
dbutils.fs.ls("/mnt/flightdata/parquet/flights")

通过这些代码示例,你已使用启用了 Data Lake Storage Gen2 的存储帐户中存储的数据探讨了 HDFS 的层次结构性质。With these code samples, you have explored the hierarchical nature of HDFS using data stored in a storage account with Data Lake Storage Gen2 enabled.

查询数据Query the data

接下来,可以开始查询上传到存储帐户中的数据。Next, you can begin to query the data you uploaded into your storage account. 将下面的每个代码块输入到 Cmd 1 中,然后按 Cmd + Enter 运行 Python 脚本。Enter each of the following code blocks into Cmd 1 and press Cmd + Enter to run the Python script.

若要为数据源创建数据帧,请运行以下脚本:To create data frames for your data sources, run the following script:

  • <csv-folder-path> 占位符值替换为 .csv 文件的路径 。Replace the <csv-folder-path> placeholder value with the path to the .csv file.
#Copy this into a Cmd cell in your notebook.
acDF = spark.read.format('csv').options(header='true', inferschema='true').load("/mnt/flightdata/On_Time.csv")
acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')

#read the existing parquet file for the flights database that was created earlier
flightDF = spark.read.format('parquet').options(header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")

#print the schema of the dataframes
acDF.printSchema()
flightDF.printSchema()

#print the flight database size
print("Number of flights in the database: ", flightDF.count())

#show the first 20 rows (20 is the default)
#to show the first n rows, run: df.show(n)
acDF.show(100, False)
flightDF.show(20, False)

#Display to run visualizations
#preferably run this in a separate cmd cell
display(flightDF)

输入此脚本,以针对数据运行一些基本分析查询。Enter this script to run some basic analysis queries against the data.

#Run each of these queries, preferably in a separate cmd cell for separate analysis
#create a temporary sql view for querying flight information
FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
FlightTable.createOrReplaceTempView('FlightTable')

#create a temporary sql view for querying airline code information
AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
AirlineCodes.createOrReplaceTempView('AirlineCodes')

#using spark sql, query the parquet file to return total flights in January and February 2016
out1 = spark.sql("SELECT * FROM FlightTable WHERE Month=1 and Year= 2016")
NumJan2016Flights = out1.count()
out2 = spark.sql("SELECT * FROM FlightTable WHERE Month=2 and Year= 2016")
NumFeb2016Flights=out2.count()
print("Jan 2016: ", NumJan2016Flights," Feb 2016: ",NumFeb2016Flights)
Total= NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)

# List out all the airports in Texas
out = spark.sql("SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'") 
print('Airports in Texas: ', out.show(100))

#find all airlines that fly from Texas
out1 = spark.sql("SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', out1.show(100, False))

清理资源Clean up resources

如果不再需要本文中创建的资源,可以删除资源组和所有相关资源。When they're no longer needed, delete the resource group and all related resources. 为此,请选择存储帐户所在的资源组,然后选择“删除”。 To do so, select the resource group for the storage account and select Delete.

后续步骤Next steps