Consulta de bases de datos con JDBC

Azure Databricks admite la conexión a bases de datos externas mediante JDBC. En este artículo se proporciona la sintaxis básica para configurar y usar estas conexiones con ejemplos de Python, SQL y Scala.

Nota:

Es posible que prefiera la federación de Lakehouse para administrar consultas en sistemas de bases de datos externos. Consulte Qué es la Federación Lakehouse.

Partner Connect proporciona integraciones optimizadas para sincronizar datos con muchos orígenes de datos externos. Consulte ¿Qué es Databricks Partner Connect?

Importante

Los ejemplos de este artículo no incluyen nombres de usuario y contraseñas en las direcciones URL de JDBC. Databricks recomienda usar secretos para almacenar las credenciales de la base de datos. Por ejemplo:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Para hacer referencia a secretos de Databricks con SQL, debe configurar una propiedad de configuración de Spark durante la inicialización del clúster.

Para obtener un ejemplo completo de administración de secretos, consulte Ejemplo de flujo de trabajo secreto.

Lectura de datos con JDBC

Debe configurar una serie de opciones para leer datos mediante JDBC. Tenga en cuenta que cada base de datos usa un formato diferente para <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark lee automáticamente el esquema de la tabla de base de datos y asigna sus tipos de nuevo a los tipos de Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Puede ejecutar consultas en esta tabla JDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Escritura de datos con JDBC

Para guardar datos en tablas con JDBC, se usan configuraciones similares a las que se emplean para leer. Observe el ejemplo siguiente:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

El comportamiento predeterminado intenta crear una tabla y genera un error si ya existe una tabla con ese nombre.

Puede anexar datos a una tabla existente mediante la sintaxis siguiente:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Puede sobrescribir una tabla existente mediante la sintaxis siguiente:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Control del paralelismo para las consultas de JDBC

De forma predeterminada, el controlador de JDBC consulta la base de datos de origen con un único subproceso. Para mejorar el rendimiento de las lecturas, debe especificar una serie de opciones para controlar cuántas consultas simultáneas realiza Azure Databricks en la base de datos. En el caso de los clústeres pequeños, si establece la opción numPartitions en el número de núcleos de los ejecutores del clúster, garantizará que todos los nodos consulten los datos en paralelo.

Advertencia

Si establece numPartitions en un valor alto en un clúster grande, puede producirse un rendimiento negativo para la base de datos remota, ya que la realización de demasiadas consultas simultáneas puede sobrecargar el servicio. Esto es especialmente problemático para las bases de datos de aplicaciones. Tenga cuidado si establece este valor por encima de 50.

Nota:

Puede acelerar las consultas si selecciona una columna con un índice calculado en la base de datos de origen para partitionColumn.

En el ejemplo de código siguiente se muestra cómo configurar el paralelismo para un clúster con ocho núcleos:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Nota:

Azure Databricks admite todas las opciones de Apache Spark para configurar JDBC.

Al escribir en bases de datos mediante JDBC, Apache Spark usa el número de particiones en memoria para controlar el paralelismo. Puede volver a particionar los datos antes de escribir para controlar el paralelismo. Evite un gran número de particiones en clústeres grandes para evitar sobrecargar la base de datos remota. En el ejemplo siguiente se muestra cómo volver a particionar en ocho particiones antes de escribir:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Inserción de una consulta en el motor de base de datos

Puede insertar una consulta completa en la base de datos y devolver solo el resultado. El parámetro table identifica la tabla JDBC que se va a leer. Puede usar cualquier cosa que sea válida en una cláusula FROM de consulta SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Control del número de filas capturadas por consulta

Los controladores de JDBC tienen un parámetro fetchSize que controla el número de filas que se capturan a la vez de la base de datos remota.

Configuración Resultado
Demasiado bajo Latencia alta debido a muchos recorridos de ida y vuelta (pocas filas devueltas por consulta)
Demasiado alto Error de memoria insuficiente (demasiados datos devueltos en una consulta)

El valor óptimo depende de la carga de trabajo. Entre las consideraciones se incluyen las siguientes:

  • ¿Cuántas columnas devuelve la consulta?
  • ¿Qué tipos de datos se devuelven?
  • ¿Qué longitud tienen las cadenas de cada columna devuelta?

Los sistemas podrían tener un valor predeterminado muy pequeño y beneficiarse de su ajuste. Por ejemplo, el valor predeterminado de fetchSize de Oracle es 10. Si se aumenta a 100, se reduce el número de consultas totales que deben ejecutarse en un factor de 10. Los resultados de JDBC son tráfico de red, por lo que conviene evitar números muy grandes, aunque los valores óptimos podrían ascender a millares para numerosos conjuntos de datos.

Use la opción fetchSize, como en el ejemplo siguiente:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()