Share via


Azure Databricks를 사용하여 Amazon Redshift 쿼리

Azure Databricks를 사용하여 Amazon Redshift에서 테이블을 읽고 쓸 수 있습니다.

참고 항목

Redshift에 대한 쿼리를 관리하기 위해 Lakehouse 페더레이션을 선호할 수 있습니다. 레이크하우스 페더레이션이란?

Databricks Redshift 데이터 원본은 Amazon S3을 사용하여 Redshift 안팎으로 데이터를 효율적으로 전송하며, JDBC를 사용하여 Redshift에서 적절한 COPYUNLOAD 명령을 자동으로 트리거합니다.

참고 항목

Databricks Runtime 11.3 LTS 이상에서 Databricks Runtime에는 형식 옵션에 대한 키워드(keyword) 사용하여 redshift 액세스할 수 있는 Redshift JDBC 드라이버가 포함되어 있습니다. 각 Databricks 런타임에 포함된 드라이버 버전에 대한 Databricks 런타임 릴리스 정보 버전 및 호환성을 참조하세요. 사용자 제공 드라이버는 계속 지원되며 번들 JDBC 드라이버보다 우선합니다.

Databricks Runtime 10.4 LTS 이하에서는 Redshift JDBC 드라이버를 수동으로 설치해야 하며 쿼리는 형식에 드라이버(com.databricks.spark.redshift)를 사용해야 합니다. Redshift 드라이버 설치를 참조하세요.

사용

다음 예제에서는 Redshift 드라이버 연결을 보여줍니다. PostgreSQL JDBC 드라이버를 사용하는 경우 url 매개 변수 값을 바꿉니다.

AWS 자격 증명을 구성한 후에는 Python, SQL, R 또는 Scala에서 Spark 데이터 원본 API와 함께 데이터 원본을 사용할 수 있습니다.

Important

Unity 카탈로그 에 정의된 외부 위치는 위치로 tempdir 지원되지 않습니다.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Databricks Runtime 10.4 LTS 이하에서 SQL을 사용하여 데이터를 읽습니다.

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Databricks Runtime 11.3 LTS 이상에서 SQL을 사용하여 데이터를 읽습니다.


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

SQL을 사용하여 데이터 쓰기:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

SQL API는 덮어쓰거나 추가하지 않고 새 테이블 만들기만 지원합니다.

R

Databricks Runtime 10.4 LTS 이하에서 R을 사용하여 데이터를 읽습니다.

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Databricks Runtime 11.3 LTS 이상에서 R을 사용하여 데이터를 읽습니다.

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Redshift 작업 시 권장 사항

쿼리 실행은 다량 데이터를 S3으로 추출할 수 있습니다. Redshift에서 동일한 데이터에 대해 여러 쿼리를 수행할 경우 Databricks는 Delta Lake를 사용하여 추출된 데이터를 저장할 것을 권합니다.

구성

S3 및 Redshift에 인증

데이터 원본에는 다음 다이어그램에 나오는 여러 네트워크 연결이 포함됩니다.

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

데이터 원본은 Redshift와 데이터를 주고받을 때 S3에 데이터를 읽고 씁니다. 따라서 S3 버킷에 대한 읽기 및 쓰기 액세스 권한이 있는 AWS 자격 증명이 필요합니다(tempdir 구성 매개 변수를 사용하여 지정).

참고 항목

데이터 원본은 S3에서 만드는 임시 파일을 정리하지 않습니다. 따라서 개체 수명 주기 구성과 함께 전용 임시 S3 버킷을 사용하여 지정된 만료 기간 후에 임시 파일이 자동으로 삭제되도록 하는 것이 좋습니다. 이러한 파일을 암호화하는 방법은 이 문서의 암호화 섹션을 참조하세요. Unity 카탈로그정의된 외부 위치를 위치로 tempdir 사용할 수 없습니다.

다음 섹션에서는 각 연결의 인증 구성 옵션에 대해 설명합니다.

Spark 드라이버에서 Redshift로

Spark 드라이버는 사용자 이름과 암호를 사용하여 JDBC를 통해 Redshift에 연결합니다. Redshift는 IAM 역할을 사용하여 이 연결을 인증하는 것을 지원하지 않습니다. 기본적으로 이 연결은 SSL 암호화를 사용합니다. 자세한 내용은 암호화를 참조하세요.

Spark에서 S3으로

S3은 Redshift에서 읽거나 쓸 때 대량 데이터를 저장하는 중간 매개 역할을 합니다. Spark는 Hadoop FileSystem 인터페이스를 모두 사용하고 Amazon Java SDK의 S3 클라이언트를 직접 사용하여 S3에 연결합니다.

참고 항목

DBFS 탑재를 사용하여 Redshift용 S3에 대한 액세스를 구성할 수 없습니다.

  • Hadoop conf에서 키 설정: Hadoop 구성 속성을 사용하여 AWS 키를 지정할 수 있습니다. tempdir 구성이 s3a:// 파일 시스템을 가리키는 경우 Hadoop XML 구성 파일에서 속성 및 fs.s3a.secret.key 속성을 설정 fs.s3a.access.key 하거나 호출 sc.hadoopConfiguration.set() 하여 Spark의 전역 Hadoop 구성을 구성할 수 있습니다. 파일 시스템을 사용하는 s3n:// 경우 다음 예제와 같이 레거시 구성 키를 제공할 수 있습니다.

    Scala

    예를 들어 파일 시스템을 사용하는 s3a 경우 다음을 추가합니다.

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    레거시 s3n 파일 시스템의 경우 다음을 추가합니다.

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    다음 명령은 일부 Spark 내부를 기반으로 하지만 모든 PySpark 버전에서 작동해야 하며 나중에 변경될 가능성이 낮습니다.

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift에서 S3으로

Spark가 forward_spark_s3_credentials JDBC를 true 통해 S3에 연결하는 데 사용하는 AWS 키 자격 증명을 Redshift에 자동으로 전달하는 옵션을 설정합니다. JDBC 쿼리는 이러한 자격 증명을 포함하므로 Databricks는 JDBC 연결의 SSL 암호화를 사용하도록 설정하는 것이 좋습니다.

  • JDBC 보안: SSL 관련 설정이 JDBC URL에 없는 한 데이터 원본은 기본적으로 SSL 암호화를 사용하도록 설정하고 Redshift 서버가 신뢰할 수 있는지도 확인합니다(즉, sslmode=verify-full). 이를 위해 서버 인증서는 처음 필요할 때 Amazon 서버에서 자동으로 다운로드됩니다. 이에 실패하는 경우 미리 번들된 인증서 파일이 대체 파일로 사용됩니다. 이는 Redshift 및 PostgreSQL JDBC 드라이버 모두에 적용됩니다.

    이 기능에 문제가 있거나 SSL을 사용하지 않도록 설정하려는 경우에는 DataFrameReader 또는 DataFrameWriter에서 .option("autoenablessl", "false")을 호출하면 됩니다.

    사용자 지정 SSL 관련 설정을 지정할 경우에는 Redshift 설명서Java에서 SSL 및 서버 인증서 사용JDBC 드라이버 구성 옵션의 지침대로 하면 됩니다. 데이터 원본과 함께 사용되는 JDBC url에 있는 모든 SSL 관련 옵션이 우선합니다(즉, 자동 구성이 트리거되지 않음).

  • S3에 저장된 UNLOAD 데이터 암호화(Redshift에서 읽을 때 저장된 데이터): S3으로 데이터 언로드에 관한 Redshift 설명서에 따르면 "UNLOAD는 Amazon S3 서버 쪽 암호화(SSE-S3)를 사용하여 데이터 파일을 자동으로 암호화합니다."

    Redshift는 사용자 지정 키로 클라이언트 쪽 암호화도 지원하지만(참조: 암호화된 데이터 파일 언로드) 데이터 원본에는 필요한 대칭 키를 지정할 수 있는 기능이 없습니다.

  • S3에 저장된 COPY 데이터 암호화(Redshift에 쓸 때 저장된 데이터): Amazon S3에서 암호화된 데이터 파일 로드에 관한 Redshift 설명서에 따르면:

COPY 명령을 사용하여 AWS 관리형 암호화 키(SSE-S3 또는 SSE-KMS), 클라이언트 쪽 암호화 또는 둘 다로 서버 쪽 암호화를 사용하여 Amazon S3에 업로드된 데이터 파일을 로드할 수 있습니다. COPY는 SSE-C(고객 제공 키)를 사용하는 Amazon S3 서버 쪽 암호화를 지원하지 않습니다.

매개 변수

Spark SQL에 제공된 매개 변수 맵 또는 OPTIONS는 다음 설정을 지원합니다.

매개 변수 Required
dbtable 예, 쿼리가 지정되지 않는 경우.
query 예, dbtable이 지정되지 않는 경우.
user 아니요
password 아니요
URL
search_path 아니요
aws_iam_role IAM 역할을 사용해 권한을 부여하는 경우에만.
forward_spark_s3_credentials 아니요
temporary_aws_access_key_id 아니요
temporary_aws_secret_access_key 아니요
temporary_aws_session_token 아니요
tempdir
jdbcdriver 아니요
diststyle 아니요
distkey 아니요, DISTSTYLE KEY를 사용하지 않는 경우
sortkeyspec 아니요
usestagingtable(사용되지 않음) 아니요
description 아니요
preactions 아니요
postactions 아니요
extracopyoptions 아니요
tempformat 아니요
csvnullstring 아니요
csvseparator 아니요 , tempformat이 CSV로 설정된 임시 파일을 작성할 때 사용할 구분 기호
CSV GZIP. 이것은 유효한 ASCII 문자(예: “,” 또는 “|”)여야 합니다.
csvignoreleadingwhitespace 아니요
csvignoretrailingwhitespace 아니요
infer_timestamp_ntz_type 아니요

추가 구성 옵션

문자열 열의 최대 크기 구성

Redshift 테이블을 만들 때 기본 동작은 문자열 열에 대한 TEXT 열을 만드는 것입니다. Redshift는 TEXT 열을 VARCHAR(256)로 저장하므로 이러한 열의 최대 크기는 256자(원본)입니다.

더 큰 열을 지원하려면 maxlength 열 메타데이터 필드를 사용하여 개별 문자열 열의 최대 길이를 지정하면 됩니다. 이는 최대 길이가 기본값보다 작은 열을 선언하여 공간 절약 성능을 최적화하는 데에도 유용합니다.

참고 항목

Spark의 제한 사항 때문에 SQL 및 R 언어 API는 열 메타데이터 수정을 지원하지 않습니다.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

다음은 Spark의 Scala API를 사용하여 여러 열의 메타데이터 필드를 업데이트하는 예입니다.

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

사용자 지정 열 형식 설정

열 형식을 수동으로 설정해야 하는 경우에는 redshift_type 열 메타데이터를 사용하면 됩니다. 예를 들어, 사용자 정의 열 형식을 할당하기 위해 Spark SQL Schema -> Redshift SQL 형식 일치자를 재정의해야 하는 경우에는 다음을 수행하면 됩니다.

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

열 인코딩 구성

테이블을 만들 때 encoding 열 메타데이터 필드를 사용하여 각 열에 대한 압축 인코딩을 지정합니다(사용 가능한 인코딩은 Amazon 문서 참조).

열에 대한 설명 설정

Redshift를 사용하면 열에 대부분의 쿼리 도구에 표시되어야 하는 설명을 첨부할 수 있습니다(COMMENT 명령 사용). description 열 메타데이터 필드를 설정하여 개별 열에 대한 설명을 지정할 수 있습니다.

Redshift로의 쿼리 푸시다운

Spark 최적화 프로그램은 다음 연산자를 Redshift로 푸시합니다.

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

ProjectFilter 내에서 다음 식을 지원합니다.

  • 대부분의 부울 논리 연산자
  • 비교
  • 기본 산술 연산
  • 숫자 및 문자열 캐스트
  • 대부분의 문자열 함수
  • 스칼라 하위 쿼리(Redshift로 완전히 푸시될 수 있는 경우).

참고 항목

이 푸시다운은 날짜 및 타임스탬프에서 작동하는 식을 지원하지 않습니다.

Aggregation 내에서 다음 집계 함수를 지원합니다.

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

DISTINCT 절과 결합(해당되는 경우).

Join 내에서 다음 유형의 조인을 지원합니다.

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • 최적화 프로그램에 의해 Join로 다시 작성되는 하위 쿼리(예: WHERE EXISTS, WHERE NOT EXISTS)

참고 항목

조인 푸시다운은 FULL OUTER JOIN를 지원하지 않습니다.

푸시다운은 LIMIT를 사용하는 쿼리에서 가장 도움이 될 수 있습니다. SELECT * FROM large_redshift_table LIMIT 10와 같은 쿼리는 전체 테이블이 먼저 중간 결과로 S3에 UNLOADed되기 때문에 매우 오래 걸릴 수 있습니다. 푸시다운을 사용하면 LIMIT가 Redshift에서 실행됩니다. 집계가 있는 쿼리에서 집계를 Redshift로 푸시하면 전송해야 하는 데이터의 양을 줄이는 데에도 도움이 됩니다.

Redshift로의 쿼리 푸시다운은 기본적으로 사용하도록 설정됩니다. 이 기능은 spark.databricks.redshift.pushdownfalse로 설정하여 사용하지 않도록 설정할 수 있습니다. 사용하지 않도록 설정된 경우에도 Spark는 필터를 푸시다운하고 Redshift로 열 제거를 수행합니다.

Redshift 드라이버 설치

Redshift 데이터 원본에는 Redshift 호환 JDBC 드라이버도 필요합니다. Redshift는 PostgreSQL 데이터베이스 시스템을 기반으로 하므로 Databricks Runtime 또는 Amazon 권장 Redshift JDBC 드라이버에 포함된 PostgreSQL JDBC 드라이버를 사용할 수 있습니다. 설치 없이도 PostgreSQL JDBC 드라이버를 사용할 수 있습니다. 각 Databricks Runtime 릴리스에 포함된 PostgreSQL JDBC 드라이버의 버전은 Databricks Runtime 릴리스 정보에 나열됩니다.

Redshift JDBC 드라이버를 수동으로 설치하려면

  1. Amazon에서 드라이버를 다운로드합니다.
  2. Azure Databricks 작업 영역에 드라이버를 업로드합니다. 라이브러리를 참조 하세요.
  3. 클러스터에 라이브러리를 설치합니다.

참고 항목

Databricks는 최신 버전의 Redshift JDBC 드라이버를 사용할 것을 권합니다. 1.2.41 미만의 Redshift JDBC 드라이버 버전에는 다음과 같은 제한 사항이 있습니다.

  • 드라이버 버전 1.2.16은 SQL 쿼리에서 where 절을 사용할 때 빈 데이터를 반환합니다.
  • 1.2.41 미만의 드라이버 버전은 열의 null 허용 여부가 "알 수 없음"이 아닌 "Null 허용 안 함"으로 잘못 보고되기 때문에 잘못된 결과를 반환할 수 있습니다.

트랜잭션 보장

이 섹션에서는 Spark에 대한 Redshift 데이터 원본의 트랜잭션 보장에 대해 설명합니다.

Redshift 및 S3 속성에 대한 일반적인 배경

Redshift 트랜잭션 보장에 대한 일반적인 내용은 Redshift 설명서의 동시 쓰기 작업 관리 장을 참조하세요. 간단히 말해서, Redshift는 Redshift BEGIN 명령에 대한 설명서에 따라 직렬화 가능한 격리를 제공합니다.

4개의 트랜잭션 격리 수준 중에서 하나를 사용할 수 있지만 Amazon Redshift는 모든 격리 수준을 직렬화 가능으로 처리합니다.

Redshift 설명서에 따라 다음을 수행합니다.

Amazon Redshift는 개별적으로 실행된 각 SQL 명령이 개별적으로 커밋되는 기본 자동 커밋 동작을 지원합니다.

따라서 COPYUNLOAD 같은 개별 명령은 개별 명령은 원자성과 트랜잭션의 속성을 갖지만, 여러 명령이나 쿼리의 원자성을 적용할 때는 명시적인 BEGINEND만 필요합니다.

Redshift에서 읽고 쓸 때 데이터 원본은 S3에서 데이터를 읽고 씁니다. Spark와 Redshift는 모두 분할된 출력을 생성하고 S3의 여러 파일에 저장합니다. Amazon S3 데이터 일관성 모델 설명서에 따르면 S3 버킷 목록 작업은 결국 일관성이 있으므로 최종 일관성의 원본으로 인해 데이터가 누락되거나 불완전하지 않도록 파일이 특수 길이로 이동해야 합니다.

Spark에 대한 Redshift 데이터 원본 보장

기존 테이블에 추가

Redshift에 행을 삽입할 때 데이터 원본은 COPY 명령을 사용하고 매니페스트를 지정하여 최종적으로 일관된 특정 S3 작업을 방지합니다. 따라서 spark-redshift를 기존 테이블에 추가하면 일반 Redshift COPY 명령과 동일한 원자성 및 트랜잭션 속성이 있습니다.

새 테이블 만들기(SaveMode.CreateIfNotExists).

새 테이블을 만드는 작업은 2단계 프로세스이며, CREATE TABLE 명령과 COPY 명령으로 구성되어 초기 행 집합을 추가합니다. 두 작업 모두 동일한 트랜잭션에서 수행됩니다.

기존 테이블 덮어쓰기

기본적으로 데이터 원본은 트랜잭션을 사용하여 대상 테이블을 삭제하고, 빈 테이블을 새로 만들고, 행을 추가하여 구현되는 덮어쓰기를 수행합니다.

사용되지 않는 usestagingtable 설정이 false로 설정된 경우 데이터 원본은 새 테이블에 행을 추가하기 전에 DELETE TABLE 명령을 커밋하여 덮어쓰기 작업의 원자성을 희생하지만 덮어쓰는 동안 Redshift에 필요한 준비 공간의 양을 줄입니다.

Redshift 테이블 쿼리

쿼리는 Redshift UNLOAD 명령을 사용하여 쿼리를 실행하고 결과를 S3에 저장하고 매니페스트를 사용하여 최종적으로 일관된 특정 S3 작업을 방지합니다. 따라서 Spark에 대한 Redshift 데이터 원본의 쿼리는 일반 Redshift 쿼리와 동일한 일관성 속성을 가져야 합니다.

일반적인 문제 및 해결 방법

S3 버킷 및 Redshift 클러스터는 서로 다른 AWS 지역에 있습니다.

S3 버킷 및 Redshift 클러스터가 서로 다른 AWS 지역에 있는 경우 기본적으로 S3 <-> Redshift 복사본이 작동하지 않습니다.

S3 버킷이 다른 지역에 있을 때 Redshift 테이블을 읽으려고 하면 다음과 같은 오류가 표시될 수 있습니다.

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

마찬가지로 다른 지역의 S3 버킷을 사용하여 Redshift에 쓰려고 하면 다음 오류가 발생할 수 있습니다.

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • 기: Redshift COPY 명령은 S3 버킷 영역의 명시적 사양을 지원하므로 이러한 경우 extracopyoptions 설정에 region 'the-region-name'를 추가하면 Redshift에 대한 쓰기가 제대로 작동할 수 있습니다. 예를 들어, 미국 동부(버지니아) 지역의 버킷과 Scala API를 사용하는 경우 다음을 사용합니다.

    .option("extracopyoptions", "region 'us-east-1'")
    

    또는 awsregion 설정을 사용해도 됩니다.

    .option("awsregion", "us-east-1")
    
  • 읽기: Redshift UNLOAD 명령은 S3 버킷 지역의 명시적 사양도 지원합니다. awsregion 설정에 지역을 추가하여 읽기가 제대로 작동하게 할 수 있습니다.

    .option("awsregion", "us-east-1")
    

JDBC URL에 특수 문자가 있는 암호를 사용하는 경우에 발생하는 인증 오류

JDBC URL의 일부로 사용자 이름 및 암호를 제공하는데 암호에 ;, ? 또는 & 같은 특수 문자가 포함된 경우 다음 예외가 표시될 수 있습니다.

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

이는 사용자 이름 또는 암호의 특수 문자가 JDBC 드라이버에 의해 올바르게 이스케이프되지 않아 발생합니다. 해당 DataFrame 옵션 userpassword를 사용하여 사용자 이름과 암호를 지정해야 합니다. 자세한 내용은 매개 변수를 참조하세요.

해당 Redshift 작업이 수행되더라도 장기 실행 Spark 쿼리가 무기한 중단됩니다.

Redshift에서 Redshift로 대량의 데이터를 읽거나 쓰는 경우 AWS Redshift 모니터링 페이지에 해당 LOAD 또는 UNLOAD 작업이 완료되었으며 클러스터가 유휴 상태로 표시되더라도 Spark 쿼리가 무기한 중단될 수 있습니다. 이는 Redshift와 Spark 간의 연결 시간 초과로 인해 발생합니다. 이를 방지하려면 tcpKeepAlive JDBC 플래그가 사용하도록 설정되어 있고 TCPKeepAliveMinutes가 낮은 값(예: 1)으로 설정되어 있어야 합니다.

자세한 내용은 Amazon Redshift JDBC 드라이버 구성을 참조하세요.

표준 시간대 의미 체계가 있는 타임스탬프

데이터를 읽을 때 Redshift TIMESTAMPTIMESTAMPTZ 데이터 형식은 모두 Spark TimestampType에 매핑되며, 값은 UTC(협정 세계시)로 변환되고 UTC 타임스탬프로 저장됩니다. Redshift TIMESTAMP의 경우 현지 표준 시간대는 값에 표준 시간대 정보가 없는 것으로 간주됩니다. Redshift 테이블에 데이터를 쓸 때 Spark TimestampType는 Redshift TIMESTAMP 데이터 형식에 매핑됩니다.

마이그레이션 가이드

이제 데이터 원본에서 Spark S3 자격 증명을 Redshift로 전달하기 전에 명시적으로 forward_spark_s3_credentials을 설정해야 합니다. aws_iam_role 또는 temporary_aws_* 인증 메커니즘을 사용하는 경우 이 변경 내용은 영향을 주지 않습니다. 그러나 이전 기본 동작을 사용하는 경우 이전 Redshift를 S3 인증 메커니즘으로 계속 사용하려면 명시적으로 forward_spark_s3_credentialstrue로 설정해야 합니다. 세 가지 인증 메커니즘 및 해당 보안 트레이드 오프에 대한 자세한 내용은 이 문서의 S3 인증 및 Redshift 섹션을 참조하세요.