Load files from Azure Blob storage and Azure Data Lake Storage Gen1 and Gen2 using Auto Loader

Auto Loader incrementally and efficiently processes new data files as they arrive in Azure Blob storage and Azure Data Lake Storage Gen1 and Gen2.

Auto Loader provides a Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

The Auto Loader works with DBFS paths as well as direct paths to the data source.

Requirements

Databricks Runtime 7.2 or above.

If you created streams using Databricks Runtime 7.1 or below, see Changes in default option values and compatibility and Cloud resource management.

File discovery modes

Auto Loader supports two modes for detecting when there are new files: directory listing and file notification.

  • Directory listing: Identifies new files by parallel listing of the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configuration and is suitable for scenarios where only a few files need to be streamed in on a regular basis. Directory listing mode is the default for Auto Loader in Databricks Runtime 7.2 and above.

    In Databricks Runtime 7.3 LTS and above, Auto Loader supports Azure Data Lake Storage Gen 1 only in directory listing mode.

  • File notification: Uses Azure Event Grid and Queue Storage services that subscribe to file events from the input directory. Auto Loader automatically sets up the Azure Event Grid and Queue Storage services. File notification mode is more performant and scalable for large input directories. To use this mode, you must configure permissions for the Azure Event Grid and Queue Storage services and specify .option("cloudFiles.useNotifications","true").

You can change mode when you restart the stream. For example, you may want to switch to file notification mode when the directory listing is getting too slow due to the increase in input directory size. For both modes, Auto Loader internally keeps tracks of what files have been processed to provide exactly-once semantics, so you do not need to manage any state information yourself.

Use cloudFiles source

To use the Auto Loader, create a cloudFiles source in the same way as other streaming sources:

Python

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .start(<output-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .start(<output-path>)

where:

  • <cloudFiles-option> is a configuration option in Cloud resource management.

  • <schema> is the file schema.

    Note

    On Databricks Runtime 7.3 LTS and above, if the file format is text or binaryFile you don’t need to provide the schema.

  • <input-path> is the path in Azure Blob storage and Azure Data Lake Storage Gen1 and Gen2 that is monitored for new files. Subdirectories of <input-path> are also monitored. <input-path> can contain file glob patterns.

  • <checkpoint-path> is the output stream checkpoint location.

  • <output-path> is the output stream path.

Schema inference and evolution

Note

Available in Databricks Runtime 8.2 and above.

Auto Loader supports schema inference and evolution with JSON, binary (binaryFile), and text file formats. See Schema inference and evolution in Auto Loader for more details.

Configuration

Configuration options specific to the cloudFiles source are prefixed with cloudFiles so that they are in a separate namespace from other Structured Streaming source options.

Important

Some default option values changed in Databricks Runtime 7.2. If you are using Auto Loader on Databricks Runtime 7.1 or below, see Changes in default option values and compatibility.

Option Type Default Description
cloudFiles. allowOverwrites Boolean false Whether to allow input directory file changes to overwrite existing data. Available in Databricks Runtime 7.6 and above.
cloudFiles. fetchParallelism Integer 1 Number of threads to use when fetching messages from the queueing service.
cloudFiles.format String None (required option) The data file format in the source path.
cloudFiles. includeExistingFiles Boolean true Whether to include existing files in the input path in the streaming processing versus only processing new files arrived after setting up the notifications. This option is respected only when you start a stream for the first time. Changing its value at stream restart won’t take any effect.
cloudFiles. inferColumnTypes Boolean false Whether to infer exact column types when leveraging schema inference. By default, columns are inferred as strings when inferring JSON datasets. See schema inference for more details.
cloudFiles. maxBytesPerTrigger Byte String None Maximum number of new bytes to be processed in every trigger. You can specify a byte string such as 10g to limit each microbatch to 10 GB of data. This is a soft maximum. If you have files that are 3 GB each, Azure Databricks processes 12 GB in a microbatch. When used together with cloudFiles.maxFilesPerTrigger, Azure Databricks consumes up to the lower limit of
cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once().
cloudFiles. maxFilesPerTrigger Integer 1000 Maximum number of new files to be processed in every trigger.
cloudFiles. partitionColumns String None Partition columns that you want to explicitly parse from Hive style directory structures. Should be provided as a comma-separated list of columns.
cloudFiles. schemaEvolutionMode String “addNewColumns” when a schema is not provided, “none” otherwise. The mode for evolving the schema as new columns are discovered in the data. See schema evolution for more details.
cloudFiles. schemaHints String None Schema information that you provide to Auto Loader during schema inference. See schema hints for more details.
cloudFiles. schemaLocation String None (required when inferring the schema) A location to store the inferred schema and subsequent changes. See schema inference for more details.
cloudFiles. useNotifications Boolean false Whether to use file notification mode to determine when there are new files. If false, use directory listing mode. See File discovery modes.
cloudFiles. validateOptions Boolean true Whether to validate Auto Loader options and return an error for unknown or inconsistent options.

Provide the following options only if you choose cloudFiles.useNotifications = true:

Option Type Default Description
cloudFiles.connectionString String None The connection string for the storage account, based on either account access key or shared access signature (SAS). (1)
cloudFiles.resourceGroup String None The Azure Resource Group under which the storage account is created.
cloudFiles.subscriptionId String None The Azure Subscription ID under which the resource group is created.
cloudFiles.tenantId String None The Azure Tenant ID under which the service principal is created.
cloudFiles.clientId String None The client ID or application ID of the service principal. (1)
cloudFiles.clientSecret String None The client secret of the service principal.
cloudFiles.queueName String None The URL of the Azure queue. If provided, the cloud files source directly consumes events from this queue instead of setting up its own Azure Event Grid and Queue Storage services. In that case, your
cloudFiles.connectionString requires only read permissions on the queue.

(1) See Permissions.

Note

Notifications are held in the Azure message queue for 7 days. If you stop the stream and restart after more than 7 days, you lose the notifications in the message queue. While the notifications are stopped, Azure Databricks falls back to directory listing mode and processes files from the point where the stream stopped; there is no data loss. However, this might take some time and performance will be slow until Azure Databricks catches up to the current state of the stream.

Changes in default option values and compatibility

The default values of the following Auto Loader options changed in Databricks Runtime 7.2 to the values listed in Cloud resource management.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Auto Loader streams started on Databricks Runtime 7.1 and below have the following default option values:

  • cloudFiles.useNotifications is true
  • cloudFiles.includeExistingFiles is false
  • cloudFiles.validateOptions is false

To ensure compatibility with existing applications, these default option values do not change when you run your existing Auto Loader streams on Databricks Runtime 7.2 or above; the streams will have the same behavior after the upgrade.

Metrics

Auto Loader reports metrics at every batch. You can view how many files exist in the backlog and how large the backlog is in the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      },
    }
  ]
}

Permissions

You must have read permissions for the input directory. See Azure Blob Storage and Azure Data Lake Storage Gen2.

To use file notification mode, you must provide authentication credentials for setting up and accessing the event notification services. In Databricks Runtime 8.1 and above, you only need a service principal for authentication. For Databricks Runtime 8.0 and below, you must provide both a service principal and a connection string.

  • Service principal

    Create an Azure Active Directory app and service principal in the form of client ID and client secret. You must assign this app the following roles to the storage account in which the input path resides:

    • Contributor: This role is for setting up resources in your storage account, such as queues and event subscriptions.
    • EventGrid EventSubscription Contributor: This role is for performing event grid subscription operations such as creating or listing event subscriptions.
    • Storage Queue Data Contributor: This role is for performing queue operations such as retrieving and deleting messages from the queues. This role is required in Databricks Runtime version 8.1 and above only when you provide a service principal without a connection string.
  • Connection string

    Auto Loader requires a connection string to authenticate for Azure Queue Storage operations, such as creating a queue and reading and deleting messages from the queue. The queue is created in the same storage account where the input directory path is located. You can find your connection string in your account key or shared access signature (SAS).

    • If you are using Databricks Runtime 8.1 or above, you do not need a connection string.

    • If you are using Databricks Runtime 8.0 or lower, you must provide a connection string to authenticate for Azure Queue Storage operations, such as creating a queue and retrieving and deleting messages from the queue. The queue is created in the same storage account in which the input path resides. You can find your connection string in your account key or shared access signature (SAS). When configuring an SAS token, you must provide the following permissions:

      Auto loader permissions

      Note

      If you do not have the necessary permissions to create resources, you can ask an administrator to perform setup using the Cloud resource management Scala API. An administrator can provide you with the queue name, which you can specify directly as .option("cloudFiles.queueName", <queue-name>) to the cloudFiles source.

Troubleshooting

Error:

java.lang.RuntimeException: Failed to create event grid subscription.

If you see this error message when you run Auto Loader for the first time, the Event Grid is not registered as a Resource Provider in your Azure subscription. To register this on Azure portal:

  1. Go to your subscription.
  2. Click Resource Providers under the Settings section.
  3. Register the provider Microsoft.EventGrid.

Error:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

If you see this error message when you run Auto Loader for the first time, ensure you have given the Contributor role to your service principal for Event Grid as well as your storage account.

Cloud resource management

You can use a Scala API to manage the Azure Event Grid and Queue Storage services created by Auto Loader. You must configure the resource setup permissions described in Permissions before using this API.

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

// Set up an AQS queue and an event grid subscription associated with the path used in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Note

Available in Databricks Runtime 7.4 and above.

Use setUpNotificationServices(<resource-suffix>) to create a Queue and an Event Grid Subscription with the name <resource-prefix><resource-suffix>. If there is an existing Queue or Event Grid Subscription with the same name, Azure Databricks reuses the resource that already exists instead of creating a new one. This function returns a Queue that you can pass to the cloudFiles source using .option("cloudFiles.queueName", <queue-name>). This enables the cloudFiles source user to have fewer permissions than the user who creates the resources. See Permissions.

Provide the "path" option to newManager only if calling setUpNotificationServices; it is not needed for listNotificationServices or tearDownNotificationServices. This is the same path that you use when running a streaming query.

Frequently asked questions (FAQ)

Do I need to create Azure event notification services beforehand?

No. If you choose file notification mode, Auto Loader creates an Azure Blob storage and Azure Data Lake Storage Gen1 and Gen2 > Event Grid Subscription > Queue file event notification pipeline automatically when you start the stream.

How do I clean up the event notification resources, such as Event Grid Subscriptions and Queues, created by Auto Loader?

You can use the cloud resource manager to list and tear down resources. You can also delete these resources manually, either in the Web Portal or using Azure APIs. All resources created by Auto Loader have the prefix: <resource-prefix>.

Does Auto Loader process the file again when the file gets appended or overwritten?

Files are processed exactly once unless cloudFiles.allowOverwrites is enabled. If a file is appended to or overwritten, Azure Databricks does not guarantee which version of the file is processed. For well-defined behavior, Databricks suggests that you use Auto Loader to ingest only immutable files. If this does not meet your requirements, contact your Databricks representative.

Can I run multiple streaming queries from the same input directory?

Yes. Each cloud files stream, as identified by a unique checkpoint directory, has its own Queue, and the same Azure Blob storage and Azure Data Lake Storage Gen1 and Gen2 events can be sent to multiple Queues.

If my data files do not arrive continuously, but in regular intervals, for example, once a day, should I still use this source and are there any benefits?

Yes and yes. In this case, you can set up a Trigger-Once Structured Streaming job and schedule to run after the anticipated file arrival time. The first run sets up the event notification services, which will be always on, even when the streaming cluster is down. When you restart the stream, the cloudFiles source fetches and processes all files events backed up in the Queue. The benefit of using Auto Loader for this case is that you don’t need to determine which files are new and to be processed each time, which can be very expensive.

What happens if I change the checkpoint location when restarting the stream?

A checkpoint location maintains important identifying information of a stream. Changing the checkpoint location effectively means that you have abandoned the previous stream and started a new stream. The new stream will create new progress information and if you are using file notification mode, new Azure Event Grid and Queue Storage services. You must manually clean up the checkpoint location and Azure Event Grid and Queue Storage services for any abandoned streams.