增量实时表数据源

可以使用以下外部数据源创建数据集:

  • Databricks Runtime 直接支持的数据源
  • 云存储中的任何文件,例如 Azure Data Lake Storage Gen2 (ADLS Gen2)、AWS S3 或 Google Cloud Storage (GCS)。
  • 存储在 DBFS 中的任何文件。

对于从支持的文件格式读取数据的管道,Databricks 建议使用自动加载程序,特别是对于处理持续传入数据的流式实时表。 自动加载程序可缩放、高效,且支持架构推断。

Python 数据集可以使用 Apache Spark 内置文件数据源从自动加载程序不支持的文件格式读取批处理操作中的数据。

SQL 数据集可以使用增量实时表文件源从自动加载程序不支持的文件格式读取批处理操作中的数据。

自动加载程序

以下示例使用自动加载程序从 CSV 和 JSON 文件创建数据集:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

可以将支持的格式选项用于自动加载程序。 使用 map() 函数,可将任意数量的选项传递给 cloud_files() 方法。 选项是键值对,其中键和值是字符串。 下面介绍了在 SQL 中使用自动加载程序的语法:

CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
AS SELECT *
  FROM cloud_files(
    "<file_path>",
    "<file_format>",
    map(
      "<option_key>", "<option_value",
      "<option_key>", "<option_value",
      ...
    )
  )

下面的示例从带有标头的制表符分隔的 CSV 文件中读取数据:

CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

可使用 schema 手动指定格式;必须为不支持架构推理的格式指定 schema

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING LIVE TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

注意

在使用自动加载程序读取文件时,增量实时表将自动配置和管理模式和检查点目录。 但是,如果你手动配置这些目录中的任何一个,执行完全刷新不会影响已配置目录的内容。 Databricks 建议使用自动配置的目录以避免在处理过程中出现意外的副作用。

Apache Spark 文件源

若要在 Python 中定义数据集时读取批处理操作中的文件,可以使用标准的 PySpark 函数。 下面的示例使用 PySpark spark.read.format("parquet").load() 函数从文件读取 Parquet 数据:

@dlt.table
def lendingclub_raw_data():
  return (
    spark.read.format("parquet").load("/databricks-datasets/samples/lending_club/parquet/")
  )

Spark SQL 文件源

若要在 SQL 中定义数据集时读取批处理操作中的文件,可以使用Spark SQL 语法。 以下示例从文件读取 Parquet 数据:

CREATE OR REFRESH LIVE TABLE customers
AS SELECT * FROM parquet.`/databricks-datasets/samples/lending_club/parquet/`