JDBC kullanarak veritabanlarını sorgulama

Azure Databricks, JDBC kullanarak dış veritabanlarına bağlanmayı destekler. Bu makalede Python, SQL ve Scala'daki örneklerle bu bağlantıları yapılandırmak ve kullanmak için temel söz dizimi sağlanır.

Not

Sorguları dış veritabanı sistemlerine yönetmek için Lakehouse Federasyonu'nu tercih edebilirsiniz. Bkz . Lakehouse Federasyonu nedir?

İş ortağı Bağlan, verileri birçok dış dış veri kaynağıyla eşitlemek için iyileştirilmiş tümleştirmeler sağlar. Bkz. Databricks İş Ortağı Bağlan nedir?.

Önemli

Bu makaledeki örnekler, JDBC URL'lerindeki kullanıcı adlarını ve parolaları içermez. Databricks, veritabanı kimlik bilgilerinizi depolamak için gizli dizilerin kullanılmasını önerir. Örneğin:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

SQL ile Databricks gizli dizilerine başvurmak için küme başlatma sırasında bir Spark yapılandırma özelliği yapılandırmanız gerekir.

Gizli dizi yönetiminin tam örneği için bkz . Gizli dizi iş akışı örneği.

JDBC ile verileri okuma

JDBC kullanarak verileri okumak için bir dizi ayar yapılandırmanız gerekir. Her veritabanının için <jdbc-url>farklı bir biçim kullandığını unutmayın.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark, veritabanı tablosundaki şemayı otomatik olarak okur ve türlerini Spark SQL türlerine geri eşler.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Bu JDBC tablosunda sorgu çalıştırabilirsiniz:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

JDBC ile veri yazma

JDBC ile tablolara veri kaydetmek, okumak için benzer yapılandırmalar kullanır. Aşağıdaki örneğe bakın:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Varsayılan davranış yeni bir tablo oluşturmayı dener ve bu ada sahip bir tablo zaten varsa hata oluşturur.

Aşağıdaki söz dizimini kullanarak var olan bir tabloya veri ekleyebilirsiniz:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Aşağıdaki söz dizimini kullanarak varolan bir tablonun üzerine yazabilirsiniz:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

JDBC sorguları için paralelliği denetleme

Varsayılan olarak, JDBC sürücüsü kaynak veritabanını tek bir iş parçacığıyla sorgular. Okumaların performansını artırmak için Azure Databricks'in veritabanınızda kaç eşzamanlı sorgu yaptığını denetlemek için bir dizi seçenek belirtmeniz gerekir. Küçük kümeler için, seçeneğini kümenizdeki yürütücü çekirdeği sayısına eşit olarak ayarlamak numPartitions , tüm düğümlerin verileri paralel olarak sorgulamasını sağlar.

Uyarı

Büyük bir kümede yüksek bir değere ayar numPartitions yapmak uzak veritabanı için olumsuz performansa neden olabilir, çok fazla eşzamanlı sorgu hizmeti bunaltabilir. Bu özellikle uygulama veritabanları için zahmetlidir. Bu değeri 50'nin üzerine ayarlamaya karşı tedbirli olun.

Not

için kaynak veritabanında hesaplanmış dizini olan bir sütun seçerek sorguları hızlandırın partitionColumn.

Aşağıdaki kod örneği sekiz çekirdekli bir küme için paralellik yapılandırmayı gösterir:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Not

Azure Databricks, JDBC'yi yapılandırmak için tüm Apache Spark seçeneklerini destekler.

JDBC kullanarak veritabanlarına yazarken Apache Spark, paralelliği denetlemek için bellekteki bölüm sayısını kullanır. Paralelliği denetlemek için yazmadan önce verileri yeniden bölümleyebilirsiniz. Uzak veritabanınızı bunaltmamak için büyük kümelerde yüksek sayıda bölüm kullanmaktan kaçının. Aşağıdaki örnekte, yazmadan önce sekiz bölüme yeniden bölümleme gösterilmektedir:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Sorguyu veritabanı altyapısına gönderme

Sorgunun tamamını veritabanına göndererek yalnızca sonucu döndürebilirsiniz. table parametresi okunacak JDBC tablosunu tanımlar. SQL sorgu FROM yan tümcesinde geçerli olan her şeyi kullanabilirsiniz.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Sorgu başına getirilen satır sayısını denetleme

JDBC sürücüleri, uzak veritabanından bir fetchSize kerede getirilen satır sayısını denetleen bir parametreye sahiptir.

Ayar Sonuç
Çok düşük Çok sayıda gidiş dönüş nedeniyle yüksek gecikme süresi (sorgu başına birkaç satır döndürülür)
Çok yüksek Yetersiz bellek hatası (bir sorguda çok fazla veri döndürüldü)

En uygun değer iş yüküne bağımlıdır. Dikkat edilmesi gerekenler şunlardır:

  • Sorgu tarafından kaç sütun döndürülür?
  • Hangi veri türleri döndürülür?
  • Her sütundaki dizeler ne kadar süreyle döndürülür?

Sistemler çok küçük bir varsayılan değere sahip olabilir ve ayarlamadan yararlanabilir. Örneğin: Oracle'ın varsayılan değeri fetchSize 10'dur. Bu değerin 100'e yükseltilmesi, 10 faktörü tarafından yürütülmesi gereken toplam sorgu sayısını azaltır. JDBC sonuçları ağ trafiğidir, bu nedenle çok büyük sayılardan kaçının, ancak en uygun değerler birçok veri kümesi için binlerde olabilir.

fetchSize Aşağıdaki örnekte olduğu gibi seçeneğini kullanın:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()