Interroger des bases de données en utilisant JDBC

Azure Databricks prend en charge la connexion à des bases de données externes en utilisant JDBC. Cet article fournit la syntaxe de base pour la configuration et l’utilisation de ces connexions avec des exemples dans Python, SQL et Scala.

Notes

Vous pouvez préférer Lakehouse Federation pour la gestion des requêtes vers les systèmes de base de données externes. Consultez Qu’est-ce que la Fédération Lakehouse.

Partner Connect fournit des intégrations optimisées pour la synchronisation des données avec de nombreuses sources de données externes. Voir Qu’est-ce que Databricks Partner Connect ?.

Important

Les exemples de cet article n’incluent pas les noms d’utilisateur et les mots de passe dans les URL JDBC. Databricks recommande d’utiliser des secrets pour stocker vos informations d’identification de base de données. Par exemple :

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Pour référencer des secrets Databricks avec SQL, vous devez configurer une propriété de configuration Spark lors de l’initialisation du cluster.

Pour obtenir un exemple complet de la gestion des secrets, consultez Exemple de workflow des secrets.

Lecture des données avec JDBC

Vous devez configurer un certain nombre de paramètres pour lire des données à l’aide de JDBC. Notez que chaque base de données utilise un format différent pour le <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark lit automatiquement le schéma de la table de base de données et, en retour, il mappe ses types à des types Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Vous pouvez exécuter des requêtes sur cette table JDBC :

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Écriture des données avec JDBC

L’enregistrement de données dans des tableaux avec JDBC utilise des configurations similaires à la lecture. Voir l’exemple suivant :

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Le comportement par défaut consiste à créer un nouveau tableau et à générer un message d’erreur si un tableau du même nom existe déjà.

Vous pouvez ajouter des données à un tableau existant à l’aide de la syntaxe suivante :

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Vous pouvez remplacer un tableau existant en utilisant la syntaxe suivante :

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Parallélisme de contrôle pour les requêtes JDBC

Par défaut, le pilote JDBC interroge la base de données source avec un seul thread. Pour améliorer les performances des lectures, vous devez spécifier un certain nombre d’options pour contrôler le nombre de requêtes simultanées effectuées par Azure Databricks dans votre base de données. Pour les petits clusters, paramétrer que l’option numPartitions égale au nombre de cœurs d’exécuteur de votre cluster garantit que tous les nœuds interrogent des données en parallèle.

Avertissement

Paramétrer numPartitions à une valeur élevée sur un cluster volumineux peut entraîner des performances négatives pour la base de données distante, car trop de requêtes simultanées peuvent surcharger le service. Cela est particulièrement problématique pour les bases de données d’application. Soyez prudent de paramétrer cette valeur au delà de 50.

Notes

Accélérez les requêtes en sélectionnant une colonne avec un index calculé dans la base de données source pour partitionColumn.

L’exemple de code suivant illustre la configuration du parallélisme pour un cluster avec huit cœurs :

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Notes

Azure Databricks prend en charge toutes les options Apache Spark pour la configuration de JDBC.

Lors de l’écriture dans des bases de données à l’aide de JDBC, Apache Spark utilise le nombre de partitions en mémoire pour contrôler le parallélisme. Vous pouvez répartir les données avant d’écrire pour contrôler le parallélisme. Évitez un nombre élevé de partitions sur des clusters volumineux pour éviter d’surcharger votre base de données distante. L’exemple suivant illustre le repartitionnement sur huit partitions avant d’écrire :

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Pousser une requête vers le moteur de base de données

Vous pouvez pousser une requête entière vers la base de données et retourner uniquement le résultat. Le paramètre table identifie la table JDBC à lire. Vous pouvez utiliser tout élément valide dans la clause FROM d’une requête SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Nombre de lignes extraites par requête

Les pilotes JDBC ont un paramètre fetchSize qui contrôle le nombre de lignes pouvant être récupérées simultanément de la base de données distante.

Paramètre Résultats
Trop faible Latence élevée due à de nombreux allers-retours (quelques lignes renvoyées par requête)
Trop élevé Erreur de mémoire insuffisante (trop de données renvoyées dans une requête)

La valeur optimale dépend de la charge de travail. Éléments à prendre en compte :

  • Combien de colonnes sont renvoyées par la requête ?
  • Quels types de données sont retournés ?
  • Combien de temps les chaînes de chaque colonne sont-elles renvoyées ?

Les systèmes peuvent avoir un tout petit défaut et bénéficier du réglage. Par exemple : La valeur par défaut fetchSize d’Oracle est 10. L’augmentation à 100 réduit le nombre total de requêtes qui doivent être exécutées par un facteur de 10. Les résultats JDBC sont le trafic réseau, donc évitez de très grands nombres, mais les valeurs optimales peuvent atteindre les milliers pour de nombreux jeux de données.

Utilisez l’option fetchSize, comme dans l’exemple suivant :

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()