チュートリアル:Spark を使用して Azure Databricks で Data Lake Storage Gen2 のデータにアクセスするTutorial: Access Data Lake Storage Gen2 data with Azure Databricks using Spark

このチュートリアルでは、Azure Data Lake Storage Gen2 対応の Azure ストレージ アカウント内の格納データに Azure Databricks クラスターを接続する方法を説明します。This tutorial shows you how to connect your Azure Databricks cluster to data stored in an Azure storage account that has Azure Data Lake Storage Gen2 enabled. この接続を使用することで、必要なデータに関するクエリや分析をクラスターからネイティブに実行することができます。This connection enables you to natively run queries and analytics from your cluster on your data.

このチュートリアルでは、次のことについて説明します。In this tutorial, you will:

  • Databricks クラスターを作成するCreate a Databricks cluster
  • 非構造化データをストレージ アカウントに取り込むIngest unstructured data into a storage account
  • Blob Storage 内のデータに対して分析を実行するRun analytics on your data in Blob storage

Azure サブスクリプションがない場合は、開始する前に無料アカウントを作成してください。If you don’t have an Azure subscription, create a free account before you begin.

前提条件Prerequisites

フライト データのダウンロードDownload the flight data

このチュートリアルでは、運輸統計局からのフライト データを使用して ETL 操作を実行する方法を示します。This tutorial uses flight data from the Bureau of Transportation Statistics to demonstrate how to perform an ETL operation. チュートリアルを完了するには、このデータをダウンロードする必要があります。You must download this data to complete the tutorial.

  1. 米国運輸省研究・革新技術庁/運輸統計局のページに移動します。Go to Research and Innovative Technology Administration, Bureau of Transportation Statistics.

  2. [Prezipped file](事前に圧縮されたファイル) チェックボックスをオンにして、すべてのデータ フィールドを選択します。Select the Prezipped File check box to select all data fields.

  3. [ダウンロード] ボタンを選択して、ご使用のコンピューターに結果を保存します。Select the Download button and save the results to your computer.

  4. ZIP ファイルの内容を解凍し、ファイル名とファイル パスをメモします。Unzip the contents of the zipped file and make a note of the file name and the path of the file. この情報は後の手順で必要になります。You need this information in a later step.

Azure Databricks サービスを作成するCreate an Azure Databricks service

このセクションでは、Azure portal を使用して Azure Databricks サービスを作成します。In this section, you create an Azure Databricks service by using the Azure portal.

  1. Azure portal で、 [リソースの作成] > [分析] > [Azure Databricks] の順に選択します。In the Azure portal, select Create a resource > Analytics > Azure Databricks.

    Azure Portal の DatabricksDatabricks on Azure portal

  2. [Azure Databricks サービス] で次の値を指定して、Databricks サービスを作成します。Under Azure Databricks Service, provide the following values to create a Databricks service:

    プロパティProperty 説明Description
    ワークスペース名Workspace name Databricks ワークスペースの名前を指定します。Provide a name for your Databricks workspace.
    サブスクリプションSubscription ドロップダウンから Azure サブスクリプションを選択します。From the drop-down, select your Azure subscription.
    リソース グループResource group 新しいリソース グループを作成するか、既存のリソース グループを使用するかを指定します。Specify whether you want to create a new resource group or use an existing one. リソース グループは、Azure ソリューションの関連するリソースを保持するコンテナーです。A resource group is a container that holds related resources for an Azure solution. 詳しくは、Azure リソース グループの概要に関するページをご覧ください。For more information, see Azure Resource Group overview.
    場所Location [米国西部 2] を選択します。Select West US 2. 使用可能な他のリージョンについては、「リージョン別の利用可能な製品」をご覧ください。For other available regions, see Azure services available by region.
    価格レベルPricing Tier [Standard] を選択します。Select Standard.

    Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace

  3. アカウントの作成には数分かかります。The account creation takes a few minutes. 操作の状態を監視するには、上部の進行状況バーを確認します。To monitor the operation status, view the progress bar at the top.

  4. [ダッシュボードにピン留めする] チェック ボックスをオンにして、 [作成] を選択します。Select Pin to dashboard and then select Create.

Azure Databricks で Spark クラスターを作成するCreate a Spark cluster in Azure Databricks

  1. Azure portal で、作成した Databricks サービスに移動し、 [Launch Workspace](ワークスペースの起動) を選択します。In the Azure portal, go to the Databricks service that you created, and select Launch Workspace.

  2. Azure Databricks ポータルにリダイレクトされます。You're redirected to the Azure Databricks portal. ポータルで [クラスター] を選択します。From the portal, select Cluster.

    Azure の DatabricksDatabricks on Azure

  3. [New cluster](新しいクラスター) ページで、クラスターを作成するための値を指定します。In the New cluster page, provide the values to create a cluster.

    Azure で Databricks Spark クラスターを作成するCreate Databricks Spark cluster on Azure

  4. 次のフィールドに値を入力し、他のフィールドの既定値はそのまま使用します。Fill in values for the following fields, and accept the default values for the other fields:

    • クラスターの名前を入力します。Enter a name for the cluster.

    • この記事では、5.1 ランタイムを使用してクラスターを作成します。For this article, create a cluster with the 5.1 runtime.

    • [Terminate after __ minutes of inactivity](アクティビティが __ 分ない場合は終了する) チェック ボックスを必ずオンにします。Make sure you select the Terminate after __ minutes of inactivity check box. クラスターが使われていない場合は、クラスターを終了するまでの時間 (分単位) を指定します。If the cluster isn't being used, provide a duration (in minutes) to terminate the cluster.

    • [クラスターの作成] を選択します。Select Create cluster. クラスターが実行されたら、ノートブックをクラスターにアタッチして、Spark ジョブを実行できます。After the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

データの取り込みIngest data

ソース データをストレージ アカウントにコピーするCopy source data into the storage account

AzCopy を使用して .csv ファイルから Data Lake Storage Gen2 アカウントにデータをコピーします。Use AzCopy to copy data from your .csv file into your Data Lake Storage Gen2 account.

  1. コマンド プロンプト ウィンドウを開き、次のコマンドを入力してストレージ アカウントにログインします。Open a command prompt window, and enter the following command to log into your storage account.

    azcopy login
    

    コマンド プロンプト ウィンドウに表示される指示に従って、ユーザー アカウントを認証します。Follow the instructions that appear in the command prompt window to authenticate your user account.

  2. .csv アカウントからデータをコピーするには、次のコマンドを入力します。To copy data from the .csv account, enter the following command.

    azcopy cp "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<file-system-name>/folder1/On_Time.csv
    
    • プレースホルダー <csv-folder-path> の値は、 .csv ファイルへのパスに置き換えます。Replace the <csv-folder-path> placeholder value with the path to the .csv file.

    • <storage-account-name> プレースホルダーの値は、実際のストレージ アカウントの名前に置き換えます。Replace the <storage-account-name> placeholder value with the name of your storage account.

    • <file-system-name> プレースホルダーを、ファイル システムに付ける任意の名前に置き換えます。Replace the <file-system-name> placeholder with any name that you want to give your file system.

ファイル システムを作成してマウントするCreate a file system and mount it

このセクションでは、ストレージ アカウントにファイル システムとフォルダーを作成します。In this section, you'll create a file system and a folder in your storage account.

  1. Azure portal で、作成した Azure Databricks サービスに移動し、 [Launch Workspace](ワークスペースの起動) を選択します。In the Azure portal, go to the Azure Databricks service that you created, and select Launch Workspace.

  2. 左側の [ワークスペース] を選択します。On the left, select Workspace. [ワークスペース] ドロップダウンで、 [作成] > [ノートブック] の順に選択します。From the Workspace drop-down, select Create > Notebook.

    Databricks でノートブックを作成するCreate a notebook in Databricks

  3. [ノートブックの作成] ダイアログ ボックスでノートブックの名前を入力します。In the Create Notebook dialog box, enter a name for the notebook. 言語として [Python] を選んで、前に作成した Spark クラスターを選びます。Select Python as the language, and then select the Spark cluster that you created earlier.

  4. 作成 を選択します。Select Create.

  5. 次のコード ブロックをコピーして最初のセルに貼り付けます。ただし、このコードはまだ実行しないでください。Copy and paste the following code block into the first cell, but don't run this code yet.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<password>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/folder1",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  6. このコード ブロックでは、appIdpasswordtenant、および storage-account-name のプレースホルダー値を、このチュートリアルの前提条件の実行中に収集した値で置き換えます。In this code block, replace the appId, password, tenant, and storage-account-name placeholder values in this code block with the values that you collected while completing the prerequisites of this tutorial. file-system-name プレースホルダーの値は、前の手順で ADLS ファイル システムに付けた名前に置き換えます。Replace the file-system-name placeholder value with the name that you gave to the ADLS File System on the previous step.

これらの値で、説明したプレース ホルダーを置き換えます。Use these values to replace the mentioned placeholders.

  • appId および password は、サービス プリンシパルの作成の一環として Active Directory に登録したアプリのものです。The appId, and password are from the app that you registered with active directory as part of creating a service principal.

  • tenant-id は、自分のサブスクリプションのものです。The tenant-id is from your subscription.

  • storage-account-name は、Azure Data Lake Storage Gen2 ストレージ アカウントの名前です。The storage-account-name is the name of your Azure Data Lake Storage Gen2 storage account.

  • file-system-name プレースホルダーを、ファイル システムに付ける任意の名前に置き換えます。Replace the file-system-name placeholder with any name that you want to give your file system.

注意

運用設定では、パスワードを Azure Databricks に格納することを検討してください。In a production setting, consider storing your password in Azure Databricks. 次に、パスワードではなくルック アップ キーをコード ブロックに追加します。Then, add a look up key to your code block instead of the password. このクイック スタートの完了後、Azure Databricks Web サイトの記事「Azure Data Lake Storage Gen2」で、このアプローチの例を参照してください。After you've completed this quickstart, see the Azure Data Lake Storage Gen2 article on the Azure Databricks Website to see examples of this approach.

  1. Shift + Enter キーを押して、このブロック内のコードを実行します。Press the SHIFT + ENTER keys to run the code in this block.

このノートブックは開いたままにしておいてください。後でコマンドを追加します。Keep this notebook open as you will add commands to it later.

Databricks Notebook を使用して CSV を Parquet に変換するUse Databricks Notebook to convert CSV to Parquet

前もって作成しておいたノートブックに新しいセルを追加し、そこに次のコードを貼り付けます。In the notebook that you previously created, add a new cell, and paste the following code into that cell.

# Use the previously established DBFS mount point to read the data.
# create a data frame to read data.

flightDF = spark.read.format('csv').options(header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# read the airline csv file and write the output to parquet format for easy query.
flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

データを調査するExplore data

AzCopy 経由でアップロードされた CSV ファイルの一覧を取得するために、次のコードを新しいセルに貼り付けます。In a new cell, paste the following code to get a list of CSV files uploaded via AzCopy.

import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls("/mnt/flightdata"))

新しいファイルを作成して parquet/flights フォルダー内のファイルの一覧を作成するには、次のスクリプトを実行します。To create a new file and list files in the parquet/flights folder, run this script:

dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
dbutils.fs.ls("/mnt/flightdata/parquet/flights")

上記のコード サンプルでは、Data Lake Storage Gen2 対応のストレージ アカウントに格納されたデータを使って HDFS の階層的な性質を調査しました。With these code samples, you have explored the hierarchical nature of HDFS using data stored in a storage account with Data Lake Storage Gen2 enabled.

データを照会するQuery the data

これで、ストレージ アカウントにアップロードしたデータの照会を開始できます。Next, you can begin to query the data you uploaded into your storage account. 次のコード ブロックをそれぞれ [Cmd 1] に入力し、Cmd を押しながら Enter キーを押して Python スクリプトを実行します。Enter each of the following code blocks into Cmd 1 and press Cmd + Enter to run the Python script.

データ ソースのデータフレームを作成するには、次のスクリプトを実行します。To create data frames for your data sources, run the following script:

  • プレースホルダー <csv-folder-path> の値は、 .csv ファイルへのパスに置き換えます。Replace the <csv-folder-path> placeholder value with the path to the .csv file.
#Copy this into a Cmd cell in your notebook.
acDF = spark.read.format('csv').options(header='true', inferschema='true').load("/mnt/flightdata/On_Time.csv")
acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')

#read the existing parquet file for the flights database that was created earlier
flightDF = spark.read.format('parquet').options(header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")

#print the schema of the dataframes
acDF.printSchema()
flightDF.printSchema()

#print the flight database size
print("Number of flights in the database: ", flightDF.count())

#show the first 20 rows (20 is the default)
#to show the first n rows, run: df.show(n)
acDF.show(100, False)
flightDF.show(20, False)

#Display to run visualizations
#preferably run this in a separate cmd cell
display(flightDF)

いくつかの基本的な分析クエリをデータに対して実行するために、次のスクリプトを入力します。Enter this script to run some basic analysis queries against the data.

#Run each of these queries, preferably in a separate cmd cell for separate analysis
#create a temporary sql view for querying flight information
FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
FlightTable.createOrReplaceTempView('FlightTable')

#create a temporary sql view for querying airline code information
AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
AirlineCodes.createOrReplaceTempView('AirlineCodes')

#using spark sql, query the parquet file to return total flights in January and February 2016
out1 = spark.sql("SELECT * FROM FlightTable WHERE Month=1 and Year= 2016")
NumJan2016Flights = out1.count()
out2 = spark.sql("SELECT * FROM FlightTable WHERE Month=2 and Year= 2016")
NumFeb2016Flights=out2.count()
print("Jan 2016: ", NumJan2016Flights," Feb 2016: ",NumFeb2016Flights)
Total= NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)

# List out all the airports in Texas
out = spark.sql("SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'") 
print('Airports in Texas: ', out.show(100))

#find all airlines that fly from Texas
out1 = spark.sql("SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', out1.show(100, False))

リソースのクリーンアップClean up resources

リソース グループおよび関連するすべてのリソースは、不要になったら削除します。When they're no longer needed, delete the resource group and all related resources. これを行うには、ストレージ アカウントのリソース グループを選択し、 [削除] を選択してください。To do so, select the resource group for the storage account and select Delete.

次の手順Next steps