Bases de données SQL à l’aide de JDBC SQL Databases using JDBC

Databricks Runtime contient des pilotes JDBC pour Microsoft SQL Server et Azure SQL Database.Databricks Runtime contains JDBC drivers for Microsoft SQL Server and Azure SQL Database. Consultez les notes de publication du runtime Databricks pour obtenir la liste complète des bibliothèques JDBC incluses dans Databricks Runtime.See the Databricks runtime release notes for the complete list of JDBC libraries included in Databricks Runtime.

Cet article explique comment utiliser l’API tableau pour se connecter à des bases de données SQL à l’aide de JDBC et comment contrôler le parallélisme des lectures par le biais de l’interface JDBC.This article covers how to use the DataFrame API to connect to SQL databases using JDBC and how to control the parallelism of reads through the JDBC interface. Cet article fournit des exemples détaillés à l’aide de l’API Scala, avec des exemples abrégés de Python et Spark SQL à la fin.This article provides detailed examples using the Scala API, with abbreviated Python and Spark SQL examples at the end. Pour obtenir tous les arguments pris en charge pour la connexion aux bases de données SQL à l’aide de JDBC, consultez JDBC vers d’autres bases de données.For all of the supported arguments for connecting to SQL databases using JDBC, see JDBC To Other Databases.

Notes

Une autre option pour se connecter à SQL Server et Azure SQL Database est le connecteur Apache Spark.Another option for connecting to SQL Server and Azure SQL Database is the Apache Spark connector. Il peut fournir des insertions en bloc plus rapides et vous permet de vous connecter à l’aide de votre identité Azure Active Directory.It can provide faster bulk inserts and lets you connect using your Azure Active Directory identity.

Important

Les exemples de cet article n’incluent pas les noms d’utilisateur et les mots de passe dans les URL JDBC.The examples in this article do not include usernames and passwords in JDBC URLs. Au lieu de cela, il s’attend à ce que vous suiviez le Guide de l’utilisateur de la gestion des secrets pour stocker vos informations d’identification de base de données en tant que secrets, puis les utiliser dans un Notebook pour remplir vos informations d’identification dans un java.util.Properties objet.Instead it expects that you follow the Secret management user guide to store your database credentials as secrets, and then leverage them in a notebook to populate your credentials in a java.util.Properties object. Par exemple :For example:

val jdbcUsername = dbutils.secrets.get(scope = "jdbc", key = "username")
val jdbcPassword = dbutils.secrets.get(scope = "jdbc", key = "password")

Pour obtenir un exemple complet de la gestion des secrets, consultez exemple de flux de travail secret.For a full example of secret management, see Secret workflow example.

Établir la connectivité à SQL ServerEstablish connectivity to SQL Server

Cet exemple interroge SQL Server à l’aide de son pilote JDBC.This example queries SQL Server using its JDBC driver.

Étape 1 : vérifier que le pilote JDBC est disponibleStep 1: Check that the JDBC driver is available

Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

Étape 2 : créer l’URL JDBCStep 2: Create the JDBC URL

val jdbcHostname = "<hostname>"
val jdbcPort = 1433
val jdbcDatabase = "<database>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

Étape 3 : vérifier la connectivité à la base de données SQLServerStep 3: Check connectivity to the SQLServer database

val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

Lire des données à partir de JDBCRead data from JDBC

Cette section charge des données à partir d’une table de base de données.This section loads data from a database table. Cela utilise une seule connexion JDBC pour extraire la table dans l’environnement Spark.This uses a single JDBC connection to pull the table into the Spark environment. Pour obtenir des lectures parallèles, consultez gérer le parallélisme.For parallel reads, see Manage parallelism.

val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)

Spark lit automatiquement le schéma à partir de la table de base de données et remappe ses types à des types SQL Spark.Spark automatically reads the schema from the database table and maps its types back to Spark SQL types.

employees_table.printSchema

Vous pouvez exécuter des requêtes sur cette table JDBC :You can run queries against this JDBC table:

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Écrire des données dans JDBCWrite data to JDBC

Cette section montre comment écrire des données dans une base de données à partir d’une table SQL Spark existante nommée diamonds .This section shows how to write data to a database from an existing Spark SQL table named diamonds.

select * from diamonds limit 5

Le code suivant enregistre les données dans une table de base de données nommée diamonds .The following code saves the data into a database table named diamonds. L’utilisation de noms de colonnes qui sont des mots clés réservés peut déclencher une exception.Using column names that are reserved keywords can trigger an exception. L’exemple de table possède une colonne nommée table , ce qui vous permet de la renommer withColumnRenamed() avant de l’envoyer à l’API JDBC.The example table has column named table, so you can rename it with withColumnRenamed() prior to pushing it to the JDBC API.

spark.table("diamonds").withColumnRenamed("table", "table_number")
     .write
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Spark crée automatiquement une table de base de données avec le schéma approprié déterminé à partir du schéma tableau.Spark automatically creates a database table with the appropriate schema determined from the DataFrame schema.

Le comportement par défaut consiste à créer une nouvelle table et à générer un message d’erreur s’il existe déjà une table portant le même nom.The default behavior is to create a new table and to throw an error message if a table with the same name already exists. Vous pouvez utiliser la fonctionnalité Spark SQL SaveMode pour modifier ce comportement.You can use the Spark SQL SaveMode feature to change this behavior. Par exemple, voici comment ajouter des lignes à la table :For example, here’s how to append more rows to the table:

import org.apache.spark.sql.SaveMode

spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
  .write
  .mode(SaveMode.Append) // <--- Append to the existing table
  .jdbc(jdbcUrl, "diamonds", connectionProperties)

Vous pouvez également remplacer une table existante :You can also overwrite an existing table:

spark.table("diamonds").withColumnRenamed("table", "table_number")
  .write
  .mode(SaveMode.Overwrite) // <--- Overwrite the existing table
  .jdbc(jdbcUrl, "diamonds", connectionProperties)

Pousser une requête vers le moteur de base de donnéesPush down a query to the database engine

Vous pouvez envoyer une requête entière à la base de données et retourner uniquement le résultat.You can push down an entire query to the database and return just the result. Le table paramètre identifie la table JDBC à lire.The table parameter identifies the JDBC table to read. Vous pouvez utiliser tout ce qui est valide dans une clause de requête SQL FROM .You can use anything that is valid in a SQL query FROM clause.

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

Optimisation PushPush down optimization

En plus de l’ingestion d’une table entière, vous pouvez envoyer une requête vers la base de données pour l’exploiter en vue de son traitement et retourner uniquement les résultats.In addition to ingesting an entire table, you can push down a query to the database to leverage it for processing, and return only the results.

// Explain plan with no column selection returns all columns
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).explain(true)

Vous pouvez nettoyer les colonnes et dépiler les prédicats de requête dans la base de données avec des DataFrame méthodes.You can prune columns and pushdown query predicates to the database with DataFrame methods.

// Explain plan with column selection will prune columns and just return the ones specified
// Notice that only the 3 specified columns are in the explain plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").explain(true)

// You can push query predicates down too
// Notice the filter at the top of the physical plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").where("cut = 'Good'").explain(true)

Gérer le parallélisme Manage parallelism

Dans l’interface utilisateur Spark, vous pouvez voir que le numPartitions dicte le nombre de tâches qui sont lancées.In the Spark UI, you can see that the numPartitions dictate the number of tasks that are launched. Chaque tâche est répartie entre les exécuteurs, ce qui peut augmenter le parallélisme des lectures et des écritures par le biais de l’interface JDBC.Each task is spread across the executors, which can increase the parallelism of the reads and writes through the JDBC interface. Consultez le Guide de programmation de Spark SQL pour d’autres paramètres, tels que fetchsize , qui peuvent vous aider à améliorer les performances.See the Spark SQL programming guide for other parameters, such as fetchsize, that can help with performance.

Lectures JDBCJDBC reads

Vous pouvez fournir des limites de fractionnement en fonction des valeurs de colonne du jeu de données.You can provide split boundaries based on the dataset’s column values.

Ces options spécifient le parallélisme lors de la lecture.These options specify the parallelism on read. Ces options doivent toutes être spécifiées si l’une d’entre elles est spécifiée.These options must all be specified if any of them is specified. lowerBound et upperBound déterminent le Stride de la partition, mais ne filtrent pas les lignes dans la table.lowerBound and upperBound decide the partition stride, but do not filter the rows in table. Par conséquent, Spark partitionne et retourne toutes les lignes de la table.Therefore, Spark partitions and returns all rows in the table.

L’exemple suivant fractionne la table lue entre les exécuteurs de la emp_no colonne à l’aide des columnName lowerBound paramètres,, upperBound et numPartitions .The following example splits the table read across executors on the emp_no column using the columnName, lowerBound, upperBound, and numPartitions parameters.

val df = (spark.read.jdbc(url=jdbcUrl,
  table="employees",
  columnName="emp_no",
  lowerBound=1L,
  upperBound=100000L,
  numPartitions=100,
  connectionProperties=connectionProperties))
display(df)

Écritures JDBCJDBC writes

Les partitions de Spark déterminent le nombre de connexions utilisées pour transmettre des données via l’API JDBC.Spark’s partitions dictate the number of connections used to push data through the JDBC API. Vous pouvez contrôler le parallélisme en appelant coalesce(<N>) ou repartition(<N>) en fonction du nombre de partitions existant.You can control the parallelism by calling coalesce(<N>) or repartition(<N>) depending on the existing number of partitions. Appelez coalesce lorsque vous réduisez le nombre de partitions et repartition lorsque vous augmentez le nombre de partitions.Call coalesce when reducing the number of partitions, and repartition when increasing the number of partitions.

import org.apache.spark.sql.SaveMode

val df = spark.table("diamonds")
println(df.rdd.partitions.length)

// Given the number of partitions above, you can reduce the partition value by calling coalesce() or increase it by calling repartition() to manage the number of connections.
df.repartition(10).write.mode(SaveMode.Append).jdbc(jdbcUrl, "diamonds", connectionProperties)

Exemple PythonPython example

Les exemples python suivants couvrent certaines des mêmes tâches que celles fournies pour Scala.The following Python examples cover some of the same tasks as those provided for Scala.

Créer l’URL JDBC Create the JDBC URL

jdbcHostname = "<hostname>"
jdbcDatabase = "employees"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)

Vous pouvez transmettre un dictionnaire qui contient les informations d’identification et la classe de pilote, comme dans l’exemple Scala précédent.You can pass in a dictionary that contains the credentials and driver class similar to the preceding Scala example.

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Pousser une requête vers le moteur de base de donnéesPush down a query to the database engine

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

Lire à partir de connexions JDBC sur plusieurs WorkersRead from JDBC connections across multiple workers

df = spark.read.jdbc(url=jdbcUrl, table="employees", column="emp_no", lowerBound=1, upperBound=100000, numPartitions=100)
display(df)

Exemple Spark SQLSpark SQL example

Vous pouvez définir une vue ou une table SQL Spark qui utilise une connexion JDBC.You can define a Spark SQL table or view that uses a JDBC connection. Pour plus d’informations, consultezFor details, see

CREATE TABLE <jdbcTable>
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:<databaseServerType>://<jdbcHostname>:<jdbcPort>",
  dbtable "<jdbcDatabase>.atable",
  user "<jdbcUsername>",
  password "<jdbcPassword>"
)

Ajoutez des données dans la table de base de données à l’aide de Spark SQL :Append data into the database table using Spark SQL:

INSERT INTO diamonds
SELECT * FROM diamonds LIMIT 10 -- append 10 records to the table

SELECT count(*) record_count FROM diamonds --count increased by 10

Remplacez les données dans la table de base de données à l’aide de Spark SQL.Overwrite data in the database table using Spark SQL. Ainsi, la base de données supprime et crée la diamonds table :This causes the database to drop and create the diamonds table:

INSERT OVERWRITE TABLE diamonds
SELECT carat, cut, color, clarity, depth, TABLE AS table_number, price, x, y, z FROM diamonds

SELECT count(*) record_count FROM diamonds --count returned to original value (10 less)

Optimiser les performances lors de la lecture des donnéesOptimize performance when reading data

Si vous tentez de lire des données à partir d’une base de données JDBC externe, cette section contient des suggestions pour améliorer les performances.If you’re attempting to read data from an external JDBC database and it’s slow, this section contains some suggestions to improve performance.

Déterminer si le déchargement JDBC se produit en parallèleDetermine whether the JDBC unload is occurring in parallel

Pour charger des données en parallèle, la source de données JDBC Spark doit être configurée avec les informations de partitionnement appropriées afin de pouvoir émettre plusieurs requêtes simultanées à la base de données externe.In order to load data in parallel, the Spark JDBC data source must be configured with appropriate partitioning information so that it can issue multiple concurrent queries to the external database. Si vous négligez de configurer le partitionnement, toutes les données sont récupérées sur le pilote à l’aide d’une seule requête JDBC qui risque de provoquer la levée d’une exception insuffisance par le pilote.If you neglect to configure partitioning, then all data will be fetched on the driver using a single JDBC query which runs the risk of causing the driver to throw an OOM exception.

Voici un exemple de lecture JDBC sans partitionnement configuré :Here’s an example of a JDBC read without partitioning configured:

Lecture JDBC sans partitionnementJDBC read without partitioning

Il existe deux API pour spécifier le partitionnement, le niveau supérieur et le niveau bas.There are two APIs for specifying partitioning, high level and low level.

L' API de haut niveau prend le nom d’une colonne numérique ( columnName ), deux points de terminaison de plage ( lowerBound , upperBound ) et une cible numPartitions et génère des tâches Spark en fractionnant uniformément la plage spécifiée en numPartitions tâches.The high level API takes the name of a numeric column (columnName), two range endpoints (lowerBound, upperBound) and a target numPartitions and generates Spark tasks by evenly splitting the specified range into numPartitions tasks. Cela fonctionne bien si votre table de base de données a une colonne numérique indexée avec des valeurs équitablement distribuées, telles qu’une clé primaire à incrémentation automatique ; Cela fonctionne un peu moins bien si la colonne numérique est extrêmement inclinée, conduisant à des tâches déséquilibrées.This work well if your database table has an indexed numeric column with fairly evenly-distributed values, such as an auto-incrementing primary key; it works somewhat less well if the numeric column is extremely skewed, leading to imbalanced tasks.

L' API de bas niveau, accessible dans Scala, accepte un tableau de WHERE conditions qui peuvent être utilisées pour définir des partitions personnalisées : cela est utile pour le partitionnement sur des colonnes non numériques ou pour la gestion de l’inclinaison.The low level API, accessible in Scala, accepts an array of WHERE conditions that can be used to define custom partitions: this is useful for partitioning on non-numeric columns or for dealing with skew. Quand vous définissez des partitions personnalisées, n’oubliez pas de prendre en compte NULL le moment où les colonnes de partition acceptent les valeurs NULL.When defining custom partitions, do not forget to consider NULL when the partition columns are Nullable. Nous vous déconseillons de définir manuellement des partitions à l’aide de plus de deux colonnes, car l’écriture des prédicats de limite requiert une logique bien plus complexe.We do not suggest that you manually define partitions using more than two columns since writing the boundary predicates require much more complex logic.

Voici un exemple de lecture JDBC avec le partitionnement configuré.Here’s an example of a JDBC read with partitioning configured. Notez que l’ajout d’une colonne numérique ( partitionColumn qui est columnName passée en tant qu’option source JDBC), de deux points de terminaison de plage ( lowerBound , upperBound ) et du numPartitions paramètre spécifiant le nombre maximal de partitions.Note the addition of a numeric column (partitionColumn – which is how columnName is passed as a JDBC source option), two range endpoints (lowerBound, upperBound) and the numPartitions parameter specifying the maximum number of partitions.

Lecture JDBC avec partitionnementJDBC read with partitioning

Pour plus d’informations, consultez gérer le parallélisme.For more information, see Manage parallelism.

Réglage du fetchSize paramètre JDBCTune the JDBC fetchSize parameter

Les pilotes JDBC ont un fetchSize paramètre qui contrôle le nombre de lignes extraites à la fois à partir de la base de données JDBC distante.JDBC drivers have a fetchSize parameter that controls the number of rows fetched at a time from the remote JDBC database. Si cette valeur est trop faible, votre charge de travail peut devenir liée à la latence en raison d’un grand nombre de requêtes d’aller-retour entre Spark et la base de données externe afin d’extraire le jeu de résultats complet.If this value is set too low then your workload may become latency-bound due to a high number of roundtrip requests between Spark and the external database in order to fetch the full result set. Si cette valeur est trop élevée, vous risquez de insuffisances.If this value is too high then you risk OOMs. La valeur optimale dépend de la charge de travail (car elle dépend du schéma de résultat, de la taille des chaînes dans les résultats, etc.), mais l’augmentation de la valeur par défaut peut entraîner des gains de performances considérables.The optimal value will be workload dependent (since it depends on the result schema, sizes of strings in results, and so on), but increasing it even slightly from the default can result in huge performance gains.

La valeur par défaut d’Oracle fetchSize est 10.Oracle’s default fetchSize is 10. L’augmentation de la valeur, jusqu’à 100, permet d’obtenir des gains de performances considérables, et de passer à une valeur plus élevée, par exemple 2000, apporte une amélioration supplémentaire.Increasing it even slightly, to 100, gives massive performance gains, and going up to a higher value, like 2000, gives an additional improvement. Par exemple :For example:

PreparedStatement stmt = null;
ResultSet rs = null;

try {
  stmt = conn. prepareStatement("select a, b, c from table");
  stmt.setFetchSize(100);

  rs = stmt.executeQuery();
  while (rs.next()) {
    ...
  }
}

Pour plus d’informations sur ce paramètre de paramétrage pour les pilotes Oracle JDBC, consultez rendre votre exécution de Java plus rapide .See Make your java run faster for a more general discussion of this tuning parameter for Oracle JDBC drivers.

Tenez compte de l’impact des indexConsider the impact of indexes

Si vous lisez en parallèle (à l’aide de l’une des techniques de partitionnement), Spark émet des requêtes simultanées à la base de données JDBC.If you are reading in parallel (using one of the partitioning techniques) Spark issues concurrent queries to the JDBC database. Si ces requêtes finissent par nécessiter des analyses de tables complètes, cela risque d’affecter le goulot d’étranglement dans la base de données distante et de devenir extrêmement lent.If these queries end up requiring full table scans this could end up bottlenecking in the remote database and become extremely slow. Par conséquent, vous devez prendre en compte l’impact des index lors du choix d’une colonne de partitionnement et choisir une colonne de sorte que les requêtes de chaque partition puissent être exécutées raisonnablement efficacement en parallèle.Thus you should consider the impact of indexes when choosing a partitioning column and pick a column such that the individual partitions’ queries can be executed reasonably efficiently in parallel.

Important

Assurez-vous que la base de données a un index sur la colonne de partitionnement.Make sure that the database has an index on the partitioning column.

Lorsqu’un index à une seule colonne n’est pas défini sur la table source, vous pouvez toujours choisir la colonne de début (la plus à gauche) dans un index composite comme colonne de partitionnement.When a single-column index is not defined on the source table, you still can choose the leading(leftmost) column in a composite index as the partitioning column. Lorsque seuls les index composites sont disponibles, la plupart des bases de données peuvent utiliser un index concaténé lors de la recherche avec les colonnes de début (les plus à gauche).When only composite indexes are available, most databases can use a concatenated index when searching with the leading (leftmost) columns. Par conséquent, la colonne de début d’un index à plusieurs colonnes peut également être utilisée comme colonne de partitionnement.Thus, the leading column in a multi-column index can also be used as a partitioning column.

Déterminez si le nombre de partitions est appropriéConsider whether the number of partitions is appropriate

Utilisation d’un trop grand nombre de partitions lors de la lecture de la base de données externe risque de surcharger cette base de données avec un trop grand nombre de requêtes.Using too many partitions when reading from the external database risks overloading that database with too many queries. La plupart des systèmes SGBD ont des limites sur les connexions simultanées.Most DBMS systems have limits on the concurrent connections. Pour commencer, veillez à ce que le nombre de partitions soit proche du nombre de cœurs/emplacements de tâches dans votre cluster Spark afin d’optimiser le parallélisme, tout en conservant le nombre total de requêtes plafonnées à une limite raisonnable.As a starting point, aim to have the number of partitions be close to the number of cores / task slots in your Spark cluster in order to maximize parallelism but keep the total number of queries capped at a reasonable limit. Si vous avez besoin d’un grand nombre de parallélismes après avoir extrait les lignes JDBC (car vous effectuez une opération liée au processeur dans Spark), mais que vous ne voulez pas envoyer trop de requêtes simultanées à votre base de données, envisagez d’utiliser une valeur inférieure numPartitions pour la lecture de JDBC, puis d’effectuer une opération explicite repartition() dans Spark.If you need lots of parallelism after fetching the JDBC rows (because you’re doing something CPU-bound in Spark) but don’t want to issue too many concurrent queries to your database then consider using a lower numPartitions for the JDBC read and then doing an explicit repartition() in Spark.

Prendre en compte les techniques de paramétrage propres à la base de donnéesConsider database-specific tuning techniques

Le fournisseur de base de données peut avoir un guide sur le réglage des performances pour les charges de travail ETL et d’accès en bloc.The database vendor may have a guide on tuning performance for ETL and bulk access workloads.