Consultar bancos de dados usando JDBC

O Azure Databricks dá suporte à conexão com bancos de dados externos usando JDBC. Este artigo fornece a sintaxe básica para configurar e usar essas conexões com exemplos em Python, SQL e Scala.

Nota

Você pode preferir a Lakehouse Federation para gerenciar consultas a sistemas de banco de dados externos. Veja O que é Lakehouse Federation.

O Partner Connect fornece integrações otimizadas para sincronizar dados com muitas fontes de dados externas externas. Consulte O que é o Databricks Partner Connect?.

Importante

Os exemplos neste artigo não incluem nomes de usuário e senhas em URLs JDBC. O Databricks recomenda o uso de segredos para armazenar suas credenciais de banco de dados. Por exemplo:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Para fazer referência a segredos do Databricks com SQL, você deve configurar uma propriedade de configuração do Spark durante a iniciação do cluster.

Para obter um exemplo completo de gerenciamento secreto, consulte Exemplo de fluxo de trabalho secreto.

Ler dados com JDBC

Você deve definir várias configurações para ler dados usando JDBC. Observe que cada banco de dados usa um formato diferente para o <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

O Spark lê automaticamente o esquema da tabela do banco de dados e mapeia seus tipos de volta para os tipos SQL do Spark.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Você pode executar consultas nesta tabela JDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Gravar dados com JDBC

Salvar dados em tabelas com JDBC usa configurações semelhantes à leitura. Veja o seguinte exemplo:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

O comportamento padrão tenta criar uma nova tabela e lança um erro se uma tabela com esse nome já existir.

Você pode acrescentar dados a uma tabela existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Você pode substituir uma tabela existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Controlar paralelismo para consultas JDBC

Por padrão, o driver JDBC consulta o banco de dados de origem com apenas um único thread. Para melhorar o desempenho de leituras, você precisa especificar várias opções para controlar quantas consultas simultâneas o Azure Databricks faz ao seu banco de dados. Para clusters pequenos, definir a numPartitions opção igual ao número de núcleos executores no cluster garante que todos os nós consultem dados em paralelo.

Aviso

A configuração numPartitions de um valor alto em um cluster grande pode resultar em desempenho negativo para o banco de dados remoto, pois muitas consultas simultâneas podem sobrecarregar o serviço. Isso é especialmente problemático para bancos de dados de aplicativos. Tenha cuidado ao definir este valor acima de 50.

Nota

Acelere as consultas selecionando uma coluna com um índice calculado no banco de dados de origem para o partitionColumn.

O exemplo de código a seguir demonstra a configuração do paralelismo para um cluster com oito núcleos:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Nota

O Azure Databricks suporta todas as opções do Apache Spark para configurar o JDBC.

Ao gravar em bancos de dados usando JDBC, o Apache Spark usa o número de partições na memória para controlar o paralelismo. Você pode reparticionar dados antes de gravar para controlar o paralelismo. Evite um grande número de partições em clusters grandes para evitar sobrecarregar seu banco de dados remoto. O exemplo a seguir demonstra o reparticionamento para oito partições antes de escrever:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Empurrar uma consulta para o mecanismo de banco de dados

Você pode enviar uma consulta inteira para o banco de dados e retornar apenas o resultado. O table parâmetro identifica a tabela JDBC a ser lida. Você pode usar qualquer coisa que seja válida em uma cláusula de consulta FROM SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Controlar o número de linhas obtidas por consulta

Os drivers JDBC têm um fetchSize parâmetro que controla o número de linhas buscadas por vez no banco de dados remoto.

Definição Result
Muito baixo Alta latência devido a muitas viagens de ida e volta (poucas linhas retornadas por consulta)
Muito alto Erro de falta de memória (muitos dados retornados em uma consulta)

O valor ideal depende da carga de trabalho. As considerações incluem:

  • Quantas colunas são retornadas pela consulta?
  • Que tipos de dados são retornados?
  • Por quanto tempo as cadeias de caracteres em cada coluna são retornadas?

Os sistemas podem ter um padrão muito pequeno e se beneficiar do ajuste. Por exemplo: o padrão fetchSize da Oracle é 10. Aumentar para 100 reduz o número total de consultas que precisam ser executadas por um fator de 10. Os resultados JDBC são tráfego de rede, portanto, evite números muito grandes, mas os valores ideais podem estar na casa dos milhares para muitos conjuntos de dados.

Use a opção, como no exemplo a fetchSize seguir:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()