Query's uitvoeren op Amazon Redshift met behulp van Azure Databricks
U kunt tabellen lezen en schrijven vanuit Amazon Redshift met Azure Databricks.
Notitie
U kunt de voorkeur geven aan Lakehouse Federation voor het beheren van query's naar Redshift. Zie Wat is Lakehouse Federation.
De Databricks Redshift-gegevensbron maakt gebruik van Amazon S3 om gegevens efficiënt over te dragen naar en uit Redshift en gebruikt JDBC om automatisch de juiste COPY
en UNLOAD
opdrachten op Redshift te activeren.
Notitie
In Databricks Runtime 11.3 LTS en hoger bevat Databricks Runtime het Redshift JDBC-stuurprogramma dat toegankelijk is met behulp van het redshift
trefwoord voor de indelingsoptie. Zie releaseopmerkingen voor Databricks Runtime en compatibiliteit voor stuurprogrammaversies die zijn opgenomen in elke Databricks Runtime. Door de gebruiker geleverde stuurprogramma's worden nog steeds ondersteund en hebben voorrang op het gebundelde JDBC-stuurprogramma.
In Databricks Runtime 10.4 LTS en hieronder is handmatige installatie van het Redshift JDBC-stuurprogramma vereist en moeten query's het stuurprogramma (com.databricks.spark.redshift
) gebruiken voor de indeling. Zie de installatie van het Redshift-stuurprogramma.
Gebruik
In de volgende voorbeelden ziet u hoe u verbinding maakt met het Redshift-stuurprogramma. Vervang de url
parameterwaarden als u het JDBC-stuurprogramma Van PostgreSQL gebruikt.
Zodra u uw AWS-referenties hebt geconfigureerd, kunt u de gegevensbron gebruiken met de Spark-gegevensbron-API in Python, SQL, R of Scala.
Belangrijk
Externe locaties die zijn gedefinieerd in Unity Catalog worden niet ondersteund als tempdir
locaties.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Gegevens lezen met SQL op Databricks Runtime 10.4 LTS en hieronder:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Gegevens lezen met SQL op Databricks Runtime 11.3 LTS en hoger:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Gegevens schrijven met behulp van SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
De SQL-API ondersteunt alleen het maken van nieuwe tabellen en het niet overschrijven of toevoegen van tabellen.
R
Gegevens lezen met R in Databricks Runtime 10.4 LTS en hieronder:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Gegevens lezen met R in Databricks Runtime 11.3 LTS en hoger:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Aanbevelingen voor het werken met Redshift
Bij het uitvoeren van query's kunnen grote hoeveelheden gegevens worden geëxtraheerd naar S3. Als u van plan bent om verschillende query's uit te voeren op dezelfde gegevens in Redshift, raadt Databricks aan om de geëxtraheerde gegevens op te slaan met Delta Lake.
Configuratie
Verifiëren bij S3 en Redshift
De gegevensbron omvat verschillende netwerkverbindingen, geïllustreerd in het volgende diagram:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
De gegevensbron leest en schrijft gegevens naar S3 bij het overdragen van gegevens naar/van Redshift. Als gevolg hiervan zijn AWS-referenties met lees- en schrijftoegang tot een S3-bucket vereist (opgegeven met behulp van de tempdir
configuratieparameter).
Notitie
De gegevensbron schoont de tijdelijke bestanden die worden gemaakt in S3 niet op. Als gevolg hiervan raden we u aan een toegewezen tijdelijke S3-bucket te gebruiken met een configuratie voor de levenscyclus van objecten om ervoor te zorgen dat tijdelijke bestanden automatisch worden verwijderd na een opgegeven verloopperiode. Zie de sectie Versleuteling van dit document voor een bespreking van het versleutelen van deze bestanden. U kunt geen externe locatie gebruiken die is gedefinieerd in Unity Catalog als een tempdir
locatie.
In de volgende secties worden de verificatieconfiguratieopties van elke verbinding beschreven:
Spark-stuurprogramma naar Redshift
Het Spark-stuurprogramma maakt verbinding met Redshift via JDBC met behulp van een gebruikersnaam en wachtwoord. Redshift biedt geen ondersteuning voor het gebruik van IAM-rollen om deze verbinding te verifiëren. Deze verbinding maakt standaard gebruik van SSL-versleuteling; Zie Versleuteling voor meer informatie.
Spark naar S3
S3 fungeert als intermediair voor het opslaan van bulkgegevens bij het lezen van of schrijven naar Redshift. Spark maakt verbinding met S3 met behulp van zowel de Hadoop FileSystem-interfaces als rechtstreeks met behulp van de S3-client van de Amazon Java SDK.
Notitie
U kunt geen DBFS-koppelingen gebruiken om de toegang tot S3 voor Redshift te configureren.
Sleutels instellen in Hadoop conf: U kunt AWS-sleutels opgeven met behulp van Hadoop-configuratie-eigenschappen. Als uw
tempdir
configuratie verwijst naar eens3a://
bestandssysteem, kunt u defs.s3a.access.key
enfs.s3a.secret.key
eigenschappen instellen in een Hadoop XML-configuratiebestand of aanroepsc.hadoopConfiguration.set()
om de globale Hadoop-configuratie van Spark te configureren. Als u eens3n://
bestandssysteem gebruikt, kunt u de verouderde configuratiesleutels opgeven, zoals wordt weergegeven in het volgende voorbeeld.Scala
Als u bijvoorbeeld het bestandssysteem gebruikt, voegt u het
s3a
volgende toe:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Voor het verouderde bestandssysteem voegt u het volgende
s3n
toe:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
De volgende opdracht is afhankelijk van bepaalde interne Spark-functies, maar moet met alle PySpark-versies werken en is in de toekomst waarschijnlijk niet gewijzigd:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift naar S3
Stel de forward_spark_s3_credentials
optie in om true
automatisch de AWS-sleutelreferenties door te sturen die Spark gebruikt om verbinding te maken met S3 via JDBC naar Redshift. De JDBC-query sluit deze referenties in, dus Databricks raadt u ten zeerste aan SSL-versleuteling van de JDBC-verbinding in te schakelen.
Codering
JDBC beveiligen: tenzij er SSL-gerelateerde instellingen aanwezig zijn in de JDBC-URL, schakelt de gegevensbron standaard SSL-versleuteling in en controleert ook of de Redshift-server betrouwbaar is (dat wil
sslmode=verify-full
gezegd). Hiervoor wordt automatisch een servercertificaat gedownload van de Amazon-servers de eerste keer dat het nodig is. In het geval dat dit mislukt, wordt een vooraf gebundeld certificaatbestand gebruikt als een terugval. Dit geldt voor zowel de Redshift- als de PostgreSQL JDBC-stuurprogramma's.Als er problemen zijn met deze functie, of als u gewoon SSL wilt uitschakelen, kunt
.option("autoenablessl", "false")
u uwDataFrameReader
ofDataFrameWriter
.Als u aangepaste SSL-gerelateerde instellingen wilt opgeven, kunt u de instructies volgen in de Redshift-documentatie: SSL- en servercertificaten gebruiken in Java en JDBC-stuurprogrammaconfiguratieopties Eventuele SSL-gerelateerde opties die aanwezig zijn in de JDBC
url
die wordt gebruikt met de gegevensbron, hebben voorrang (de automatische configuratie wordt dus niet geactiveerd).UNLOAD-gegevens versleutelen die zijn opgeslagen in S3 (gegevens die zijn opgeslagen bij het lezen vanuit Redshift): Volgens de Redshift-documentatie over het lossen van gegevens naar S3 worden gegevensbestanden automatisch versleuteld met behulp van Amazon S3-versleuteling (SSE-S3)."
Redshift biedt ook ondersteuning voor versleuteling aan de clientzijde met een aangepaste sleutel (zie: Versleutelde gegevensbestanden lossen) maar de gegevensbron beschikt niet over de mogelijkheid om de vereiste symmetrische sleutel op te geven.
COPY-gegevens versleutelen die zijn opgeslagen in S3 (gegevens die zijn opgeslagen bij het schrijven naar Redshift): volgens de Redshift-documentatie over het laden van versleutelde gegevensbestanden van Amazon S3:
U kunt de COPY
opdracht gebruiken om gegevensbestanden te laden die zijn geüpload naar Amazon S3 met behulp van versleuteling aan de serverzijde met door AWS beheerde versleutelingssleutels (SSE-S3 of SSE-KMS), versleuteling aan de clientzijde of beide. COPY biedt geen ondersteuning voor Versleuteling aan de serverzijde van Amazon S3 met een door de klant geleverde sleutel (SSE-C).
Parameters
De parametertoewijzing of OPTIES in Spark SQL ondersteunen de volgende instellingen:
Parameter | Vereist | ||
---|---|---|---|
dbtable | Ja, tenzij de query is opgegeven. | ||
query | Ja, tenzij dbtable is opgegeven. | ||
Gebruiker | Nee | ||
password | Nee | ||
URL | Ja | ||
search_path | Nee | ||
aws_iam_role | Alleen als u IAM-rollen gebruikt om te autoriseren. | ||
forward_spark_s3_credentials | Nee | ||
temporary_aws_access_key_id | Nee | ||
temporary_aws_secret_access_key | Nee | ||
temporary_aws_session_token | Nee | ||
Tempdir | Ja | ||
jdbcdriver | Nee | ||
diststyle | Nee | ||
distkey | Nee, tenzij u DISTSTYLE KEY |
||
sortkeyspec | Nee | ||
usestagingtable (afgeschaft) | Nee | ||
beschrijving | Nee | ||
preactions | Nee | ||
postactions | Nee | ||
extracopyoptions | Nee | ||
tempformat | Nee | ||
csvnullstring | Nee | ||
CSVSeparator | Nee | , |
Scheidingsteken dat moet worden gebruikt bij het schrijven van tijdelijke bestanden met tempformat ingesteld op CSV ofCSV GZIP . Dit moet een geldig ASCII-teken zijn, bijvoorbeeld ', ' of '| .' |
csvignoreleadingwhitespace | Nee | ||
csvignoretrailingwhitespace | Nee | ||
infer_timestamp_ntz_type | Nee |
Extra configuratieopties
De maximale grootte van tekenreekskolommen configureren
Bij het maken van Redshift-tabellen is het standaardgedrag het maken TEXT
van kolommen voor tekenreekskolommen. Redshift slaat TEXT
kolommen op als VARCHAR(256)
, zodat deze kolommen een maximale grootte hebben van 256 tekens (bron).
Als u grotere kolommen wilt ondersteunen, kunt u het maxlength
veld met kolommetagegevens gebruiken om de maximale lengte van afzonderlijke tekenreekskolommen op te geven. Dit is ook handig voor het implementeren van ruimtebesparende prestatieoptimalisaties door kolommen met een kleinere maximale lengte te declareren dan de standaardwaarde.
Notitie
Vanwege beperkingen in Spark bieden de SQL- en R-taal-API's geen ondersteuning voor het wijzigen van kolommetagegevens.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Hier volgt een voorbeeld van het bijwerken van metagegevensvelden van meerdere kolommen met behulp van de Scala-API van Spark:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Een aangepast kolomtype instellen
Als u handmatig een kolomtype wilt instellen, kunt u de metagegevens van de redshift_type
kolom gebruiken. Als u bijvoorbeeld de typematcher wilt overschrijven om een door de Spark SQL Schema -> Redshift SQL
gebruiker gedefinieerd kolomtype toe te wijzen, kunt u het volgende doen:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Kolomcodering configureren
Wanneer u een tabel maakt, gebruikt u het encoding
veld met kolommetagegevens om een compressiecodering voor elke kolom op te geven (zie Amazon-documenten voor beschikbare coderingen).
Beschrijvingen instellen voor kolommen
Met Redshift kunnen kolommen beschrijvingen bevatten die moeten worden weergegeven in de meeste queryhulpprogramma's (met behulp van de COMMENT
opdracht). U kunt het description
veld voor kolommetagegevens instellen om een beschrijving voor afzonderlijke kolommen op te geven.
Querypushdown naar Redshift
De Spark Optimizer pusht de volgende operators omlaag naar Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
Binnen Project
en Filter
ondersteunt het de volgende expressies:
- De meeste Booleaanse logische operators
- Vergelijkingen
- Eenvoudige rekenkundige bewerkingen
- Numerieke en tekenreekscasts
- De meeste tekenreeksfuncties
- Scalaire subquery's, als ze volledig naar Redshift kunnen worden gepusht.
Notitie
Deze pushdown biedt geen ondersteuning voor expressies die werken op datums en tijdstempels.
Binnen Aggregation
ondersteunt het de volgende aggregatiefuncties:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
gecombineerd met de DISTINCT
component, indien van toepassing.
Binnen Join
ondersteunt het de volgende typen joins:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Subquery's die opnieuw worden geschreven
Join
door de optimizer, bijvoorbeeldWHERE EXISTS
,WHERE NOT EXISTS
Notitie
Join pushdown biedt geen ondersteuning FULL OUTER JOIN
voor .
De pushdown kan het nuttigst zijn in query's met LIMIT
. Een query zoals SELECT * FROM large_redshift_table LIMIT 10
kan erg lang duren, omdat de hele tabel eerst UNLOADed naar S3 zou zijn als tussenliggend resultaat. Met pushdown wordt de LIMIT
uitvoering uitgevoerd in Redshift. In query's met aggregaties helpt het pushen van de aggregatie naar Redshift ook om de hoeveelheid gegevens te verminderen die moet worden overgedragen.
Querypushdown naar Redshift is standaard ingeschakeld. Deze kan worden uitgeschakeld door de instelling in te stellen spark.databricks.redshift.pushdown
op false
. Zelfs als deze instelling is uitgeschakeld, pusht Spark nog steeds filters omlaag en voert kolomuitschakeling uit in Redshift.
Installatie van Redshift-stuurprogramma
De Redshift-gegevensbron vereist ook een met Redshift compatibel JDBC-stuurprogramma. Omdat Redshift is gebaseerd op het PostgreSQL-databasesysteem, kunt u het PostgreSQL JDBC-stuurprogramma gebruiken dat is opgenomen in Databricks Runtime of het Door Amazon aanbevolen Redshift JDBC-stuurprogramma. Er is geen installatie vereist om het PostgreSQL JDBC-stuurprogramma te gebruiken. De versie van het PostgreSQL JDBC-stuurprogramma dat is opgenomen in elke Databricks Runtime-release, wordt vermeld in de releaseopmerkingen van Databricks Runtime.
Het Redshift JDBC-stuurprogramma handmatig installeren:
- Download het stuurprogramma van Amazon.
- Upload het stuurprogramma naar uw Azure Databricks-werkruimte. Zie Bibliotheken.
- Installeer de bibliotheek op uw cluster.
Notitie
Databricks raadt aan de nieuwste versie van het Redshift JDBC-stuurprogramma te gebruiken. Versies van het Redshift JDBC-stuurprogramma lager dan 1.2.41 hebben de volgende beperkingen:
- Versie 1.2.16 van het stuurprogramma retourneert lege gegevens wanneer u een
where
component in een SQL-query gebruikt. - Versies van het stuurprogramma onder 1.2.41 kunnen ongeldige resultaten retourneren omdat de null-waarde van een kolom onjuist is gerapporteerd als 'Niet nullable' in plaats van 'Onbekend'.
Transactionele garanties
In deze sectie worden de transactionele garanties van de Redshift-gegevensbron voor Spark beschreven.
Algemene achtergrond op redshift- en S3-eigenschappen
Zie het hoofdstuk Gelijktijdige schrijfbewerkingen beheren in de redshift-documentatie voor algemene informatie over transactionele garanties van Redshift. In een notendop biedt Redshift serialiseerbare isolatie volgens de documentatie voor de opdracht Redshift BEGIN :
[hoewel] u elk van de vier transactieisolatieniveaus kunt gebruiken, verwerkt Amazon Redshift alle isolatieniveaus als serialiseerbaar.
Volgens de Redshift-documentatie:
Amazon Redshift ondersteunt een standaardgedrag voor automatische doorvoer , waarbij elke afzonderlijk uitgevoerde SQL-opdracht afzonderlijk wordt doorgevoerd.
Afzonderlijke opdrachten zoals COPY
en UNLOAD
zijn dus atomisch en transactioneel, terwijl ze expliciet BEGIN
zijn en END
alleen nodig moeten zijn om de atomiciteit van meerdere opdrachten of query's af te dwingen.
Bij het lezen van en schrijven naar Redshift leest en schrijft de gegevensbron gegevens in S3. Spark en Redshift produceren gepartitioneerde uitvoer en slaan deze op in meerdere bestanden in S3. Volgens de documentatie van het Amazon S3-gegevensconsistentiemodel zijn S3 bucket-vermeldingsbewerkingen uiteindelijk consistent, dus moeten de bestanden naar speciale lengten gaan om ontbrekende of onvolledige gegevens te voorkomen vanwege deze bron van uiteindelijke consistentie.
Garanties van de Redshift-gegevensbron voor Spark
Toevoegen aan een bestaande tabel
Wanneer u rijen in Redshift invoegt, gebruikt de gegevensbron de opdracht COPY en geeft deze manifesten op om te voorkomen dat bepaalde uiteindelijk consistente S3-bewerkingen worden uitgevoerd. Als gevolg hiervan spark-redshift
hebben toevoegt aan bestaande tabellen dezelfde atomische en transactionele eigenschappen als reguliere Redshift-opdrachten COPY
.
Een nieuwe tabel maken (SaveMode.CreateIfNotExists
)
Het maken van een nieuwe tabel is een proces in twee stappen, bestaande uit een CREATE TABLE
opdracht gevolgd door een COPY-opdracht om de eerste set rijen toe te voegen. Beide bewerkingen worden uitgevoerd in dezelfde transactie.
Een bestaande tabel overschrijven
De gegevensbron maakt standaard gebruik van transacties voor het uitvoeren van overschrijven, die worden geïmplementeerd door de doeltabel te verwijderen, een nieuwe lege tabel te maken en er rijen aan toe te voegen.
Als de afgeschafte usestagingtable
instelling is ingesteld false
op, voert de gegevensbron de DELETE TABLE
opdracht door voordat er rijen aan de nieuwe tabel worden toegevoegd, waardoor de atomiciteit van de overschrijfbewerking wordt opgeofferd, maar de hoeveelheid faseringsruimte die Redshift nodig heeft tijdens het overschrijven vermindert.
Query's uitvoeren op redshift-tabel
Query's gebruiken de opdracht Redshift UNLOAD om een query uit te voeren en de resultaten ervan op te slaan in S3 en manifesten te gebruiken om te voorkomen dat bepaalde uiteindelijk consistente S3-bewerkingen worden uitgevoerd. Als gevolg hiervan moeten query's uit de Redshift-gegevensbron voor Spark dezelfde consistentie-eigenschappen hebben als gewone Redshift-query's.
Veelvoorkomende problemen en oplossingen
S3-bucket en Redshift-cluster bevinden zich in verschillende AWS-regio's
Standaard werken S3 <-> Redshift-kopieën niet als de S3-bucket en het Redshift-cluster zich in verschillende AWS-regio's bevinden.
Als u probeert een Redshift-tabel te lezen wanneer de S3-bucket zich in een andere regio bevindt, ziet u mogelijk een fout zoals:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Op dezelfde manier kan het schrijven naar Redshift met behulp van een S3-bucket in een andere regio de volgende fout veroorzaken:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Schrijfbewerkingen: De opdracht Redshift COPY ondersteunt expliciete specificatie van de S3-bucketregio, zodat u schrijfbewerkingen naar Redshift correct kunt laten werken in deze gevallen door deze toe te voegen
region 'the-region-name'
aan deextracopyoptions
instelling. Gebruik bijvoorbeeld een bucket in de regio US - oost (Virginia) en de Scala-API:.option("extracopyoptions", "region 'us-east-1'")
U kunt de
awsregion
instelling ook gebruiken:.option("awsregion", "us-east-1")
Leesbewerkingen: De opdracht Redshift UNLOAD ondersteunt ook expliciete specificatie van de S3-bucketregio. U kunt leesbewerkingen goed laten werken door de regio toe te voegen aan de
awsregion
instelling:.option("awsregion", "us-east-1")
Verificatiefout bij het gebruik van een wachtwoord met speciale tekens in de JDBC-URL
Als u de gebruikersnaam en het wachtwoord opgeeft als onderdeel van de JDBC-URL en het wachtwoord speciale tekens bevat, zoals ;
, ?
of &
, ziet u mogelijk de volgende uitzondering:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Dit wordt veroorzaakt doordat speciale tekens in de gebruikersnaam of het wachtwoord niet correct worden ontsnapt door het JDBC-stuurprogramma. Zorg ervoor dat u de gebruikersnaam en het wachtwoord opgeeft met behulp van de bijbehorende DataFrame-opties user
en password
. Zie Parameters voor meer informatie.
Langlopende Spark-query loopt voor onbepaalde tijd vast, ook al wordt de bijbehorende Redshift-bewerking uitgevoerd
Als u grote hoeveelheden gegevens van en naar Redshift leest of schrijft, kan uw Spark-query voor onbepaalde tijd vastlopen, zelfs als op de pagina AWS Redshift Monitoring wordt aangegeven dat de bijbehorende LOAD
of UNLOAD
bewerking is voltooid en dat het cluster niet actief is. Dit wordt veroorzaakt door de verbinding tussen Redshift en Spark-time-out. Om dit te voorkomen, moet u ervoor zorgen dat de tcpKeepAlive
JDBC-vlag is ingeschakeld en TCPKeepAliveMinutes
is ingesteld op een lage waarde (bijvoorbeeld 1).
Zie Amazon Redshift JDBC Driver Configuration voor meer informatie.
Tijdstempel met tijdzone-semantiek
Bij het lezen van gegevens worden zowel Redshift TIMESTAMP
als TIMESTAMPTZ
gegevenstypen toegewezen aan Spark TimestampType
en wordt een waarde geconverteerd naar Coordinated Universal Time (UTC) en wordt deze opgeslagen als de UTC-tijdstempel. Voor een Redshift TIMESTAMP
wordt ervan uitgegaan dat de lokale tijdzone geen tijdzonegegevens bevat. Wanneer u gegevens naar een Redshift-tabel schrijft, wordt een Spark TimestampType
toegewezen aan het gegevenstype Redshift TIMESTAMP
.
Migratiehandleiding
Voor de gegevensbron moet u nu expliciet instellen forward_spark_s3_credentials
voordat Spark S3-referenties worden doorgestuurd naar Redshift. Deze wijziging heeft geen invloed als u de aws_iam_role
mechanismen of temporary_aws_*
verificatiemechanismen gebruikt. Als u echter afhankelijk bent van het oude standaardgedrag, moet u nu expliciet instellen forward_spark_s3_credentials
om true
uw vorige Redshift te blijven gebruiken naar het S3-verificatiemechanisme. Zie de sectie Verifiëren bij S3 en Redshift van dit document voor een bespreking van de drie verificatiemechanismen en hun beveiligingsproblemen.