Wykonywanie zapytań dotyczących baz danych przy użyciu JDBC

Usługa Azure Databricks obsługuje nawiązywanie połączeń z zewnętrznymi bazami danych przy użyciu protokołu JDBC. Ten artykuł zawiera podstawową składnię konfigurowania i używania tych połączeń z przykładami w językach Python, SQL i Scala.

Uwaga

Możesz preferować federację Lakehouse do zarządzania zapytaniami w zewnętrznych systemach baz danych. Zobacz Co to jest Federacja Lakehouse.

Partner Połączenie zapewnia zoptymalizowane integracje na potrzeby synchronizowania danych z wieloma zewnętrznymi źródłami danych. Zobacz Co to jest Połączenie partner usługi Databricks?.

Ważne

Przykłady w tym artykule nie zawierają nazw użytkowników i haseł w adresach URL JDBC. Usługa Databricks zaleca używanie wpisów tajnych do przechowywania poświadczeń bazy danych. Na przykład:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Aby odwołać się do wpisów tajnych usługi Databricks w języku SQL, należy skonfigurować właściwość konfiguracji platformy Spark podczas inicjowania klastra.

Pełny przykład zarządzania wpisami tajnymi można znaleźć w temacie Przykład przepływu pracy wpisu tajnego.

Odczytywanie danych za pomocą JDBC

Należy skonfigurować wiele ustawień do odczytywania danych przy użyciu JDBC. Należy pamiętać, że każda baza danych używa innego formatu dla elementu <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Platforma Spark automatycznie odczytuje schemat z tabeli bazy danych i mapuje jej typy z powrotem na typy Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Zapytania można uruchamiać względem tej tabeli JDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Zapisywanie danych za pomocą JDBC

Zapisywanie danych w tabelach za pomocą JDBC używa podobnych konfiguracji do odczytywania. Zobacz poniższy przykład:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Domyślne zachowanie próbuje utworzyć nową tabelę i zgłasza błąd, jeśli tabela o tej nazwie już istnieje.

Dane można dołączać do istniejącej tabeli przy użyciu następującej składni:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Istniejącą tabelę można zastąpić przy użyciu następującej składni:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Sterowanie równoległością zapytań JDBC

Domyślnie sterownik JDBC wysyła zapytanie do źródłowej bazy danych tylko z jednym wątkiem. Aby zwiększyć wydajność operacji odczytu, należy określić szereg opcji kontrolowania liczby równoczesnych zapytań usługi Azure Databricks do bazy danych. W przypadku małych klastrów ustawienie numPartitions opcji równej liczbie rdzeni funkcji wykonawczej w klastrze gwarantuje, że wszystkie węzły wykonują zapytania dotyczące danych równolegle.

Ostrzeżenie

Ustawienie numPartitions na dużą wartość w dużym klastrze może spowodować ujemną wydajność zdalnej bazy danych, ponieważ zbyt wiele równoczesnych zapytań może przeciążyć usługę. Jest to szczególnie kłopotliwe w przypadku baz danych aplikacji. Należy uważać, aby ustawić tę wartość powyżej 50.

Uwaga

Przyspieszanie zapytań przez wybranie kolumny z indeksem obliczonym w źródłowej bazie danych dla elementu partitionColumn.

W poniższym przykładzie kodu pokazano konfigurowanie równoległości dla klastra z ośmioma rdzeniami:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Uwaga

Usługa Azure Databricks obsługuje wszystkie opcje platformy Apache Spark do konfigurowania JDBC.

Podczas zapisywania w bazach danych przy użyciu JDBC platforma Apache Spark używa liczby partycji w pamięci do kontrolowania równoległości. Dane można ponownie partycjonować przed zapisaniem w celu kontrolowania równoległości. Unikaj dużej liczby partycji w dużych klastrach, aby uniknąć przeciążenia zdalnej bazy danych. W poniższym przykładzie pokazano ponowne partycjonowanie do ośmiu partycji przed zapisaniem:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Wypychanie zapytania do aparatu bazy danych

Możesz wypchnąć całe zapytanie do bazy danych i zwrócić tylko wynik. Parametr table identyfikuje tabelę JDBC do odczytania. Możesz użyć dowolnego elementu prawidłowego w klauzuli zapytania FROM SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Kontrolowanie liczby wierszy pobranych na zapytanie

Sterowniki JDBC mają fetchSize parametr, który kontroluje liczbę wierszy pobranych w czasie z zdalnej bazy danych.

Ustawienie Result
Za mało Duże opóźnienie ze względu na wiele rund (kilka wierszy zwracanych na zapytanie)
Zbyt wysoka Błąd braku pamięci (za dużo danych zwracanych w jednym zapytaniu)

Optymalna wartość jest zależna od obciążenia. Zagadnienia obejmują:

  • Ile kolumn jest zwracanych przez zapytanie?
  • Jakie typy danych są zwracane?
  • Jak długo są zwracane ciągi w każdej kolumnie?

Systemy mogą mieć bardzo małe wartości domyślne i korzystać z dostrajania. Na przykład: wartość domyślna fetchSize oracle to 10. Zwiększenie go do 100 zmniejsza liczbę całkowitych zapytań, które muszą być wykonywane przez współczynnik 10. Wyniki JDBC są ruchem sieciowym, dlatego unikaj bardzo dużych liczb, ale optymalne wartości mogą znajdować się w tysiącach dla wielu zestawów danych.

fetchSize Użyj opcji , jak w poniższym przykładzie:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()