Läsa in filer från AWS S3 med Auto Loader

Auto Loader inkrementellt och effektivt bearbetar nya datafiler när de tas emot i AWS S3 ( s3:// ).

Auto Loader tillhandahåller en Structured Streaming-källa med namnet cloudFiles . Med en sökväg till indatakatalogen i molnfillagringen bearbetar källan automatiskt nya filer när de tas emot, med möjlighet att även bearbeta befintliga cloudFiles filer i katalogen.

Auto Loader fungerar med DBFS-sökvägar samt direkta sökvägar till datakällan.

Krav

Databricks Runtime 7.2 eller högre.

Om du har skapat strömmar med Databricks Runtime 7.1 eller lägre kan du se Ändringar i standardalternativvärden och kompatibilitet och Molnresurshantering.

Lägen för filidentifiering

Auto Loader stöder två lägen för att identifiera när det finns nya filer: kataloglista och filavisering.

  • Kataloglista:Identifierar nya filer genom att visa en lista över indatakatalogen. Kataloglistningsläge gör att du snabbt kan starta Auto Loader-strömmar utan några andra behörighetskonfigurationer än åtkomst till dina data på AWS S3 och är lämpligt för scenarier där endast ett fåtal filer behöver strömmas regelbundet. Kataloglistningsläge är standard för Auto Loader i Databricks Runtime 7.2 och högre.
  • Filmeddelande:Använder AWS SNS- och SQS-tjänster som prenumererar på filhändelser från indatakatalogen. Auto Loader uppsättningar automatiskt AWS SNS- och SQS-tjänster. Filmeddelandeläget är mer prestanda- och skalbart för stora indatakataloger. Om du vill använda det här läget måste du konfigurera behörigheter för AWS SNS- och SQS-tjänsterna och ange .

Du kan ändra läge när du startar om dataströmmen. Du kanske till exempel vill växla till filmeddelandeläge när kataloglistan blir för långsam på grund av den ökade storleken på indatakatalogen. För båda lägena håller Auto Loader internt reda på vilka filer som har bearbetats på platsen för strömningskontrollpunkten för att tillhandahålla exakt en gång-semantik, så du behöver inte hantera någon tillståndsinformation själv.

Använda cloudFiles källa

Om du vill använda Auto Loader skapar du cloudFiles en källa på samma sätt som andra strömmande källor. Koden nedan startar en autoinläsareström som skrivs till Delta Lake i kataloglistningsläge:

Python

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .trigger(<trigger>) \
  .start(<output-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(<trigger>)
  .start(<output-path>)

där:

  • <cloudFiles-option> är ett konfigurationsalternativ i <cloudFiles-option>.

  • <schema> är filschemat. Auto Loader stöder även schemainferens och utveckling med vissa filformat. Mer information finns i Schemaferens och utveckling

  • <input-path> är sökvägen i lagringen som övervakas för nya filer. Underordnade kataloger i <input-path> övervakas också. <input-path> kan innehålla filglobmönster. Glob-mönstret har lagts till i det. Om det omfattar filer som du inte vill mata in kan du inkludera ytterligare * ett filter via alternativet pathGlobFilter . Om du tillhandahåller en kö för filmeddelanden och inte behöver fylla i några data igen behöver du inte ange en sökväg för indata.

  • <checkpoint-path> är dataströmmens kontrollpunktsplats.

  • <trigger> En valfri utlösare för dataströmmen. Standardvärdet är att köra nästa mikrobatch så snabbt som möjligt. Om du har data som kommer med jämna mellanrum, till exempel en gång om dagen, kan du använda och schemalägga körningen av dina dataströmmar i Trigger.Once ett Azure Databricks jobb. För Databricks Runtime 10.1 och Databricks Runtime 10.1 Photon och högre har Auto Loader nu stöd för en ny typ av utlösare: för både kataloglistning och filmeddelandelägen. Trigger.AvailableNow ger samma garantier som Trigger.Once , som bearbetar alla tillgängliga data och stoppar sedan frågan. Men Trigger.AvailableNow kan utföra hastighetsbegränsning och dela upp arbetet över flera batchar, därför rekommenderas i stället för Trigger.Once . För dataströmmar som alltid är igång rekommenderar Databricks att du ställer in en utlösare för bearbetningstid.

  • <output-path> är sökvägen till utdataströmmen.

Fördelar med Apache Spark FileStreamSource

I Apache Spark kan du läsa filer inkrementellt med hjälp av spark.readStream.format(fileFormat).load(directory) . Auto Loader ger följande fördelar över filkällan:

  • Skalbarhet: Auto Loader kan effektivt identifiera miljarder filer. Återfyllningar kan utföras asynkront för att undvika att slösa beräkningsresurser.
  • Prestanda: Kostnaden för att identifiera filer med Auto Loader skalas med antalet filer som matas in i stället för det antal kataloger som filerna kan komma att finnas i. Se Optimerad kataloglista.
  • Stöd för schemainferens och utveckling: Auto Loader kan identifiera schemaförändringar, meddela dig när schemaändringar sker och hjälpdata som annars skulle ha ignorerats eller gått förlorade. Se Schemaferens och utveckling.
  • Kostnad: Auto Loader använder interna moln-API:er för att hämta listor över filer som finns i lagringen. Dessutom kan autoinläsarens filmeddelandeläge hjälpa dig att minska dina molnkostnader ytterligare genom att undvika kataloglistor helt och hållet. Auto Loader kan automatiskt konfigurera filaviseringstjänster på lagring för att göra filidentifiering mycket billigare.

Optimerad kataloglista

Anteckning

Tillgänglig i Databricks Runtime 9.0 och högre.

Auto Loader kan identifiera filer i molnlagringssystem med hjälp av kataloglistor effektivare än andra alternativ. Om du till exempel har filer som laddas upp var 5:e minut som , för att hitta alla filer i dessa kataloger, skulle Apache Spark-filkällan lista alla underkataloger parallellt, vilket orsakar /some/path/YYYY/MM/DD/HH/fileName 1 (baskatalog) + 365 (per dag) * 24 (per timme) = 8761 LISTA API-katalog-anrop till lagring. Genom att ta emot ett utplattat svar från lagringen minskar Auto Loader antalet API-anrop till antalet filer i lagringen dividerat med antalet resultat som returneras av varje API-anrop, vilket avsevärt minskar dina molnkostnader.

Inkrementell lista

Viktigt

Den här funktionen finns som allmänt tillgänglig förhandsversion.

För lexikographically genererade filer kan Auto Loader nu använda lexikal filordning och befintliga optimerade API:er för att förbättra effektiviteten i kataloglistan genom att lista från tidigare inmatade filer i stället för att visa hela katalogen.

Som standard identifierar Auto Loader automatiskt om en viss katalog är tillämplig för den inkrementella listan genom att kontrollera och jämföra sökvägar för tidigare slutförda fullständiga kataloglistor. För att säkerställa eventuell fullständighet i det här läget utlöser Auto Loader automatiskt den fullständiga kataloglistan efter att ha slutfört 7 på auto varandra följande inkrementella listor. Om du vill vara mer frekvent eller mindre frekvent kan du ange att cloudFiles.backfillInterval utlösa asynkrona återfyllningar vid ett visst intervall.

Om du har förtroende för ordningen på filer som genereras i katalogen kan du uttryckligen aktivera eller inaktivera läget för inkrementell lista genom att ange eller cloudFiles.useIncrementalListingtruefalse (standard auto ), t.ex. filer som sorteras efter partitioner kan anses vara lexikalt sorterade om data bearbetas en gång om dagen, filsökvägar som innehåller date=... tidsstämplar kan anses vara lexikalt sorterade. Du kan alltid använda cloudFiles.backfillInterval för att säkerställa att alla data matas in när du aktiverar den inkrementella listan.

Schemaferens och utveckling

Anteckning

Tillgänglig i Databricks Runtime 8.2 och högre.

Auto Loader stöder schemainferens och utveckling med filformaten CSV, JSON, binary ( binaryFile ) och text. Mer information finns i Schemainferens och utveckling i Auto Loader.

Köra Auto Loader i produktion

Databricks rekommenderar att du följer strömningsmetoderna för att köra Auto Loader i produktion.

Konfiguration

Konfigurationsalternativ som är specifika för källan föregås av så att de finns cloudFiles i ett separat cloudFiles namnområde från andra alternativ för Structured Streaming-källor.

Viktigt

Vissa standardalternativvärden ändrades i Databricks Runtime 7.2. Om du använder Auto Loader på Databricks Runtime 7.1 eller lägre, se Ändringar i standardalternativvärden och kompatibilitet.

Filformatsalternativ

Med Auto Loader kan du mata in JSON , , , , , och CSVPARQUETAVROTEXTBINARYFILEORC filer. Se Formatalternativ för alternativen för dessa filformat.

Vanliga alternativ för automatisk inläsare

Du kan konfigurera följande alternativ för kataloglista eller filmeddelandeläge.

Alternativ
cloudFiles.allowOverwrites

Typ: Boolean

Om du vill tillåta ändringar i indatakatalogfilen för att skriva över befintliga data. Tillgänglig i Databricks Runtime 7.6 och högre.

Standardvärdet: false
cloudFiles.format

Typ: String

Datafilformatet i källsökvägen. Tillåtna värden är:

* avro: *
* binaryFile: *
* csv: *
* json: *
* orc: ORC-fil
* parquet: *
* text: Textfil

Standardvärde: Inget (obligatoriskt alternativ)
cloudFiles.includeExistingFiles

Typ: Boolean

Om du vill inkludera befintliga filer i indatasökvägen för dataströmbearbetning eller endast bearbeta nya filer som kommer efter den första installationen. Det här alternativet utvärderas bara när du startar en dataström för första gången. Det har ingen effekt att ändra det här alternativet när du har startat om dataströmmen.

Standardvärdet: true
cloudFiles.inferColumnTypes

Typ: Boolean

Om du vill dra slutsatser om exakta kolumntyper när du använder schemaferens. Som standard härförs kolumner som strängar när JSON-datauppsättningar härförs. Se schemaferens för mer information.

Standardvärdet: false
cloudFiles.maxBytesPerTrigger

Typ: Byte String

Det maximala antalet nya byte som ska bearbetas i varje utlösare. Du kan ange en bytesträng, till 10g exempel för att begränsa varje mikrobatch till 10 GB data. Det här är ett mjukt maxbelopp. Om du har filer på 3 GB vardera Azure Databricks 12 GB i en mikrobatch. När den används tillsammans cloudFiles.maxFilesPerTrigger med Azure Databricks upp till den nedre gränsen på eller , beroende på vilket som nås cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger först. Det här alternativet har ingen effekt när det används med Trigger.Once() .

Standardvärde: Ingen
cloudFiles.maxFileAge

Typ: Interval String

Hur länge en filhändelse spåras i dedupliceringssyfte. Databricks rekommenderar inte justering av den här parametern om du inte matar in data i miljontals filer i timmen. Mer information finns i avsnittet Så här väljer du maxFileAge.

Standardvärde: Ingen
cloudFiles.resourceTags

Typ: Map(String, String)

En serie med nyckel/värde-taggpar som hjälper dig att associera och identifiera relaterade resurser, till exempel:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

Mer information finns i Namngivning av köer och metadata och täckningen för i Händelseprenumerationer. Auto Loader lagrar dessa nyckel/värde-taggpar i JSON som etiketter. (1)

Standardvärde: Ingen
cloudFiles.schemaEvolutionMode

Typ: String

Läget för att utveckla schemat när nya kolumner identifieras i data. Som standard härförs kolumner som strängar när JSON-datauppsättningar härförs. Se schemautveckling för mer information.

Standardvärde: "addNewColumns" när ett schema inte anges.
"none" Annars.
cloudFiles.schemaHints

Typ: String

Schemainformation som du anger för Auto Loader under schemainferens. Se schematips för mer information.

Standardvärde: Ingen
cloudFiles.schemaLocation

Typ: String

Platsen för att lagra det härdade schemat och efterföljande ändringar. Se schemaferens för mer information.

Standardvärde: Inget (krävs vid härring av schemat)
cloudFiles.validateOptions

Typ: Boolean

Om du vill validera alternativen för Auto Loader och returnera ett fel för okända eller inkonsekventa alternativ.

Standardvärdet: true
cloudFiles.backfillInterval

>[! VIKTIGT] >> Den här funktionen är i >

Typ: Interval String

Auto Loader kan utlösa asynkrona återfyllningar vid ett visst intervall,
Det kan till exempel 1 day vara att återfylla en gång om dagen eller 1 week att återfylla en gång i veckan. Filhändelsemeddelandesystem garanterar inte 100 % leverans av alla filer som har laddats upp. Därför kan du använda återfyllningar för att garantera att alla filer så småningom bearbetas, tillgängliga i Databricks Runtime 8.4 och Databricks Runtime 8.4 Photon och senare. Om du använder den inkrementella listan kan du också använda vanliga återfyllningar för att garantera slutlig fullständighet, som finns i Databricks Runtime 9.1 LTS och Databricks Runtime 9.1 LTS Photon och senare.

Standardvärde: Ingen

(1) Auto Loader lägger till följande nyckel/värde-taggpar som standard efter bästa möjliga resultat:

  • vendor: Databricks
  • path: Den plats som data läses in från.
  • checkpointLocation: Platsen för dataströmmens kontrollpunkt.
  • streamId: En globalt unik identifierare för dataströmmen.

Dessa nyckelnamn är reserverade och du kan inte skriva över deras värden.

Alternativ för kataloglistor

Följande alternativ är relevanta för kataloglistningsläge.

Alternativ
cloudFiles.useIncrementalListing

>[! VIKTIGT] >> Den här funktionen är i >

Typ: String

Om du vill använda den inkrementella listan i stället för den fullständiga listan i kataloglistningsläge. Som standard gör Auto Loader bästa möjliga försök att automatiskt identifiera om en viss katalog är tillämplig för den inkrementella listan. Du kan uttryckligen använda den inkrementella listan eller använda den fullständiga kataloglistan genom att ange true den false som eller .

Tillgänglig i Databricks Runtime 9.1 LTS och Databricks Runtime 9.1 LTS Photon och högre.

Standardvärdet: auto

Tillgängliga värden: auto , true , false

Så här väljer du maxFileAge

Anteckning

Tillgängligt i Databricks Runtime 8.4 och högre.

Auto Loader håller reda på identifierade filer på kontrollpunktsplatsen med hjälp av RocksDB för att tillhandahålla inmatningsgarantier exakt en gång. För datauppsättningar med stora volymer kan du använda maxFileAge alternativet för att förfalla händelser från kontrollpunktsplatsen. Det minsta värde som du kan ange för maxFileAge är "14 days" . Borttagningar i RocksDB visas som tombstone-poster. Därför bör du förvänta dig att lagringsanvändningen ökar när händelser upphör att gälla innan de börjar plana ut.

Varning

maxFileAge tillhandahålls som en kostnadskontrollmekanism för datauppsättningar med stora volymer och matas in i miljontals filer varje timme. Felaktig maxFileAge justering kan leda till problem med datakvaliteten. Därför rekommenderar Databricks inte att du finjusterar den här parametern om det inte är absolut nödvändigt.

Om du försöker finjustera alternativet kan det leda till att obearbetade filer ignoreras av Auto Loader eller att redan bearbetade filer upphör att gälla och sedan bearbetas på samma sätt, vilket maxFileAge orsakar duplicerade data. Här är några saker att tänka på när du väljer en maxFileAge :

  • Om dataströmmen startas om efter lång tid ignoreras filaviseringshändelser som hämtas från maxFileAge kön. På samma sätt, om du använder kataloglista, kommer filer som kan ha visats under driftstiden som är äldre maxFileAge än ignoreras.
  • Om du använder kataloglistningsläge och använder , till exempel inställt på , stoppar du dataströmmen och startar om dataströmmen med inställt på , alla filer som är äldre än 1 månad, men senare än 2 månader maxFileAge"1 month"maxFileAge"2 months" bearbetas om.

Det bästa sättet att justera är att till exempel utgå från en förfallotid och maxFileAge att arbeta nedåt till något som liknar "1 year""9 months" . Om du anger det här alternativet första gången du startar dataströmmen matar du därför inte in data som är äldre än . Om du vill mata in gamla data ska du därför inte ange det här alternativet när du startar maxFileAge strömmen.

Alternativ för filavisering

Följande alternativ är relevanta för filmeddelandeläge.

Alternativ
cloudFiles.fetchParallelism

Typ: Integer

Antal trådar som ska användas vid hämtning av meddelanden från kötjänsten.

Standardvärde: 1
cloudFiles.pathRewrites

Typ: En JSON-sträng

Krävs endast om du anger en som tar emot filmeddelanden från flera S3-buckets och du vill utnyttja monteringspunkter som konfigurerats för åtkomst till queueUrl data i dessa containrar. Använd det här alternativet för att skriva om prefixet för bucket/key sökvägen med monteringspunkten. Endast prefix kan skrivas om. Till exempel för konfigurationen
{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, sökvägen
s3://<databricks-mounted-bucket>/path/2017/08/fileA.json skrivs om till dbfs:/mnt/data-warehouse/2017/08/fileA.json .

Standardvärde: Ingen
cloudFiles.queueUrl

Typ: String

URL:en för SQS-kön. Om det tillhandahålls använder molnfilkällan direkt händelser från den här kön i stället för att konfigurera egna AWS SNS- och SQS-tjänster.

Standardvärde: Ingen
cloudFiles.useNotifications

Typ: Boolean

Om du vill använda filmeddelandeläge för att avgöra när det finns nya filer. Om false använder du kataloglistningsläget. Se Filidentifieringslägen.

Standardvärdet: false

Ange endast följande alternativ om du väljer cloudFiles.useNotifications = true och du vill att Auto Loader ska konfigurera meddelandetjänsterna åt dig:

Alternativ
cloudFiles.region

Typ: String

Den region där käll-S3-bucketen finns och där AWS SNS- och SQS-tjänsterna kommer att skapas.

Standardvärde: I Databricks Runtime 9.0 och över regionen för EC2-instansen. I Databricks Runtime 8.4 och lägre måste du ange regionen.

Du kan använda följande alternativ för att ange autentiseringsuppgifter för åtkomst till AWS SNS och SQS när IAM-roller inte är tillgängliga eller när du matar in data från olika moln.

Alternativ
cloudFiles.awsAccessKey

Typ: String

AWS-åtkomstnyckelns ID för användaren. Måste anges med
cloudFiles.awsSecretKey.

Standardvärde: Ingen
cloudFiles.awsSecretKey

Typ: String

Den hemliga åtkomstnyckeln för AWS för användaren. Måste anges med
cloudFiles.awsAccessKey.

Standardvärde: Ingen
cloudFiles.roleArn

Typ: String

ARN för en IAM-roll som ska antas. Rollen kan antas från klustrets instansprofil eller genom att ange autentiseringsuppgifter med
cloudFiles.awsAccessKey och cloudFiles.awsSecretKey.

Standardvärde: Ingen
cloudFiles.roleExternalId

Typ: String

En identifierare som ska anges när du antar en roll med hjälp av cloudFiles.roleArn .

Standardvärde: Ingen
cloudFiles.roleSessionName

Typ: String

Ett valfritt sessionsnamn som ska användas när en roll används
cloudFiles.roleArn.

Standardvärde: Ingen
cloudFiles.stsEndpoint

Typ: String

En valfri slutpunkt för åtkomst till AWS STS när du antar en roll med hjälp av cloudFiles.roleArn .

Standardvärde: Ingen

Ändringar i standardalternativvärden och kompatibilitet

Standardvärdena för följande alternativ för Automatisk inläsare har ändrats i Databricks Runtime 7.2 till de värden som anges i Konfiguration.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Auto Loader-strömmar som startats Databricks Runtime 7.1 och lägre har följande standardalternativvärden:

  • cloudFiles.useNotifications är true
  • cloudFiles.includeExistingFiles är false
  • cloudFiles.validateOptions är false

För att säkerställa kompatibilitet med befintliga program ändras inte dessa standardalternativvärden när du kör dina befintliga Auto Loader-strömmar på Databricks Runtime 7.2 eller högre. dataströmmarna har samma beteende efter uppgraderingen.

Behörigheter

Du måste ha läsbehörighet för indatakatalogen. Mer information finns i S3-anslutningsinformation.

Om du vill använda filmeddelandeläge bifogar du följande JSON-principdokument till din IAM-användare eller -roll.

Om du inte kan konfigurera de behörigheter som anges i JSON-principdokumentet kan du även be en administratör att utföra installationen åt dig med hjälp av Scala API för molnresurshantering. En administratör kan ge dig kö-URL:en, som du kan ange .option("queueUrl", <queue-url>) direkt för cloudFiles källan. Med den här konfigurationen behöver du bara minskade behörigheter. Mer information finns i Bilaga: Minskade behörigheter efter den första installationen.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

där:

  • <bucket-name>: S3-bucketnamnet där dataströmmen ska läsa filer, till exempel auto-logs . Du kan använda * som jokertecken, till exempel databricks-*-logs . Om du vill ta reda på den underliggande S3-bucketen för DBFS-sökvägen kan du visa en lista över alla DBFS-monteringspunkter i en notebook-fil genom att köra %fs mounts .
  • <region>: AWS-regionen där S3-bucketen finns, till exempel us-west-2 . Om du inte vill ange regionen använder du * .
  • <account-number>: Det AWS-kontonummer som äger S3-bucketen, till exempel 123456789012 . Om du inte vill ange kontonumret använder du * .

Strängen i SQS- och SNS ARN-specifikationen är det namnprefix som källan använder databricks-auto-ingest-* när cloudFiles SQS- och SNS-tjänster skapas. Eftersom Azure Databricks uppsättningar av aviseringstjänster i den första körningen av dataströmmen kan du använda en princip med minskade behörigheter efter den första körningen (till exempel stoppa dataströmmen och sedan starta om den). Mer information finns i Bilaga: Minskade behörigheter efter den första installationen.

Anteckning

Föregående princip gäller endast de behörigheter som krävs för att konfigurera filmeddelandetjänster, nämligen S3-bucketmeddelande, SNS- och SQS-tjänster och förutsätter att du redan har läsbehörighet till S3-bucketen. Om du behöver lägga till skrivskyddade S3-behörigheter lägger du till följande Action i listan i DatabricksAutoLoaderSetup -instruktionen i JSON-dokumentet:

  • s3:ListBucket
  • s3:GetObject

Mata in data säkert i ett annat AWS-konto

Auto Loader kan läsa in data över AWS-konton genom att anta en IAM-roll. När du har skapat de tillfälliga säkerhetsautentiseringsuppgifterna som skapats av kan du använda AssumeRole Auto Loader för att läsa in molnfiler mellan konton. Om du vill konfigurera Auto Loader för konton mellan AWS följer du dokumentet: _. Kontrollera att du:

  • Kontrollera att du har tilldelats metarollen AssumeRole till klustret.

  • Konfigurera klustrets Spark-konfiguration så att den innehåller följande egenskaper:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

Mått

Auto Loader rapporterar mått i varje batch. Du kan visa hur många filer som finns i uppgifterna och hur stora uppgifterna är i måtten och under fliken Rådata på instrumentpanelen för numFilesOutstandingnumBytesOutstandingnumBytesOutstandingnumFilesOutstanding

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

Molnresurshantering

Du kan använda ett Scala-API för att hantera AWS SNS- och SQS-tjänster som skapats av Auto Loader. Du måste konfigurera de behörigheter för resursinstallation som beskrivs i Behörigheter innan du använder det här API:et.

Viktigt

Om du har använt Auto Loader i Databricks Runtime 7.1 och tidigare uppdaterar du IAM-principen med hjälp av JSON-principdokumentet i Behörigheter. Det finns nya instruktioner i principen för Databricks Runtime 7.2– och – som anger de ytterligare behörigheter som krävs DatabricksAutoLoaderListDatabricksAutoLoaderTeardown av Scala-API:et.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

// Set up an SQS queue and a topic subscribed to the path provided in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Anteckning

Tillgängligt i Databricks Runtime 7.4 och högre.

Använd setUpNotificationServices(<resource-suffix>) för att skapa en SQS-kö och ett SNS-ämne med namnet databricks-auto-ingest-<resource-suffix> . Om det finns en befintlig SQS-kö eller ett SNS-ämne med samma namn Azure Databricks resurs som redan finns i stället för att skapa en ny. Den här funktionen returnerar en SQS-kö som du kan skicka till cloudFiles källan med hjälp av .option("cloudFiles.queueUrl", <queue-url>) . Detta gör det cloudFiles möjligt för källanvändaren att ha färre behörigheter än den användare som skapar resurserna. Se Behörigheter.

Ange alternativet "path" till endast om du newManagersetUpNotificationServices anropar ; det behövs inte för listNotificationServices eller tearDownNotificationServices . Det här är samma som path du använder när du kör en strömningsfråga.

Vanliga frågor och svar (FAQ)

Måste jag skapa aviseringstjänster för AWS-händelser i förväg?

Nej. Om du väljer filmeddelandeläge skapar Auto Loader automatiskt en pipeline för AWS > S3 SNS Topic SQS-köfilsaviseringar automatiskt när > du startar dataströmmen.

Hur gör jag för att rensa resurserna för händelseaviseringar, till exempel SNS-ämnen och SQS-köer, som skapats av Auto Loader?

Du kan använda molnresurshanteraren för att lista och ta bort resurser. Du kan också ta bort dessa resurser manuellt, antingen i webbkonsolen eller med AWS-API:er. Alla resurser som skapas av Auto Loader har prefixet : databricks-auto-ingest- .

Bearbetar Auto Loader filen igen när filen läggs till eller skrivs över?

Filer bearbetas exakt en gång om du inte aktiverar cloudFiles.allowOverwrites . Om en fil läggs till eller skrivs över garanterar Databricks inte vilken version av filen som bearbetas. För väldefinierat beteende rekommenderar Databricks att du använder Auto Loader för att endast mata in oföränderliga filer. Om detta inte uppfyller dina krav kontaktar du din Databricks-representant.

Kan jag köra flera strömningsfrågor från samma indatakatalog?

Ja. Varje molnfilström, som identifieras av en unik kontrollpunktskatalog, har sin egen SQS-kö, och samma AWS S3-händelser kan skickas till flera SQS-köer.

Om mina datafiler inte tas emot kontinuerligt, men med jämna mellanrum, till exempel en gång om dagen, bör jag fortfarande använda den här källan och finns det några fördelar?

Ja och ja. I det här fallet kan du konfigurera ett Trigger-Once Structured Streaming-jobb och schemalägga att köras efter den förväntade filuppkomsttiden. Den första körningen uppsättningar händelsemeddelandetjänster, som alltid är på, även när strömningsklustret är nere. När du startar om dataströmmen hämtar cloudFiles och bearbetar källan alla filhändelser som säkerhetskopieras i SQS-kön. Fördelen med att använda Auto Loader i det här fallet är att du inte behöver avgöra vilka filer som är nya och som ska bearbetas varje gång, vilket kan vara väldigt dyrt.

Vad händer om jag ändrar kontrollpunktsplatsen när jag startar om dataströmmen?

En kontrollpunktsplats har viktig identifieringsinformation för en dataström. Om du ändrar kontrollpunktsplatsen innebär det att du har övergett den tidigare dataströmmen och startat en ny dataström. Den nya dataströmmen skapar ny förloppsinformation och om du använder filmeddelandeläge, nya AWS SNS- och SQS-tjänster. Du måste manuellt rensa kontrollpunktsplatsen och AWS SNS- och SQS-tjänsterna för alla övergivna strömmar.

Kan jag köra flera strömningsfrågor från olika indatakataloger i samma S3-bucket?

Ja, så länge de inte är överordnade-underordnade kataloger, till exempel prod-logs/ och prod-logs/usage/ .

Kan jag använda den här funktionen när det finns befintliga filmeddelanden på min S3-bucket?

Ja, så länge din indatakatalog inte står i konflikt med det befintliga meddelandeprefixet (till exempel ovanstående överordnade-underordnade kataloger).

Bilaga: Minskade behörigheter efter den första installationen

De behörigheter för resursinstallation som beskrivs i Behörigheter krävs endast under den första körningen av dataströmmen. Efter den första körningen kan du växla till följande IAM-princip med minskade behörigheter.

Viktigt

Med de minskade behörigheterna kan du inte starta nya strömmande frågor eller återskapa resurser vid fel (till exempel om SQS-kön har tagits bort av misstag). du kommer inte heller att kunna använda API:et för hantering av molnresurser för att lista eller ta bort resurser.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}