Co je režim oznámení souboru automatického zavaděče?

V režimu oznámení souborů automaticky zavaděč nastaví službu oznámení a službu fronty, která se přihlásí k odběru událostí souborů ze vstupního adresáře. Oznámení o souborech můžete použít ke škálování automatického zavaděče na ingestování milionů souborů za hodinu. Ve srovnání s režimem výpisu adresářů je režim oznámení souboru výkonnější a škálovatelný pro velké vstupní adresáře nebo velký objem souborů, ale vyžaduje další cloudová oprávnění.

Mezi oznámeními o souborech a výpisem adresářů můžete kdykoli přepínat a přitom zachovat záruky zpracování dat přesně jednou.

Upozorňující

Změna zdrojové cesty pro automatický zavaděč není podporována v režimu oznámení souboru. Pokud se použije režim oznámení souboru a cesta se změní, může se stát, že se nepodaří ingestovat soubory, které jsou již v novém adresáři v době aktualizace adresáře.

Cloudové prostředky používané v režimu oznámení souborů automatického zavaděče

Důležité

K automatické konfiguraci cloudové infrastruktury pro režim oznámení souborů potřebujete zvýšená oprávnění. Obraťte se na správce cloudu nebo správce pracovního prostoru. Viz:

Automatický zavaděč vám může automaticky nastavit oznámení o souborech, když nastavíte možnost cloudFiles.useNotificationstrue a poskytnete potřebná oprávnění k vytváření cloudových prostředků. Kromě toho možná budete muset poskytnout další možnosti , jak udělit autorizaci automatického zavaděče pro vytvoření těchto prostředků.

Následující tabulka shrnuje, které prostředky jsou vytvořeny automatickým zavaděčem.

Cloudové úložiště Předplatná služba Služba front Předponu* Limit**
AWS S3 AWS SNS AWS SQS Automatické ingestování databricks 100 na kbelík S3
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks 500 na účet úložiště
GCS Google Pub/Sub Google Pub/Sub Automatické ingestování databricks 100 na kbelík GCS
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks 500 na účet úložiště
  • Automaticky zavaděč pojmenuje prostředky s touto předponou.

** Kolik souběžných kanálů oznámení o souborech je možné spustit

Pokud potřebujete pro daný účet úložiště spustit více než omezený počet kanálů oznámení o souborech, můžete:

  • Využijte službu, jako je AWS Lambda, Azure Functions nebo Google Cloud Functions, k rozdlouchejte oznámení z jedné fronty, která naslouchá celému kontejneru nebo kontejneru do front specifických pro adresář.

Události oznámení souboru

AWS S3 poskytuje ObjectCreated událost při nahrání souboru do kontejneru S3 bez ohledu na to, jestli byl nahraný vložením nebo vícedílným nahráním.

ADLS Gen2 poskytuje různá oznámení událostí pro soubory, které se zobrazují v kontejneru Gen2.

  • Automatické zavaděče naslouchá FlushWithClose události pro zpracování souboru.
  • Streamy automatického zavaděče RenameFile podporují akci zjišťování souborů. RenameFile Akce vyžadují požadavek rozhraní API na systém úložiště, aby získaly velikost přejmenovaného souboru.
  • Automatické streamy zavaděče vytvořené pomocí Databricks Runtime 9.0 a po podpoře RenameDirectory akce zjišťování souborů. RenameDirectory Akce vyžadují, aby požadavky rozhraní API na systém úložiště vypsaly obsah přejmenovaného adresáře.

Google Cloud Storage poskytuje OBJECT_FINALIZE událost při nahrání souboru, který zahrnuje přepsání a kopie souborů. Neúspěšné nahrání negenerují tuto událost.

Poznámka:

Poskytovatelé cloudu nezaručují 100% doručení všech událostí souborů za velmi vzácných podmínek a neposkytují přísné smlouvy SLA na latenci událostí souborů. Databricks doporučuje aktivovat pravidelné zavaděče automatického zavaděče pomocí cloudFiles.backfillInterval možnosti zaručit, že se všechny soubory v rámci dané smlouvy SLA zjistí, pokud je požadavek na dokončení dat. Aktivace pravidelných backfillů nezpůsobí duplicity.

Požadovaná oprávnění pro konfiguraci oznámení o souboru pro ADLS Gen2 a Azure Blob Storage

Pro vstupní adresář musíte mít oprávnění ke čtení. Viz Azure Blob Storage.

Pokud chcete použít režim oznámení souboru, musíte zadat přihlašovací údaje pro ověřování pro nastavení a přístup ke službám oznámení událostí. K ověřování potřebujete pouze instanční objekt.

  • Instanční objekt – použití předdefinovaných rolí Azure

    Vytvořte aplikaci a instanční objekt Microsoft Entra ID (dříve Azure Active Directory) ve formě ID klienta a tajného klíče klienta.

    Přiřaďte této aplikaci následující role k účtu úložiště, ve kterém se nachází vstupní cesta:

    • Přispěvatel: Tato role slouží k nastavení prostředků ve vašem účtu úložiště, jako jsou fronty a odběry událostí.
    • Přispěvatel dat fronty úložiště: Tato role slouží k provádění operací front, jako je načítání a odstraňování zpráv z front. Tato role se vyžaduje pouze v případě, že zadáte instanční objekt bez připojovací řetězec.

    Přiřaďte této aplikaci následující roli ke související skupině prostředků:

    • Přispěvatel EventGrid EventSubscription: Tato role slouží k provádění operací odběru event gridu, jako je vytváření nebo výpis odběrů událostí.

    Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.

  • Instanční objekt – použití vlastní role

    Pokud máte obavy o nadměrná oprávnění požadovaná pro předchozí role, můžete vytvořit vlastní roli s alespoň následujícími oprávněními, která jsou uvedená níže ve formátu JSON role Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Potom můžete této vlastní roli přiřadit k aplikaci.

    Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.

Oprávnění automatického zavaděče

Řešení běžných chyb

Chyba:

java.lang.RuntimeException: Failed to create event grid subscription.

Pokud se tato chybová zpráva zobrazí při prvním spuštění automatického zavaděče, služba Event Grid není ve vašem předplatném Azure zaregistrovaná jako poskytovatel prostředků. Postup registrace na webu Azure Portal:

  1. Přejděte k předplatnému.
  2. V části Nastavení klikněte na Poskytovatele prostředků.
  3. Zaregistrujte poskytovatele Microsoft.EventGrid.

Chyba:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Pokud se tato chybová zpráva zobrazí při prvním spuštění automatického zavaděče, ujistěte se, že jste svému instančnímu objektu pro Event Grid i vašemu účtu úložiště udělili roli Přispěvatel.

Požadovaná oprávnění pro konfiguraci oznámení o souboru pro AWS S3

Pro vstupní adresář musíte mít oprávnění ke čtení. Další podrobnosti najdete v podrobnostech o připojení S3.

Pokud chcete použít režim oznámení souboru, připojte k uživateli nebo roli IAM následující dokument zásad JSON.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

kde:

  • <bucket-name>: Název kontejneru S3, kde stream bude číst soubory, auto-logsnapříklad . Můžete použít * například databricks-*-logsjako zástupný znak . Pokud chcete zjistit základní kontejner S3 pro cestu DBFS, můžete zobrazit seznam všech přípojných bodů DBFS v poznámkovém bloku spuštěním %fs mounts.
  • <region>: Oblast AWS, ve které se nachází kontejner S3, us-west-2například . Pokud nechcete zadávat oblast, použijte *.
  • <account-number>: Číslo účtu AWS, které vlastní kbelík S3, 123456789012například . Pokud nechcete zadat číslo účtu, použijte *.

Řetězec databricks-auto-ingest-* ve specifikaci SQS a SNS ARN je předpona názvu, kterou cloudFiles zdroj používá při vytváření služeb SQS a SNS. Vzhledem k tomu, že Azure Databricks nastaví služby oznámení v počátečním spuštění streamu, můžete po počátečním spuštění použít zásadu s omezenými oprávněními (například zastavit stream a poté ho restartovat).

Poznámka:

Předchozí zásada se zabývá pouze oprávněními potřebnými k nastavení služeb oznámení souborů, konkrétně služby S3 bucket notification, SNS a SQS a předpokládá, že už máte přístup pro čtení do kontejneru S3. Pokud potřebujete přidat oprávnění jen pro čtení S3, přidejte do seznamu v příkazu v DatabricksAutoLoaderSetup dokumentu JSON následujícíAction:

  • s3:ListBucket
  • s3:GetObject

Omezená oprávnění po počátečním nastavení

Výše popsaná oprávnění k nastavení prostředků se vyžadují pouze při počátečním spuštění datového proudu. Po prvním spuštění můžete přepnout na následující zásady IAM s omezenými oprávněními.

Důležité

S omezenými oprávněními nemůžete v případě selhání spustit nové dotazy streamování ani znovu vytvořit prostředky (například fronta SQS byla omylem odstraněna); Nemůžete také použít rozhraní API pro správu cloudových prostředků k výpisu nebo odstraňování prostředků.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Požadovaná oprávnění ke konfiguraci oznámení o souboru pro službu GCS

Ke kontejneru GCS a ke všem objektům musíte mít list oprávnění get . Podrobnosti najdete v dokumentaci Google k oprávněním IAM.

Pokud chcete použít režim oznámení souborů, musíte přidat oprávnění pro účet služby GCS a účet použitý pro přístup k prostředkům Google Cloud Pub/Sub.

Pub/Sub Publisher Přidejte roli do účtu služby GCS. Účet tak může publikovat zprávy oznámení o událostech z kontejnerů GCS do Google Cloud Pub/Sub.

Pokud jde o účet služby používaný pro prostředky Google Cloud Pub/Sub, musíte přidat následující oprávnění:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

K tomu můžete buď vytvořit vlastní roli IAM s těmito oprávněními, nebo přiřadit existující role GCP k pokrytí těchto oprávnění.

Vyhledání účtu služby GCS

V konzole Google Cloud Console pro odpovídající projekt přejděte na Cloud Storage > Settings. Část "Účet služby cloudového úložiště" obsahuje e-mail účtu služby GCS.

Účet služby GCS

Vytvoření vlastní role IAM cloudu Google pro režim oznámení souborů

V konzole Google Cloud pro odpovídající projekt přejděte na IAM & Admin > Roles. Pak buď vytvořte roli v horní části, nebo aktualizujte existující roli. Na obrazovce pro vytvoření nebo úpravu role klikněte na Add Permissions. Zobrazí se nabídka, ve které můžete přidat požadovaná oprávnění k roli.

Vlastní role GCP IAM

Ruční konfigurace nebo správa prostředků oznámení o souborech

Privilegovaní uživatelé můžou ručně konfigurovat nebo spravovat prostředky oznámení o souborech.

  • Službu oznámení souborů nastavte ručně prostřednictvím poskytovatele cloudu a ručně zadejte identifikátor fronty. Další podrobnosti najdete v tématu Možnosti oznámení o souboru.
  • Rozhraní SCALA API slouží k vytvoření nebo správě oznámení a služeb řazení do front, jak je znázorněno v následujícím příkladu:

Poznámka:

Ke konfiguraci nebo úpravě cloudové infrastruktury musíte mít příslušná oprávnění. Prohlédni si dokumentaci k oprávněním pro Azure, S3 nebo GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices())

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Slouží setUpNotificationServices(<resource-suffix>) k vytvoření fronty a předplatného s názvem <prefix>-<resource-suffix> (předpona závisí na systému úložiště shrnutého v cloudových prostředcích používaných v režimu oznámení souboru automatického zavaděče. Pokud existuje existující prostředek se stejným názvem, Azure Databricks znovu použije existující prostředek místo vytvoření nového prostředku. Tato funkce vrátí identifikátor fronty, který můžete předat cloudFiles zdroji pomocí identifikátoru v možnostech oznámení souboru. To umožňuje zdrojovému cloudFiles uživateli mít méně oprávnění než uživatel, který prostředky vytvoří.

Zadejte možnost newManager pouze v případě, že volání setUpNotificationServicesnení nutné nebo listNotificationServicestearDownNotificationServices."path" To je stejné path , jaké používáte při spouštění streamovacího dotazu.

Následující matice označuje, které metody rozhraní API jsou podporované v tom, ve kterých databricks Runtime pro každý typ úložiště:

Cloudové úložiště Nastavení rozhraní API Rozhraní API pro výpis Roztrhání rozhraní API
AWS S3 Všechny verze Všechny verze Všechny verze
ADLS Gen2 Všechny verze Všechny verze Všechny verze
GCS Databricks Runtime 9.1 a novější Databricks Runtime 9.1 a novější Databricks Runtime 9.1 a novější
Azure Blob Storage Všechny verze Všechny verze Všechny verze
ADLS Gen1 Nepodporované Nepodporované Nepodporované