¿Qué es el modo de notificación de archivos de Auto Loader?

En el modo de notificación de archivos, el cargador automático configura automáticamente un servicio de notificación y un servicio de cola que se suscriben a eventos de archivo desde el directorio de entrada. Puede usar notificaciones de archivos para escalar Auto Loader para que ingiera millones de archivos por hora. En comparación con el modo de listas de directorios, el modo de notificación de archivos es más eficaz y escalable para directorios de entrada grandes o para un gran volumen de archivos, pero requiere permisos de nube adicionales.

Puede cambiar entre las notificaciones de archivo y la lista de directorios en cualquier momento y seguir manteniendo las garantías de que los datos se procesan exactamente una vez.

Advertencia

El cambio de la ruta de origen para el cargador automático no es compatible con el modo de notificación de archivos. Si se usa el modo de notificación de archivos y se cambia la ruta de acceso, es posible que no pueda ingerir archivos que ya están presentes en el nuevo directorio en el momento de la actualización del directorio.

Recursos en la nube usados en el modo de notificación de archivos de Auto Loader

Importante

Necesita permisos elevados para configurar automáticamente la infraestructura en la nube para el modo de notificación de archivos. Póngase en contacto con el administrador de la nube o el administrador del área de trabajo. Consulte:

El cargador automático puede configurar notificaciones de archivos automáticamente al establecer la opción cloudFiles.useNotifications en true y proporcionar los permisos para crear recursos en la nube. Además, es posible que tenga que proporcionar opciones adicionales para conceder autorización a Auto Loader para crear estos recursos.

En la tabla siguiente se resumen los recursos creados por el cargador automático.

Almacenamiento en la nube Servicio de suscripción Queue service Prefijo * Límite **
AWS S3 AWS SNS AWS SQS databricks-auto-ingest 100 por cubo S3
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks 500 por cuenta de almacenamiento
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest 100 por cubo GCS
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks 500 por cuenta de almacenamiento
  • El cargador automático nombra los recursos con este prefijo.

** Cuántas canalizaciones de notificación de archivos simultáneas se pueden iniciar

Si necesita ejecutar más del número limitado de canalizaciones de notificación de archivos para una cuenta de almacenamiento determinada, puede:

  • Aproveche un servicio como AWS Lambda, Azure Functions o Google Cloud Functions para enviar notificaciones de una sola cola que escucha todo un contenedor o cubo en colas específicas del directorio.

Eventos de notificación de archivos

AWS S3 proporciona un evento ObjectCreated cuando se carga un archivo en un cubo de S3, independientemente de si se cargó mediante una carga put o de varias partes.

ADLS Gen2 proporciona notificaciones de eventos diferentes para los archivos que aparecen en el contenedor de Gen2.

  • Auto Loader escucha el evento FlushWithClose para procesar un archivo.
  • Los flujos del cargador automático admiten la RenameFile acción para detectar archivos. Las acciones RenameFile requieren una solicitud de API al sistema de almacenamiento para obtener el tamaño del archivo renombrado.
  • Las secuencia de Auto Loader creadas con Databricks Runtime 9.0 y versiones posteriores admiten la acción RenameDirectory para detectar archivos. Las acciones RenameDirectory requieren solicitudes de API al sistema de almacenamiento para enumerar los contenidos del directorio renombrado.

Google Cloud Storage proporciona un evento OBJECT_FINALIZE cuando se carga un archivo, lo que incluye sobreescrituras y copias de archivos. Las cargas con errores no generan este evento.

Nota:

Los proveedores de nube no garantizan la entrega al 100 % de todos los eventos de archivo en condiciones muy poco frecuentes y no proporcionan un Acuerdo de Nivel de Servicio estricto sobre la latencia de los eventos de archivo. Databricks recomienda desencadenar reposiciones periódicas con el cargador automático mediante la opción cloudFiles.backfillInterval para garantizar que todos los archivos se detectan dentro de un Acuerdo de Nivel de Servicio determinado si la integridad de los datos es un requisito. Desencadenar reposiciones periódicas no provoca duplicados.

Permisos necesarios para configurar la notificación de archivos para ADLS Gen2 y Azure Blob Storage

Debe tener permisos de lectura para el directorio de entrada. Consulte Azure Blob Storage.

Para usar el modo de notificación de archivos, debe proporcionar credenciales de autenticación para configurar y acceder a los servicios de notificación de eventos. Solo necesita una entidad de servicio para la autenticación.

  • Entidad de servicio: Uso de roles integrados de Azure

    Cree una aplicación de Microsoft Entra ID (anteriormente Azure Active Directory) y una entidad de servicio en forma de Id. de cliente y secreto de cliente.

    Asigne a esta aplicación los siguientes roles para la cuenta de almacenamiento en la que reside la ruta de acceso de entrada:

    • Colaborador: este rol es para configurar recursos en la cuenta de almacenamiento, como colas y suscripciones de eventos.
    • Colaborador de datos de la cola de Storage: este rol sirve para realizar operaciones de colas, como recuperar y eliminar mensajes de las colas. Este rol solo es necesario cuando se proporciona una entidad de servicio sin una cadena de conexión.

    Asigne a esta aplicación el siguiente rol para el grupo de recursos relacionado:

    Para más información, consulte Asignación de roles de Azure mediante Azure Portal.

  • Entidad de servicio: Uso de un rol personalizado

    Si le preocupan los permisos excesivos necesarios para los roles anteriores, puede crear un rol personalizado con al menos los permisos siguientes, que se indican a continuación en el formato JSON de rol de Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    A continuación, puede asignar este rol personalizado a la aplicación.

    Para más información, consulte Asignación de roles de Azure mediante Azure Portal.

Permisos de la característica Auto Loader

Solución de problemas de errores comunes

Error:

java.lang.RuntimeException: Failed to create event grid subscription.

Si ve este mensaje de error al ejecutar Auto Loader por primera vez, la instancia de Event Grid no está registrada como proveedor de recursos en su suscripción de Azure. Para registrarla en Azure Portal:

  1. Vaya a su suscripción.
  2. Haga clic en Proveedores de recursos en la sección Configuración.
  3. Registre el proveedor Microsoft.EventGrid.

Error:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Si ve este mensaje de error al ejecutar Auto Loader por primera vez, asegúrese de que ha asignado el rol Colaborador a la entidad de servicio para Event Grid, así como para la cuenta de almacenamiento.

Permisos necesarios para configurar la notificación de archivos para AWS S3

Debe tener permisos de lectura para el directorio de entrada. Consulte los detalles de conexión S3 para obtener más información.

Para usar el modo de notificación de archivos, adjunte el siguiente documento de directiva JSON al usuario o rol IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

donde:

  • <bucket-name>: nombre del cubo S3 donde la secuencia leerá archivos, por ejemplo, auto-logs. Puede usar * como carácter comodín, por ejemplo, databricks-*-logs. Para averiguar el cubo S3 subyacente de la ruta de acceso de DBFS, puede enumerar todos los puntos de montaje de DBFS en un cuaderno mediante la ejecución de %fs mounts.
  • <region>: región de AWS donde reside el cubo S3, por ejemplo, us-west-2. Si no desea especificar la región, use *.
  • <account-number>: número de cuenta de AWS que posee el cubo S3, por ejemplo, 123456789012. Si no desea especificar el número de cuenta, use *.

La cadena databricks-auto-ingest-* de la especificación de SQS y SNS ARN es el prefijo de nombre que utiliza el origen cloudFiles al crear los servicios SQS y SNS. Dado que Azure Databricks configura los servicios de notificación en la ejecución inicial de la secuencia, puede utilizar una directiva con permisos reducidos después de la ejecución inicial (por ejemplo, detener la secuencia y reiniciarla).

Nota:

La directiva anterior solo se refiere a los permisos necesarios para configurar los servicios de notificación de archivos, es decir, los servicios de notificación de cubos S3, SNS y SQS, y asume que ya tiene acceso de lectura al cubo de S3. Si necesita agregar permisos de solo lectura S3, agregue lo siguiente a la lista Action en la instrucción DatabricksAutoLoaderSetup del documento JSON:

  • s3:ListBucket
  • s3:GetObject

Permisos reducidos después de la configuración inicial

Los permisos de configuración de recursos descritos anteriormente solo son necesarios durante la ejecución inicial de la secuencia. Después de la primera ejecución, puede cambiar a la siguiente directiva IAM con permisos reducidos.

Importante

Con los permisos reducidos, no puede iniciar nuevas consultas de streaming ni volver a crear recursos en caso de errores (por ejemplo, la cola de SQS se ha eliminado accidentalmente); tampoco puede usar la API de administración de recursos en la nube para enumerar o eliminar recursos.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Permisos necesarios para configurar la notificación de archivos para GCS

Debe tener los permisos list y get en el cubo de GCS y en todos los objetos. Para obtener más información, consulte la documentación de Google sobre los permisos IAM.

Para usar el modo de notificación de archivos, debe agregar permisos para la cuenta de servicio GCS y para la cuenta que se usa para acceder a los recursos de Google Cloud Pub/Sub.

Agregue el rol Pub/Sub Publisher a la cuenta de servicio de GCS. Esto permite que la cuenta publique mensajes de notificación de eventos desde los cubos de GCS en Google Cloud Pub/Sub.

En cuanto a la cuenta de servicio utilizada para los recursos de Google Cloud Pub/Sub, debe agregar los siguientes permisos:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Para ello, puede crear un rol personalizado IAM con estos permisos o asignar roles de GCP preexistentes para cubrir estos permisos.

Búsqueda de la cuenta de servicio de GCS

En la consola en la nube de Google para el proyecto correspondiente, vaya a Cloud Storage > Settings. La sección «Cuenta de servicio de almacenamiento en nube» contiene el correo electrónico de la cuenta de servicio de GCS.

Cuenta de servicio GCS

Creación de un rol IAM personalizado de Google Cloud para el modo de notificación de archivos

En la consola de Google Cloud del proyecto correspondiente, vaya a IAM & Admin > Roles. A continuación, cree un rol en la parte superior o actualice un rol existente. En la pantalla de creación o edición de roles, haga clic en Add Permissions. A continuación, aparece un menú en el que puede agregar los permisos deseados al rol.

Roles personalizados para la característica IAM de GCP

Configuración o administración manual de recursos de notificación de archivos

Los usuarios con privilegios pueden configurar o administrar manualmente los recursos de notificación de archivos.

  • Configure los servicios de notificación de archivos manualmente a través del proveedor de nube y especifique manualmente el identificador de cola. Consulte Opciones de notificación de archivos para obtener más detalles.
  • Use las API de Scala para crear o administrar los servicios de notificaciones y colas, como se muestra en el ejemplo siguiente:

Nota:

Debe tener los permisos adecuados para configurar o modificar la infraestructura en la nube. Consulte la documentación de permisos para Azure, S3 o GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) para crear una cola y una suscripción con el nombre <prefix>-<resource-suffix> (el prefijo depende del sistema de almacenamiento resumido en Recursos de nube usados en el modo de notificación de archivos de Auto Loader. Si hay un recurso existente con el mismo nombre, Azure Databricks reutiliza el recurso ya existente en lugar de crear uno nuevo. Esta función devuelve un identificador de cola que se puede pasar al origen cloudFiles mediante el identificador en Opciones de notificación de archivos. Esto permite que el usuario de origen cloudFiles tenga menos permisos que el usuario que crea los recursos.

Proporcione la opción "path" a newManager solo si llama a setUpNotificationServices; no es necesario para listNotificationServices o tearDownNotificationServices. Es el mismo valor path que se usa al ejecutar una consulta de streaming.

La siguiente matriz indica qué métodos de API se admiten en qué Databricks Runtime para cada tipo de almacenamiento:

Almacenamiento en la nube Configuración de API API de lista API de anulación
AWS S3 Todas las versiones Todas las versiones Todas las versiones
ADLS Gen2 Todas las versiones Todas las versiones Todas las versiones
GCS Databricks Runtime 9.1 y versiones superiores Databricks Runtime 9.1 y versiones superiores Databricks Runtime 9.1 y versiones superiores
Azure Blob Storage Todas las versiones Todas las versiones Todas las versiones
ADLS Gen1 No compatible No compatible No compatible