Azure SQL Database と SQL Server 用の Spark コネクタを使用してビッグ データのリアルタイム分析を高速化するAccelerate real-time big data analytics with Spark connector for Azure SQL Database and SQL Server

Azure SQL Database と SQL Server 用の Spark コネクタを使用して、Azure SQL Database と SQL Server を含む SQL データベースを Spark ジョブの入力データ ソースまたは出力データ シンクとして機能させることができます。The Spark connector for Azure SQL Database and SQL Server enables SQL databases, including Azure SQL Database and SQL Server, to act as input data source or output data sink for Spark jobs. ビッグ データ分析の中でリアルタイム トランザクション データを利用でき、アドホック クエリの結果やレポートを保持できます。It allows you to utilize real-time transactional data in big data analytics and persist results for adhoc queries or reporting. 組み込みの JDBC コネクタに比べて、このコネクタには、SQL データベースにデータを一括挿入する機能があります。Compared to the built-in JDBC connector, this connector provides the ability to bulk insert data into SQL databases. 行単位の挿入に比べ、パフォーマンスを 10 倍から 20 倍も向上させることができます。It can outperform row by row insertion with 10x to 20x faster performance. Azure SQL Database と SQL Server 用の Spark コネクタは、AAD 認証もサポートします。The Spark connector for Azure SQL Database and SQL Server also supports AAD authentication. AAD アカウントを使用して Azure Databricks から Azure SQL データベースに安全に接続できます。It allows you securely connecting to your Azure SQL database from Azure Databricks using your AAD account. 組み込みの JDBC コネクタと同様のインターフェイスを備えています。It provides similar interfaces with the built-in JDBC connector. この新しいコネクタを使用するための既存の Spark ジョブの移行は簡単に実行できます。It is easy to migrate your existing Spark jobs to use this new connector.

ダウンロードDownload

最初に、GitHub の azure-sqldb-spark リポジトリ から Spark to SQL DB コネクタをダウンロードします。To get started, download the Spark to SQL DB connector from the azure-sqldb-spark repository on GitHub.

正式にサポートされているバージョンOfficial Supported Versions

コンポーネントComponent VersionVersion
Apache SparkApache Spark 2.0.2 以降2.0.2 or later
ScalaScala 2.10 以降2.10 or later
SQL Server 用 Microsoft JDBC ドライバーMicrosoft JDBC Driver for SQL Server 6.2 以降6.2 or later
Microsoft SQL ServerMicrosoft SQL Server SQL Server 2008 以降SQL Server 2008 or later
Azure SQL DatabaseAzure SQL Database サポートされていますSupported

Azure SQL Database と SQL Server 用の Spark コネクタは、SQL Server 用の Microsoft JDBC ドライバーを使用して、Spark ワーカー ノードと SQL データベース間でデータを移動します。The Spark connector for Azure SQL Database and SQL Server utilizes the Microsoft JDBC Driver for SQL Server to move data between Spark worker nodes and SQL databases:

データフローは次のとおりです。The dataflow is as follows:

  1. Spark のマスター ノードが、SQL Server または Azure SQL Database に接続し、特定のテーブルまたは特定の SQL クエリを使用してデータを読み込みます。The Spark master node connects to SQL Server or Azure SQL Database and loads data from a specific table or using a specific SQL query
  2. Spark のマスター ノードが、変換のためにワーカー ノードにデータを分散します。The Spark master node distributes data to worker nodes for transformation.
  3. ワーカー ノードが、SQL Server または Azure SQL Database に接続し、データベースにデータを書き込みます。The Worker node connects to SQL Server or Azure SQL Database and writes data to the database. ユーザーは、使用する挿入方法 (行単位または一括) を選択できます。User can choose to use row-by-row insertion or bulk insert.

次の図にデータ フローを示します。The following diagram illustrates the data flow.

アーキテクチャ

Spark to SQL DB コネクタをビルドするBuild the Spark to SQL DB connector

現在、コネクタ プロジェクトでは Maven を使用します。Currently, the connector project uses maven. 依存関係なしにコネクタを構築するには、次のコードを実行します。To build the connector without dependencies, you can run:

  • mvn クリーン パッケージmvn clean package
  • releases フォルダーから最新バージョンの JAR をダウンロードしますDownload the latest versions of the JAR from the release folder
  • SQL DB Spark JAR をインクルードしますInclude the SQL DB Spark JAR

コネクタを使用して Spark to SQL DB に接続するConnect Spark to SQL DB using the connector

Spark ジョブから Azure SQL Database または SQL Server に接続して、データの読み取りまたは書き込みを実行できます。You can connect to Azure SQL Database or SQL Server from Spark jobs, read or write data. Azure SQL データベースまたは SQL Server で DML または DDL クエリを実行することもできます。You can also run a DML or DDL query in an Azure SQL database or SQL Server database.

Azure SQL Database または SQL Server からデータを読み取るRead data from Azure SQL Database or SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients"
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

指定した SQL クエリによる Azure SQL Database または SQL Server からのデータの読み込みReading data from Azure SQL Database or SQL Server with specified SQL query

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDb(config)
collection.show()

Azure SQL Database または SQL Server にデータを書き込むWrite data to Azure SQL Database or SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
 
// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients"
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

Azure SQL Database または SQL Server で DML または DDL クエリを実行するRun DML or DDL query in Azure SQL Database or SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City= 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.windows.net",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.SqlDBQuery(config)

AAD 認証を使用して Spark を Azure SQL Database に接続するConnect Spark to Azure SQL Database using AAD authentication

Azure Active Directory (AAD) 認証を使用して SQL Database に接続できます。You can connect to Azure SQL Database using Azure Active Directory (AAD) authentication. AAD 認証を使用して、SQL Server 認証の代わりとしてデータベース ユーザーの ID を一元的に管理します。Use AAD authentication to centrally manage identities of database users and as an alternative to SQL Server authentication.

ActiveDirectoryPassword 認証モードを使用した接続Connecting using ActiveDirectoryPassword Authentication Mode

設定要件Setup Requirement

ActiveDirectoryPassword 認証モードを使用する場合は、azure-activedirectory-library-for-java とその依存関係をダウンロードし、それらを Java ビルド パスに含める必要があります。If you are using the ActiveDirectoryPassword authentication mode, you need to download azure-activedirectory-library-for-java and its dependencies, and include them in the Java build path.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.windows.net",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username ",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.SqlDB(config)
collection.show()

アクセス トークンを使用した接続Connecting using Access Token

設定要件Setup Requirement

アクセス トークンに基づく認証モードを使用する場合は、azure-activedirectory-library-for-java とその依存関係をダウンロードし、それらを Java ビルド パスに含める必要があります。If you are using the access token-based authentication mode, you need to download azure-activedirectory-library-for-java and its dependencies, and include them in the Java build path.

Azure SQL データベースのアクセス トークンを取得する方法については、Azure Active Directory 認証を使用した SQL Database の認証に関する記事を参照してください。See Use Azure Active Directory Authentication for authentication with SQL Database to learn how to get access token to your Azure SQL database.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.windows.net",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token ",
  "hostNameInCertificate" -> "*.database.windows.net",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.SqlDB(config)
collection.show()

一括挿入を使用して Azure SQL データベースまたは SQL Server にデータを書き込むWrite data to Azure SQL database or SQL Server using Bulk Insert

従来の JDBC コネクタは、行単位の挿入を使用して Azure SQL データベースまたは SQL Server にデータを書き込みます。The traditional jdbc connector writes data into Azure SQL database or SQL Server using row-by-row insertion. Spark to SQL DB コネクタでは、一括挿入を使用して SQL Database にデータを書き込むことができます。You can use Spark to SQL DB connector to write data to SQL database using bulk insert. 大きなデータ セットを読み込むとき、または列ストア インデックスが使用されているテーブルにデータを読み込むときの書き込みのパフォーマンスが大幅に向上します。It significantly improves the write performance when loading large data sets or loading data into tables where a column store index is used.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/** 
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.windows.net",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

次の手順Next steps

Azure SQL Database と SQL Server 用の Spark コネクタをまだダウンロードしていない場合は、GitHub の azure-sqldb-spark リポジトリからダウンロードし、リポジトリのその他のリソースを調べます。If you haven't already, download the Spark connector for Azure SQL Database and SQL Server from azure-sqldb-spark GitHub repository and explore the additional resources in the repo:

また、Apache Spark SQL、DataFrames、データセット ガイドAzure Databricks ドキュメントを確認することもできます。You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide and the Azure Databricks documentation.