Začínáme používat funkci COPY INTO k načtení dat
Příkaz COPY INTO
SQL umožňuje načíst data z umístění souboru do tabulky Delta. Jedná se o re-triable a idempotentní operace; soubory ve zdrojovém umístění, které již byly načteny, se přeskočí.
COPY INTO
nabízí následující možnosti:
- Snadno konfigurovatelné filtry souborů nebo adresářů z cloudového úložiště, včetně svazků S3, ADLS Gen2, ABFS, GCS a Unity Catalog.
- Podpora více zdrojových formátů souborů: CSV, JSON, XML, Avro, ORC, Parquet, text a binární soubory
- Zpracování souboru přesně jednou (idempotentní) ve výchozím nastavení
- Odvození schématu cílové tabulky, mapování, sloučení a vývoj
Poznámka:
Pro škálovatelné a robustnější prostředí pro příjem souborů doporučuje Databricks, aby uživatelé SQL využívali streamované tabulky. Viz Načtení dat pomocí streamovaných tabulek v Databricks SQL.
Upozorňující
COPY INTO
respektuje nastavení pracovního prostoru pro vektory odstranění. Pokud je tato možnost povolená, jsou v cílové tabulce povoleny vektory odstranění, pokud COPY INTO
běží na SQL Warehouse nebo ve výpočetním prostředí Databricks Runtime 14.0 nebo novějším. Po povolení blokují vektory odstranění dotazy na tabulku v Databricks Runtime 11.3 LTS a níže. Podívejte se , co jsou vektory odstranění? a vektory automatického povolení odstranění.
Požadavky
Správce účtu musí postupovat podle kroků v části Konfigurace přístupu k datům pro příjem dat a nakonfigurovat přístup k datům v cloudovém úložišti objektů, aby uživatelé mohli načítat data pomocí COPY INTO
.
Příklad: Načtení dat do tabulky Delta Lake bez schématu
Poznámka:
Tato funkce je dostupná ve verzi Databricks Runtime 11.3 LTS a vyšší.
Prázdné zástupné tabulky Delta můžete vytvořit tak, aby se schéma později odvodilo během COPY INTO
příkazu nastavením na :mergeSchema
true
COPY_OPTIONS
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Výše uvedený příkaz SQL je idempotentní a dá se naplánovat tak, aby se data spouštěla přesně jednou do tabulky Delta.
Poznámka:
Prázdná tabulka Delta není použitelná mimo COPY INTO
. INSERT INTO
a MERGE INTO
nejsou podporovány pro zápis dat do tabulek Delta bez schématu. Po vložení dat do tabulky s COPY INTO
tabulkou se tabulka stane dotazovatelnou.
Viz Vytvoření cílových tabulek pro COPY INTO.
Příklad: Nastavení schématu a načtení dat do tabulky Delta Lake
Následující příklad ukazuje, jak vytvořit tabulku Delta a pak pomocí COPY INTO
příkazu SQL načíst ukázková data z datových sad Databricks do tabulky. Ukázkový kód Pythonu, R, Scaly nebo SQL můžete spustit z poznámkového bloku připojeného ke clusteru Azure Databricks. Kód SQL můžete také spustit z dotazu přidruženého k SQL Warehouse v Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Pokud chcete tabulku vyčistit, spusťte následující kód, který odstraní tabulku:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Reference
- Databricks Runtime 7.x a vyšší: COPY INTO
Další materiály
Načtení dat pomocí funkce COPY INTO se svazky katalogu Unity nebo externími umístěními
Běžné vzory použití, včetně příkladů více
COPY INTO
operací se stejnou tabulkou Delta, najdete v tématu Běžné vzory načítání dat pomocí funkce COPY INTO.
Váš názor
https://aka.ms/ContentUserFeedback.
Připravujeme: V průběhu roku 2024 budeme postupně vyřazovat problémy z GitHub coby mechanismus zpětné vazby pro obsah a nahrazovat ho novým systémem zpětné vazby. Další informace naleznete v tématu:Odeslat a zobrazit názory pro