HDInsight Spark クラスターを使用して Azure SQL Database のデータを読み書きする

Azure HDInsight の Apache Spark クラスターを Azure SQL Database と接続する方法について説明します。 次に、SQL データベースに対するデータの読み取り、書き込み、ストリーミングを行います。 この記事の説明では、Jupyter Notebook を使って Scala コード スニペットを実行します。 ただし、Scala または Python でスタンドアロン アプリケーションを作成して、同じタスクを実行することもできます。

前提条件

  • Azure HDInsight Spark クラスター。 手順については、「Azure HDInsight での Apache Spark クラスターの作成」をご覧ください。

  • Azure SQL Database。 Azure SQL Database でのデータベースの作成に関するページの手順に従ってください。 AdventureWorksLT サンプルのスキーマとデータを使って、データベースを作成します。 また、サーバー レベルのファイアウォール規則を作成し、クライアントの IP アドレスが SQL Database にアクセスすることを許可します。 ファイアウォール規則を追加する方法についても、同じ記事をご覧ください。 SQL データベースを作成したら、以下の値がすぐにわかるようにしておきます。 Spark クラスターからデータベースに接続するときに必要です。

    • サーバー名。
    • データベース名。
    • Azure SQL Database の管理者のユーザー名 / パスワード。
  • SQL Server Management Studio (SSMS)。 「SQL Server Management Studio を使って接続とデータの照会を行う」の手順に従います。

Jupyter Notebook の作成

最初に、Spark クラスターに関連付けられた Jupyter Notebook を作成します。 この Notebook を使って、この記事で使われているコード スニペットを実行します。

  1. Azure Portal でクラスターを開きます。
  2. 右側のクラスター ダッシュボードの下で、 [Jupyter Notebook] を選択します。 クラスター ダッシュボードが表示されない場合は、左側のメニューの [概要] を選択します。 入力を求められたら、クラスターの管理者資格情報を入力します。

Jupyter Notebook on Apache Spark.

Note

ブラウザーで次の URL を開き、Spark クラスターの Jupyter Notebook にアクセスすることもできます。 CLUSTERNAME をクラスターの名前に置き換えます。

https://CLUSTERNAME.azurehdinsight.net/jupyter

  1. Jupyter Notebook で、右上隅の [New](新規) をクリックし、 [Spark] をクリックして Scala ノートブックを作成します。 HDInsight Spark クラスター上の Jupyter Notebook では、Python2 アプリケーション用の PySpark カーネル、Python3 アプリケーション用の PySpark3 カーネルも提供されます。 この記事では、Scala Notebook を作成します。

Kernels for Jupyter Notebook on Spark.

これらのカーネルの詳細については、HDInsight での Apache Spark クラスターと Jupyter Notebook カーネルの使用に関する記事を参照してください。

Note

Spark から SQL Database へのデータのストリーミングは、現在 Scala と Java でのみサポートされているため、この記事では Spark (Scala) カーネルを使います。 SQL からの読み取りや SQL への書き込みは Python を使って行うこともできますが、この記事での一貫性を保つため、3 つの操作すべてに Scala を使います。

  1. Untitled という既定の名前を持つ新しいノートブックが開かれます。 Notebook の名前をクリックし、任意の名前を入力します。

    Provide a name for the notebook.

    アプリケーションの作成を始められるようになります。

Azure SQL Database からデータを読み取る

このセクションでは、AdventureWorks データベースに存在するテーブル (たとえば、SalesLT.Address) からデータを読み取ります。

  1. 新しい Jupyter Notebook で、コード セルに次のスニペットを貼り付けて、プレースホルダーの値をデータベースの値で置き換えます。

    // Declare the values for your database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    Shift + Enter キーを押して、コード セルを実行します。

  2. 次のスニペットを使用して JDBC URL を作成し、Spark データフレーム API に渡すことができます。 このコードでは、パラメーターを保持する Properties オブジェクトを作成します。 次のスニペットをコード セルに貼り付け、Shift + Enter キーを押して実行します。

    import java.util.Properties
    
    val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
    val connectionProperties = new Properties()
    connectionProperties.put("user", s"${jdbcUsername}")
    connectionProperties.put("password", s"${jdbcPassword}")
    
  3. 次のスニペットを使用して、データベース内のテーブルのデータでデータフレームを作成します。 このスニペットでは、AdventureWorksLT データベースの一部として利用可能な SalesLT.Address テーブルを使います。 次のスニペットをコード セルに貼り付け、Shift + Enter キーを押して実行します。

    val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
    
  4. データ スキーマの取得など、データフレームに対する操作を実行できるようになります。

    sqlTableDF.printSchema
    

    次の図のような出力が表示されます。

    schema output.

  5. 上位 10 行の取得のような操作も実行できます。

    sqlTableDF.show(10)
    
  6. または、特定の列をデータセットから取得します。

    sqlTableDF.select("AddressLine1", "City").show(10)
    

Azure SQL Database にデータを書き込む

このセクションでは、クラスターで利用可能なサンプル CSV ファイルを使ってデータベース内にテーブルを作成し、そこにデータを格納します。 HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv にあるサンプルの CSV ファイル (HVAC.csv) は、すべての HDInsight クラスターで使うことができます。

  1. 新しい Jupyter Notebook で、コード セルに次のスニペットを貼り付けて、プレースホルダーの値をデータベースの値で置き換えます。

    // Declare the values for your database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    Shift + Enter キーを押して、コード セルを実行します。

  2. 下のスニペットを使用して作成される JDBC URL を Spark データフレーム API に渡すことができます。 このコードでは、パラメーターを保持する Properties オブジェクトを作成します。 次のスニペットをコード セルに貼り付け、Shift + Enter キーを押して実行します。

    import java.util.Properties
    
    val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
    val connectionProperties = new Properties()
    connectionProperties.put("user", s"${jdbcUsername}")
    connectionProperties.put("password", s"${jdbcPassword}")
    
  3. 次のスニペットを使用して、HVAC.csv のデータのスキーマを抽出し、そのスキーマを使って、CSV からデータフレーム readDf にデータを読み込みます。 次のスニペットをコード セルに貼り付け、Shift + Enter キーを押して実行します。

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
    
  4. readDf データフレームを使って、一時テーブル temphvactable を作成します。 次に、一時テーブルを使って、Hive テーブル hvactable_hive を作成します。

    readDf.createOrReplaceTempView("temphvactable")
    spark.sql("create table hvactable_hive as select * from temphvactable")
    
  5. 最後に、Hive テーブルを使ってデータベース内にテーブルを作成します。 次のスニペットは、Azure SQL Database に hvactable を作成します。

    spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
    
  6. SSMS を使って Azure SQL Database に接続し、dbo.hvactable がそこにあることを確認します。

    a. SSMS を開始し、次のスクリーンショットに示すように、接続の詳細を指定して Azure SQL Database に接続します。

    Connect to SQL Database using SSMS1.

    b. オブジェクト エクスプローラーでデータベースとテーブル ノードを展開して、dbo.hvactable が作成されていることを確認します。

    Connect to SQL Database using SSMS2.

  7. SSMS でクエリを実行して、テーブルの列を表示します。

    SELECT * from hvactable
    

Azure SQL Database にデータをストリーミングする

このセクションでは、前のセクションで作成した hvactable にデータをストリーミングします。

  1. 最初のステップとして、hvactable 内にレコードがないことを確認します。 SSMS を使い、テーブルに対して次のクエリを実行します。

    TRUNCATE TABLE [dbo].[hvactable]
    
  2. HDInsight Spark クラスターに新しい Jupyter Notebook を作成します。 コード セルに次のスニペットを貼り付けてから、Shift + Enter キーを押します。

    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming._
    import java.sql.{Connection,DriverManager,ResultSet}
    
  3. HVAC.csv から hvactable にデータをストリーミングします。 HVAC.csv ファイルは、クラスター (/HdiSamples/HdiSamples/SensorSampleData/HVAC/) で入手できます。 次のスニペットでは最初に、ストリーミングするデータのスキーマを取得します。 次に、そのスキーマを使ってストリーミング データフレームを作成します。 次のスニペットをコード セルに貼り付け、Shift + Enter キーを押して実行します。

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/") 
    readStreamDf.printSchema
    
  4. 出力に、HVAC.csv のスキーマが表示されます。 hvactable にも同じスキーマがあります。 出力にテーブルの列が一覧表示されます。

    `hdinsight Apache Spark schema table`.

  5. 最後に、次のスニペットを使って HVAC.csv からデータを読み取り、それをデータベース内の hvactable にストリーミングします。 コード セルにスニペットを貼り付けて、プレースホルダーの値をデータベースの値に置き換えてから、Shift キーを押しながら Enter キーを押して実行します。

    val WriteToSQLQuery  = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
        var connection:java.sql.Connection = _
        var statement:java.sql.Statement = _
    
        val jdbcUsername = "<SQL DB ADMIN USER>"
        val jdbcPassword = "<SQL DB ADMIN PWD>"
        val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
        val jdbcPort = 1433
        val jdbcDatabase ="<AZURE SQL DB NAME>"
        val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    
        def open(partitionId: Long, version: Long):Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
        statement = connection.createStatement
        true
        }
    
        def process(value: Row): Unit = {
        val Date  = value(0)
        val Time = value(1)
        val TargetTemp = value(2)
        val ActualTemp = value(3)
        val System = value(4)
        val SystemAge = value(5)
        val BuildingID = value(6)  
    
        val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
        statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")
        }
    
        def close(errorOrNull: Throwable): Unit = {
        connection.close
        }
        })
    
    var streamingQuery = WriteToSQLQuery.start()
    
  6. SQL Server Management Studio (SSMS) で、次のクエリを実行し、hvactable にデータがストリーミングされていることを確認します。 クエリを実行するたびに、表示されるテーブル行数の値が増加します。

    SELECT COUNT(*) FROM hvactable
    

次のステップ