チュートリアル: PySpark DataFrames でデータを読み込んで変換する

このチュートリアルでは、Azure Databricks で Apache Spark Python (PySpark) DataFrame API を使用して米国の都市のデータを読み込んで変換する方法を示します。

このチュートリアルを最後まで進めると、DataFrame とは何であるかを理解し、以下のタスクを快適に実行できます。

Apache Spark PySpark API リファレンスも参照してください。

DataFrame とは

DataFrame は、型が異なる可能性のある列を持つ、2次元のラベルの付いたデータ構造です。 DataFrame は、スプレッドシート、SQL テーブル、または一連のオブジェクトのディクショナリのようなものと考えることができます。 Apache Spark DataFrames には、一般的なデータ分析の問題を効率的に解決できるようにする豊富な機能セット (列の選択、フィルター、結合、集計) が用意されています。

Apache Spark DataFrames は、Resilient Distributed Datasets (RDD) に基づいて構築された抽象化です。 Spark DataFrames と Spark SQL では、統合された計画と最適化エンジンが使用されるため、Azure Databricks でサポートされているすべての言語 (Python、SQL、Scala、R) でほぼ同じパフォーマンスを得ることができます。

要件

次のチュートリアルを完了するには、次の要件を満たす必要があります。

Note

クラスター制御の特権がなくても、以降の手順の大部分はクラスターへのアクセス権があれば実行できます。

ホーム ページのサイド バーから、Azure Databricks エンティティ (ワークスペース ブラウザー、カタログ、エクスプローラー、ワークフロー、コンピューティング) にアクセスします。 ワークスペースは、ノートブックやライブラリなどの Azure Databricks 資産を保存するルート フォルダーです。

手順 1: Python を使用して DataFrame を作成する

Azure Databricks ノートブック内を移動する方法を学習するには、「Databricks ノートブックのインターフェイスとコントロール」を参照してください。

  1. 新しいノートブックを開き、New Icon アイコンをクリックして新しいセルを挿入します。
  2. 次のコードをコピーしてノートブックの空のセルに貼り付け、Shift+Enter キーを押してセルを実行します。 次のコード例では、都市の人口データを含む df1 という名前の DataFrame を作成し、その内容を表示します。
data = [[295, "South Bend", "Indiana", "IN", 101190, 112.9]]
columns = ["rank", "city", "state", "code", "population", "price"]

df1 = spark.createDataFrame(data, schema="rank LONG, city STRING, state STRING, code STRING, population LONG, price DOUBLE")
display(df1)

手順 2: ファイルから DataFrame にデータを読み込む

/databricks-datasets ディレクトリから df2 に都市の人口データを追加します。

data_geo.csv ファイルから DataFrame df2 にデータを読み込むには、次のようにします。

  1. ノートブックに新しいセルを作成します。
  2. 次のコードをコピーしてノートブックの空のセルに貼り付け、Shift+Enter キーを押してセルを実行します。

サポートされている多くのファイル形式からデータを読み込むことができます。 次の例では、ほとんどのワークスペースからアクセスできる /databricks-datasets ディレクトリで使用できるデータセットを使用します。 「サンプル データセット」を参照してください。

df2 = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)

手順 3: DataFrame を表示して操作する

次の方法を使って、都市の人口の DataFrame を表示して操作します。

DataFrame を結合する

最初の DataFrame の内容を、data_geo.csv の内容を含む DataFrame と結合します。

ノートブックで、次のコード例を利用し、和集合操作を使用して 1 つの DataFrame の行を別の DataFrame に追加する、新しい DataFrame を作成します。

# Returns a DataFrame that combines the rows of df1 and df2
df = df1.union(df2)

DataFrame を表示する

米国の都市のデータを表形式で表示するには、ノートブック セルで Azure Databricks の display() コマンドを使用します。

display(df)

Spark では、スキーマ という用語を使用して、DataFrame 内の列の名前とデータ型を参照します。

ノートブックで次の .printSchema() メソッドを使用して、DataFrame のスキーマを出力します。 結果のメタデータを使用して、DataFrame の内容を操作します。

df.printSchema()

Note

Azure Databricks では、カタログに登録されているテーブルのコレクションも "スキーマ" という用語を使用して説明します。

DataFrame で行をフィルター処理する

.filter() または .where() を使用して行をフィルター処理し、データセット内で最も人口の多い 5 つの都市を見つけます。 フィルター処理を使用して、DataFrame で返す行または変更する行のサブセットを選択します。 以下の例に示すように、パフォーマンスや構文に違いはありません。

# Filter rows using .filter()
filtered_df = df.filter(df["rank"] < 6)
display(filtered_df)

# Filter rows using .where()
filtered_df = df.where(df["rank"] < 6)
display(filtered_df)

DataFrame から列を選択する

select() メソッドを使用して、都市がどの州にあるかを確認します。 次の例のように、1 つ以上の列名を .select() に渡して列を選択します。

select_df = df.select("City", "State")
display(select_df)

サブセットの DataFrame を作成する

人口が最も多い 10 都市を含むサブセットの DataFrame を作成し、結果のデータを表示します。 ノートブックの次のコードを使用して、select および filter クエリを結合し、返される行と列を制限します。

subset_df = df.filter(df["rank"] < 11).select("City")
display(subset_df)

手順 4: DataFrameを保存する

DataFrame をテーブルに保存することも、DataFrame を 1 つのファイルまたは複数のファイルに書き込むこともできます。

DataFrame をテーブルに保存する

Azure Databricks では、既定ですべてのテーブルに Delta Lake 形式が使用されます。 DataFrame を保存するには、カタログとスキーマに対する CREATE テーブル権限が必要です。 次の例では、DataFrame の内容を us_cities という名前のテーブルに保存します。

df.write.saveAsTable("us_cities")

ほとんどの Spark アプリケーションは、大規模なデータ セットに対して分散方式で動作します。 Spark は、1 つのファイルではなく、ファイルのディレクトリを書き出します。 Delta Lake では Parquet フォルダーとファイルを分割します。 多くのデータ システムでは、これらのファイルのディレクトリを読み取ることができます。 Azure Databricks では、ほとんどのアプリケーションでファイル パスよりもテーブルを使用することをお勧めします。

DataFrame を JSON ファイルに保存する

次の例では、JSON ファイルのディレクトリを保存します。

# Write a DataFrame to a collection of files
df.write.format("json").save("/tmp/json_data")

DataFrame を JSON ファイルから読み取る

# Read a DataFrame from a JSON file
df3 = spark.read.format("json").json("/tmp/json_data")
display(df3)

追加のタスク: PySpark で SQL クエリを実行する

Spark DataFrame には、SQL と Python を結合するために次のオプションが用意されています。 このチュートリアル用に作成したものと同じノートブックで、次のコードを実行できます。

列を SQL クエリとして指定する

selectExpr() メソッドを使用すると、次の例のように、各列を SQL クエリとして指定できます。

display(df.selectExpr("`rank`", "upper(city) as big_name"))

expr() をインポートする

次の例のように、expr() 関数を pyspark.sql.functions からインポートして、列を指定する任意の場所で SQL 構文を使用することができます。

from pyspark.sql.functions import expr

display(df.select("rank", expr("lower(city) as little_name")))

任意の SQL クエリを実行する

次の例のように、任意の SQL クエリを実行する spark.sql() を使用できます。

query_df = spark.sql("SELECT * FROM us_cities")

SQL クエリをパラメーター化する

次の例のように、Python フォーマットを使用して SQL クエリをパラメーター化できます。

table_name = "us_cities"

query_df = spark.sql(f"SELECT * FROM {table_name}")

その他のリソース