SQL Server と Azure SQL 用の Apache Spark コネクタを使用する

SQL Server と Azure SQL 用の Apache Spark コネクタは、パフォーマンスの高いコネクタです。これを使用すると、ビッグ データ分析でトランザクション データを使用し、アドホック クエリやレポートの結果を保持できます。 コネクタを使用すると、オンプレミスまたはクラウド内のあらゆる SQL データベースを、Spark ジョブの入力データ ソースまたは出力データ シンクとして使用できます。 このコネクタでは、SQL Server 一括書き込み API が使用されます。 任意の一括書き込みパラメーターは、任意のパラメーターとして渡される場合があり、コネクタによって基になる API にそのまま渡されます。 一括書き込み操作の詳細については、「JDBC ドライバーでの一括コピーの使用」を参照してください。

コネクタは SQL Server ビッグ データ クラスターに既定で含まれています。

コネクタの詳細については、オープン ソース リポジトリで確認してください。 例については、サンプルを参照してください。

新しい SQL テーブルに書き込む

注意事項

overwrite モードでは、テーブルが既定でデータベースに既に存在する場合は、コネクタによって最初に削除されます。 予期しないデータ損失を避けるため、このオプションは十分な注意を払って使用してください。

モード overwrite を使用している場合、テーブルの再作成時にオプション truncate を使用しないと、インデックスは失われます。 たとえば、列ストア テーブルがヒープになります。 既存のインデックス作成を維持したい場合は、オプション truncate も値 true で指定してください。 例: .option("truncate",true)

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

SQL テーブルに追加する

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

分離レベルを指定する

このコネクタでは、データベースへの一括挿入を実行するときに、既定で READ_COMMITTED 分離レベルが使用されます。 これを別の分離レベルでオーバーライドしたい場合は、次に示すように mssqlIsolationLevel オプションを使用します。

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

SQL テーブルから読み取る

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

非 Active Directory モード

非 Active Directory モードのセキュリティでは、各ユーザーがユーザー名とパスワードを持っています。これは、読み取りと書き込みを実行するためのコネクタのインスタンス化時にパラメーターとして指定する必要があります。

非 Active Directory モードでのコネクタのインスタンス化の例を次に示します。 スクリプトを実行する前に、? をご自分のアカウントの値に置き換えます。

# Note: '?' is a placeholder for a necessary user-specified value
connector_type = "com.microsoft.sqlserver.jdbc.spark" 

url = "jdbc:sqlserver://master-p-svc;databaseName=?;"
writer = df.write \ 
   .format(connector_type)\ 
   .mode("overwrite") 
   .option("url", url) \ 
   .option("user", ?) \ 
   .option("password",?) 
writer.save() 

Active Directory モード

Active Directory モードのセキュリティでは、ユーザーがキー タブ ファイルを生成した後、コネクタのインスタンス化時に、ユーザーが principalkeytab をパラメーターとして指定する必要があります。

このモードでは、ドライバーがそれぞれの実行プログラム コンテナーに、keytab ファイルを読み込みます。 次に、この実行プログラムが、プリンシパル名と keytab を使用して、読み取り/書き込み用の JDBC コネクタの作成に使用されるトークンを生成します。

Active Directory モードでのコネクタのインスタンス化の例を次に示します。 スクリプトを実行する前に、? をご自分のアカウントの値に置き換えます。

# Note: '?' is a placeholder for a necessary user-specified value
connector_type = "com.microsoft.sqlserver.jdbc.spark"

url = "jdbc:sqlserver://master-p-svc;databaseName=?;integratedSecurity=true;authenticationScheme=JavaKerberos;" 
writer = df.write \ 
   .format(connector_type)\ 
   .mode("overwrite") 
   .option("url", url) \ 
   .option("principal", ?) \ 
   .option("keytab", ?)   

writer.save() 

次のステップ

ビッグ データ クラスターの詳細については、Kubernetes に SQL Server ビッグ データ クラスター を展開する方法に関するページを参照してください

SQL Server ビッグ データ クラスターのフィードバックまたは機能に関する推奨事項がありますか? SQL Server ビッグ データ クラスターのフィードバックにメモを残してください。