Introdução ao uso de COPY INTO para carregar dados

O comando SQL COPY INTO permite carregar dados de um local de arquivo para uma tabela do Delta. Esta é uma operação repetível e idempotente. Os arquivos no local de origem que já foram carregados são ignorados.

COPY INTO oferece as seguintes funcionalidades:

  • Filtros de arquivo ou diretório facilmente configuráveis do armazenamento em nuvem, incluindo volumes S3, ADLS Gen2, ABFS, GCS e Catálogo do Unity.
  • Suporte para vários formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários
  • Processamento de arquivos exatamente uma vez (idempotente) por padrão
  • Inferência do esquema da tabela de destino, mapeamento, mesclagem e evolução

Observação

Para obter uma experiência de ingestão de arquivos mais escalonável e robusta, a Databricks recomenda que os usuários de SQL aproveitem as tabelas de streaming. Confira Carregar dados usando tabelas de streaming no Databricks SQL.

Aviso

COPY INTO respeita a configuração do espaço de trabalho para vetores de exclusão. Se habilitado, os vetores de exclusão serão habilitados na tabela de destino quando COPY INTO for executado em uma computação ou SQL warehouse que esteja executando o Databricks Runtime 14.0 ou superior. Uma vez habilitados, os vetores de exclusão bloqueiam as consultas em uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Consulte O que são vetores de exclusão? e Habilitar automaticamente vetores de exclusão.

Requisitos

Um administrador de conta deve seguir as etapas em Configurar o acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos de nuvem antes que os usuários possam carregar dados usando COPY INTO.

Exemplo: carregar dados em uma tabela do Delta Lake sem esquema

Observação

Este recurso está disponível no Databricks Runtime 11.3 LTS e versões superiores.

Você pode criar tabelas delta de espaço reservado vazias para que o esquema seja posteriormente inferido durante um comando COPY INTO definindo mergeSchema como true em COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

A instrução de SQL acima é idempotente e pode ser agendada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.

Observação

A tabela Delta vazia não é utilizável fora de COPY INTO. INSERT INTO e MERGE INTO não há suporte para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.

Confira Criar tabelas de destino para o COPY INTO.

Exemplo: definir o esquema e carregar dados em uma tabela do Delta Lake

O exemplo a seguir mostra como criar uma tabela Delta e usar o comando SQL COPY INTO para carregar dados de exemplo dos conjuntos de dados do Databricks nela. Execute o código Python, R, Scala ou SQL de exemplo em um notebook anexado a um cluster do Azure Databricks. Também é possível executar o código SQL em uma consulta associada a um SQL warehouse no SQL do Databricks.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Para limpar, execute o seguinte código, que exclui a tabela:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Referência

  • Databricks Runtime 7.x e posteriores: COPY INTO

Recursos adicionais