JDBC를 사용하는 데이터베이스 쿼리

Azure Databricks는 JDBC를 사용하여 외부 데이터베이스에 연결할 수 있도록 지원합니다. 이 문서에서는 Python, SQL, Scala의 예제와 함께 이러한 연결을 구성하고 사용하기 위한 기본 구문을 제공합니다.

참고 항목

외부 데이터베이스 시스템에 대한 쿼리를 관리하기 위해 Lakehouse Federation을 선호할 수 있습니다. 레이크하우스 페더레이션이란?

Partner Connect는 많은 외부 데이터 원본과 데이터를 동기화하기 위한 최적화된 통합을 제공합니다. Databricks Partner Connect란?을 참조하세요.

Important

이 문서의 예제에는 JDBC URL의 사용자 이름 및 암호가 포함되지 않습니다. Databricks는 비밀을 사용하여 데이터베이스 자격 증명을 저장하는 것이 좋습니다. 예시:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

SQL을 사용하여 Databricks 비밀을 참조하려면 클러스터를 초기화하는 동안 Spark 구성 속성을 구성해야 합니다.

비밀 관리의 전체 예제는 비밀 워크플로 예제를 참조하세요.

JDBC를 사용하여 데이터 읽기

JDBC를 사용하여 데이터를 읽으려면 여러 설정을 구성해야 합니다. 각 데이터베이스는 <jdbc-url>에 다른 형식을 사용합니다.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark는 데이터베이스 테이블에서 스키마를 자동으로 읽고 해당 형식을 Spark SQL 형식에 다시 매핑합니다.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

이 JDBC 테이블에 대해 쿼리를 실행할 수 있습니다.

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

JDBC를 사용하여 데이터 쓰기

JDBC를 사용하여 테이블에 데이터를 저장하는 것은 읽기와 유사한 구성을 사용합니다. 다음 예제를 참조하십시오.

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

기본 동작은 새 테이블을 만들고 이름이 같은 테이블이 이미 있는 경우 오류를 throw하는 것입니다.

다음 구문을 사용하여 기존 테이블에 데이터를 추가할 수 있습니다.

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

다음 구문을 사용하여 기존 테이블을 덮어쓸 수 있습니다.

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

JDBC 쿼리에 대한 병렬 처리 제어

기본적으로 JDBC 드라이버는 단일 스레드로만 원본 데이터베이스를 쿼리합니다. 읽기 성능을 향상하려면 Azure Databricks에서 데이터베이스에 대해 수행하는 동시 쿼리 수를 제어하는 여러 옵션을 지정해야 합니다. 작은 클러스터의 경우 numPartitions 옵션을 클러스터의 실행기 코어 수와 동일하게 설정하면 모든 노드가 데이터를 병렬로 쿼리할 수 있습니다.

Warning

대규모 클러스터에서 numPartitions를 높은 값으로 설정할 경우 동시에 너무 많이 쿼리하면 서비스에 과부하가 발생할 수 있으므로 원격 데이터베이스의 성능이 저하될 수 있습니다. 이는 애플리케이션 데이터베이스에서 특히 번거롭습니다. 이 값을 50 이상으로 설정하지 않는 것이 좋습니다.

참고 항목

partitionColumn의 원본 데이터베이스에서 계산된 인덱스가 있는 열을 선택하여 쿼리 속도를 향상합니다.

다음 코드 예제에서는 8개의 코어가 있는 클러스터에 대한 병렬 처리를 구성하는 방법을 보여 줍니다.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

참고 항목

Azure Databricks는 JDBC를 구성하기 위한 모든 Apache Spark 옵션을 지원합니다.

JDBC를 사용하여 데이터베이스에 쓸 때 Apache Spark는 메모리의 파티션 수를 사용하여 병렬 처리를 제어합니다. 병렬 처리를 제어하기 위해 쓰기 전에 데이터를 다시 분할할 수 있습니다. 원격 데이터베이스를 압도하지 않도록 대규모 클러스터에서 많은 수의 파티션을 방지합니다. 다음 예제에서는 쓰기 전에 8개의 파티션으로 다시 분할하는 방법을 보여 줍니다.

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

데이터베이스 엔진에 쿼리 푸시 다운

전체 쿼리를 데이터베이스로 푸시 다운하고 결과만 반환할 수 있습니다. table 매개 변수는 읽을 JDBC 테이블을 식별합니다. SQL 쿼리 FROM 절에서 유효한 모든 항목을 사용할 수 있습니다.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

쿼리당 페치된 행 수 제어

JDBC 드라이버에는 원격 데이터베이스에서 한 번에 가져온 행 수를 제어하는 fetchSize 매개 변수가 있습니다.

설정 결과
너무 낮음 많은 왕복으로 인한 높은 대기 시간(쿼리당 반환되는 행은 거의 없음)
너무 높음 메모리 부족 오류(한 쿼리에서 반환된 데이터가 너무 많음)

최적의 값은 워크로드에 따라 달라집니다. 고려해야 할 사항은 다음과 같습니다.

  • 쿼리에서 반환되는 열은 몇 개인가요?
  • 반환되는 데이터 형식은 무엇인가요?
  • 각 열에서 반환되는 문자열 길이는 얼마나 되나요?

시스템은 매우 작은 기본값을 가지며 튜닝의 이점을 누릴 수 있습니다. 예를 들어 Oracle의 기본 fetchSize는 10입니다. 100으로 늘리면 실행해야 하는 총 쿼리 수가 10배로 줄어듭니다. JDBC 결과는 네트워크 트래픽이므로 매우 많은 수를 방지하지만 최적의 값은 많은 데이터 세트에 대해 수천 개에 있을 수 있습니다.

다음 예제와 같이 fetchSize 옵션을 사용합니다.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()