Charger des fichiers à partir d’AWS S3 avec le chargeur automatique

Le chargeur automatique traite de façon incrémentielle et efficace les nouveaux fichiers de données à mesure qu’ils arrivent dans AWS S3 ( s3:// ).

Le chargeur automatique fournit une source de diffusion en continu structurée appelée cloudFiles . À partir du chemin d’accès du répertoire d’entrée sur le stockage de fichiers Cloud, la cloudFiles source traite automatiquement les nouveaux fichiers à mesure qu’ils arrivent, avec la possibilité de traiter également les fichiers existants dans ce répertoire.

Le chargeur automatique fonctionne avec les chemins d’accès DBFS et les chemins d’accès directs à la source de données.

Configuration requise

Databricks Runtime 7,2 ou version ultérieure.

Si vous avez créé des flux à l’aide de Databricks Runtime 7,1 ou des versions antérieures, consultez modifications des valeurs d’option par défaut et compatibilité et gestion des ressources de Cloud.

Modes de détection de fichiers

Le chargeur automatique prend en charge deux modes de détection de nouveaux fichiers : liste de répertoires et notification de fichier.

  • Liste des répertoires: identifie les nouveaux fichiers en répertoriant le répertoire d’entrée. Le mode de liste de répertoires vous permet de démarrer rapidement des flux de chargement automatique sans aucune configuration d’autorisation autre que l’accès à vos données sur AWS S3 et convient aux scénarios où seuls quelques fichiers doivent être diffusés sur une base régulière. Mode de liste de répertoires par défaut pour le chargeur automatique dans Databricks Runtime 7,2 et versions ultérieures.
  • Notification de fichier: utilise les services AWS SNS et SQS qui s’abonnent aux événements de fichier à partir du répertoire d’entrée. Le chargeur automatique configure automatiquement les services AWS SNS et SQS. Le mode de notification de fichier est plus performant et évolutif pour les répertoires d’entrée volumineux. Pour utiliser ce mode, vous devez configurer des autorisations pour les services AWS SNS et SQS et spécifier .option("cloudFiles.useNotifications","true") .

Vous pouvez modifier le mode lorsque vous redémarrez le flux. Par exemple, vous pouvez choisir de basculer vers le mode de notification de fichier lorsque la liste des répertoires est trop lente en raison de l’augmentation de la taille du répertoire d’entrée. Pour les deux modes, le chargeur automatique conserve en interne les pistes des fichiers qui ont été traités dans votre emplacement de point de contrôle de diffusion en continu pour fournir une sémantique exacte, de sorte que vous n’avez pas besoin de gérer vous-même les informations d’État.

Utiliser la cloudFiles source

Pour utiliser le chargeur automatique, créez une cloudFiles source de la même façon que les autres sources de streaming. Le code ci-dessous démarre un chargement automatique de flux d’écriture dans Delta Lake en mode de liste de répertoires :

Python

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .trigger(<trigger>) \
  .start(<output-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(<trigger>)
  .start(<output-path>)

où :

  • <cloudFiles-option> est une option de configuration dans la configuration.

  • <schema> est le schéma de fichier. Le chargeur automatique prend également en charge l’inférence de schéma et l’évolution avec certains formats de fichier. Pour plus d’informations , consultez inférence et évolution du schéma .

  • <input-path> est le chemin d’accès dans le stockage qui est surveillé pour les nouveaux fichiers. Les répertoires enfants de <input-path> sont également analysés. <input-path> peut contenir des modèles de glob de fichier. Le modèle glob y est * ajouté. si cela inclut des fichiers que vous ne souhaitez pas recevoir, vous pouvez inclure un filtre supplémentaire à l’aide de l' pathGlobFilter option. Si vous fournissez une file d’attente pour les notifications de fichiers et que vous n’avez pas besoin de renvoyer des données, vous n’avez pas besoin de fournir un chemin d’accès d’entrée.

  • <checkpoint-path> emplacement du point de contrôle de flux.

  • <trigger> Déclencheur facultatif pour le flux. La valeur par défaut consiste à exécuter le microbatch suivant le plus rapidement possible. Si vous avez des données arrivant à intervalles réguliers, par exemple une fois par jour, vous pouvez utiliser le déclencheur une fois et planifier l’exécution de vos flux dans un travail de Azure Databricks. Pour les flux Always on, Databricks recommande de définir un déclencheur de temps de traitement.

  • <output-path> est le chemin d’accès du flux de sortie.

Avantages par rapport à Apache Spark FileStreamSource

Dans Apache Spark, vous pouvez lire les fichiers de façon incrémentielle à l’aide de spark.readStream.format(fileFormat).load(directory) . Le chargeur automatique offre les avantages suivants par rapport à la source du fichier :

  • Évolutivité : le chargeur automatique peut découvrir efficacement des milliards de fichiers. Les remplissages peuvent être exécutés de façon asynchrone afin d’éviter de gaspiller des ressources de calcul.
  • Performances : le coût de la découverte de fichiers avec le chargeur automatique est mis à l’échelle avec le nombre de fichiers qui sont ingérés à la place du nombre de répertoires dans lesquels les fichiers peuvent se trouver. Consultez la liste des répertoires optimisés.
  • Inférence de schéma et prise en charge de l’évolution : le chargeur automatique peut détecter les dérives de schéma, vous avertir lorsque les modifications de schéma se produisent et récupérer les données qui auraient autrement été ignorées ou perdues. Consultez inférence et évolution du schéma.
  • Coût : le chargeur automatique utilise des API Cloud natives pour récupérer les listes des fichiers qui existent dans le stockage. En outre, le mode de notification de fichier du chargeur automatique peut contribuer à réduire les coûts liés au Cloud en évitant l’ensemble des listes de répertoires. Le chargeur automatique peut configurer automatiquement les services de notification de fichiers sur le stockage pour rendre la découverte des fichiers plus économique.

Liste des répertoires optimisés

Notes

Disponible dans Databricks Runtime 9,0 et versions ultérieures.

Le chargeur automatique peut détecter des fichiers sur des systèmes de stockage cloud à l’aide d’une liste de répertoires plus efficace que d’autres alternatives. Par exemple, si des fichiers ont été téléchargés toutes les 5 minutes en tant que /some/path/YYYY/MM/DD/HH/fileName , pour rechercher tous les fichiers dans ces répertoires, la source de fichier Apache Spark répertorie tous les sous-répertoires en parallèle, ce qui provoque 1 (répertoire de base) + 365 (par jour) * 24 (par heure) = 8761 liste des appels de répertoire API au stockage. En recevant une réponse aplatie du stockage, le chargeur automatique réduit le nombre d’appels d’API vers le nombre de fichiers dans le stockage divisé par le nombre de résultats retournés par chaque appel d’API, ce qui réduit considérablement les coûts du Cloud.

Liste incrémentielle

Important

Cette fonctionnalité est disponible en préversion publique.

Notes

Disponible dans Databricks Runtime 9,1 et Databricks Runtime 9,1 photonique et versions ultérieures.

Pour les fichiers générés par vue lexicographique, le chargeur automatique peut désormais tirer parti de l’ordonnancement lexical des fichiers et des API optimisées existantes pour améliorer l’efficacité de la liste de répertoires en répertoriant les fichiers ingérés précédents au lieu de répertorier l’intégralité du répertoire.

Par défaut, le chargeur automatique détecte automatiquement si un répertoire donné s’applique à la liste incrémentielle en vérifiant et en comparant les chemins d’accès des fichiers des listes de répertoires complets terminées précédentes. Pour garantir un achèvement éventuel dans ce auto mode, le chargeur automatique déclenchera automatiquement la liste complète des répertoires une fois que vous aurez terminé 7 listes incrémentielles consécutives. Si vous souhaitez être plus fréquent ou moins fréquent, vous pouvez définir cloudFiles.backfillInterval pour déclencher des remplissages asynchrones à un intervalle donné.

Si vous avez confiance dans l’ordre des fichiers générés dans le répertoire, vous pouvez activer ou désactiver explicitement le mode de mise en forme incrémentielle en affectant à la valeur cloudFiles.useIncrementalListing true ou false (par défaut auto ), par exemple, les fichiers classés par date=... partitions peuvent être considérés comme étant classés lexicalement si les données sont traitées une fois par jour, les chemins de fichiers contenant des horodateurs peuvent être considérés comme Vous pouvez toujours utiliser cloudFiles.backfillInterval pour vous assurer que toutes les données sont ingérées lorsque vous activez la liste incrémentielle.

Inférence et évolution du schéma

Notes

Disponible dans Databricks Runtime 8,2 et versions ultérieures.

Le chargeur automatique prend en charge l’inférence de schéma et l’évolution avec les formats de fichiers CSV, JSON, Binary ( binaryFile ) et texte. Pour plus d’informations, consultez inférence de schéma et évolution du chargeur automatique .

Exécuter le chargeur automatique en production

Databricks vous recommande de suivre les meilleures pratiques en matière de streaming pour l' exécution du chargeur automatique en production.

Configuration

Les options de configuration propres à la cloudFiles source sont précédées de cloudFiles façon à ce qu’elles se trouvent dans un espace de noms distinct des autres options de source de streaming structurées.

Important

Certaines valeurs d’option par défaut ont été modifiées dans Databricks Runtime 7,2. Si vous utilisez le chargeur automatique sur Databricks Runtime 7,1 ou les versions antérieures, consultez modifications des valeurs d’option par défaut et compatibilité.

Options de format de fichier

Le chargeur automatique vous permet de recevoir les fichiers,,,,, JSON CSV PARQUET AVRO TEXT BINARYFILE et ORC . Consultez options de format pour les options de ces formats de fichier.

Options courantes du chargeur automatique

Vous pouvez configurer les options suivantes pour la liste de répertoires ou le mode de notification de fichier.

Option
cloudFiles.allowOverwrites

Entrez : Boolean

Indique s’il faut autoriser les modifications du fichier du répertoire d’entrée pour remplacer les données existantes. Disponible dans Databricks Runtime 7,6 et versions ultérieures.

Valeur par défaut : true
cloudFiles. format

Entrez : String

Format du fichier de données dans le chemin source. Les valeurs autorisées sont les suivantes :

* avro: Fichier Avro
* binaryFile: Fichier binaire
* csv: Fichier CSV
* json: Fichier JSON
* orc: Fichier ORC
* parquet: Fichier parquet
* text: Fichier texte

Valeur par défaut : aucune (option obligatoire)
cloudFiles.includeExistingFiles

Entrez : Boolean

Indique s’il faut inclure les fichiers existants dans le chemin d’entrée de traitement du flux ou uniquement les nouveaux fichiers arrivant après l’installation initiale. Cette option est évaluée uniquement lorsque vous démarrez un flux pour la première fois. La modification de cette option après le redémarrage du flux n’a aucun effet.

Valeur par défaut : true
cloudFiles.inferColumnTypes

Entrez : Boolean

Indique s’il faut déduire les types de colonnes exacts lors de l’utilisation de l’inférence de schéma. Par défaut, les colonnes sont déduites sous forme de chaînes lors de l’inférence de datasets JSON. Pour plus d’informations, consultez inférence de schéma .

Valeur par défaut : false
cloudFiles.maxBytesPerTrigger

Entrez : Byte String

Nombre maximal de nouveaux octets à traiter dans chaque déclencheur. Vous pouvez spécifier une chaîne d’octets comme 10g pour limiter chaque microlot à 10 Go de données. Il s’agit d’une valeur maximale. Si vous avez des fichiers de 3 Go chacun, Azure Databricks traite 12 Go dans un microbatch. En cas d’utilisation avec cloudFiles.maxFilesPerTrigger , Azure Databricks consomme jusqu’à la limite inférieure de cloudFiles.maxFilesPerTrigger ou, selon ce qui cloudFiles.maxBytesPerTrigger est atteint en premier. Cette option n’a aucun effet lorsqu’elle est utilisée avec Trigger.Once() .

Valeur par défaut : Aucun
cloudFiles.maxFileAge

Entrez : Interval String

Durée de suivi d’un événement de fichier à des fins de déduplication. Databricks ne recommande pas de régler ce paramètre, sauf si vous ingestionz des données à l’ordre de millions de fichiers par heure. Pour plus d’informations, consultez la section How to Choose maxFileAge .

Valeur par défaut : Aucun
cloudFiles.resourceTags

Entrez : Map(String, String)

Série de paires clé-valeur pour aider à associer et à identifier les ressources associées, par exemple :

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

Pour plus d’informations, consultez affectation de noms aux files d’attente et aux métadonnées et couverture de properties.labels dans les abonnements aux événements. Le chargeur automatique stocke les paires de balises clé-valeur dans JSON en tant qu’étiquettes. (1)

Valeur par défaut : Aucun
cloudFiles.schemaEvolutionMode

Entrez : String

Le mode d’évolution du schéma à mesure que de nouvelles colonnes sont découvertes dans les données. Par défaut, les colonnes sont déduites sous forme de chaînes lors de l’inférence de datasets JSON. Pour plus d’informations, consultez évolution du schéma .

Valeur par défaut : "addNewColumns" lorsqu’un schéma n’est pas fourni.
Sinon, "none".
cloudFiles.schemaHints

Entrez : String

Informations de schéma que vous fournissez au chargeur automatique lors de l’inférence du schéma. Pour plus d’informations, consultez les indications de schéma .

Valeur par défaut : Aucun
cloudFiles. schemaLocation

Entrez : String

Emplacement dans lequel stocker le schéma inféré et les modifications ultérieures. Pour plus d’informations, consultez inférence de schéma .

Valeur par défaut : aucun (requis lors de l’inférence du schéma)
cloudFiles.validateOptions

Entrez : Boolean

Indique s’il faut valider les options du chargeur automatique et retourner une erreur pour les options inconnues ou incohérentes.

Valeur par défaut : true
cloudFiles.backfillInterval

> [!IMPORTANT] > > cette fonctionnalité est en préversion publique.

Entrez : Interval String

Le chargeur automatique peut déclencher des remplissages asynchrones à un intervalle donné.
par exemple 1 day , pour le renvoi une fois par jour ou 1 week pour le renvoi une fois par semaine. Les systèmes de notification d’événements de fichier ne garantissent pas la remise de 100% de tous les fichiers qui ont été téléchargés. par conséquent, vous pouvez utiliser les remplissages pour garantir que tous les fichiers finissent par être traités, disponibles dans Databricks Runtime 8,4 et Databricks Runtime 8,4 photons et versions ultérieures. Si vous utilisez la liste incrémentielle, vous pouvez également utiliser des remplissages standard pour garantir l’achèvement éventuel, disponible dans Databricks Runtime 9,1 et Databricks Runtime 9,1 photonique et versions ultérieures.

Valeur par défaut : Aucun

(1) le chargeur automatique ajoute les paires de balises clé-valeur suivantes par défaut, en fonction du meilleur effort :

  • vendor: Databricks
  • path: Emplacement à partir duquel les données sont chargées.
  • checkpointLocation: Emplacement du point de contrôle du flux.
  • streamId: Identificateur global unique pour le flux.

Ces noms de clé sont réservés et vous ne pouvez pas remplacer leurs valeurs.

Options de la liste de répertoires

Les options suivantes s’appliquent au mode de liste des répertoires.

Option
cloudFiles.useIncrementalListing

> [!IMPORTANT] > > cette fonctionnalité est en préversion publique.

Entrez : String

Indique s’il faut utiliser la liste incrémentielle plutôt que la liste complète en mode de liste de répertoires. Par défaut, le chargeur automatique fera le meilleur effort pour détecter automatiquement si un répertoire donné s’applique à la liste incrémentielle. Vous pouvez utiliser explicitement la liste incrémentielle ou utiliser la liste de répertoires complète en la définissant comme true ou false respectivement.

Disponible dans Databricks Runtime 9,1 et Databricks Runtime 9,1 photonique et versions ultérieures.

Valeur par défaut : auto

Valeurs disponibles : auto , true , false

Comment choisirmaxFileAge

Notes

Disponible dans Databricks Runtime 8,4 et versions ultérieures.

Le chargeur automatique assure le suivi des fichiers découverts à l’emplacement du point de contrôle à l’aide de RocksDB pour fournir des garanties d’ingestion exactement une fois. Pour les jeux de données volumineux, vous pouvez utiliser l' maxFileAge option pour faire expirer les événements à partir de l’emplacement du point de contrôle. La valeur minimale que vous pouvez définir pour maxFileAge est "14 days" . Dans RocksDB, les suppressions apparaissent comme des entrées tombstone. par conséquent, vous devez vous attendre à ce que l’utilisation du stockage augmente à mesure que les événements expirent avant qu’il ne commence à se déformer.

Avertissement

maxFileAge est fourni comme un mécanisme de contrôle des coûts pour les jeux de données volumineux, en informant dans l’ordre des millions de fichiers toutes les heures. Le paramétrage maxFileAge de manière incorrecte peut entraîner des problèmes de qualité des données. Par conséquent, Databricks ne recommande pas de régler ce paramètre, sauf si cela est absolument requis.

Toute tentative de réglage de l' maxFileAge option peut entraîner l’ignorance des fichiers non traités par le chargeur automatique ou l’expiration des fichiers traités, puis le retraitement des données en double. Voici quelques éléments à prendre en compte lors du choix d’un maxFileAge :

  • Si votre flux redémarre après un long moment, les événements de notification de fichier extraits de la file d’attente qui sont antérieurs à maxFileAge sont ignorés. De même, si vous utilisez la liste de répertoires, les fichiers qui peuvent avoir été affichés pendant une période d’indisponibilité antérieure à maxFileAge sont ignorés.
  • Si vous utilisez le mode de liste des répertoires et que vous utilisez maxFileAge , par exemple "1 month" , la valeur, vous arrêtez votre flux et redémarrez le flux avec la maxFileAge valeur "2 months" , tous les fichiers datant de plus de 1 mois, mais plus récents que deux mois, sont retraités.

La meilleure approche en matière de paramétrage maxFileAge consiste à démarrer à partir d’un délai d’expiration généreux, par exemple, "1 year" et à utiliser des éléments tels que "9 months" . Si vous définissez cette option la première fois que vous démarrez le flux, vous n’inrecevrez pas les données plus anciennes que maxFileAge , par conséquent, si vous voulez ingérer les anciennes données, vous ne devez pas définir cette option lorsque vous démarrez votre flux.

Options de notification de fichier

Les options suivantes s’appliquent au mode de notification fichier.

Option
cloudFiles.fetchParallelism

Entrez : Integer

Nombre de threads à utiliser lors de la récupération de messages à partir du service de mise en file d’attente.

Valeur par défaut : 1
cloudFiles.pathRewrites

Type : une chaîne JSON

Obligatoire uniquement si vous spécifiez un queueUrl qui reçoit des notifications de fichiers de plusieurs compartiments S3 et que vous souhaitez tirer parti des points de montage configurés pour accéder aux données de ces conteneurs. Utilisez cette option pour réécrire le préfixe du bucket/key chemin d’accès avec le point de montage. Seuls les préfixes peuvent être réécrits. Par exemple, pour la configuration
{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, le chemin d’accès
s3://<databricks-mounted-bucket>/path/2017/08/fileA.json est réécrite dans dbfs:/mnt/data-warehouse/2017/08/fileA.json .

Valeur par défaut : Aucun
cloudFiles.queueUrl

Entrez : String

URL de la file d’attente SQS. Si elle est fournie, la source de fichiers Cloud consomme directement les événements de cette file d’attente au lieu de configurer ses propres services SNS et SQS AWS.

Valeur par défaut : Aucun
cloudFiles.useNotifications

Entrez : Boolean

Indique si le mode de notification de fichier doit être utilisé pour déterminer le moment où de nouveaux fichiers sont utilisés. Si false , utilisez le mode de liste des répertoires. Consultez modes de détection de fichiers.

Valeur par défaut : false

Fournissez l’option suivante uniquement si vous choisissez cloudFiles.useNotifications = true et que le chargeur automatique doit configurer les services de notification pour vous :

Option
cloudFiles. Region

Entrez : String

La région où se trouve le compartiment S3 source et où les services AWS SNS et SQS seront créés.

Valeur par défaut : dans Databricks Runtime 9,0 et au-dessus de la région de l’instance EC2. Dans Databricks Runtime 8,4 et versions antérieures, vous devez spécifier la région.

Vous pouvez utiliser les options suivantes pour fournir des informations d’identification pour accéder à AWS SNS et SQS lorsque les rôles IAM ne sont pas disponibles ou lorsque vous ingestionz des données de différents Clouds.

Option
cloudFiles. awsAccessKey

Entrez : String

ID de la clé d’accès AWS pour l’utilisateur. Doit être fourni avec
cloudFiles.awsSecretKey.

Valeur par défaut : Aucun
cloudFiles.awsSecretKey

Entrez : String

Clé d’accès au secret AWS pour l’utilisateur. Doit être fourni avec
cloudFiles.awsAccessKey.

Valeur par défaut : Aucun
cloudFiles.roleArn

Entrez : String

ARN d’un rôle IAM à supposer. Le rôle peut être utilisé à partir du profil d’instance de votre cluster ou en fournissant des informations d’identification avec
Voir cloudFiles.awsAccessKey et cloudFiles.awsSecretKey.

Valeur par défaut : Aucun
cloudFiles.roleExternalId

Entrez : String

Identificateur à fournir en supposant qu’un rôle utilise cloudFiles.roleArn .

Valeur par défaut : Aucun
cloudFiles.roleSessionName

Entrez : String

Nom de session facultatif à utiliser en supposant qu’un rôle utilise
cloudFiles.roleArn.

Valeur par défaut : Aucun
cloudFiles. stsEndpoint

Entrez : String

Point de terminaison facultatif à fournir pour accéder au STS AWS en supposant un rôle à l’aide de cloudFiles.roleArn .

Valeur par défaut : Aucun

Modifications des valeurs d’option par défaut et compatibilité

Les valeurs par défaut des options de chargeur automatique suivantes ont été modifiées dans Databricks Runtime 7,2 sur les valeurs indiquées dans configuration.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Les flux de chargement automatique démarrés sur Databricks Runtime 7,1 et les valeurs d’option par défaut suivantes sont les suivants :

  • cloudFiles.useNotifications a la valeur true.
  • cloudFiles.includeExistingFiles a la valeur false.
  • cloudFiles.validateOptions a la valeur false.

Pour garantir la compatibilité avec les applications existantes, ces valeurs d’option par défaut ne changent pas lorsque vous exécutez vos flux de chargeur automatique existants sur Databricks Runtime 7,2 ou version ultérieure ; les flux auront le même comportement après la mise à niveau.

Autorisations

Vous devez disposer d’autorisations de lecture pour le répertoire d’entrée. Pour plus d’informations, consultez détails de la connexion S3.

Pour utiliser le mode de notification de fichier, joignez le document de stratégie JSON suivant à votre utilisateur ou rôle IAM.

Si vous ne parvenez pas à configurer les autorisations spécifiées dans le document de stratégie JSON, vous pouvez éventuellement demander à un administrateur d’effectuer l’installation à l’aide de l' API Scala Resource Management Scala. Un administrateur peut vous fournir l’URL de la file d’attente, que vous pouvez fournir directement en tant que .option("queueUrl", <queue-url>) cloudFiles source. Avec cette configuration, vous n’avez besoin que d’autorisations réduites. Pour plus d’informations, consultez annexe : autorisations réduites après l’installation initiale .

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

où :

  • <bucket-name>: Nom du compartiment S3 dans lequel votre flux lira des fichiers, par exemple auto-logs . Vous pouvez utiliser * comme caractère générique, par exemple databricks-*-logs . Pour déterminer le compartiment S3 sous-jacent de votre chemin DBFS, vous pouvez répertorier tous les points de montage DBFS d’un bloc-notes en exécutant %fs mounts .
  • <region>: Région AWS dans laquelle réside le compartiment S3, par exemple us-west-2 . Si vous ne souhaitez pas spécifier la région, utilisez * .
  • <account-number>: Numéro de compte AWS qui possède le compartiment S3, par exemple 123456789012 . Si vous ne souhaitez pas spécifier le numéro de compte, utilisez * .

La chaîne databricks-auto-ingest-* dans la spécification SQS et SNS ARN est le préfixe de nom cloudFiles utilisé par la source lors de la création de services SQS et SNS. Étant donné que Azure Databricks configure les services de notification lors de l’exécution initiale du flux, vous pouvez utiliser une stratégie avec des autorisations réduites après l’exécution initiale (par exemple, arrêter le flux, puis le redémarrer). Pour plus d’informations, consultez annexe : autorisations réduites après l’installation initiale .

Notes

La stratégie précédente concerne uniquement les autorisations nécessaires à la configuration des services de notification de fichiers, à savoir les services de notification de compartiment S3, SNS et SQS, et suppose que vous disposez déjà d’un accès en lecture au compartiment S3. Si vous devez ajouter des autorisations de lecture seule S3, ajoutez ce qui suit à la Action liste dans l' DatabricksAutoLoaderSetup instruction dans le document JSON :

  • s3:ListBucket
  • s3:GetObject

Réception sécurisée des données dans un autre compte AWS

Le chargeur automatique peut charger des données entre des comptes AWS en supposant un rôle IAM. Après avoir défini les informations d’identification de sécurité temporaires créées par AssumeRole , vous pouvez faire en sorte que le chargeur automatique charge des fichiers Cloud entre les comptes. Pour configurer le chargeur automatique pour les comptes inter-AWS, suivez le document : _. Assurez-vous d’effectuer les tâches suivantes :

  • Vérifiez que vous avez le rôle méta AssumeRole affecté au cluster.

  • Configurez la configuration Spark du cluster pour inclure les propriétés suivantes :

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

Mesures

Le chargeur automatique signale les métriques à chaque lot. Vous pouvez afficher le nombre de fichiers présents dans le backlog et la taille du backlog dans les numFilesOutstanding numBytesOutstanding métriques et sous l’onglet données brutes dans le tableau de bord progression des requêtes de streaming:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

Gestion des ressources de Cloud

Vous pouvez utiliser une API Scala pour gérer les services AWS SNS et SQS créés par le chargeur automatique. Vous devez configurer les autorisations de configuration des ressources décrites dans autorisations avant d’utiliser cette API.

Important

Si vous avez utilisé le chargeur automatique dans Databricks Runtime 7,1 et les versions antérieures, mettez à jour votre stratégie IAM à l’aide du document de stratégie JSON dans autorisations. Il existe de nouvelles instructions dans la stratégie pour Databricks Runtime 7,2, DatabricksAutoLoaderList et DatabricksAutoLoaderTeardown , qui spécifient les autorisations supplémentaires requises par l’API Scala.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

// Set up an SQS queue and a topic subscribed to the path provided in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Notes

Disponible dans Databricks Runtime 7,4 et versions ultérieures.

Utilisez setUpNotificationServices(<resource-suffix>) pour créer une file d’attente SQS et une rubrique SNS avec le nom databricks-auto-ingest-<resource-suffix> . S’il existe une SQS file d’attente ou une rubrique SNS portant le même nom, Azure Databricks réutilise la ressource qui existe déjà au lieu d’en créer une nouvelle. Cette fonction retourne une file d’attente SQS que vous pouvez passer à la cloudFiles source à l’aide de .option("cloudFiles.queueUrl", <queue-url>) . Cela permet cloudFiles à l’utilisateur source d’avoir moins d’autorisations que l’utilisateur qui crée les ressources. Consultez autorisations.

Fournissez l' "path" option à newManager uniquement en cas d’appel de setUpNotificationServices ; il n’est pas nécessaire pour listNotificationServices ou tearDownNotificationServices . Il s’agit du même path que celui utilisé lors de l’exécution d’une requête de streaming.

Forum Aux Questions (FAQ)

Dois-je créer des services de notification d’événements AWS au préalable ?

Non. Si vous choisissez le mode de notification fichier, le chargeur automatique crée une rubrique AWS S3 > SNS > pipeline de notification d’événements du fichier de file d’attente SQS automatiquement lorsque vous démarrez le flux.

Comment faire nettoyer les ressources de notification d’événements, telles que les rubriques SNS et les files d’attente SQS, créées par le chargeur automatique ?

Vous pouvez utiliser le Gestionnaire de ressources cloud pour répertorier et supprimer des ressources. Vous pouvez également supprimer ces ressources manuellement, soit dans la console Web, soit à l’aide des API AWS. Toutes les ressources créées par le chargeur automatique ont le préfixe : databricks-auto-ingest- .

Le chargeur automatique traite-t-il le fichier une fois que le fichier est ajouté ou remplacé ?

Les fichiers sont traités une seule fois, sauf si vous activez cloudFiles.allowOverwrites . Si un fichier est ajouté à ou remplacé, Databricks ne garantit pas la version du fichier qui est traitée. Pour un comportement bien défini, Databricks recommande d’utiliser le chargeur automatique pour ingérer uniquement les fichiers immuables. Si cela ne répond pas à vos besoins, contactez votre représentant Databricks.

Puis-je exécuter plusieurs requêtes de streaming à partir du même répertoire d’entrée ?

Oui. Chaque flux de fichiers Cloud, tel qu’identifié par un répertoire de point de contrôle unique, possède sa propre file d’attente SQS et les mêmes événements de l’AWS S3 peuvent être envoyés à plusieurs files d’attente SQS.

Si mes fichiers de données n’arrivent pas en permanence, mais à intervalles réguliers, par exemple une fois par jour, dois-je toujours utiliser cette source et quels sont les avantages ?

Oui et oui. Dans ce cas, vous pouvez configurer une Trigger-Once tâche de diffusion en continu structurée et planifier l’exécution après l’heure d’arrivée prévue du fichier. La première exécution définit les services de notification d’événements, qui seront toujours activés, même lorsque le cluster de streaming est en service. Lorsque vous redémarrez le flux, la cloudFiles source récupère et traite tous les fichiers événements sauvegardés dans la file d’attente SQS. L’avantage de l’utilisation du chargeur automatique dans ce cas est que vous n’avez pas besoin de déterminer les fichiers qui sont nouveaux et à traiter à chaque fois, ce qui peut être très coûteux.

Que se passe-t-il si je modifie l’emplacement du point de contrôle lors du redémarrage du flux ?

Un emplacement de point de contrôle conserve les informations d’identification importantes d’un flux. La modification efficace de l’emplacement du point de contrôle signifie que vous avez abandonné le flux précédent et lancé un nouveau flux. Le nouveau flux crée des informations de progression et, si vous utilisez le mode de notification de fichier, les nouveaux services AWS SNS et SQS. Vous devez nettoyer manuellement l’emplacement du point de contrôle et les services AWS SNS et SQS pour tous les flux abandonnés.

Puis-je exécuter plusieurs requêtes de streaming à partir de différents répertoires d’entrée sur le même compartiment S3 ?

Oui, tant qu’il ne s’agit pas de répertoires de type parent-enfant, par exemple prod-logs/ et prod-logs/usage/ .

Puis-je utiliser cette fonctionnalité quand des notifications de fichier existent sur mon compartiment S3 ?

Oui, tant que votre répertoire d’entrée n’est pas en conflit avec le préfixe de notification existant (par exemple, les répertoires parent-enfant ci-dessus).

Annexe : autorisations réduites après l' installation initiale

Les autorisations d’installation des ressources décrites dans autorisations sont requises uniquement lors de l’exécution initiale du flux. Après la première exécution, vous pouvez basculer vers la stratégie IAM suivante avec des autorisations réduites.

Important

Avec les autorisations réduites, vous ne pouvez pas démarrer de nouvelles requêtes de streaming ou recréer des ressources en cas d’échec (par exemple, la file d’attente SQS a été supprimée par inadvertance). vous ne pouvez pas non plus utiliser l’API de gestion des ressources de Cloud pour répertorier ou détruire des ressources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}