Query's uitvoeren op Amazon Redshift met behulp van Azure Databricks

U kunt tabellen lezen en schrijven vanuit Amazon Redshift met Azure Databricks.

Notitie

U kunt de voorkeur geven aan Lakehouse Federation voor het beheren van query's naar Redshift. Zie Wat is Lakehouse Federation.

De Databricks Redshift-gegevensbron maakt gebruik van Amazon S3 om gegevens efficiënt over te dragen naar en uit Redshift en gebruikt JDBC om automatisch de juiste COPY en UNLOAD opdrachten op Redshift te activeren.

Notitie

In Databricks Runtime 11.3 LTS en hoger bevat Databricks Runtime het Redshift JDBC-stuurprogramma dat toegankelijk is met behulp van het redshift trefwoord voor de indelingsoptie. Zie releaseopmerkingen voor Databricks Runtime en compatibiliteit voor stuurprogrammaversies die zijn opgenomen in elke Databricks Runtime. Door de gebruiker geleverde stuurprogramma's worden nog steeds ondersteund en hebben voorrang op het gebundelde JDBC-stuurprogramma.

In Databricks Runtime 10.4 LTS en hieronder is handmatige installatie van het Redshift JDBC-stuurprogramma vereist en moeten query's het stuurprogramma (com.databricks.spark.redshift) gebruiken voor de indeling. Zie de installatie van het Redshift-stuurprogramma.

Gebruik

In de volgende voorbeelden ziet u hoe u verbinding maakt met het Redshift-stuurprogramma. Vervang de url parameterwaarden als u het JDBC-stuurprogramma Van PostgreSQL gebruikt.

Zodra u uw AWS-referenties hebt geconfigureerd, kunt u de gegevensbron gebruiken met de Spark-gegevensbron-API in Python, SQL, R of Scala.

Belangrijk

Externe locaties die zijn gedefinieerd in Unity Catalog worden niet ondersteund als tempdir locaties.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Gegevens lezen met SQL op Databricks Runtime 10.4 LTS en hieronder:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Gegevens lezen met SQL op Databricks Runtime 11.3 LTS en hoger:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Gegevens schrijven met behulp van SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

De SQL-API ondersteunt alleen het maken van nieuwe tabellen en het niet overschrijven of toevoegen van tabellen.

R

Gegevens lezen met R in Databricks Runtime 10.4 LTS en hieronder:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Gegevens lezen met R in Databricks Runtime 11.3 LTS en hoger:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Aanbevelingen voor het werken met Redshift

Bij het uitvoeren van query's kunnen grote hoeveelheden gegevens worden geëxtraheerd naar S3. Als u van plan bent om verschillende query's uit te voeren op dezelfde gegevens in Redshift, raadt Databricks aan om de geëxtraheerde gegevens op te slaan met Delta Lake.

Configuratie

Verifiëren bij S3 en Redshift

De gegevensbron omvat verschillende netwerkverbindingen, geïllustreerd in het volgende diagram:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

De gegevensbron leest en schrijft gegevens naar S3 bij het overdragen van gegevens naar/van Redshift. Als gevolg hiervan zijn AWS-referenties met lees- en schrijftoegang tot een S3-bucket vereist (opgegeven met behulp van de tempdir configuratieparameter).

Notitie

De gegevensbron schoont de tijdelijke bestanden die worden gemaakt in S3 niet op. Als gevolg hiervan raden we u aan een toegewezen tijdelijke S3-bucket te gebruiken met een configuratie voor de levenscyclus van objecten om ervoor te zorgen dat tijdelijke bestanden automatisch worden verwijderd na een opgegeven verloopperiode. Zie de sectie Versleuteling van dit document voor een bespreking van het versleutelen van deze bestanden. U kunt geen externe locatie gebruiken die is gedefinieerd in Unity Catalog als een tempdir locatie.

In de volgende secties worden de verificatieconfiguratieopties van elke verbinding beschreven:

Spark-stuurprogramma naar Redshift

Het Spark-stuurprogramma maakt verbinding met Redshift via JDBC met behulp van een gebruikersnaam en wachtwoord. Redshift biedt geen ondersteuning voor het gebruik van IAM-rollen om deze verbinding te verifiëren. Deze verbinding maakt standaard gebruik van SSL-versleuteling; Zie Versleuteling voor meer informatie.

Spark naar S3

S3 fungeert als intermediair voor het opslaan van bulkgegevens bij het lezen van of schrijven naar Redshift. Spark maakt verbinding met S3 met behulp van zowel de Hadoop FileSystem-interfaces als rechtstreeks met behulp van de S3-client van de Amazon Java SDK.

Notitie

U kunt geen DBFS-koppelingen gebruiken om de toegang tot S3 voor Redshift te configureren.

  • Sleutels instellen in Hadoop conf: U kunt AWS-sleutels opgeven met behulp van Hadoop-configuratie-eigenschappen. Als uw tempdir configuratie verwijst naar een s3a:// bestandssysteem, kunt u de fs.s3a.access.key en fs.s3a.secret.key eigenschappen instellen in een Hadoop XML-configuratiebestand of aanroep sc.hadoopConfiguration.set() om de globale Hadoop-configuratie van Spark te configureren. Als u een s3n:// bestandssysteem gebruikt, kunt u de verouderde configuratiesleutels opgeven, zoals wordt weergegeven in het volgende voorbeeld.

    Scala

    Als u bijvoorbeeld het bestandssysteem gebruikt, voegt u het s3a volgende toe:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Voor het verouderde bestandssysteem voegt u het volgende s3n toe:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    De volgende opdracht is afhankelijk van bepaalde interne Spark-functies, maar moet met alle PySpark-versies werken en is in de toekomst waarschijnlijk niet gewijzigd:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift naar S3

Stel de forward_spark_s3_credentials optie in om true automatisch de AWS-sleutelreferenties door te sturen die Spark gebruikt om verbinding te maken met S3 via JDBC naar Redshift. De JDBC-query sluit deze referenties in, dus Databricks raadt u ten zeerste aan SSL-versleuteling van de JDBC-verbinding in te schakelen.

Codering

  • JDBC beveiligen: tenzij er SSL-gerelateerde instellingen aanwezig zijn in de JDBC-URL, schakelt de gegevensbron standaard SSL-versleuteling in en controleert ook of de Redshift-server betrouwbaar is (dat wil sslmode=verify-fullgezegd). Hiervoor wordt automatisch een servercertificaat gedownload van de Amazon-servers de eerste keer dat het nodig is. In het geval dat dit mislukt, wordt een vooraf gebundeld certificaatbestand gebruikt als een terugval. Dit geldt voor zowel de Redshift- als de PostgreSQL JDBC-stuurprogramma's.

    Als er problemen zijn met deze functie, of als u gewoon SSL wilt uitschakelen, kunt .option("autoenablessl", "false") u uw DataFrameReader of DataFrameWriter.

    Als u aangepaste SSL-gerelateerde instellingen wilt opgeven, kunt u de instructies volgen in de Redshift-documentatie: SSL- en servercertificaten gebruiken in Java en JDBC-stuurprogrammaconfiguratieopties Eventuele SSL-gerelateerde opties die aanwezig zijn in de JDBC url die wordt gebruikt met de gegevensbron, hebben voorrang (de automatische configuratie wordt dus niet geactiveerd).

  • UNLOAD-gegevens versleutelen die zijn opgeslagen in S3 (gegevens die zijn opgeslagen bij het lezen vanuit Redshift): Volgens de Redshift-documentatie over het lossen van gegevens naar S3 worden gegevensbestanden automatisch versleuteld met behulp van Amazon S3-versleuteling (SSE-S3)."

    Redshift biedt ook ondersteuning voor versleuteling aan de clientzijde met een aangepaste sleutel (zie: Versleutelde gegevensbestanden lossen) maar de gegevensbron beschikt niet over de mogelijkheid om de vereiste symmetrische sleutel op te geven.

  • COPY-gegevens versleutelen die zijn opgeslagen in S3 (gegevens die zijn opgeslagen bij het schrijven naar Redshift): volgens de Redshift-documentatie over het laden van versleutelde gegevensbestanden van Amazon S3:

U kunt de COPY opdracht gebruiken om gegevensbestanden te laden die zijn geüpload naar Amazon S3 met behulp van versleuteling aan de serverzijde met door AWS beheerde versleutelingssleutels (SSE-S3 of SSE-KMS), versleuteling aan de clientzijde of beide. COPY biedt geen ondersteuning voor Versleuteling aan de serverzijde van Amazon S3 met een door de klant geleverde sleutel (SSE-C).

Parameters

De parametertoewijzing of OPTIES in Spark SQL ondersteunen de volgende instellingen:

Parameter Vereist
dbtable Ja, tenzij de query is opgegeven.
query Ja, tenzij dbtable is opgegeven.
Gebruiker Nee
password Nee
URL Ja
search_path Nee
aws_iam_role Alleen als u IAM-rollen gebruikt om te autoriseren.
forward_spark_s3_credentials Nee
temporary_aws_access_key_id Nee
temporary_aws_secret_access_key Nee
temporary_aws_session_token Nee
Tempdir Ja
jdbcdriver Nee
diststyle Nee
distkey Nee, tenzij u DISTSTYLE KEY
sortkeyspec Nee
usestagingtable (afgeschaft) Nee
beschrijving Nee
preactions Nee
postactions Nee
extracopyoptions Nee
tempformat Nee
csvnullstring Nee
CSVSeparator Nee , Scheidingsteken dat moet worden gebruikt bij het schrijven van tijdelijke bestanden met tempformat ingesteld op CSV of
CSV GZIP. Dit moet een geldig ASCII-teken zijn, bijvoorbeeld ',' of '|.'
csvignoreleadingwhitespace Nee
csvignoretrailingwhitespace Nee
infer_timestamp_ntz_type Nee

Extra configuratieopties

De maximale grootte van tekenreekskolommen configureren

Bij het maken van Redshift-tabellen is het standaardgedrag het maken TEXT van kolommen voor tekenreekskolommen. Redshift slaat TEXT kolommen op als VARCHAR(256), zodat deze kolommen een maximale grootte hebben van 256 tekens (bron).

Als u grotere kolommen wilt ondersteunen, kunt u het maxlength veld met kolommetagegevens gebruiken om de maximale lengte van afzonderlijke tekenreekskolommen op te geven. Dit is ook handig voor het implementeren van ruimtebesparende prestatieoptimalisaties door kolommen met een kleinere maximale lengte te declareren dan de standaardwaarde.

Notitie

Vanwege beperkingen in Spark bieden de SQL- en R-taal-API's geen ondersteuning voor het wijzigen van kolommetagegevens.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Hier volgt een voorbeeld van het bijwerken van metagegevensvelden van meerdere kolommen met behulp van de Scala-API van Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Een aangepast kolomtype instellen

Als u handmatig een kolomtype wilt instellen, kunt u de metagegevens van de redshift_type kolom gebruiken. Als u bijvoorbeeld de typematcher wilt overschrijven om een door de Spark SQL Schema -> Redshift SQL gebruiker gedefinieerd kolomtype toe te wijzen, kunt u het volgende doen:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Kolomcodering configureren

Wanneer u een tabel maakt, gebruikt u het encoding veld met kolommetagegevens om een compressiecodering voor elke kolom op te geven (zie Amazon-documenten voor beschikbare coderingen).

Beschrijvingen instellen voor kolommen

Met Redshift kunnen kolommen beschrijvingen bevatten die moeten worden weergegeven in de meeste queryhulpprogramma's (met behulp van de COMMENT opdracht). U kunt het description veld voor kolommetagegevens instellen om een beschrijving voor afzonderlijke kolommen op te geven.

Querypushdown naar Redshift

De Spark Optimizer pusht de volgende operators omlaag naar Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

Binnen Project en Filterondersteunt het de volgende expressies:

  • De meeste Booleaanse logische operators
  • Vergelijkingen
  • Eenvoudige rekenkundige bewerkingen
  • Numerieke en tekenreekscasts
  • De meeste tekenreeksfuncties
  • Scalaire subquery's, als ze volledig naar Redshift kunnen worden gepusht.

Notitie

Deze pushdown biedt geen ondersteuning voor expressies die werken op datums en tijdstempels.

Binnen Aggregationondersteunt het de volgende aggregatiefuncties:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

gecombineerd met de DISTINCT component, indien van toepassing.

Binnen Joinondersteunt het de volgende typen joins:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Subquery's die opnieuw worden geschreven Join door de optimizer, bijvoorbeeld WHERE EXISTS, WHERE NOT EXISTS

Notitie

Join pushdown biedt geen ondersteuning FULL OUTER JOINvoor .

De pushdown kan het nuttigst zijn in query's met LIMIT. Een query zoals SELECT * FROM large_redshift_table LIMIT 10 kan erg lang duren, omdat de hele tabel eerst UNLOADed naar S3 zou zijn als tussenliggend resultaat. Met pushdown wordt de LIMIT uitvoering uitgevoerd in Redshift. In query's met aggregaties helpt het pushen van de aggregatie naar Redshift ook om de hoeveelheid gegevens te verminderen die moet worden overgedragen.

Querypushdown naar Redshift is standaard ingeschakeld. Deze kan worden uitgeschakeld door de instelling in te stellen spark.databricks.redshift.pushdown op false. Zelfs als deze instelling is uitgeschakeld, pusht Spark nog steeds filters omlaag en voert kolomuitschakeling uit in Redshift.

Installatie van Redshift-stuurprogramma

De Redshift-gegevensbron vereist ook een met Redshift compatibel JDBC-stuurprogramma. Omdat Redshift is gebaseerd op het PostgreSQL-databasesysteem, kunt u het PostgreSQL JDBC-stuurprogramma gebruiken dat is opgenomen in Databricks Runtime of het Door Amazon aanbevolen Redshift JDBC-stuurprogramma. Er is geen installatie vereist om het PostgreSQL JDBC-stuurprogramma te gebruiken. De versie van het PostgreSQL JDBC-stuurprogramma dat is opgenomen in elke Databricks Runtime-release, wordt vermeld in de releaseopmerkingen van Databricks Runtime.

Het Redshift JDBC-stuurprogramma handmatig installeren:

  1. Download het stuurprogramma van Amazon.
  2. Upload het stuurprogramma naar uw Azure Databricks-werkruimte. Zie Bibliotheken.
  3. Installeer de bibliotheek op uw cluster.

Notitie

Databricks raadt aan de nieuwste versie van het Redshift JDBC-stuurprogramma te gebruiken. Versies van het Redshift JDBC-stuurprogramma lager dan 1.2.41 hebben de volgende beperkingen:

  • Versie 1.2.16 van het stuurprogramma retourneert lege gegevens wanneer u een where component in een SQL-query gebruikt.
  • Versies van het stuurprogramma onder 1.2.41 kunnen ongeldige resultaten retourneren omdat de null-waarde van een kolom onjuist is gerapporteerd als 'Niet nullable' in plaats van 'Onbekend'.

Transactionele garanties

In deze sectie worden de transactionele garanties van de Redshift-gegevensbron voor Spark beschreven.

Algemene achtergrond op redshift- en S3-eigenschappen

Zie het hoofdstuk Gelijktijdige schrijfbewerkingen beheren in de redshift-documentatie voor algemene informatie over transactionele garanties van Redshift. In een notendop biedt Redshift serialiseerbare isolatie volgens de documentatie voor de opdracht Redshift BEGIN :

[hoewel] u elk van de vier transactieisolatieniveaus kunt gebruiken, verwerkt Amazon Redshift alle isolatieniveaus als serialiseerbaar.

Volgens de Redshift-documentatie:

Amazon Redshift ondersteunt een standaardgedrag voor automatische doorvoer , waarbij elke afzonderlijk uitgevoerde SQL-opdracht afzonderlijk wordt doorgevoerd.

Afzonderlijke opdrachten zoals COPY en UNLOAD zijn dus atomisch en transactioneel, terwijl ze expliciet BEGIN zijn en END alleen nodig moeten zijn om de atomiciteit van meerdere opdrachten of query's af te dwingen.

Bij het lezen van en schrijven naar Redshift leest en schrijft de gegevensbron gegevens in S3. Spark en Redshift produceren gepartitioneerde uitvoer en slaan deze op in meerdere bestanden in S3. Volgens de documentatie van het Amazon S3-gegevensconsistentiemodel zijn S3 bucket-vermeldingsbewerkingen uiteindelijk consistent, dus moeten de bestanden naar speciale lengten gaan om ontbrekende of onvolledige gegevens te voorkomen vanwege deze bron van uiteindelijke consistentie.

Garanties van de Redshift-gegevensbron voor Spark

Toevoegen aan een bestaande tabel

Wanneer u rijen in Redshift invoegt, gebruikt de gegevensbron de opdracht COPY en geeft deze manifesten op om te voorkomen dat bepaalde uiteindelijk consistente S3-bewerkingen worden uitgevoerd. Als gevolg hiervan spark-redshift hebben toevoegt aan bestaande tabellen dezelfde atomische en transactionele eigenschappen als reguliere Redshift-opdrachten COPY .

Een nieuwe tabel maken (SaveMode.CreateIfNotExists)

Het maken van een nieuwe tabel is een proces in twee stappen, bestaande uit een CREATE TABLE opdracht gevolgd door een COPY-opdracht om de eerste set rijen toe te voegen. Beide bewerkingen worden uitgevoerd in dezelfde transactie.

Een bestaande tabel overschrijven

De gegevensbron maakt standaard gebruik van transacties voor het uitvoeren van overschrijven, die worden geïmplementeerd door de doeltabel te verwijderen, een nieuwe lege tabel te maken en er rijen aan toe te voegen.

Als de afgeschafte usestagingtable instelling is ingesteld falseop, voert de gegevensbron de DELETE TABLE opdracht door voordat er rijen aan de nieuwe tabel worden toegevoegd, waardoor de atomiciteit van de overschrijfbewerking wordt opgeofferd, maar de hoeveelheid faseringsruimte die Redshift nodig heeft tijdens het overschrijven vermindert.

Query's uitvoeren op redshift-tabel

Query's gebruiken de opdracht Redshift UNLOAD om een query uit te voeren en de resultaten ervan op te slaan in S3 en manifesten te gebruiken om te voorkomen dat bepaalde uiteindelijk consistente S3-bewerkingen worden uitgevoerd. Als gevolg hiervan moeten query's uit de Redshift-gegevensbron voor Spark dezelfde consistentie-eigenschappen hebben als gewone Redshift-query's.

Veelvoorkomende problemen en oplossingen

S3-bucket en Redshift-cluster bevinden zich in verschillende AWS-regio's

Standaard werken S3 <-> Redshift-kopieën niet als de S3-bucket en het Redshift-cluster zich in verschillende AWS-regio's bevinden.

Als u probeert een Redshift-tabel te lezen wanneer de S3-bucket zich in een andere regio bevindt, ziet u mogelijk een fout zoals:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Op dezelfde manier kan het schrijven naar Redshift met behulp van een S3-bucket in een andere regio de volgende fout veroorzaken:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Schrijfbewerkingen: De opdracht Redshift COPY ondersteunt expliciete specificatie van de S3-bucketregio, zodat u schrijfbewerkingen naar Redshift correct kunt laten werken in deze gevallen door deze toe te voegen region 'the-region-name' aan de extracopyoptions instelling. Gebruik bijvoorbeeld een bucket in de regio US - oost (Virginia) en de Scala-API:

    .option("extracopyoptions", "region 'us-east-1'")
    

    U kunt de awsregion instelling ook gebruiken:

    .option("awsregion", "us-east-1")
    
  • Leesbewerkingen: De opdracht Redshift UNLOAD ondersteunt ook expliciete specificatie van de S3-bucketregio. U kunt leesbewerkingen goed laten werken door de regio toe te voegen aan de awsregion instelling:

    .option("awsregion", "us-east-1")
    

Verificatiefout bij het gebruik van een wachtwoord met speciale tekens in de JDBC-URL

Als u de gebruikersnaam en het wachtwoord opgeeft als onderdeel van de JDBC-URL en het wachtwoord speciale tekens bevat, zoals ;, ?of &, ziet u mogelijk de volgende uitzondering:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Dit wordt veroorzaakt doordat speciale tekens in de gebruikersnaam of het wachtwoord niet correct worden ontsnapt door het JDBC-stuurprogramma. Zorg ervoor dat u de gebruikersnaam en het wachtwoord opgeeft met behulp van de bijbehorende DataFrame-opties user en password. Zie Parameters voor meer informatie.

Langlopende Spark-query loopt voor onbepaalde tijd vast, ook al wordt de bijbehorende Redshift-bewerking uitgevoerd

Als u grote hoeveelheden gegevens van en naar Redshift leest of schrijft, kan uw Spark-query voor onbepaalde tijd vastlopen, zelfs als op de pagina AWS Redshift Monitoring wordt aangegeven dat de bijbehorende LOAD of UNLOAD bewerking is voltooid en dat het cluster niet actief is. Dit wordt veroorzaakt door de verbinding tussen Redshift en Spark-time-out. Om dit te voorkomen, moet u ervoor zorgen dat de tcpKeepAlive JDBC-vlag is ingeschakeld en TCPKeepAliveMinutes is ingesteld op een lage waarde (bijvoorbeeld 1).

Zie Amazon Redshift JDBC Driver Configuration voor meer informatie.

Tijdstempel met tijdzone-semantiek

Bij het lezen van gegevens worden zowel Redshift TIMESTAMP als TIMESTAMPTZ gegevenstypen toegewezen aan Spark TimestampTypeen wordt een waarde geconverteerd naar Coordinated Universal Time (UTC) en wordt deze opgeslagen als de UTC-tijdstempel. Voor een Redshift TIMESTAMPwordt ervan uitgegaan dat de lokale tijdzone geen tijdzonegegevens bevat. Wanneer u gegevens naar een Redshift-tabel schrijft, wordt een Spark TimestampType toegewezen aan het gegevenstype Redshift TIMESTAMP .

Migratiehandleiding

Voor de gegevensbron moet u nu expliciet instellen forward_spark_s3_credentials voordat Spark S3-referenties worden doorgestuurd naar Redshift. Deze wijziging heeft geen invloed als u de aws_iam_role mechanismen of temporary_aws_* verificatiemechanismen gebruikt. Als u echter afhankelijk bent van het oude standaardgedrag, moet u nu expliciet instellen forward_spark_s3_credentials om true uw vorige Redshift te blijven gebruiken naar het S3-verificatiemechanisme. Zie de sectie Verifiëren bij S3 en Redshift van dit document voor een bespreking van de drie verificatiemechanismen en hun beveiligingsproblemen.