Aan de slag met COPY INTO om gegevens te laden

Met de COPY INTO SQL-opdracht kunt u gegevens van een bestandslocatie laden in een Delta-tabel. Dit is een re-triable en idempotente bewerking; bestanden in de bronlocatie die al zijn geladen, worden overgeslagen.

COPY INTO biedt de volgende mogelijkheden:

  • Eenvoudig configureerbare bestands- of mapfilters uit cloudopslag, waaronder S3-, ADLS Gen2-, ABFS-, GCS- en Unity Catalog-volumes.
  • Ondersteuning voor meerdere bronbestandsindelingen: CSV-, JSON-, XML-, Avro-, ORC-, Parquet-, tekst- en binaire bestanden
  • Exactly-once (idempotent) bestandsverwerking standaard
  • Schemadeductie van doeltabellen, toewijzing, samenvoegen en evolutie

Notitie

Voor een meer schaalbare en robuuste ervaring voor bestandsopname raadt Databricks aan dat SQL-gebruikers gebruikmaken van streamingtabellen. Zie Gegevens laden met behulp van streamingtabellen in Databricks SQL.

Waarschuwing

COPY INTO respecteert de werkruimte-instelling voor verwijderingsvectoren. Indien ingeschakeld, worden verwijderingsvectoren ingeschakeld in de doeltabel wanneer ze COPY INTO worden uitgevoerd op een SQL Warehouse of rekenproces waarop Databricks Runtime 14.0 of hoger wordt uitgevoerd. Als dit is ingeschakeld, blokkeren verwijderingsvectoren query's voor een tabel in Databricks Runtime 11.3 LTS en hieronder. Zie Wat zijn verwijderingsvectoren? en verwijdervectoren automatisch inschakelen.

Vereisten

Een accountbeheerder moet de stappen volgen in Gegevenstoegang configureren voor opname om de toegang tot gegevens in de opslag van cloudobjecten te configureren voordat gebruikers gegevens kunnen laden met behulp van COPY INTO.

Voorbeeld: Gegevens laden in een schemaloze Delta Lake-tabel

Notitie

Deze functie is beschikbaar in Databricks Runtime 11.3 LTS en hoger.

U kunt lege tijdelijke aanduidingen voor Delta-tabellen maken, zodat het schema later wordt afgeleid tijdens een COPY INTO opdracht door in te stellen mergeSchema op trueCOPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

De bovenstaande SQL-instructie is idempotent en kan worden gepland om gegevens exact één keer op te nemen in een Delta-tabel.

Notitie

De lege Delta-tabel is niet bruikbaar buiten COPY INTO. INSERT INTO en MERGE INTO worden niet ondersteund voor het schrijven van gegevens in schemaloze Delta-tabellen. Nadat gegevens in de tabel zijn ingevoegd, COPY INTOkan de tabel query's uitvoeren.

Zie Doeltabellen maken voor COPY INTO.

Voorbeeld: Schema instellen en gegevens laden in een Delta Lake-tabel

In het volgende voorbeeld ziet u hoe u een Delta-tabel maakt en vervolgens de COPY INTO SQL-opdracht gebruikt om voorbeeldgegevens uit Databricks-gegevenssets in de tabel te laden. U kunt de voorbeeldcode Python, R, Scala of SQL uitvoeren vanuit een notebook dat is gekoppeld aan een Azure Databricks-cluster. U kunt de SQL-code ook uitvoeren vanuit een query die is gekoppeld aan een SQL-warehouse in Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Voer de volgende code uit om de tabel op te schonen:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Verwijzing

Aanvullende bronnen