Bestanden laden vanuit AWS S3 met behulp van Auto Loader

Automatisch laden verwerkt incrementeel en efficiënt nieuwe gegevensbestanden wanneer deze binnenkomen in AWS S3 ( s3:// ).

Auto Loader biedt een structured streaming-bron met de naam cloudFiles . Op basis van een pad naar de invoermap in de cloudbestandsopslag verwerkt de bron automatisch nieuwe bestanden wanneer deze binnenkomen, met de optie om ook bestaande bestanden in die cloudFiles map te verwerken.

Automatisch laden werkt met DBFS-paden en directe paden naar de gegevensbron.

Vereisten

Databricks Runtime 7.2 of hoger.

Als u streams hebt gemaakt met Databricks Runtime 7.1 of lager, zie Wijzigingen in standaardoptiewaarden en compatibiliteit en Cloudresourcebeheer.

Bestandsdetectiemodi

Auto Loader ondersteunt twee modi voor het detecteren van wanneer er nieuwe bestanden zijn: mapvermelding en bestandsmelding.

  • Mapvermelding: identificeert nieuwe bestanden door de invoermap weer te geven. Met de modus Directoryvermelding kunt u automatisch laden snel stromen starten zonder andere machtigingsconfiguraties dan toegang tot uw gegevens op AWS S3. Dit is geschikt voor scenario's waarin slechts enkele bestanden regelmatig moeten worden gestreamd. De modus Directoryvermelding is de standaardinstelling voor Automatisch laden in Databricks Runtime 7.2 en hoger.
  • Bestandsmelding: maakt gebruik van AWS SNS- en SQS-services die zich abonneren op bestandsgebeurtenissen uit de invoermap. Auto Loader stelt automatisch de AWS SNS- en SQS-services in. De bestandsmeldingsmodus is beter presterend en schaalbaar voor grote invoerdirecties. Als u deze modus wilt gebruiken, moet u machtigingen configureren voor de AWS SNS- en SQS-services en .option("cloudFiles.useNotifications","true") opgeven.

U kunt de modus wijzigen wanneer u de stream opnieuw start. U kunt bijvoorbeeld overschakelen naar de bestandsmeldingsmodus wanneer de mapvermelding te langzaam wordt vanwege de toename van de grootte van de invoermap. Voor beide modi houdt Automatisch laden intern bij welke bestanden zijn verwerkt op de locatie van uw streamingcontrolepunt om exactly-once-semantiek te bieden, zodat u geen statusinformatie zelf hoeft te beheren.

Bron cloudFiles gebruiken

Als u het automatisch laadplatform wilt gebruiken, maakt u een bron op dezelfde cloudFiles manier als andere streamingbronnen. Met de onderstaande code wordt een stream voor automatisch laden naar Delta Lake geschreven in de mapvermeldingsmodus:

Python

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .trigger(<trigger>) \
  .start(<output-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(<trigger>)
  .start(<output-path>)

Hierbij

  • <cloudFiles-option> is een configuratieoptie in Configuratie.

  • <schema> is het bestandsschema. Auto Loader ondersteunt ook schemadeferentie en ontwikkeling met sommige bestandsindelingen. Zie Schemadeferentie en ontwikkeling voor meer informatie

  • <input-path> is het pad in de opslag dat wordt bewaakt op nieuwe bestanden. Onderliggende directories <input-path> van worden ook bewaakt. <input-path> kan bestands glob-patronen bevatten. Het glob-patroon wordt toegevoegd. Als dit bestanden bevat die u niet wilt opnemen, kunt u een extra filter opnemen * via de pathGlobFilter optie . Als u een wachtrij voor bestandsmeldingen verstrekt en geen gegevens hoeft in te vullen, hoeft u geen invoerpad op te geven.

  • <checkpoint-path> is de locatie van het stroomcontrolepunt.

  • <trigger> Een optionele trigger voor de stream. De standaardinstelling is om de volgende microbatch zo snel mogelijk uit te voeren. Als u gegevens binnenkomt met een vast interval, bijvoorbeeld eenmaal per dag, kunt u Eenmaal activeren gebruiken en de uitvoering van uw streams in een Azure Databricks plannen. Voor Always On-streams raadt Databricks aan om een verwerkingstijdtrigger in te stellen.

  • <output-path> is het pad naar de uitvoerstroom.

Voordelen ten opzichte van Apache Spark FileStreamSource

In Apache Spark kunt u bestanden incrementeel lezen met behulp van spark.readStream.format(fileFormat).load(directory) . Automatisch laden biedt de volgende voordelen ten opzichte van de bestandsbron:

  • Schaalbaarheid: Met automatisch laden kunnen miljarden bestanden efficiënt worden ontdekt. Backfills kunnen asynchroon worden uitgevoerd om te voorkomen dat rekenbronnen worden verspild.
  • Prestaties: de kosten voor het detecteren van bestanden met automatische laadmogelijkheden worden geschaald met het aantal bestanden dat wordt opgenomen in plaats van het aantal mappen waarin de bestanden kunnen beland. Zie Geoptimaliseerde directory met.
  • Ondersteuning voor schemadeferentie en ontwikkeling: Met automatische loader kunt u schemadrifts detecteren, u op de hoogte stellen wanneer schemawijzigingen plaatsvinden en gegevens die anders zouden zijn genegeerd of verloren, verwijderen. Zie Schemadeferentie en evolutie.
  • Kosten: Auto Loader maakt gebruik van systeemeigen cloud-API's om lijsten op te halen met bestanden die in de opslag aanwezig zijn. Bovendien kan de modus voor bestandsmeldingen van het automatisch laadplatform helpen uw cloudkosten verder te verlagen door een directorylijst helemaal te vermijden. Automatisch laden kan automatisch services voor bestandsmeldingen instellen op opslag om bestandsdetectie veel goedkoper te maken.

Geoptimaliseerde mapvermelding

Notitie

Beschikbaar in Databricks Runtime 9.0 en hoger.

Met automatisch laden kunnen bestanden in cloudopslagsystemen worden ontdekt met behulp van mapvermeldingen efficiënter dan andere alternatieven. Als u bijvoorbeeld bestanden hebt die elke vijf minuten worden geüpload als , om alle bestanden in deze mappen te vinden, worden in de Apache Spark-bestandsbron alle subdirectory's parallel weergegeven, waardoor /some/path/YYYY/MM/DD/HH/fileName er 1 (basismap) + 365 (per dag) * 24 (per uur) = 8761 LIJST API-directory-aanroepen naar opslag worden veroorzaakt. Door een afgevlakt antwoord uit de opslag te ontvangen, vermindert Automatisch laden het aantal API-aanroepen naar het aantal bestanden in de opslag gedeeld door het aantal resultaten dat door elke API-aanroep wordt geretourneerd, waardoor de cloudkosten aanzienlijk worden teruggebracht.

Schemadeferentie en ontwikkeling

Notitie

Beschikbaar in Databricks Runtime 8.2 en hoger.

Auto Loader ondersteunt schemadeferentie en ontwikkeling met CSV-, JSON-, binaire ( binaryFile ) en tekstbestandsindelingen. Zie Schemadeferentie en ontwikkeling in Auto Loader voor meer informatie.

Automatisch laden uitvoeren in productie

Databricks raadt u aan de aanbevolen procedures voor streaming te volgen voor het uitvoeren van automatisch laadplatform in productie.

Configuratie

Configuratieopties die specifiek zijn voor de bron, worden vooraf voorzien van , zodat ze zich in een afzonderlijke naamruimte van cloudFiles cloudFiles andere gestructureerde streaming-bronopties.

Belangrijk

Sommige standaardoptiewaarden zijn gewijzigd in Databricks Runtime 7.2. Zie Wijzigingen in standaardoptiewaarden en compatibiliteit als u Databricks Runtime 7.1 of lager gebruikt.

Opties voor bestandsindelingen

Met Automatisch laden kunt u JSON , , , , , en bestanden CSV PARQUET AVRO TEXT BINARYFILE ORC opnemen. Zie Opmaakopties voor de opties voor deze bestandsindelingen.

Algemene opties voor automatisch laden

U kunt de volgende opties configureren voor mapvermelding of bestandsmeldingsmodus.

Optie
cloudFiles.allowOverwrites

Type: Boolean

Of wijzigingen in het invoermapbestand bestaande gegevens mogen overschrijven. Beschikbaar in Databricks Runtime 7.6 en hoger.

Standaardwaarde: true
cloudFiles.format

Type: String

De indeling van het gegevensbestand in het bronpad. Toegestane waarden zijn onder andere:

* avro: Avro-bestand
* binaryFile: Binair bestand
* csv: CSV-bestand
* json: JSON-bestand
* orc: ORC-bestand
* parquet: Parquet-bestand
* text: Tekstbestand

Standaardwaarde: Geen (vereiste optie)
cloudFiles.includeExistingFiles

Type: Boolean

Of u bestaande bestanden wilt opnemen in het invoerpad voor stroomverwerking of alleen nieuwe bestanden wilt verwerken die aankomen na de eerste installatie. Deze optie wordt alleen geëvalueerd wanneer u een stream voor de eerste keer start. Het wijzigen van deze optie na het opnieuw opstarten van de stream heeft geen effect.

Standaardwaarde: true
cloudFiles.inferColumnTypes

Type: Boolean

Of u exacte kolomtypen wilt afleiden bij het gebruik van schemadeferentie. Standaard worden kolommen afgeleid als tekenreeksen bij het afleiden van JSON-gegevenssets. Zie schemadeferentie voor meer informatie.

Standaardwaarde: false
cloudFiles.maxBytesPerTrigger

Type: Byte String

Het maximum aantal nieuwe bytes dat in elke trigger moet worden verwerkt. U kunt een bytereeks opgeven, bijvoorbeeld om 10g elke microbatch te beperken tot 10 GB aan gegevens. Dit is een zachte maximum. Als u bestanden van elk 3 GB hebt, Azure Databricks 12 GB in een microbatch verwerkt. Wanneer u deze samen met gebruikt, Azure Databricks tot de lagere cloudFiles.maxFilesPerTrigger limiet van of , wat het eerst wordt cloudFiles.maxFilesPerTrigger cloudFiles.maxBytesPerTrigger bereikt. Deze optie heeft geen effect wanneer u gebruikt met Trigger.Once() .

Standaardwaarde: Geen
cloudFiles.maxFileAge

Type: Interval String

Hoe lang een bestandsgebeurtenis wordt bijgeslagen voor ontdubbelingsdoeleinden. Databricks raadt het afstemmen van deze parameter niet aan, tenzij u gegevens in de volgorde van miljoenen bestanden per uur opgeeft. Zie de sectie maxFileAge kiezen voor meer informatie.

Standaardwaarde: Geen
cloudFiles.resourceTags

Type: Map(String, String)

Een reeks sleutel-waardetagparen om gerelateerde resources te koppelen en te identificeren, bijvoorbeeld:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

Zie Naamgeving van wachtrijen en metagegevens en de dekking van properties.labels in Gebeurtenisabonnementen voor meer informatie. Auto Loader slaat deze sleutel-waardetagparen op in JSON als labels. (1)

Standaardwaarde: Geen
cloudFiles.schemaEvolutionMode

Type: String

De modus voor het ontwikkelen van het schema als nieuwe kolommen worden in de gegevens ontdekt. Standaard worden kolommen afgeleid als tekenreeksen bij het afleiden van JSON-gegevenssets. Zie schemaontwikkeling voor meer informatie.

Standaardwaarde: "addNewColumns" wanneer er geen schema wordt opgegeven.
"none" Anders.
cloudFiles.schemaHints

Type: String

Schema-informatie die u tijdens schemadeferentie aan automatisch laadt. Zie schemahints voor meer informatie.

Standaardwaarde: Geen
cloudFiles.schemaLocation

Type: String

De locatie voor het opslaan van het afgeleide schema en latere wijzigingen. Zie schemadeferentie voor meer informatie.

Standaardwaarde: Geen (vereist bij het afleiden van het schema)
cloudFiles.validateOptions

Type: Boolean

Of u opties voor automatisch laden wilt valideren en een fout retourneert voor onbekende of inconsistente opties.

Standaardwaarde: true

(1) Auto Loader voegt standaard de volgende sleutel-waardetagparen toe op basis van best-effort:

  • vendor: Databricks
  • path: De locatie van waar de gegevens worden geladen.
  • checkpointLocation: De locatie van het controlepunt van de stream.
  • streamId: Een wereldwijd unieke id voor de stroom.

Deze sleutelnamen zijn gereserveerd en u kunt hun waarden niet overschrijven.

Kiezen maxFileAge

Notitie

Beschikbaar in Databricks Runtime 8.4 en hoger.

Auto Loader houdt de gevonden bestanden op de controlepuntlocatie bij met behulp van RocksDB om exactly-once opnamegaranties te bieden. Voor gegevenssets met een groot volume kunt u de optie maxFileAge gebruiken om gebeurtenissen te laten verlopen vanaf de locatie van het controlepunt. De minimumwaarde die u kunt instellen maxFileAge voor is "14 days" . Verwijderen in RocksDB worden weergegeven als tombstone-vermeldingen. Daarom moet u ervan uit gaan dat het opslaggebruik toeneemt naarmate gebeurtenissen verlopen voordat het niveau ervan afneemt.

Waarschuwing

maxFileAge wordt geleverd als een mechanisme voor kostenbeheer voor gegevenssets met een hoog volume, die elk uur in de orde van miljoenen bestanden worden opgenomen. Onjuiste maxFileAge afstemming kan leiden tot problemen met de kwaliteit van de gegevens. Daarom raadt Databricks het afstemmen van deze parameter aan, tenzij dit absoluut vereist is.

Het afstemmen van de optie kan ertoe leiden dat niet-verwerkte bestanden worden genegeerd door automatisch laden of dat al verwerkte bestanden verlopen en vervolgens opnieuw worden verwerkt, waardoor dubbele maxFileAge gegevens ontstaan. Hier zijn enkele dingen om rekening mee te houden bij het kiezen van een maxFileAge :

  • Als uw stream na lange tijd opnieuw wordt opgestart, worden gebeurtenissen voor bestandsmeldingen die ouder zijn dan uit de wachtrij maxFileAge gehaald, genegeerd. Als u directoryvermelding gebruikt, worden bestanden die mogelijk zijn verschenen tijdens de down time die ouder zijn dan maxFileAge genegeerd.
  • Als u de directorylijstmodus gebruikt en gebruikt, bijvoorbeeld ingesteld op , stopt u de stream en start u de stream opnieuw op met ingesteld op , alle bestanden die ouder zijn dan maxFileAge "1 month" maxFileAge 1 maand, maar die meer recent zijn dan 2 maanden, worden opnieuw "2 months" verwerkt.

De beste manier om af te stemmen is om te beginnen met een verlopen verloopdatum, bijvoorbeeld en omlaag maxFileAge te werken naar iets als "1 year" "9 months" . Als u deze optie de eerste keer dat u de stream start in stelt, neemt u geen gegevens op die ouder zijn dan . Als u dus oude gegevens wilt opnemen, moet u deze optie niet instellen wanneer u de maxFileAge stream start.

Opties voor bestandsmeldingen

De volgende opties zijn relevant voor de bestandsmeldingsmodus.

Optie
cloudFiles.backfillInterval

Type: Interval String

Automatisch laden kan asynchrone backfills activeren met een bepaald interval,
bijvoorbeeld één keer per dag invullen of één 1 day 1 week keer per week invullen. Meldingssystemen voor bestandsgebeurtenissen bieden geen garantie voor 100% levering van alle bestanden die zijn geüpload. Daarom kunt u backfills gebruiken om ervoor te zorgen dat alle bestanden uiteindelijk worden verwerkt.

Standaardwaarde: Geen
cloudFiles.fetchParallelism

Type: Integer

Het aantal threads dat moet worden gebruikt bij het ophalen van berichten uit de wachtrijservice.

Standaardwaarde: 1
cloudFiles.pathRewrites

Type: een JSON-tekenreeks

Alleen vereist als u een opgeeft die bestandsmeldingen ontvangt van meerdere S3-buckets en u gebruik wilt maken van de bevestigingspunten die zijn geconfigureerd voor toegang tot gegevens queueUrl in deze containers. Gebruik deze optie om het voorvoegsel van het pad met bucket/key het bevestigingspunt te herschrijven. Alleen voorvoegsels kunnen worden herschreven. Bijvoorbeeld voor de configuratie
{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, het pad
s3://<databricks-mounted-bucket>/path/2017/08/fileA.json wordt herschreven naar dbfs:/mnt/data-warehouse/2017/08/fileA.json .

Standaardwaarde: Geen
cloudFiles.queueUrl

Type: String

De URL van de SQS-wachtrij. Indien opgegeven, verbruikt de bron van cloudbestanden rechtstreeks gebeurtenissen uit deze wachtrij in plaats van zijn eigen AWS SNS- en SQS-services in te stellen.

Standaardwaarde: Geen
cloudFiles.useNotifications

Type: Boolean

Bepaalt of de modus voor bestandsmeldingen moet worden gebruikt om te bepalen wanneer er nieuwe bestanden zijn. Als false , gebruikt u de mapvermeldingsmodus. Zie Bestandsdetectiemodi.

Standaardwaarde: false

Geef de volgende optie alleen op als u wilt dat Automatisch laden de cloudFiles.useNotifications = true meldingsservices voor u in stelt:

Optie
cloudFiles.region

Type: String

De regio waar de bron-S3-bucket zich bevindt en waar de AWS SNS- en SQS-services worden gemaakt.

Standaardwaarde: In Databricks Runtime 9.0 en hoger dan de regio van de EC2-instantie. In Databricks Runtime 8.4 en lager moet u de regio opgeven.

U kunt de volgende opties gebruiken om referenties op te geven voor toegang tot AWS SNS en SQS wanneer IAM-rollen niet beschikbaar zijn of wanneer u gegevens van verschillende clouds op neemt.

Optie
cloudFiles.awsAccessKey

Type: String

De AWS-toegangssleutel-id voor de gebruiker. Moet worden opgegeven met
cloudFiles.awsSecretKey.

Standaardwaarde: Geen
cloudFiles.awsSecretKey

Type: String

De geheime toegangssleutel van AWS voor de gebruiker. Moet worden opgegeven met
cloudFiles.awsAccessKey.

Standaardwaarde: Geen
cloudFiles.roleArn

Type: String

De ARN van een IAM-rol die moet worden aangenomen. De rol kan worden aangenomen uit het exemplaarprofiel van uw cluster of door referenties op te geven met
cloudFiles.awsAccessKey en cloudFiles.awsSecretKey .

Standaardwaarde: Geen
cloudFiles.roleExternalId

Type: String

Een id die moet worden verstrekt bij het aannemen van een rol met behulp van cloudFiles.roleArn .

Standaardwaarde: Geen
cloudFiles.roleSessionName

Type: String

Een optionele sessienaam die moet worden gebruikt als een rol wordt gebruikt
cloudFiles.roleArn.

Standaardwaarde: Geen
cloudFiles.stsEndpoint

Type: String

Een optioneel eindpunt voor toegang tot AWS STS wanneer een rol wordt aangenomen met behulp van cloudFiles.roleArn .

Standaardwaarde: Geen

Wijzigingen in standaardoptiewaarden en compatibiliteit

De standaardwaarden van de volgende opties voor automatisch laden zijn in Databricks Runtime 7.2 gewijzigd in de waarden die worden vermeld in Configuratie.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Stromen voor automatisch laden die zijn gestart op Databricks Runtime 7.1 en lager hebben de volgende standaardoptiewaarden:

  • cloudFiles.useNotifications is true
  • cloudFiles.includeExistingFiles is false
  • cloudFiles.validateOptions is false

Om compatibiliteit met bestaande toepassingen te garanderen, worden deze standaardoptiewaarden niet gewijzigd wanneer u uw bestaande stromen voor automatisch laden op Databricks Runtime 7.2 of hoger; de streams hebben hetzelfde gedrag na de upgrade.

Machtigingen

U moet leesmachtigingen hebben voor de invoermap. Zie S3-verbindingsgegevens voor meer informatie.

Als u de bestandsmeldingsmodus wilt gebruiken, koppelt u het volgende JSON-beleidsdocument aan uw IAM-gebruiker of -rol.

Als u de machtigingen die zijn opgegeven in het JSON-beleidsdocument niet kunt instellen, kunt u eventueel een beheerder vragen om de installatie voor u uit te voeren met behulp van de Scala API voor cloudresourcebeheer. Een beheerder kan u voorzien van de wachtrij-URL, die u rechtstreeks kunt opgeven .option("queueUrl", <queue-url>) voor de cloudFiles bron. Met deze configuratie hebt u alleen beperkte machtigingen nodig. Zie bijlage: beperkte machtigingen na de eerste installatie voor meer informatie.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

Hierbij

  • <bucket-name>: De naam van de S3-bucket waarin uw stream bestanden leest, bijvoorbeeld auto-logs . U kunt gebruiken * als jokerteken, bijvoorbeeld databricks-*-logs . Als u de onderliggende S3-bucket voor uw DBFS-pad wilt weten, kunt u alle DBFS-bevestigingspunten in een notebook in een lijst zien door uit te %fs mounts gaan.
  • <region>: de AWS-regio waarin de S3-bucket zich bevindt, bijvoorbeeld us-west-2 . Als u de regio niet wilt opgeven, gebruikt u * .
  • <account-number>: Het AWS-accountnummer dat eigenaar is van de S3-bucket, bijvoorbeeld 123456789012 . Als u het accountnummer niet wilt opgeven, gebruikt u * .

De tekenreeks in de SQS- en SNS ARN-specificatie is het naamsprefix dat de bron gebruikt bij het maken van databricks-auto-ingest-* cloudFiles SQS- en SNS-services. Aangezien Azure Databricks notification services in de eerste keer dat de stream wordt uitgevoerd in stelt, kunt u een beleid met beperkte machtigingen gebruiken na de eerste run (bijvoorbeeld de stream stoppen en opnieuw starten). Zie bijlage: beperkte machtigingen na de eerste installatie voor meer informatie.

Notitie

Het voorgaande beleid heeft alleen betrekking op de machtigingen die nodig zijn voor het instellen van bestandsmeldingsservices, namelijk S3-bucketmeldingen, SNS- en SQS-services, en gaat ervan uit dat u al leestoegang tot de S3-bucket hebt. Als u alleen-lezen machtigingen voor S3 wilt toevoegen, voegt u het volgende toe aan de lijst in de Action DatabricksAutoLoaderSetup instructie in het JSON-document:

  • s3:ListBucket
  • s3:GetObject

Gegevens veilig opnemen in een ander AWS-account

Automatisch laden kan gegevens laden tussen AWS-accounts door een IAM-rol aan te nemen. Nadat u de tijdelijke beveiligingsreferenties hebt gemaakt door , kunt u cloudbestanden automatisch laden AssumeRole voor meerdere accounts laten laden. Volg het document voor het instellen van de automatische loader voor cross-AWS-accounts: _. Zorg ervoor dat u:

  • Controleer of de metarol AssumeRole is toegewezen aan het cluster.

  • Configureer de Spark-configuratie van het cluster om de volgende eigenschappen op te nemen:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

Metrische gegevens

Auto Loader rapporteert metrische gegevens voor elke batch. U kunt op het tabblad Onbewerkte gegevens in het voortgangsdashboard van de streamingquery zien hoeveel bestanden er in de achterstand staan en hoe groot de achterstand numFilesOutstanding numBytesOutstanding is:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

Cloudresourcebeheer

U kunt een Scala-API gebruiken voor het beheren van de AWS SNS- en SQS-services die zijn gemaakt door Auto Loader. U moet de machtigingen voor het instellen van resources configureren die worden beschreven in Machtigingen voordat u deze API gebruikt.

Belangrijk

Als u Auto Loader in Databricks Runtime 7.1 en lager hebt gebruikt, moet u uw IAM-beleid bijwerken met behulp van het JSON-beleidsdocument in Machtigingen. Er zijn nieuwe instructies in het beleid voor Databricks Runtime 7.2 en , die de aanvullende machtigingen opgeven die vereist zijn DatabricksAutoLoaderList DatabricksAutoLoaderTeardown voor de Scala-API.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

// Set up an SQS queue and a topic subscribed to the path provided in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Notitie

Beschikbaar in Databricks Runtime 7.4 en hoger.

Gebruik setUpNotificationServices(<resource-suffix>) om een SQS-wachtrij en een SNS-onderwerp met de naam te databricks-auto-ingest-<resource-suffix> maken. Als er een bestaande SQS-wachtrij of SNS-onderwerp met dezelfde naam is, gebruikt Azure Databricks de resource die al bestaat, opnieuw in plaats van een nieuwe te maken. Deze functie retourneert een SQS-wachtrij die u kunt doorgeven aan de cloudFiles bron met behulp van .option("cloudFiles.queueUrl", <queue-url>) . Hierdoor heeft de cloudFiles brongebruiker minder machtigingen dan de gebruiker die de resources maakt. Zie Machtigingen.

Geef de "path" optie alleen op als u newManager setUpNotificationServices aanroept; dit is niet nodig voor listNotificationServices of tearDownNotificationServices . Dit is hetzelfde dat path u gebruikt bij het uitvoeren van een streamingquery.

Veelgestelde vragen

Moet ik van tevoren AWS-services voor gebeurtenismeldingen maken?

Nee. Als u de bestandsmeldingsmodus kiest, wordt er automatisch een meldingspijplijn voor de gebeurtenismelding AWS S3 > SNS Topic > SQS Queue file gemaakt wanneer u de stream start.

Hoe kan ik resources voor gebeurtenismeldingen, zoals SNS-onderwerpen en SQS-wachtrijen, die zijn gemaakt door auto loader op te schonen?

U kunt cloudresourcebeheer gebruiken om resources weer te maken en te afbreken. U kunt deze resources ook handmatig verwijderen, in de webconsole of met behulp van AWS-API's. Alle resources die zijn gemaakt door auto loader hebben het voorvoegsel : databricks-auto-ingest- .

Verwerkt het automatisch laadproces het bestand opnieuw wanneer het bestand wordt toegevoegd of overschreven?

Bestanden worden exact één keer verwerkt, tenzij u cloudFiles.allowOverwrites inschakelen. Als een bestand wordt toegevoegd aan of overschreven, garandeert Databricks niet welke versie van het bestand wordt verwerkt. Voor goed gedefinieerd gedrag raadt Databricks u aan om Automatisch laden te gebruiken om alleen onveranderbare bestanden op te nemen. Als dit niet aan uw vereisten voldoet, neemt u contact op met uw Databricks-vertegenwoordiger.

Kan ik meerdere streamingquery's uitvoeren vanuit dezelfde invoermap?

Ja. Elke cloudbestandenstroom, zoals geïdentificeerd door een unieke controlepuntmap, heeft een eigen SQS-wachtrij en dezelfde AWS S3-gebeurtenissen kunnen worden verzonden naar meerdere SQS-wachtrijen.

Als mijn gegevensbestanden niet continu binnenkomen, maar met regelmatige tussenpozen, bijvoorbeeld eenmaal per dag, moet ik deze bron dan nog steeds gebruiken en zijn er voordelen?

Ja en ja. In dit geval kunt u een Structured Streaming-taak instellen en plannen om te worden uitgevoerd na de verwachte aankomsttijd Trigger-Once van het bestand. Bij de eerste run worden de services voor gebeurtenismeldingen, die altijd worden uitgevoerd, ook als het streamingcluster niet beschikbaar is, in gebruik. Wanneer u de stream opnieuw opstart, haalt de bron alle bestandsgebeurtenissen op en verwerkt deze die in cloudFiles de SQS-wachtrij worden gemaakt. Het voordeel van het gebruik van automatische loader in dit geval is dat u niet hoeft te bepalen welke bestanden nieuw zijn en elke keer moeten worden verwerkt, wat erg duur kan zijn.

Wat gebeurt er als ik de locatie van het controlepunt wijzig bij het opnieuw starten van de stream?

Een controlepuntlocatie onderhoudt belangrijke identificatiegegevens van een stroom. Als u de locatie van het controlepunt effectief verandert, betekent dit dat u de vorige stroom hebt verlaten en een nieuwe stroom hebt gestart. De nieuwe stream maakt nieuwe voortgangsinformatie en als u de bestandsmeldingsmodus gebruikt, worden er nieuwe AWS SNS- en SQS-services gemaakt. U moet de controlepuntlocatie en AWS SNS- en SQS-services handmatig ops schonen voor alle verlaten stromen.

Kan ik meerdere streamingquery's uitvoeren vanuit verschillende invoerdirecties in dezelfde S3-bucket?

Ja, zolang het geen bovenliggende/onderliggende directories zijn, prod-logs/ bijvoorbeeld en prod-logs/usage/ .

Kan ik deze functie gebruiken wanneer er bestaande bestandsmeldingen in mijn S3-bucket staan?

Ja, zolang uw invoermap geen conflict veroorzaakt met het bestaande voorvoegsel voor meldingen (bijvoorbeeld de bovenstaande bovenliggende en onderliggende mappen).

Bijlage: beperkte machtigingen na de eerste installatie

De machtigingen voor het instellen van resources die worden beschreven in Machtigingen zijn alleen vereist tijdens de eerste keer dat de stream wordt uitgevoerd. Na de eerste keer uitvoeren kunt u overschakelen naar het volgende IAM-beleid met beperkte machtigingen.

Belangrijk

Met de beperkte machtigingen kunt u geen nieuwe streamingquery's starten of resources opnieuw maken in het geval van fouten (de SQS-wachtrij is bijvoorbeeld per ongeluk verwijderd); U kunt ook de API voor cloudresourcebeheer niet gebruiken om resources weer te bieden of te afbreken.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}