自动加载程序

自动加载程序会在新数据文件到达云存储空间时以增量方式高效地对其进行处理。 除了 Databricks File System (DBFS, dbfs:/) 之外,自动加载程序还可以从 AWS S3 (s3://)、Azure Data Lake Storage Gen2 (ADLS Gen2, abfss://)、Google Cloud Storage (GCS, gs://)、Azure Blob Storage (wasbs://) 和 ADLS Gen1 (adl://) 加载数据文件。 自动加载程序可以引入 JSONCSVPARQUETAVROORCTEXTBINARYFILE 文件格式。

自动加载程序提供了名为 cloudFiles 的结构化流式处理源。 给定云文件存储上的输入目录路径后,cloudFiles 源将在新文件到达时自动处理这些文件,你也可以选择处理该目录中的现有文件。

自动加载程序可以扩展到从包含数十亿个文件的存储帐户加载数据,这些文件需要回填到管道中,在管道中,一小时内可加载数百万个文件。

自动加载程序的工作方式

自动加载程序支持两种检测新文件的模式:目录列表和文件通知。

  • 目录列表:自动加载程序通过列出输入目录来识别新文件。 通过目录列表模式,你无需任何权限配置即可快速启动自动加载程序流,而无需访问云存储上的数据。 在 Databricks Runtime 9.1 及更高版本中,自动加载程序可以自动检测文件是否以词法顺序到达云存储,并显著减少检测新文件所需的 API 调用量。 有关更多详细信息,请参阅增量列表
  • 文件通知:自动加载程序可以自动设置从输入目录中订阅文件事件的通知服务和队列服务。 文件通知模式对于大型输入目录或大量文件具有更高的性能和可扩展性,但需要额外的云权限才能进行设置。 有关更多详细信息,请参阅利用文件通知

下面列出了这些模式的可用性。

云存储 目录列表 文件通知
AWS S3 所有版本 所有版本
ADLS Gen2 所有版本 所有版本
GCS 所有版本 Databricks Runtime 9.1 及更高版本
Azure Blob 存储 所有版本 所有版本
ADLS Gen1 Databricks Runtime 7.3 及更高版本 不支持
DBFS 所有版本 仅适用于装入点。

当文件被发现时,其元数据将保留在自动加载程序管道的检查点位置的可缩放键值存储 (RocksDB) 中。 这种键值存储可确保只处理一次数据。 你可以在流重启时切换文件发现模式,并且仍可获得精确一次的数据处理保证。 实际上,自动加载程序正是通过这种方式来保证既可以对包含现有文件的目录执行回填,又可以并发处理通过文件通知发现的新文件。

如果发生故障,自动加载程序可以通过存储在检查点位置的信息从中断的位置恢复,并在将数据写入 Delta Lake 时继续提供一次性保证。 你无需自己维护或管理任何状态即可实现容错或恰好一次性语义。

何时使用 COPY INTO 以及何时使用自动加载程序

COPY INTO 命令是另一种将数据增量加载到增量表中的便捷方法,可以保证“恰好一次”。 在自动加载程序和 COPY INTO 之间进行选择时,需要考虑以下几点:

  • 如果要以数千个顺序引入文件,则可以使用 COPY INTO。 如果随着时间的推移,文件的数量约为数百万或更多,请使用自动加载程序。 与 COPY INTO 相比,自动加载程序可以更便宜地发现文件,并且可以将处理拆分为多个批处理。
  • 如果你的数据架构将频繁演化,自动加载程序会围绕架构推理和演进提供更好的基元。 有关更多详细信息,请参阅架构推理和演变
  • 使用 COPY INTO 加载重新上传的文件的子集可以更容易地进行管理。 使用自动加载程序,很难重新处理选定的文件子集。 但是,你可以使用 COPY INTO 在自动加载程序流同时运行时重新加载文件子集。

相对于 Apache Spark FileStreamSource 的优势

在 Apache Spark 中,可以使用 spark.readStream.format(fileFormat).load(directory) 以增量方式读取文件。 相比于文件源,自动加载程序具有以下优势:

  • 可伸缩性:自动加载程序可以高效地发现数十亿个文件。 可以以异步方式执行回填,以避免浪费任何计算资源。
  • 性能:使用自动加载程序发现文件的成本会根据要引入的文件的数量(而不是文件可能登陆的目录的数量)进行调整。 请参阅经过优化的目录列表
  • 架构推理和演变支持:自动加载程序可以检测到架构偏移,在发生架构更改时通知你,并且会挽救那些原本会被忽略或丢失的数据。 请参阅架构推理和演变
  • 成本:自动加载程序使用本机云 API 来获取存储中存在的文件的列表。 另外,自动加载程序的文件通知模式可通过完全避免目录列表来帮助进一步降低云成本。 自动加载程序可在存储上自动设置文件通知服务,从而使文件发现成本大幅降低。

快速入门

下面的代码示例演示自动加载程序如何在新数据文件到达云存储时检测到这些文件。 可以从附加到 Azure Databricks 群集笔记本中运行示例代码。

  1. 创建文件上传目录,例如:

    Python

    user_dir = '<my-name>@<my-organization.com>'
    upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    

    Scala

    val user_dir = "<my-name>@<my-organization.com>"
    val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    
  2. 创建以下示例 CSV 文件,然后通过使用 DBFS 文件浏览器将这些文件上传到文件上传目录:

    WA.csv:

    city,year,population
    Seattle metro,2019,3406000
    Seattle metro,2020,3433000
    

    OR.csv:

    city,year,population
    Portland metro,2019,2127000
    Portland metro,2020,2151000
    
  3. 运行以下代码以启动自动加载程序。

    Python

    checkpoint_path = '/tmp/delta/population_data/_checkpoints'
    write_path = '/tmp/delta/population_data'
    
    # Set up the stream to begin reading incoming files from the
    # upload_path location.
    df = spark.readStream.format('cloudFiles') \
      .option('cloudFiles.format', 'csv') \
      .option('header', 'true') \
      .schema('city string, year int, population long') \
      .load(upload_path)
    
    # Start the stream.
    # Use the checkpoint_path location to keep a record of all files that
    # have already been uploaded to the upload_path location.
    # For those that have been uploaded since the last check,
    # write the newly-uploaded files' data to the write_path location.
    df.writeStream.format('delta') \
      .option('checkpointLocation', checkpoint_path) \
      .start(write_path)
    

    Scala

    val checkpoint_path = "/tmp/delta/population_data/_checkpoints"
    val write_path = "/tmp/delta/population_data"
    
    // Set up the stream to begin reading incoming files from the
    // upload_path location.
    val df = spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", "true")
      .schema("city string, year int, population long")
      .load(upload_path)
    
    // Start the stream.
    // Use the checkpoint_path location to keep a record of all files that
    // have already been uploaded to the upload_path location.
    // For those that have been uploaded since the last check,
    // write the newly-uploaded files' data to the write_path location.
    df.writeStream.format("delta")
      .option("checkpointLocation", checkpoint_path)
      .start(write_path)
    
  4. 在步骤 3 中的代码仍在运行时,运行以下代码来查询写入目录中的数据:

    Python

    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    '''
    

    Scala

    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    */
    
  5. 在步骤 3 中的代码仍在运行时,创建以下附加 CSV 文件,然后通过使用 DBFS 文件浏览器将它们上传到上传目录:

    ID.csv:

    city,year,population
    Boise,2019,438000
    Boise,2020,447000
    

    MT.csv:

    city,year,population
    Helena,2019,81653
    Helena,2020,82590
    

    Misc.csv:

    city,year,population
    Seattle metro,2021,3461000
    Portland metro,2021,2174000
    Boise,2021,455000
    Helena,2021,81653
    
  6. 在步骤 3 中的代码仍在运行时,运行以下代码来查询写入目录中的现有数据,此外还包括自动加载程序在上传目录中检测到然后写入到写入目录的文件中的新数据:

    Python

    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    '''
    

    Scala

    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    */
    
  7. 若要清理,请取消步骤 3 中正在运行的代码,然后运行以下代码来删除上传、检查点和写入目录:

    Python

    dbutils.fs.rm(write_path, True)
    dbutils.fs.rm(upload_path, True)
    

    Scala

    dbutils.fs.rm(write_path, true)
    dbutils.fs.rm(upload_path, true)
    

另请参阅教程:使用自动加载程序将数据连续引入 Delta Lake

架构推理和演变

注意

适用于 Databricks Runtime 8.2 及更高版本。

自动加载程序支持 CSV、JSON、二进制 (binaryFile) 和文本文件格式的架构推理和演变。 有关详细信息,请参阅自动加载程序中的架构推理和演变

将自动加载程序扩展到大量数据

使用 Trigger.AvailableNow 和速率限制

注意

仅适用于 Scala 的 Databricks Runtime 10.1。

适用于 Python 和 Scala 的 Databricks Runtime 10.2 及更高版本。

可以使用 Trigger.AvailableNow 将自动加载程序计划为在 Databricks 作业中作为批处理作业运行。 AvailableNow 触发器将指示自动加载程序处理在查询开始时间之前到达的所有文件。 在流开始后上传的新文件将被忽略,直到下一次触发。

使用 Trigger.AvailableNow,文件发现将与数据处理异步进行,并且可以通过速率限制跨多个微批处理数据。 默认情况下,自动加载程序每个微批处理最多处理 1000 个文件。 可以通过配置 cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger 来配置应在微批处理中处理的文件数或字节数。 文件限制是硬限制,但字节限制是软限制,这意味着可以处理的字节数多于 maxBytesPerTrigger 提供的字节数。 当这两个选项同时提供时,自动加载程序将处理达到其中一个限制所需的文件数量。

经过优化的目录列表

注意

可在 Databricks Runtime 9.0 及更高版本中使用。

自动加载程序可以比其他替代方式更高效地使用目录列表在云存储系统上发现文件。 例如,如果你每隔 5 分钟就有文件上传为 /some/path/YYYY/MM/DD/HH/fileName,那么,为了找到这些目录中的所有文件,Apache Spark 文件源会并行列出所有子目录,导致 1(基础目录)+ 365(每天)* 24(每小时)= 针对存储的 8761 次 LIST API 目录调用。 通过从存储接收平展响应,自动加载程序将 API 调用次数减少到存储中的文件数除以每次 API 调用返回的结果数(使用 S3 时为 1000,使用 ADLS Gen2 时为 5000,使用 GCS 时为 1024),从而大大降低了云成本。

增量列表

注意

Databricks Runtime 9.1 LTS 及更高版本中可用。

现在,对于按字典顺序生成的文件,自动加载程序可以利用字典顺序的文件排序和经过优化的列表 API,通过从最近引入的文件中列出而不是列出整个目录的内容,从而来提高目录列表的效率。

默认情况下,自动加载程序通过检查和比较以前完成的目录列表的文件路径,自动检测给定目录是否适用于增量列表。 为了保证 auto 模式中数据的最终完整性,自动加载程序会在完成 7 个连续的增量列表后自动触发一个完整的目录列表。 可以设置 cloudFiles.backfillInterval 以在给定的时间间隔触发异步回填,从而控制完整目录列表的频率。

你可以通过将 cloudFiles.useIncrementalListing 设置为 "true""false"(默认为 "auto")来显式启用或禁用增量列表。 显式启用后,自动加载程序将不会触发完整目录列表,除非设置了回填间隔。 AWS Kinesis Firehose、AWS DMS 和 Azure Data Factory 等服务可以配置为按词法顺序将文件上传到存储系统。 有关词汇目录结构的更多示例,请参见附录

利用文件通知

当文件未按词法顺序到达存储桶时,可以使用文件通知来缩放自动加载程序,以每小时引入数百万个文件。 如果将 cloudFiles.useNotifications 选项设置为 true 并提供创建云资源所需的权限时,自动加载程序可以自动为你设置文件通知。 此外,还可能需要提供以下附加选项,以提供创建这些资源所需要的自动加载程序授权。 下表汇总了自动加载程序创建的资源。

云存储 订阅服务 队列服务 前缀 (1) 限制 (2)
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 每个 S3 存储桶 100 个
ADLS Gen2 Azure 事件网格 Azure Queue Storage databricks 每存储帐户 500
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest 每个 GCS 存储桶有 100 个
Azure Blob 存储 Azure 事件网格 Azure Queue Storage databricks 每存储帐户 500
  1. 自动加载程序将使用此前缀命名资源
  2. 可以启动多少个并发文件通知管道

如果你无法为 Auto Loader 提供创建文件通知服务的必要权限,则可以要求云管理员使用 Databricks Scala 笔记本中下一节中的 setUpNotificationServices 方法为你创建文件通知服务。 或者,你的云管理员可以手动设置文件通知服务,并可以为你提供队列标识符以利用文件通知。 有关更多详细信息,请参阅文件通知选项

可以随时在文件通知和目录列表之间切换,并且只要数据处理保证,你仍然可以精确地维护。

注意

云提供商不保证在极少数情况下 100% 交付所有文件事件,也不对文件事件的延迟提供任何严格的 SLA。 Databricks 建议你使用自动加载程序触发定期回填,方法是使用 cloudFiles.backfillInterval 选项来保证在给定的 SLA 中发现所有文件(如果需要满足数据完整性的要求)。 触发定期回填不会导致重复。

如果需要为给定存储帐户运行超过有限数量的文件通知管道,则可以:

  • 考虑重新架构如何上传文件以利用增量列表来代替文件通知
  • 利用 AWS Lambda、Azure Functions 或 Google Cloud Functions 等服务,将来自单个队列的通知扇出(该队列侦听整个容器或存储桶),并放入特定于目录的队列

文件通知事件

当文件上传到 S3 存储桶时,AWS S3 都会提供一个 ObjectCreated 事件,无论该文件的上传方式是通过 put 上传还是通过多部分上传。

ADLS Gen2 为 Gen2 容器中显示的文件提供不同的事件通知。

  • 自动加载程序会侦听 FlushWithClose 事件,以便处理某个文件。
  • 使用 Databricks Runtime 8.3 及更高版本创建的自动加载程序流支持 RenameFile 操作,以便发现文件。 RenameFile 操作将会需要向存储系统发送的 API 请求,以便获取重命名的文件的大小。
  • 使用 Databricks Runtime 9.0 及更高版本创建的自动加载程序流支持 RenameDirectory 操作,以便发现文件。 RenameDirectory 操作将会需要向存储系统发送的 API 请求,以便列出重命名的目录的内容。

Google Cloud Storage 会在上传文件时提供一个 OBJECT_FINALIZE 事件,其中包括覆盖和文件副本。 失败的上传不会生成此事件。

管理文件通知资源

可以使用 Scala API 来管理自动加载程序创建的通知和排队服务。 使用此 API 之前,必须配置权限中所述的资源设置权限。

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

使用 setUpNotificationServices(<resource-suffix>) 创建名为 <prefix>-<resource-suffix> 的队列和订阅(前缀取决于利用文件通知中总结的存储系统)。 如果已存在具有相同名称的资源,Azure Databricks 会重用已存在的资源,而不是创建新资源。 此函数返回一个队列标识符,可以使用文件通知选项中的标识符将该标识符传递给 cloudFiles 源。 这使得 cloudFiles 源用户拥有的权限少于创建资源的用户的权限。 请参阅权限

只有调用 setUpNotificationServices 时才需要提供 newManager"path" 选项;对于 listNotificationServicestearDownNotificationServices,则不需要提供。 这是你运行流式处理查询时使用的同一 path

云存储 安装程序 API 列出 API 拆解 API
AWS S3 所有版本 所有版本 所有版本
ADLS Gen2 所有版本 所有版本 所有版本
GCS Databricks Runtime 9.1 及更高版本 Databricks Runtime 9.1 及更高版本 Databricks Runtime 9.1 及更高版本
Azure Blob 存储 所有版本 所有版本 所有版本
ADLS Gen1 不支持 不支持 不支持

事件保留

注意

可在 Databricks Runtime 8.4 及更高版本中使用。

自动加载程序使用 RocksDB 跟踪检查点位置中的已发现文件,以提供恰好一次的引入保证。 对于高容量数据集,可以使用 cloudFiles.maxFileAge 选项使检查点位置的事件过期,以减少存储成本和自动加载程序启动时间。 可为 cloudFiles.maxFileAge 设置的最小值为 "14 days"。 RocksDB 中的删除项显示为逻辑删除条目,因此,在存储使用率稳定下来前,可以预见存储使用率会随着事件过期而临时增加。

警告

cloudFiles.maxFileAge 作为高容量数据集的成本控制机制提供,以每小时数百万个文件的速度引入数据。 未正确优化 cloudFiles.maxFileAge 可能会导致数据质量问题。 因此,Databricks 建议不要优化此参数,除非绝对必要。

尝试优化 cloudFiles.maxFileAge 选项可能会导致自动加载程序忽略未处理的文件,或导致已处理的文件过期并重新处理,从而导致重复数据。 下面是选择 cloudFiles.maxFileAge 时需要注意的几个事项:

  • 如果流在很长时间后重启,将忽略从队列中拉取的超过 cloudFiles.maxFileAge 的文件通知事件。 同样,如果使用目录列表,将忽略在停机期间可能出现的超过 cloudFiles.maxFileAge 的文件。
  • 如果你使用目录列表模式并使用 cloudFiles.maxFileAge(例如将其设置为 "1 month"),然后停止流并在 cloudFiles.maxFileAge 设置为 "2 months" 的情况下重启流,则所有超过 1 个月但未到 2 个月的文件都将重新处理。

优化 cloudFiles.maxFileAge 的最佳方法是从一个宽松的期限(例如 "1 year")开始,然后继续下调(例如调整为 "9 months")。 如果你在首次启动流时设置此选项,则不会引入超过 cloudFiles.maxFileAge 的数据。因此,如果你需要引入旧数据,则在启动流时不应设置此选项。

在生产环境中运行自动加载程序

Databricks 建议在生产环境中运行自动加载程序时遵循流式处理最佳做法

监视自动加载程序

侦听流更新

若要进一步监视自动加载程序流,Databricks 建议使用 Apache Spark 的流式处理查询侦听器接口

自动加载程序在每批次中向流式处理查询侦听器报告指标。 你可以在numBytesOutstanding的“原始数据”选项卡下的 numFilesOutstandingnumBytesOutstanding 指标中查看积压工作 (backlog) 中存在多少个文件以及积压工作 (backlog) 量有多大:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

在 Databricks Runtime 10.1 及更高版本中,对于 AWS 和 Azure,使用文件通知模式时,指标还将包括云队列中文件事件的近似数量(表示为 approximateQueueSize)。

在增量实时表中使用自动加载程序

自动加载程序可以在 SQL 和 Python 的增量实时表中使用。 在增量实时表中使用自动加载程序时,无需提供架构位置或检查点位置,因为这些位置将由管道的增量实时表管理。

以下示例使用自动加载程序从 CSV 和 JSON 文件创建数据集:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

可以将支持的格式选项用于自动加载程序。 下面的示例从制表符分隔的 CSV 文件中读取数据:

CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t"))

注意

在使用自动加载程序读取文件时,增量实时表将自动配置和管理模式和检查点目录。 但是,如果你手动配置这些目录中的任何一个,执行完全刷新不会影响已配置目录的内容。 Databricks 建议使用自动配置的目录以避免在处理过程中出现意外的副作用。

成本注意事项

运行自动加载程序时,主要成本来源是计算资源和文件发现的成本。

为了降低计算成本,Databricks 建议使用 Databricks 作业将自动加载程序计划为使用 Trigger.AvailableNow(在 Databricks Runtime 10.1 及更高版本中)或 Trigger.Once 的批处理作业,而不是连续运行(前提是你对于低延迟没有要求)。

文件发现成本的形式可以是目录列表模式下存储帐户的 LIST 操作、订阅服务上的 API 请求,以及文件通知模式下的队列服务。 为了降低文件发现成本,Databricks 的建议是:

  • 在目录列表模式下连续运行自动加载程序时提供 ProcessingTime 触发器
  • 以词汇顺序构建文件上传到你的存储帐户,以尽可能利用增量列表
  • 在目录列表模式下使用 Databricks Runtime 9.0 或更高版本,特别是对于深度嵌套目录
  • 在无法增量列出时利用文件通知
  • 使用资源标记来标记自动加载程序创建的资源,以跟踪你的成本

配置自动加载程序

特定于 cloudFiles 源的配置选项以 cloudFiles 为前缀,因此它们位于与其他结构化流式处理源选项不同的命名空间中。

文件格式选项

使用自动加载程序,可引入 JSONCSVPARQUETAVROTEXTBINARYFILEORC 文件。 有关这些文件格式的选项,请参阅格式选项

常见自动加载程序选项

可为目录列表或文件通知模式配置以下选项。

选项
cloudFiles.allowOverwrites

类型:Boolean

是否允许输入目录文件更改替代现有数据。 适用于 Databricks Runtime 7.6 及更高版本。

默认值:30false
cloudFiles.backfillInterval

类型:Interval String

自动加载程序可在给定的时间间隔触发异步回填。
例如,1 day 表示每天回填一次,1 week 表示每周回填一次。 文件事件通知系统不保证 100% 交付已上传的所有文件,因此,可使用回填来保证所有文件最终都得到处理,此功能在 Databricks Runtime 8.4(不受支持)及更高版本中可用。 如果使用增量列表,还可以使用定期回填来保证最终的完成度,此功能在 Databricks Runtime 9.1 LTS 及更高版本中可用。

默认值: 无
cloudFiles.format

类型:String

源路径中的数据文件格式。 允许的值包括:

* avro:*
* binaryFile:*
* csv:*
* json:*
* orc:ORC 文件
* parquet:*
* text:文本文件

默认值:无(必需选项)
cloudFiles.includeExistingFiles

类型:Boolean

是包含流式处理输入路径中的现有文件,还是仅处理初始设置后到达的新文件。 仅在首次启动流时会评估此选项。 在重启流后更改此选项不起作用。

默认值:30true
cloudFiles.inferColumnTypes

类型:Boolean

在利用架构推理时是否推断确切的列类型。 默认情况下,在推断 JSON 数据集时,将列推断为字符串。 有关更多详细信息,请参阅架构推理

默认值:30false
cloudFiles.maxBytesPerTrigger

类型:Byte String

要在每个触发器中处理的最大新字节数。 你可以指定一个字节字符串(例如 10g),将每个微批限制为 10 GB 数据。 这个一个软性最大值。 如果每个文件为 3 GB,则 Azure Databricks 在一个微批中可以处理 12 GB。 与 cloudFiles.maxFilesPerTrigger 一起使用时,Azure Databricks 最多将消耗 cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger 的最低限制(以先达到者为准)。 与 Trigger.Once() 一起使用时,此选项不起作用。

默认值: 无
cloudFiles.maxFileAge

类型:Interval String

出于重复数据删除目的而跟踪文件事件的时长。 Databricks 建议不要优化此参数,除非你是在以每小时数百万个文件的速度引入数据。 有关更多详细信息,请参阅事件保留章节。

默认值: 无
cloudFiles.maxFilesPerTrigger

类型:Integer

要在每个触发器中处理的最大新文件数。 与 cloudFiles.maxBytesPerTrigger 一起使用时,Azure Databricks 最多将消耗 cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger 的最低限制(以先达到者为准)。 与 Trigger.Once() 一起使用时,此选项不起作用。

默认值:1000
cloudFiles.partitionColumns

类型:String

要从文件的目录结构推断出的 Hive 样式分区列的逗号分隔列表。 Hive 样式的分区列是由等号组合的键值对,例如
<base_path>/a=x/b=1/c=y/file.format. 在此示例中,分区列为 abc。 默认情况下,如果你使用的是架构推理,则这些列将自动添加到架构中,并提供从中加载数据的 <base_path>。 如果提供架构,则自动加载程序会期望这些列包含在架构中。 如果你不希望这些列成为架构的一部分,则可以指定 "" 以忽略这些列。 此外,当你希望推断出复杂目录结构中的文件路径时,可以使用此选项,如下面的示例所示:

<base_path>/year=2022/week=1/file1.csv
<base_path>/year=2022/month=2/day=3/file2.csv
<base_path>/year=2022/month=2/day=4/file3.csv

如果将 cloudFiles.partitionColumns 指定为 year,month,day,则
针对 file1.csv,将返回 year=2022,但 monthday 列将为 null
对于 file2.csvfile3.csv,将正确分析 monthday

默认值: 无
cloudFiles.schemaEvolutionMode

类型:String

在数据中发现新列时架构演变的模式。 默认情况下,在推断 JSON 数据集时,将列推断为字符串。 有关更多详细信息,请参阅架构演变

默认值:在未提供架构时为 "addNewColumns"
否则为 "none"
cloudFiles.schemaHints

类型:String

在架构推理期间向自动加载程序提供的架构信息。 有关更多详细信息,请参阅架构提示

默认值: 无
cloudFiles.schemaLocation

类型:String

存储推断出的架构和后续更改的位置。 有关更多详细信息,请参阅架构推理

默认值:无(推断架构时需要)
cloudFiles.validateOptions

类型:Boolean

是否验证自动加载程序选项,并为未知或不一致的选项返回错误。

默认值:30true

目录列表选项

以下选项与目录列表模式相关。

选项
cloudFiles.useIncrementalListing

类型:String

是否在目录列表模式下使用增量列表而不是完整列表。 在默认情况下,自动加载程序将尽最大努力自动检测指定的目录是否适用于递增列表。 可以显式使用增量列表,或者通过将完整目录列表分别设置为 truefalse 来使用该列表。

Databricks Runtime 9.1 LTS 及更高版本中可用。

默认值:30auto

可用值:autotruefalse

文件通知选项

以下选项与文件通知模式相关。

选项
cloudFiles.fetchParallelism

类型:Integer

从队列服务中提取消息时要使用的线程数。

默认值:1
cloudFiles.pathRewrites

类型:JSON 字符串

仅当指定从多个 S3 存储桶接收文件通知的 queueUrl,并且希望利用为访问这些容器中的数据而配置的装入点时,才需要此选项。 借助此选项可以使用装入点重写 bucket/key 路径的前缀。 只能重写前缀。 例如,对于配置
{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"},路径
s3://<databricks-mounted-bucket>/path/2017/08/fileA.json 会重写为 dbfs:/mnt/data-warehouse/2017/08/fileA.json

默认值: 无
cloudFiles.resourceTags

类型:Map(String, String)

一系列键值标记对,可帮助关联和识别相关资源,例如:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

有关 AWS 的更多信息,请参阅 Amazon SQS 成本分配标记为 Amazon SNS 主题配置标记(1)

有关 Azure 的详细信息,请参阅命名队列和元数据以及事件订阅properties.labels 的覆盖范围。 自动加载程序将这些键值标记对以 JSON 格式存储为标签。 (1)

有关 GCP 的更多信息,请参阅使用标签报告使用情况(1)

默认值: 无
cloudFiles.useNotifications

类型:Boolean

是否使用文件通知模式来确定何时存在新文件。 如果为 false,则使用目录列表模式。 请参阅自动加载程序的工作方式

默认值:30false

(1) 默认情况下,自动加载程序会尽量添加以下键值标记对:

  • vendor: Databricks
  • path:从中加载数据的位置。 由于标签限制,在 GCP 中不可用。
  • checkpointLocation:流检查点的位置。 由于标签限制,在 GCP 中不可用。
  • streamId:流的全局唯一标识符。

这些键名称是预留的,无法覆盖其值。

特定于 AWS 的选项

只有选择了 cloudFiles.useNotifications = true 并且需要自动加载程序来设置通知服务时,才需要提供以下选项:

选项
cloudFiles.region

类型:String

源 S3 存储桶所在的、将创建 AWS SNS 和 SQS 服务的区域。

默认值:在 Databricks Runtime 9.0 及更高版本中,默认值为 EC2 实例所在的区域。 在 Databricks Runtime 8.4 及更低版本中,必须指定区域。

仅当你选择 cloudFiles.useNotifications = true 并希望自动加载程序使用已设置的队列时,才提供以下选项:

选项
cloudFiles.queueUrl

类型:String

SQS 队列的 URL。 如果提供了此项,则自动加载程序会直接使用此队列中的事件,而不是设置自己的 AWS SNS 和 SQS 服务。

默认值: 无

当 IAM 角色不可用或从不同云中引入数据时,可以使用以下选项提供用于访问 AWS SNS 和 SQS 的凭据。

选项
cloudFiles.awsAccessKey

类型:String

用户的 AWS 访问密钥 ID。 必须与以下选项一起提供:
cloudFiles.awsSecretKey.

默认值: 无
cloudFiles.awsSecretKey

类型:String

用户的 AWS 机密访问密钥。 必须与以下选项一起提供:
cloudFiles.awsAccessKey.

默认值: 无
cloudFiles.roleArn

类型:String

要担任的 IAM 角色的 ARN。 可以从群集的实例配置文件或通过使用以下选项提供凭据来担任该角色:
cloudFiles.awsAccessKeycloudFiles.awsSecretKey

默认值: 无
cloudFiles.roleExternalId

类型:String

使用 cloudFiles.roleArn 担任角色时提供的标识符。

默认值: 无
cloudFiles.roleSessionName

类型:String

使用以下选项担任角色时使用的可选会话名称:
cloudFiles.roleArn.

默认值: 无
cloudFiles.stsEndpoint

类型:String

一个可选终结点,用于在使用 cloudFiles.roleArn 担任角色时访问 AWS STS。

默认值: 无

特定于 Azure 的选项

如果指定 cloudFiles.useNotifications = true,并且希望自动加载程序设置通知服务,则必须为以下所有选项提供值:

选项
cloudFiles.clientId

类型:String

服务主体的客户端 ID 或应用程序 ID。

默认值: 无
cloudFiles.clientSecret

类型:String

服务主体的客户端密码。

默认值: 无
cloudFiles.connectionString

类型:String

存储帐户的连接字符串,基于帐户访问密钥或共享访问签名 (SAS)。

默认值: 无
cloudFiles.resourceGroup

类型:String

在其下创建了存储帐户的 Azure 资源组。

默认值: 无
cloudFiles.subscriptionId

类型:String

在其下创建了资源组的 Azure 订阅 ID。

默认值: 无
cloudFiles.tenantId

类型:String

在其下创建了服务主体的 Azure 租户 ID。

默认值: 无

重要

自动通知设置在 Azure 中国和具有 Databricks Runtime 9.1 及更高版本的政府区域中可用。 对于较低的 DBR 版本,在这些区域中,必须提供 queueName 才能将自动加载程序与文件通知配合使用。

仅当你选择 cloudFiles.useNotifications = true 并希望自动加载程序使用已设置的队列时,才提供以下选项:

选项
cloudFiles.queueName

类型:String

Azure 队列的名称。 如果提供了此项,则云文件源会直接使用此队列中的事件,而不是设置自己的 Azure 事件网格和队列存储服务。 在这种情况下,cloudFiles.connectionString 只需要对队列具有读取权限。

默认值: 无

特定于 Google 的选项

自动加载程序可以利用 Google 服务帐户自动为你设置通知服务。 你可以按照 Google 服务设置将群集配置为采用服务帐户。 设置文件通知资源所需的权限中说明了服务帐户所需的权限。 如果希望自动加载程序为你设置通知服务,则可以提供以下身份验证选项。

选项
cloudFiles.client

类型:String

Google 服务帐户的客户端 ID。

默认值: 无
cloudFiles.clientEmail

类型:String

Google 服务帐户的电子邮件地址。

默认值: 无
cloudFiles.privateKey

类型:String

为 Google 服务帐户生成的私钥。

默认值: 无
cloudFiles.privateKeyId

类型:String

为 Google 服务帐户生成的私钥的 ID。

默认值: 无
cloudFiles.projectId

类型:String

GCS 桶所在项目的 ID。 Google Cloud Pub/Sub 订阅也将在此项目中创建。

默认值: 无

仅当你选择 cloudFiles.useNotifications = true 并希望自动加载程序使用已设置的队列时,才提供以下选项:

选项
cloudFiles.subscription

类型:String

Google Cloud Pub/Sub 订阅的名称。 如果提供此选项,则云文件源将使用此队列中的事件,而不是设置自身的 GCS 通知和 Google Cloud Pub/Sub 服务。

默认值: 无

参考

有关自动加载程序的概述和演示,请观看此 YouTube 视频(59 分钟)。

有关如何使用自动加载程序的详细信息,请参阅:

常见数据加载模式

使用 glob 模式筛选目录或文件

在路径中提供 Glob 模式时,可用于筛选目录和文件。

模式 说明
? 匹配任何单一字符
* 与零个或多个字符匹配
[abc] 匹配字符集中的单个字符 {a,b,c}。
[a-z] 匹配字符范围 {a...z} 中的单个字符。
[^a] 匹配不是来自字符集或范围 {a} 的单个字符。 请注意,^ 字符必须立即出现在左括号的右侧。
{ab,cd} 匹配字符串集 {ab, cd} 中的字符串。
{ab,c{de, fh}} 匹配字符串集 {ab, cde, cfh} 中的字符串。

使用 path 来提供前缀模式,例如:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base_path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base_path>/*/files")

重要

你需要使用 pathGlobFilter 选项来显式提供后缀模式。 path 仅提供前缀筛选器。

例如,如果只想分析包含不同后缀的文件的目录中的 png 文件,则可以执行以下操作:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base_path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base_path>)

常见问题解答 (FAQ)

当文件被追加或覆盖时,自动加载程序是否会再次处理文件?

除非启用 cloudFiles.allowOverwrites,否则文件只会处理一次。 如果文件被追加或覆盖,Azure Databricks 不保证处理的是哪个版本的文件。 对于明确定义的行为,Databricks 建议使用自动加载程序来仅引入不可变的文件。 如果这不能满足你的要求,请联系你的 Databricks 代表。

如果我的数据文件没有连续送达,但会定期送达(例如,一天一次),那么我是否仍然可以使用此源,是否有任何好处?

是的,的确有好处。 在这种情况下,你可以设置一个 Trigger.OnceTrigger.AvailableNow(在 Databricks Runtime 10.2 和更高版本中可用)结构化流作业并计划在预期文件到达时间之后运行。 自动加载程序不但适用于不频繁的更新,也适用于频繁的更新。 即使最终更新非常大,自动加载程序也能够很好地根据输入大小进行缩放。 自动加载程序高效的文件发现技术和架构演进功能使自动加载程序成为增量数据引入的首选方法。

如果在重启流时更改了检查点位置,会发生什么情况?

检查点位置维护流的重要标识信息。 更改检查点位置实际上意味着已放弃上一个流并启动一个新流。

是否需要事先创建事件通知服务?

不是。 如果你选择文件通知模式并提供所需的权限,自动加载程序可以为你创建文件通知服务。 请参阅利用文件通知

如何清理自动加载程序创建的事件通知资源?

你可以使用云资源管理器列出和清除资源。 还可以使用云提供商的 UI 或 API 手动删除这些资源。

我可以从同一个存储桶/容器上的不同输入目录运行多个流式查询吗?

可以,前提是这些目录不是父子关系;例如,不能从 prod-logs/prod-logs/usage/ 运行这种查询,因为 /usage/prod-logs 的子目录。

当桶/容器中已存在文件通知时,我是否可以使用此功能?

可以,只要输入目录与现有通知前缀不冲突(例如上面的父子目录)。

疑难解答

错误:

java.lang.RuntimeException: Failed to create event grid subscription.

如果你第一次运行自动加载程序时看到此错误消息,可能是事件网格未在 Azure 订阅中注册为资源提供程序。 若要在 Azure 门户中注册它,请执行以下操作:

  1. 转到你的订阅。
  2. 单击“设置”部分的“资源提供程序”。
  3. 注册提供程序 Microsoft.EventGrid

错误:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

如果你第一次运行自动加载程序时看到此错误消息,请确保已向你的事件网格和存储帐户的服务主体授予了“参与者”角色。

附录

设置文件通知资源所需的权限

ADLS Gen2 和 Azure Blob 存储

你必须具有对输入目录的读取权限。 请参阅 Azure Blob 存储Azure Data Lake Storage Gen2

若要使用文件通知模式,必须提供用于设置和访问事件通知服务的身份验证凭据。 在 Databricks Runtime 8.1 及更高版本中,只需要用于身份验证的服务主体。 对于 Databricks Runtime 8.0 及更低版本,则必须提供一个服务主体和一个连接字符串。

  • 服务主体 - 使用 Azure 内置角色

    以客户端 ID 和客户端密码的形式创建 Azure Active Directory 应用和服务主体

    为此应用分配输入路径所在的存储帐户的以下角色:

    • 参与者:此角色用于设置存储帐户中的资源,例如队列和事件订阅。
    • 存储队列数据参与者:此角色用于执行队列操作,例如检索和删除队列中的消息。 仅当在没有连接字符串的情况下提供服务主体时,Databricks Runtime 8.1 及更高版本中才需要此角色。

    为此应用分配相关资源组的以下角色:

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

  • 服务主体 - 使用自定义角色

    如果担心上述角色所需的执行权限,则可以创建一个至少具有以下权限的自定义角色,下面以 Azure 角色 JSON 格式列出:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    然后,可以将此自定义角色分配给应用。

    有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

  • 连接字符串

    自动加载程序需要使用连接字符串对 Azure 队列存储操作(例如,创建队列以及在队列中读取和删除消息)进行身份验证。 队列是在输入目录路径所在的同一存储帐户中创建的。 你可以在帐户密钥共享访问签名 (SAS) 中找到连接字符串。

    如果使用 Databricks Runtime 8.1 或更高版本,则不需要连接字符串。

    如果使用 Databricks Runtime 8.0 或更低版本,则必须提供一个连接字符串,用于对 Azure 队列存储操作(例如创建队列以及在队列中检索和删除消息)进行身份验证。 队列是在输入目录路径所在的同一存储帐户中创建的。 你可以在帐户密钥共享访问签名 (SAS) 中找到连接字符串。 配置 SAS 令牌时,必须提供以下权限:

Auto loader permissions

AWS S3

你必须具有对输入目录的读取权限。 有关更多详细信息,请参阅 S3 连接详细信息。

若要使用文件通知模式,请将以下 JSON 策略文档附加到 IAM 用户或角色。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

其中:

  • <bucket-name>:流将在其中读取文件的 S3 存储桶名称,例如,auto-logs。 可以使用 * 作为通配符,例如 databricks-*-logs。 若要找出 DBFS 路径的底层 S3 存储桶,可以通过运行 %fs mounts 列出笔记本中的所有 DBFS 装入点。
  • <region>:S3 存储桶所在的 AWS 区域,例如 us-west-2。 如果不想指定区域,请使用 *
  • <account-number>:拥有 S3 存储桶的 AWS 帐号,例如 123456789012。 如果不想指定帐号,请使用 *

SQS 和 SNS ARN 规范中的字符串 databricks-auto-ingest-*cloudFiles 源在创建 SQS 和 SNS 服务时使用的名称前缀。 由于 Azure Databricks 会在流的初始运行期间设置通知服务,因此你可以在初始运行后(例如,停止并重启流)使用权限降低的策略。

注意

上述策略仅涉及设置文件通知服务(即 S3 存储桶通知、SNS 和 SQS 服务)所需的权限,并假设你已经拥有对 S3 存储桶的读取访问权限。 如果你需要添加 S3 只读权限,请在 JSON 文档的 DatabricksAutoLoaderSetup 语句的 Action 列表中添加以下内容:

  • s3:ListBucket
  • s3:GetObject
初始设置后权限减少

上文中所述的资源设置权限仅在流的初始运行期间才需要。 首次运行后,可以切换到以下权限降低的 IAM 策略。

重要

权限降低后,无法在出现故障时(例如,SQS 队列被意外删除)启动新的流式处理查询或重新创建资源;也无法使用云资源管理 API 来列出或清除资源。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}
将数据安全地引入其他 AWS 帐户

自动加载程序可以通过担任 IAM 角色,跨 AWS 帐户加载数据。 设置 AssumeRole 创建的临时安全凭据后,可以让自动加载程序跨帐户加载云文件。 若要为跨 AWS 帐户设置自动加载程序,请遵循文档:_。 确保:

  • 验证是否已将 AssumeRole 元角色分配给群集。

  • 配置群集的 Spark 配置,使其包含以下属性:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

GCS

你必须对你的 GCS Bucket 和所有对象具有 listget 权限。 有关详细信息,请参阅有关 IAM 权限的 Google 文档。

若要使用文件通知模式,需要为 GCS 服务帐户以及用于访问 Google Cloud Pub/Sub 资源的帐户添加权限。

Pub/Sub Publisher 角色添加到 GCS 服务帐户。 这样,该帐户就可以将事件通知消息从 GCS 桶发布到 Google Cloud Pub/Sub。

对于用于 Google Cloud Pub/Sub 资源的服务帐户,需要添加以下权限:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

为此,可以创建拥有这些权限的 IAM 自定义角色,或分配预先存在的 GCP 角色来涵盖这些权限。

查找 GCS 服务帐户

在相应项目的“Google Cloud Console”(Google 云控制台)中,导航到 Cloud Storage > Settings。 在该页面上,应会看到标题为“Cloud Storage Service Account”(云存储服务帐户)的部分,其中包含 GCS 服务帐户的电子邮件地址。

GCS Service Account

为文件通知模式创建自定义 Google Cloud IAM 角色

在相应项目的“Google Cloud Console”(Google 云控制台)中,导航到 IAM & Admin > Roles。 然后,在顶部创建一个角色或更新现有角色。 在角色创建或编辑屏幕中,单击 Add Permissions。 然后应该会弹出一个菜单,你可以在其中向角色添加所需的权限。

GCP IAM Custom Roles

文件的词法排序

对于要按词法排序的文件,上载的新文件需要具有在字典上大于现有文件的前缀。 下面显示了词法排序目录的一些示例。

带版本控制的文件

Delta Lake 表按词法顺序提交到其事务日志。

<path_to_table>/_delta_log/00000000000000000000.json
<path_to_table>/_delta_log/00000000000000000001.json <- guaranteed to be written after version 0
<path_to_table>/_delta_log/00000000000000000002.json <- guaranteed to be written after version 1
...

AWS DMS 以版本控制方式将 CDC 文件上传到 AWS S3。

database_schema_name/table_name/LOAD00000001.csv
database_schema_name/table_name/LOAD00000002.csv
...

日期分区文件

文件可以按日期分区格式上传并利用增量列表。 这些功能的示例包括:

// <base_path>/yyyy/MM/dd/HH:mm:ss-randomString
<base_path>/2021/12/01/10:11:23-b1662ecd-e05e-4bb7-a125-ad81f6e859b4.json
<base_path>/2021/12/01/10:11:23-b9794cf3-3f60-4b8d-ae11-8ea320fad9d1.json
...

// <base_path>/year=yyyy/month=MM/day=dd/hour=HH/minute=mm/randomString
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/442463e5-f6fe-458a-8f69-a06aa970fc69.csv
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/8f00988b-46be-4112-808d-6a35aead0d44.csv <- this may be uploaded before the file above as long as processing happens less frequently than a minute

使用日期分区上传文件时,要记住的一些事项是:

  • 月份、天、小时、分钟需要用零填充,以确保词法排序(应上传为 hour=03,而不是 hour=32021/05/03,也不是 2021/5/3)。
  • 文件不一定必须按词法顺序上传到最深的目录中,只要处理发生的频率低于父目录的时间粒度

一些可以按日期分区词法排序上传文件的服务是:

格式选项

泛型选项

以下选项适用于所有文件格式。

选项
ModifiedAfter

类型:Timestamp String,例如 2021-01-01 00:00:00.000000 UTC+0

一个可选时间戳,指示引入其修改时间戳晚于所提供的时间戳的文件。

默认值: 无
modifiedBefore

类型:Timestamp String,例如 2021-01-01 00:00:00.000000 UTC+0

一个可选时间戳,指示引入其修改时间戳早于所提供的时间戳的文件。

默认值: 无
pathGlobFilter

类型:String

提供用来选择文件的一种潜在 glob 模式。 等效于
COPY INTO 中的 PATTERN

默认值: 无
recursiveFileLookup

类型:Boolean

是否在基目录中以递归方式加载数据并跳过分区推理。

默认值:30false

JSON 选项

选项
allowBackslashEscapingAnyCharacter

类型:Boolean

是否允许反斜杠对其后面的任何字符进行转义。 如果未启用,则只能对按 JSON 规范显式列出的字符进行转义。

默认值:30false
allowComments

类型:Boolean

是否允许在分析的内容中使用 Java、C 和 C++ 样式的注释('/''*''//' 变体)。

默认值:30false
allowNonNumericNumbers

类型:Boolean

是否允许将非数字 (NaN) 标记集用作合法浮点数字值。

默认值:30true
allowNumericLeadingZeros

类型:Boolean

是否允许整数以附加的(可忽略的)零开头(例如 000001)。

默认值:30false
allowSingleQuotes

类型:Boolean

是否允许使用单引号(撇号字符 '\')来引用字符串(名称和字符串值)。

默认值:30true
allowUnquotedControlChars

类型:Boolean

是否允许 JSON 字符串包含未转义的控制字符(值小于 32 的 ASCII 字符,包括制表符和换行符)。

默认值:30false
allowUnquotedFieldNames

类型:Boolean

是否允许使用不带引号的字段名称(JavaScript 允许,但 JSON 规范不允许)。

默认值:30false
badRecordsPath

类型:String

用于记录错误 JSON 记录相关信息的存储文件的路径。

默认值: 无
columnNameOfCorruptRecord

类型:String

用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。

默认值:30_corrupt_record
dateFormat

类型:String

用于分析日期字符串的格式。

默认值:30yyyy-MM-dd
dropFieldIfAllNull

类型:Boolean

在进行架构推理期间是否忽略所有 null 值或空数组和结构的列。

默认值:30false
encoding 或 charset

类型:String

JSON 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 multilinetrue 时,不能使用 UTF-16UTF-32

默认值:30UTF-8
inferTimestamp

类型:Boolean

是否尝试将时间戳字符串推理为 TimestampType。 设置为
true 时,架构推理可能需要明显更长的时间。

默认值:30false
lineSep

类型:String

两个连续 JSON 记录之间的字符串。

默认值:None,涵盖 \r\r\n\n
locale

类型:String

一个 java.util.Locale 标识符。 影响 JSON 中的默认日期、时间戳和十进制分析。

默认值:30US
mode

类型:String

围绕处理格式错误的记录提供的分析程序模式。 'PERMISSIVE'
'DROPMALFORMED''FAILFAST' 中的一项。

默认值:30PERMISSIVE
multiLine

类型:Boolean

JSON 记录是否跨多行。

默认值:30false
prefersDecimal

类型:Boolean

在进行架构推理期间是否将浮点值和双精度值推理为 DecimalType

默认值:30false
primitivesAsString

类型:Boolean

是否将数字和布尔值等基元类型推理为 StringType

默认值:30false
rescuedDataColumn

类型:String

是否将因数据类型不匹配或架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考补救的数据列

默认值: 无
timestampFormat

类型:String

用于分析时间戳字符串的格式。

默认值:30yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
timeZone

类型:String

分析时间戳和日期时要使用的 java.time.ZoneId

默认值: 无

CSV 选项

选项
badRecordsPath

类型:String

用于记录错误 CSV 记录相关信息的存储文件的路径。

默认值: 无
charToEscapeQuoteEscaping

类型:Char

用来对引号的转义字符进行转义的字符。 例如,对于以下记录:[ " a\\", b ]

* 如果未定义用来对 '\' 进行转义的字符,则不会分析记录。 分析程序会将字符读取为 [a],[\],["],[,],[ ],[b],并引发错误,因为它找不到右引号。
* 如果将用来对 '\' 转义的字符定义为 '\',则读取的记录会带有 2 个值:[a\][b]

默认值:30'\0'
columnNameOfCorruptRecord

类型:String

用于存储因格式不正确而无法分析的记录的列。 如果用于分析的 mode 设置为 DROPMALFORMED,则此列将为空。

默认值:30_corrupt_record
comment

类型:Char

定义表示行注释的字符(位于文本行的开头时)。 请使用 '\0' 来禁用注释跳过。

默认值:30'#'
dateFormat

类型:String

用于分析日期字符串的格式。

默认值:30yyyy-MM-dd
emptyValue

类型:String

空值的字符串表示形式。

默认值:30""
encoding 或 charset

类型:String

CSV 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset。 当 multilinetrue 时,不能使用 UTF-16UTF-32

默认值:30UTF-8
enforceSchema

类型:Boolean

是否将指定的或推理出的架构强制应用于 CSV 文件。 如果启用此选项,则会忽略 CSV 文件的标题。 默认情况下,当使用自动加载程序来补救数据并允许架构演变时,会忽略此选项。

默认值:30true
escape

类型:Char

分析数据时要使用的转义字符。

默认值:30'\'
标头

类型:Boolean

CSV 文件是否包含标题。 自动加载程序在推理架构时会假定文件具有标题。

默认值:30false
ignoreLeadingWhiteSpace

类型:Boolean

是否忽略每个所分析值的前导空格。

默认值:30false
ignoreTrailingWhiteSpace

类型:Boolean

是否忽略每个所分析值的尾随空格。

默认值:30false
inferSchema

类型:Boolean

是推理所分析 CSV 记录的数据类型,还是假定所有列都是 StringType 类型的。 如果设置为 true,则需要对数据进行另一轮操作。

默认值:30false
lineSep

类型:String

两个连续 CSV 记录之间的字符串。

默认值:None,涵盖 \r\r\n\n
locale

类型:String

一个 java.util.Locale 标识符。 影响 CSV 中的默认日期、时间戳和十进制分析。

默认值:30US
maxCharsPerColumn

类型:Int

要分析的值预期包含的最大字符数。 可用于避免内存错误。 默认为 -1,表示无限制。

默认值:30-1
maxColumns

类型:Int

记录可以包含的列数的硬限制。

默认值:3020480
mergeSchema

类型:Boolean

是否跨多个文件推理架构并合并每个文件的架构。 已默认在推理架构时为自动加载程序启用。

默认值:30false
mode

类型:String

围绕处理格式错误的记录提供的分析程序模式。 'PERMISSIVE'
'DROPMALFORMED''FAILFAST'

默认值:30PERMISSIVE
multiLine

类型:Boolean

CSV 记录是否跨多行。

默认值:30false
nanValue

类型:String

分析 FloatTypeDoubleType 列时非数字值的字符串表示形式。

默认值:30"NaN"
negativeInf

类型:String

分析 FloatTypeDoubleType 列时负无穷大的字符串表示形式。

默认值:30"-Inf"
nullValue

类型:String

null 值的字符串表示形式。

默认值:30""
parserCaseSensitive(不推荐使用)

类型:Boolean

读取文件时,将标题中声明的列与架构对齐时是否区分大小写。 对于自动加载程序,此项默认为 true。 如果启用,则会在 rescuedDataColumn 中补救大小写不同的列。 已不推荐使用此选项,推荐使用 readerCaseSensitive

默认值:30false
positiveInf

类型:String

分析 FloatTypeDoubleType 列时正无穷大的字符串表示形式。

默认值:30"Inf"
quote

类型:Char

当字段分隔符是值的一部分时用于对值进行转义的字符。

默认值:30'\'
rescuedDataColumn

类型:String

是否将因数据类型不匹配和架构不匹配(包括列大小写)而无法分析的所有数据收集到单独的列。 使用自动加载程序时,默认包含此列。 有关更多详细信息,请参考补救的数据列

默认值: 无
sep 或 delimiter

类型:String

列之间的分隔符字符串。

默认值:30","
timestampFormat

类型:String

用于分析时间戳字符串的格式。

默认值:30yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
timeZone

类型:String

分析时间戳和日期时要使用的 java.time.ZoneId

默认值: 无
unescapedQuoteHandling

类型:String

用于处理未转义的引号的策略。 允许的选项:

* STOP_AT_CLOSING_QUOTE:如果在输入中发现了未转义的引号,则会累积引号字符并继续将值解析为带引号的值,直至找到右引号。
* BACK_TO_DELIMITER:如果在输入中发现了未转义的引号,则会将该值视为无引号的值。 这会让分析程序累积当前所分析值的所有字符,直至找到 sep 定义的分隔符。 如果在值中找不到分隔符,则分析程序会继续从输入中累积字符,直到找到分隔符或行尾。
* STOP_AT_DELIMITER:如果在输入中发现了未转义的引号,则会将该值视为无引号的值。 这会让分析程序累积所有字符,直至在输入中找到 sep 定义的分隔符或找到行尾。
* SKIP_VALUE:如果在输入中发现了未转义的引号,则将跳过针对给定值所解析的内容(直至找到下一个分隔符),并将改为生成 nullValue 中设置的值。
* RAISE_ERROR:如果在输入中发现了未转义的引号,则
会引发 TextParsingException

默认值:30STOP_AT_DELIMITER

PARQUET 选项

选项
datetimeRebaseMode

类型:String

控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。 允许的值:EXCEPTIONLEGACY
CORRECTED.

默认值:30LEGACY
int96RebaseMode

类型:String

控制 INT96 时间戳值在儒略历与外推格里历之间的基本值重定。 允许的值:EXCEPTIONLEGACY
CORRECTED.

默认值:30LEGACY
mergeSchema

类型:Boolean

是否跨多个文件推理架构并合并每个文件的架构。

默认值:30false

AVRO 选项

选项
avroSchema

类型:String

用户以 Avro 格式提供的可选架构。 读取 Avro 时,可以将此选项设置为一个演变的架构,该架构与实际 Avro 架构兼容但不同。 反序列化架构会与演变的架构保持一致。 例如,如果你设置的演变架构包含一个具有默认值的附加列,则读取结果也会包含该新列。

默认值: 无
datetimeRebaseMode

类型:String

控制 DATE 和 TIMESTAMP 值在儒略历与外推格里历之间的基本值重定。 允许的值:EXCEPTIONLEGACY
CORRECTED.

默认值:30LEGACY
mergeSchema

类型:Boolean

是否跨多个文件推理架构并合并每个文件的架构。
Avro 的 mergeSchema 不放宽数据类型。

默认值:30false

BINARYFILE 选项

二进制文件没有任何额外的配置选项。

TEXT 选项

选项
encoding

类型:String

TEXT 文件编码的名称。 有关选项列表,请参阅 java.nio.charset.Charset

默认值:30UTF-8
lineSep

类型:String

两个连续 TEXT 记录之间的字符串。

默认值:None,涵盖 \r\r\n\n
wholeText

类型:Boolean

是否将文件读取为单个记录。

默认值:30false

ORC 选项

选项
mergeSchema

类型:Boolean

是否跨多个文件推理架构并合并每个文件的架构。

默认值:30false