Apache Spark を使用して分析を行う

このチュートリアルでは、Apache Spark for Azure Synapse でデータを読み込んで分析する基本的な手順について説明します。

サーバーレス Apache Spark プールを作成する

  1. Synapse Studio の左側のペインで、 [管理]>[Apache Spark プール] を選択します。
  2. [新規] を選択します。
  3. [Apache Spark プール名] に「Spark1」と入力します。
  4. [ノード サイズ] に「Small」と入力します。
  5. [ノード数] で、最小数を 3 に、最大数を 3 に設定します。
  6. [確認と作成]>[作成] の順に選択します。 Apache Spark プールの準備は数秒で完了します。

サーバーレス Apache Spark プールについて

サーバーレス Spark プールは、ユーザーが Spark の操作方法を示すための手段です。 プールの使用を開始すると、必要に応じて Spark セッションが作成されます。 プールでは、そのセッションで使用される Spark リソースの数と、セッションが自動的に一時停止するまでの継続時間を制御します。 課金は、プール自体ではなく、そのセッション中に使用された Spark リソースに対して発生します。 このように、Spark プールを使用すると、クラスターを管理せずに Apache Spark を使用できます。 これは、サーバーレス SQL プールの動作に似ています。

Spark プールで NYC タクシーのデータを分析する

  1. Synapse Studio で、 [開発] ハブに移動します。

  2. 新しい Notebook を作成します。

  3. 新しいコード セルを作成し、以下のコードをそのセルに貼り付けます。

    %%pyspark
    df = spark.read.load('abfss://users@contosolake.dfs.core.windows.net/NYCTripSmall.parquet', format='parquet')
    display(df.limit(10))
    
  4. abfss URI スキームに従ってストレージ アカウント内のサンプル ファイルを参照するように、読み込み URI を変更します。

  5. ノートブックの [アタッチ先] メニューで、前に作成した Spark1 サーバーレス Spark プールを選択します。

  6. セルで [実行] を選択します。 このセルを実行するために必要であれば、Synapse によって新しい Spark セッションが開始されます。 新しい Spark セッションが必要な場合、最初の作成に約 2 から 5 分かかります。 セッションが一旦作成されると、約 2 秒でセルが実行されます。

  7. データフレームのスキーマを表示するだけの場合は、次のコードを使用してセルを実行します。

    %%pyspark
    df.printSchema()
    

NYC タクシーのデータを Spark nyctaxi データベースに読み込む

データは、df という名前のデータフレームを使って取得できます。 それを nyctaxi という名前の Spark データベースに読み込みます。

  1. ノートブックに新しいコード セルを追加し、次のコードを入力します。

    %%pyspark
    spark.sql("CREATE DATABASE IF NOT EXISTS nyctaxi")
    df.write.mode("overwrite").saveAsTable("nyctaxi.trip")
    

Spark とノートブックを使用して NYC タクシーのデータを分析する

  1. 新しいコード セルを作成し、次のコードを入力します。

    %%pyspark
    df = spark.sql("SELECT * FROM nyctaxi.trip") 
    display(df)
    
  2. このセルを実行して、nyctaxi Spark データベースに読み込んだ NYC Taxi データを表示します。

  3. 新しいコード セルを作成し、次のコードを入力します。 このデータを分析し、nyctaxi.passengercountstats というテーブルに結果を保存します。

    %%pyspark
    df = spark.sql("""
       SELECT passenger_count,
           SUM(trip_distance) as SumTripDistance,
           AVG(trip_distance) as AvgTripDistance
       FROM nyctaxi.trip
       WHERE trip_distance > 0 AND passenger_count > 0
       GROUP BY passenger_count
       ORDER BY passenger_count
    """) 
    display(df)
    df.write.saveAsTable("nyctaxi.passengercountstats")
    
  4. セルの結果で [グラフ] を選択し、視覚化されたデータを確認します。

次の手順