使用 JDBC 查詢資料庫
Azure Databricks 支援使用 JDBC 連線到外部資料庫。 本文提供基本語法,以搭配 Python、SQL 和 Scala 中的範例來設定和使用這些連線。
注意
您可能偏好使用 Lakehouse 同盟來管理外部資料庫系統的查詢。 請參閱 什麼是 Lakehouse 同盟。
合作夥伴 連線 提供優化整合,以便與許多外部數據源同步處理數據。 請參閱什麼是 Databricks 合作夥伴 連線?。
重要
本文中的範例不包含 JDBC URL 中的使用者名稱和密碼。 Databricks 建議使用 秘密 來儲存資料庫認證。 例如:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
若要使用 SQL 參考 Databricks 秘密,您必須 在叢集初始化期間設定 Spark 組態屬性。
如需秘密管理的完整範例,請參閱 秘密工作流程範例。
使用 JDBC 讀取數據
您必須設定一些設定,以使用 JDBC 讀取數據。 請注意,每個資料庫都會針對 <jdbc-url>
使用不同的格式。
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark 會自動從資料庫數據表讀取架構,並將其類型對應回 Spark SQL 類型。
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
您可以針對此 JDBC 資料表執行查詢:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
使用 JDBC 寫入數據
使用 JDBC 將資料儲存至資料表時,會使用類似的設定來讀取。 請參閱下列範例:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
默認行為會嘗試建立新的數據表,並在具有該名稱的數據表已經存在時擲回錯誤。
您可以使用下列語法將資料附加至現有的資料表:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
您可以使用下列語法覆寫現有的資料表:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
控制 JDBC 查詢的平行處理原則
根據預設,JDBC 驅動程式只會使用單一線程查詢源資料庫。 若要改善讀取的效能,您必須指定一些選項,以控制 Azure Databricks 對資料庫進行多少同時查詢。 針對小型叢集,將 選項設定 numPartitions
為等於叢集中的執行程式核心數目,可確保所有節點都會平行查詢數據。
警告
將 設定 numPartitions
為大型叢集上的高值可能會導致遠端資料庫的負面效能,因為太多同時查詢可能會使服務不堪重負。 對於應用程式資料庫來說,這特別麻煩。 請謹慎設定此值高於 50。
注意
選取在源資料庫中 partitionColumn
計算之索引的數據行,以加速查詢。
下列程式代碼範例示範如何為具有八個核心的叢集設定平行處理原則:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
注意
Azure Databricks 支援所有 Apache Spark 選項來設定 JDBC。
使用 JDBC 寫入資料庫時,Apache Spark 會使用記憶體中的數據分割數目來控制平行處理原則。 您可以先重新分割數據,再寫入以控制平行處理原則。 避免大型叢集上的大量分割區,以避免讓遠端資料庫壓倒性。 下列範例示範在寫入之前重新分割至八個分割區:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
將查詢向下推送至資料庫引擎
您可以將整個查詢向下推送至資料庫,並只傳回結果。 參數 table
會識別要讀取的 JDBC 資料表。 您可以使用任何在 SQL 查詢 FROM
子句中有效的專案。
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
控制每個查詢擷取的數據列數目
JDBC 驅動程式具有 fetchSize
參數,可控制一次從遠端資料庫擷取的數據列數目。
設定 | 結果 |
---|---|
太低 | 由於往返次數過多而造成高延遲(每個查詢傳回的數據列很少) |
太高 | 記憶體不足錯誤(在一個查詢中傳回太多資料) |
最佳值取決於工作負載。 考慮包括:
- 查詢會傳回多少個數據行?
- 傳回哪些數據類型?
- 每個數據行中的字串傳回多久時間?
系統可能會有非常小的預設值,並受益於微調。 例如:Oracle 的預設值 fetchSize
為 10。 將它增加至 100,可減少需要以 10 乘以 10 執行的總查詢數目。 JDBC 結果為網路流量,因此請避免非常大量,但最佳值可能位於數千個數據集中。
fetchSize
使用 選項,如下列範例所示:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()