チュートリアル:Azure HDInsight を使用してデータの抽出、変換、読み込みを行う

このチュートリアルでは、ETL (データの抽出、変換、読み込み) 操作を実行します。 生の CSV データ ファイルを取得して Azure HDInsight クラスターにインポートした後、Apache Hive を使用して変換し、Apache Sqoop を使用して Azure SQL Database に読み込みます。

このチュートリアルでは、以下の内容を学習します。

  • データを抽出し、HDInsight クラスターにアップロードする。
  • Apache Hive を使用してデータを変換する。
  • Sqoop を使用して Azure SQL Database にデータを読み込む。

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。

前提条件

データをダウンロードおよび抽出してアップロードする

このセクションでは、サンプル フライト データをダウンロードします。 その後、HDInsight クラスターにデータをアップロードしてから、そのデータを自分の Data Lake Storage Gen2 アカウントにコピーします。

  1. On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip ファイルをダウンロードします。 このファイルには、フライト データが含まれています。

  2. コマンド プロンプトを開き、次の Secure Copy (Scp) コマンドを使用して HDInsight クラスターのヘッド ノードに .ZIP ファイルをアップロードします。

    scp On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.net:
    
    • <ssh-user-name> プレースホルダーを HDInsight クラスターの SSH ユーザー名に置き換えます。
    • <cluster-name> プレースホルダーを HDInsight クラスターの名前に置き換えます。

    パスワードを使用して SSH ユーザー名を認証する場合は、パスワードを入力するよう求められます。

    公開キーを使用している場合は、-i パラメーターを使用して、対応する秘密キーへのパスを指定することが必要な場合があります。 たとえば、「 scp -i ~/.ssh/id_rsa <file_name>.zip <user-name>@<cluster-name>-ssh.azurehdinsight.net: 」のように入力します。

  3. アップロードが完了したら、SSH を使用してクラスターに接続します。 コマンド プロンプトで次のコマンドを入力します。

    ssh <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.net
    
  4. 次のコマンドを使用して .zip ファイルを解凍します。

    unzip <file-name>.zip
    

    このコマンドにより .csv ファイルが抽出されます。

  5. 次のコマンドを使用して、Data Lake Storage Gen2 コンテナーを作成します。

    hadoop fs -D "fs.azure.createRemoteFileSystemDuringInitialization=true" -ls abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/
    

    <container-name> プレースホルダーを、ご自身のコンテナーに付ける名前に置き換えます。

    <storage-account-name> プレースホルダーは、実際のストレージ アカウントの名前に置き換えます。

  6. 次のコマンドを使用して、ディレクトリを作成します。

    hdfs dfs -mkdir -p abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data
    
  7. 次のコマンドを使用して .csv ファイルをディレクトリにコピーします。

    hdfs dfs -put "On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2016_1.csv" abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data/
    

    ファイル名にスペースや特殊文字が含まれる場合は、ファイル名を引用符で囲んでください。

データの変換

このセクションでは、Beeline を使用して Apache Hive ジョブを実行します。

Apache Hive ジョブの一環として、.csv ファイルから delays という名前の Apache Hive テーブルにデータをインポートします。

  1. HDInsight クラスター用に既に開いている SSH プロンプトから、次のコマンドを使用して flightdelays.hql という名前の新しいファイルを作成して編集します。

    nano flightdelays.hql
    
  2. 次のテキストに変更を加えます。<container-name><storage-account-name> のプレースホルダーを実際のコンテナーとストレージ アカウントの名前に置き換えてください。 マウスの右ボタンを選択したまま Shift キーを押して、そのテキストを nano コンソールにコピーして貼り付けます。

      DROP TABLE delays_raw;
      -- Creates an external table over the csv file
      CREATE EXTERNAL TABLE delays_raw (
         YEAR string,
         FL_DATE string,
         UNIQUE_CARRIER string,
         CARRIER string,
         FL_NUM string,
         ORIGIN_AIRPORT_ID string,
         ORIGIN string,
         ORIGIN_CITY_NAME string,
         ORIGIN_CITY_NAME_TEMP string,
         ORIGIN_STATE_ABR string,
         DEST_AIRPORT_ID string,
         DEST string,
         DEST_CITY_NAME string,
         DEST_CITY_NAME_TEMP string,
         DEST_STATE_ABR string,
         DEP_DELAY_NEW float,
         ARR_DELAY_NEW float,
         CARRIER_DELAY float,
         WEATHER_DELAY float,
         NAS_DELAY float,
         SECURITY_DELAY float,
         LATE_AIRCRAFT_DELAY float)
      -- The following lines describe the format and location of the file
      ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      STORED AS TEXTFILE
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/data';
    
      -- Drop the delays table if it exists
      DROP TABLE delays;
      -- Create the delays table and populate it with data
      -- pulled in from the CSV file (via the external table defined previously)
      CREATE TABLE delays
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/processed'
      AS
      SELECT YEAR AS year,
         FL_DATE AS FlightDate, 
         substring(UNIQUE_CARRIER, 2, length(UNIQUE_CARRIER) -1) AS IATA_CODE_Reporting_Airline,
         substring(CARRIER, 2, length(CARRIER) -1) AS Reporting_Airline, 
         substring(FL_NUM, 2, length(FL_NUM) -1) AS Flight_Number_Reporting_Airline,
         ORIGIN_AIRPORT_ID AS OriginAirportID,
         substring(ORIGIN, 2, length(ORIGIN) -1) AS OriginAirportSeqID,
         substring(ORIGIN_CITY_NAME, 2) AS OriginCityName,
         substring(ORIGIN_STATE_ABR, 2, length(ORIGIN_STATE_ABR) -1)  AS OriginState,
         DEST_AIRPORT_ID AS DestAirportID,
         substring(DEST, 2, length(DEST) -1) AS DestAirportSeqID,
         substring(DEST_CITY_NAME,2) AS DestCityName,
         substring(DEST_STATE_ABR, 2, length(DEST_STATE_ABR) -1) AS DestState,
         DEP_DELAY_NEW AS DepDelay,
         ARR_DELAY_NEW AS ArrDelay,
         CARRIER_DELAY AS CarrierDelay,
         WEATHER_DELAY AS WeatherDelay,
         NAS_DELAY AS NASDelay,
         SECURITY_DELAY AS SecurityDelay,
         LATE_AIRCRAFT_DELAY AS LateAircraftDelay
      FROM delays_raw;
    
  3. Ctrl キーを押しながら X キーを押し、確認を求められたら「Y」と入力してファイルを保存します。

  4. Hive を起動して flightdelays.hql ファイルを実行するには、次のコマンドを使います。

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http' -f flightdelays.hql
    
  5. flightdelays.hql スクリプトの実行が終わったら、次のコマンドで対話型 Beeline セッションを開始します。

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http'
    
  6. jdbc:hive2://localhost:10001/> プロンプトが表示されたら、次のクエリを使用してインポートされたフライト遅延データからデータを取得します。

    INSERT OVERWRITE DIRECTORY '/tutorials/flightdelays/output'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    SELECT regexp_replace(OriginCityName, '''', ''),
      avg(WeatherDelay)
    FROM delays
    WHERE WeatherDelay IS NOT NULL
    GROUP BY OriginCityName;
    

    このクエリにより、悪天候による遅延が発生した都市の一覧と平均遅延時間が取得され、abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output に保存されます。 その後、Sqoop がこの場所からデータを読み取り、Azure SQL Database にエクスポートします。

  7. Beeline を終了するには、プロンプトで「 !quit 」と入力します。

SQL データベース テーブルの作成

この操作には、SQL Database のサーバー名が必要になります。 サーバー名を確認するには、次の手順に従います。

  1. Azure ポータルにアクセスします。

  2. [SQL データベース] を選択します。

  3. 使用するデータベースの名前でフィルター処理します。 サーバー名は [サーバー名] 列に表示されます。

  4. 使用するデータベースの名前でフィルター処理します。 サーバー名は [サーバー名] 列に表示されます。

    Azure SQL Server の詳細を取得

    SQL Database に接続してテーブルを作成するには、多くの方法があります。 次の手順では、HDInsight クラスターから FreeTDS を使用します。

  5. FreeTDS をインストールするには、クラスターへの SSH 接続から次のコマンドを使用します。

    sudo apt-get --assume-yes install freetds-dev freetds-bin
    
  6. インストールが完了したら、次のコマンドを使用して SQL Database に接続します。

    TDSVER=8.0 tsql -H '<server-name>.database.windows.net' -U '<admin-login>' -p 1433 -D '<database-name>'
    
    • <server-name> プレースホルダーを論理 SQL サーバーの名前に置き換えます。

    • <admin-login> プレースホルダーを SQL Database の管理者ユーザー名に置き換えます。

    • <database-name> プレースホルダーをデータベース名に置き換えます。

    メッセージが表示されたら、SQL Database 管理者ユーザー名のパスワードを入力します。

    次のテキストのような出力が返されます。

    locale is "en_US.UTF-8"
    locale charset is "UTF-8"
    using default charset "UTF-8"
    Default database being set to sqooptest
    1>
    
  7. 1> プロンプトで、次のステートメントを入力します。

    CREATE TABLE [dbo].[delays](
    [OriginCityName] [nvarchar](50) NOT NULL,
    [WeatherDelay] float,
    CONSTRAINT [PK_delays] PRIMARY KEY CLUSTERED
    ([OriginCityName] ASC))
    GO
    
  8. GO ステートメントを入力すると、前のステートメントが評価されます。

    このクエリにより、クラスター化インデックス付きの、delays という名前のテーブルが作成されます。

  9. 次のクエリを使用して、テーブルが作成されたことを確認します。

    SELECT * FROM information_schema.tables
    GO
    

    出力は次のテキストのようになります。

    TABLE_CATALOG   TABLE_SCHEMA    TABLE_NAME      TABLE_TYPE
    databaseName       dbo             delays        BASE TABLE
    
  10. Enter exit at the 1>」と入力して、tsql ユーティリティを終了します。

データのエクスポートと読み込み

上のセクションで、変換したデータを abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output にコピーしました。 このセクションでは、Sqoop を使用して、abfs://<container-name>@<storage-account-name>.dfs.core.windows.net/tutorials/flightdelays/output のデータを、Azure SQL Database に作成したテーブルにエクスポートします。

  1. 次のコマンドを使用して、Sqoop が SQL データベースを認識できることを確認します。

    sqoop list-databases --connect jdbc:sqlserver://<SERVER_NAME>.database.windows.net:1433 --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD>
    

    このコマンドにより、delays テーブルを作成したデータベースを含む、データベースの一覧が返されます。

  2. 次のコマンドを使って、hivesampletable テーブルから delays テーブルにデータをエクスポートします。

    sqoop export --connect 'jdbc:sqlserver://<SERVER_NAME>.database.windows.net:1433;database=<DATABASE_NAME>' --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD> --table 'delays' --export-dir 'abfs://<container-name>@.dfs.core.windows.net/tutorials/flightdelays/output' --fields-terminated-by '\t' -m 1
    

    Sqoop は delays テーブルを含むデータベースに接続して、/tutorials/flightdelays/output ディレクトリから delays テーブルにデータをエクスポートします。

  3. sqoop コマンドが完了したら、tsql ユーティリティを使用してデータベースに接続します。

    TDSVER=8.0 tsql -H <SERVER_NAME>.database.windows.net -U <ADMIN_LOGIN> -P <ADMIN_PASSWORD> -p 1433 -D <DATABASE_NAME>
    
  4. 次のステートメントを使用して、データが delays テーブルにエクスポートされたことを確認します。

    SELECT * FROM delays
    GO
    

    テーブル内のデータの一覧が表示されます。 テーブルには、都市の名前と、その都市のフライトの平均遅延時間が含まれます。

  5. exit」と入力して、tsql ユーティリティを終了します。

リソースをクリーンアップする

このチュートリアルで使用されるすべてのリソースは、既存のものです。 クリーンアップは必要ありません。

次のステップ

HDInsight でのデータ操作の詳細については、次の記事を参照してください。