Fájlok betöltése az AWS S3-ból automatikus betöltő használatával

Az automatikus betöltő növekményesen és hatékonyan dolgozza fel az új adatfájlokat, amint megérkeznek az AWS S3-be ( s3:// ).

Az Automatikus betöltő egy nevű strukturált streamelési forrást cloudFiles biztosít. A felhőalapú fájltároló bemeneti könyvtárának elérési útja alapján a forrás automatikusan feldolgozza az új fájlokat, amint megérkeznek, és a könyvtárban lévő meglévő fájlokat is cloudFiles feldolgozza.

Az automatikus betöltő DBFS-elérési utakkal, valamint az adatforráshoz vezető közvetlen elérési utakkal is működik.

Követelmények

Databricks Runtime 7.2-es vagy magasabb.

Ha a 7.1-es vagy Databricks Runtime használatával hozott létre streameket, tekintse meg az alapértelmezett beállításértékek és kompatibilitás változásait és a felhőerőforrás-kezelésről készült cikkeket.

Fájlfelderítési módok

Az automatikus betöltő két módot támogat az új fájlok észlelésére: könyvtárlista és fájlértesítés.

  • Könyvtárlista:Az új fájlokat azonosítja a bemeneti könyvtár listázása alapján. A címtár-listázási mód lehetővé teszi az automatikus betöltőstreamek gyors, az AWS S3-on található adatokhoz való hozzáférésen kívül más engedélykonfigurációk nélküli gyors indítását, és olyan forgatókönyvekhez alkalmas, amelyekben csak néhány fájlt kell rendszeresen streamelni. A 7.2-es vagy újabb Databricks Runtime automatikus betöltési mód az alapértelmezett könyvtárlista mód.
  • Fájlértesítés:Olyan AWS SNS- és SQS-szolgáltatásokat használ, amelyek feliratkoznak a bemeneti könyvtárból származó fájleseményekre. Az automatikus betöltő automatikusan beállítja az AWS SNS- és SQS-szolgáltatásokat. A fájlértesítési mód nagyobb teljesítmény és méretezhetőség nagy bemeneti könyvtárak esetén. Ehhez a módhoz konfigurálnia kell az AWS SNS- és SQS-szolgáltatások engedélyét, és meg kell adnia a következőt: .

A stream újraindításakor módosíthatja a módot. Előfordulhat például, hogy fájlértesítési módra szeretne váltani, ha a könyvtárlista túl lassú a bemeneti könyvtár méretének növekedése miatt. Az automatikus betöltő mindkét mód esetében belsőleg nyomon követi, hogy a streamelési ellenőrzőpont helye milyen fájlokat lett feldolgozva, hogy pontosan egyszer szemantikát biztosítson, így önnek nem kell állapotinformációkat kezelnie.

Forrás cloudFiles használata

Az automatikus betöltő használata esetén a többi streamelési forráshoz hasonló módon hozzon létre cloudFiles egy forrást. Az alábbi kód elindít egy automatikus betöltőstreamet, amely könyvtárlista módban ír a Delta Lake-be:

Python

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .trigger(<trigger>) \
  .start(<output-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(<trigger>)
  .start(<output-path>)

ahol:

  • <cloudFiles-option>A egy konfigurációs beállítás a <cloudFiles-option>

  • <schema> A a fájlsémát. Az automatikus betöltő bizonyos fájlformátumokkal támogatja a séma következtetését és fejlődését is. További részletekért lásd: Séma-következtetés és -fejlődés

  • <input-path> A az új fájlokat figyelt tárolóban található elérési út. A gyermek-könyvtárát <input-path> is figyeli a rendszer. <input-path> fájl-glob mintákat tartalmazhatnak. A rendszer hozzáfűzi hozzá a glob mintát. Ha ez olyan fájlokat is tartalmaz, amelyek nem kívánják a behozni kívánt fájlokat, egy további szűrőt is be lehet foglalni a *pathGlobFilter beállításon keresztül. Ha üzenetsort biztosít a fájlértesítések számára, és nem kell adatokat visszatöltésre adnia, nem kell bemeneti útvonalat adnia.

  • <checkpoint-path> A a stream ellenőrzőpont-helye.

  • <trigger> A stream választható eseményindítója. Az alapértelmezett beállítás a következő mikrokötet lehető leggyorsabb végrehajtása. Ha rendszeres időközönként ( például naponta egyszer) érkeznek az adatok, a streamek végrehajtását egy Azure Databricks Trigger.Once ütemezheti. A Databricks Runtime 10.1 és Databricks Runtime 10.1 Photon és újabb esetén az Automatikus betöltő mostantól egy új típusú eseményindítót támogat: a könyvtárlista és a fájlértesítési mód esetében is. Trigger.AvailableNow A ugyanazokkal a garanciákkal rendelkezik, mint a , amely feldolgozza az összes rendelkezésre álló Trigger.Once adatot, majd leállítja a lekérdezést. De képes sebességkorlátozást végezni, és felosztani a munkát több kötegre, ezért ajánlott a Trigger.AvailableNowTrigger.Once helyett. A mindig bekapcsolt streamek számára a Databricks azt javasolja, hogy állítson be egy feldolgozásiidő-eseményindítót.

  • <output-path> A a kimeneti stream elérési útja.

Előnyök a Apache Spark FileStreamSource

A Apache Spark fájlok növekményesen olvashatók a spark.readStream.format(fileFormat).load(directory) használatával. Az automatikus betöltő a következő előnyöket biztosítja a fájlforráshoz valóhoz:

  • Méretezhetőség: Az automatikus betöltő több milliárd fájlt képes hatékonyan felderíteni. A háttérszűrők aszinkron módon is elvégezhetők a számítási erőforrások pazarlásának elkerülése érdekében.
  • Teljesítmény: Az automatikus betöltővel a fájlok felderítésének költsége a betöltetlen fájlok számával együtt skáláz a fájlok számára vonatkozó adatok helyett. Lásd: Optimalizált könyvtárlista.
  • Sémakövetkeztatás és fejlődés támogatása: Az automatikus betöltő képes észlelni a séma eltérését, értesítést küld a sémaváltozások bekövetkezéséről, valamint az olyan mentési adatokról, amelyek egyébként figyelmen kívül lett volna hagyva vagy elvesznek. Lásd: Séma-következtetés és -fejlődés.
  • Költség: Az automatikus betöltő natív felhőbeli API-kat használ a tárolóban található fájlok listáinak lekértéhez. Emellett az automatikus betöltő fájlértesítési módja segíthet a felhőköltségek további csökkentésében, ha teljesen elkerüli a könyvtárak listázását. Az automatikus betöltő automatikusan be tudja állítani a fájlértesítési szolgáltatásokat a tárolón, hogy a fájlfelderítés sokkal olcsóbb legyen.

Optimalizált könyvtárlista

Megjegyzés

A 9.0-s Databricks Runtime érhető el.

Az automatikus betöltő a más alternatíváknál hatékonyabban listázással képes felderíteni a felhőalapú tárolórendszerek fájljait. Ha például 5 percenként töltött fel fájlokat , hogy megtalálja az összes fájlt ezekben a könyvtárakban, a Apache Spark-fájlforrás párhuzamosan listaná az összes alkönyvtárat, ami /some/path/YYYY/MM/DD/HH/fileName 1 (alapkönyvtár) + 365 (naponta) * 24 (óránként) = 8761 LIST API könyvtárhívást adna vissza a tárolóhoz. Az automatikus betöltő a tárolóból származó simított válasz fogadása miatt csökkenti az API-hívások számát a tárterületen lévő fájlok számának és az egyes API-hívások által visszaadott eredmények számának osztva, ami jelentősen csökkenti a felhő költségeit.

Növekményes termékoldal

Fontos

Ez a funkció a nyilvános előzetes verzióban érhető el.

A lexikálisan generált fájlok esetén az automatikus betöltő mostantól a lexikális fájlrendezés és a meglévő optimalizált API-k segítségével javíthatja a könyvtárlista hatékonyságát, ha a teljes könyvtár listázása helyett a korábban betöltött fájlokból listázást ad meg.

Alapértelmezés szerint az automatikus betöltő automatikusan észleli, hogy egy adott könyvtár alkalmazható-e a növekményes listában a korábban befejezett teljes könyvtárlista fájlútvonalának ellenőrzéssel és összehasonlításával. Az automatikus betöltő 7 egymást követő növekményes lista befejezése után automatikusan elindítja a teljes könyvtároldalt, hogy biztosítsa a teljes teljességet ebben a auto módban. Ha gyakoribb vagy kevésbé gyakori szeretne lenni, beállíthatja, hogy egy adott időközönként aszinkron cloudFiles.backfillInterval háttérszűrőket indítson el.

Ha biztos a könyvtárban létrehozott fájlok sorrendjében, explicit módon be- és kikapcsolhatja a növekményes listázási módot a vagy (alapértelmezett) beállításával, például a partíciók szerint rendezett fájlok lexikálisan rendezettnek tekinthetők, ha naponta egyszer feldolgoznak adatokat, az időbélyegeket tartalmazó fájlútvonalak lexikálisan rendezettnek cloudFiles.useIncrementalListingtruefalseautodate=... tekinthetők. A használatával mindig gondoskodhat arról, hogy a növekményes lista bekapcsolása esetén minden adat be legyen cloudFiles.backfillInterval edve.

Séma következtetése és fejlődése

Megjegyzés

A 8.2-es Databricks Runtime és a magasabb 2-es Databricks Runtime érhető el.

Az automatikus betöltő támogatja a séma következtetését és fejlődését CSV-, JSON-, bináris ( ) és binaryFile szövegfájlformátumokkal. További részletekért lásd: Séma-következtetés és -fejlődés az automatikus betöltőben.

Automatikus betöltő futtatása éles környezetben

A Databricks azt javasolja, hogy kövesse az automatikus betöltő éles környezetben való futtatásához ajánlott streamelési eljárásokat.

Konfiguráció

A forrásra jellemző konfigurációs beállítások a előtaggal vannak előtaggal, így külön névtérben vannak, mint a cloudFilescloudFiles többi strukturált streamelési forrás beállításai.

Fontos

Egyes alapértelmezett beállításértékek a 7.2-es Databricks Runtime módosult. Ha a 7.1-es vagy újabb verzión használja az automatikus Databricks Runtime, tekintse meg az alapértelmezett beállításértékek és kompatibilitás változásait.

Fájlformátum beállításai

Az Automatikus betöltővel betöltheti a JSON , , , , , és CSVPARQUETAVROTEXTBINARYFILEORC fájlokat. A fájlformátumok beállításaiért lásd: Formátumbeállítások.

Az automatikus betöltő gyakori beállításai

A következő beállításokat konfigurálhatja a könyvtárlista vagy a fájlértesítési mód számára.

Beállítás
cloudFiles.allowOverwrites

Típus: Boolean

Engedélyezi-e a bemeneti könyvtárfájl módosításait a meglévő adatok felülírása érdekében. A 7.6-os Databricks Runtime és a feletti érhetők el.

Alapértelmezett érték: false
cloudFiles.format

Típus: String

Az adatfájl formátuma a forrás elérési útján. Az engedélyezett értékek a következők:

* avro: *
* binaryFile: *
* csv: *
* json: *
* orc: ORC-fájl
* parquet: *
* text: Szövegfájl

Alapértelmezett érték: Nincs (kötelező beállítás)
cloudFiles.includeExistingFiles

Típus: Boolean

Azt, hogy a streamfeldolgozási bemeneti útvonalba foglalja-e a meglévő fájlokat, vagy csak a kezdeti beállítás után érkező új fájlokat kell feldolgoznia. Ez a lehetőség csak akkor lesz kiértékelve, amikor először indít el egy streamet. Ennek a beállításnak a stream újraindítása utáni módosítása nem befolyásolja.

Alapértelmezett érték: true
cloudFiles.inferColumnTypes

Típus: Boolean

Pontos oszloptípusokat kell-e kihozni séma-dedíció használata esetén. Az oszlopok alapértelmezés szerint sztringekként vannak kikövetkeztetve JSON-adatkészletek kikövetkeztetésekor. További részletekért lásd a séma-következtetést.

Alapértelmezett érték: false
cloudFiles.maxBytesPerTrigger

Típus: Byte String

Az összes eseményindítóban feldolgozható új bájtok maximális száma. Megadhat egy bájtsringet, például az egyes 10g mikrobatchok 10 GB adatra való korlátozására. Ez egy soft maximum. Ha 3 GB-os fájlok vannak, a Azure Databricks 12 GB-ot dolgozza fel egy mikrobatchban. A és a együttes Azure Databricks legfeljebb a vagy a korlátját használja cloudFiles.maxFilesPerTriggercloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTrigger fel, amelyiket először eléri. Ennek a beállításnak nincs hatása a és a használata Trigger.Once() esetén.

Alapértelmezett érték: Nincs
cloudFiles.maxFileAge

Típus: Interval String

A fájlesemények deduplikáció céljából történő nyomon követésének ideje. A Databricks nem javasolja a paraméter finomhangolását, kivéve, ha óránként több millió fájlból állnak az adatok. További részleteket a How to choose maxFileAge (A maxFileAge kiválasztása) című szakaszban talál.

Alapértelmezett érték: Nincs
cloudFiles.resourceTags

Típus: Map(String, String)

Kulcs-érték címkepárok sorozata, amelyek segítenek a kapcsolódó erőforrások társításában és azonosításában, például:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

További információ: Naming Queues and Metadata (Üzenetsorok és metaadatok elnevezése és lefedettsége) az Event Subscriptions (Esemény-előfizetések) alatt. Az automatikus betöltő ezeket a kulcs-érték címkepárokat JSON-ban tárolja címkékként. (1)

Alapértelmezett érték: Nincs
cloudFiles.schemaEvolutionMode

Típus: String

A séma új oszlopokként való módosításának módja az adatokban. Az oszlopok alapértelmezés szerint sztringekként vannak kikövetkeztetve JSON-adatkészletek kikövetkeztetésekor. További részletekért tekintse meg a séma fejlődését.

Alapértelmezett érték: "addNewColumns" ha nincs séma megjelölve.
"none" Egyébként.
cloudFiles.schemaHints

Típus: String

Az automatikus betöltőnek a séma következtetése során meghozott sémainformációk. További részletekért tekintse meg a sémamutatókat.

Alapértelmezett érték: Nincs
cloudFiles.schemaLocation

Típus: String

A kikövetkezett séma és az azt követő módosítások tárolására vonatkozó hely. További részletekért lásd a séma-következtetést.

Alapértelmezett érték: Nincs (a séma ki következtetéséhez szükséges)
cloudFiles.validateOptions

Típus: Boolean

Ellenőrizze-e az automatikus betöltő beállításait, és hogy ismeretlen vagy inkonzisztens beállítások esetén hibát ad-e vissza.

Alapértelmezett érték: true
cloudFiles.backfillInterval

>[! FONTOS] >> Ez a funkció nyilvános előzetes >

Típus: Interval String

Az automatikus betöltő aszinkron visszatöltéseket aktiválhat egy adott időközönként,
például naponta egyszer, vagy hetente egyszer 1 day1 week visszatöltésre. A fájlesemény-értesítési rendszerek nem garantálják az összes feltöltött fájl 100%-os kézbesítését, ezért a visszatöltésekkel garantálhatja, hogy Databricks Runtime 8.4-es és 8. Databricks Runtime 4-es vagy magasabb Databricks Runtime-es és a 8.4-es vagy magasabb idő alatt minden fájl fel lesz feldolgozva. Ha növekményes listát használ, normál visszatöltésekkel is garantálhatja a teljességet, amely Databricks Runtime 9.1 LTS és Databricks Runtime 9.1 LTS Photon és az azt Databricks Runtime alatt érhető el.

Alapértelmezett érték: Nincs

(1) Az automatikus betöltő alapértelmezés szerint hozzáadja a következő kulcs-érték címkepárokat az ajánlott beállítások alapján:

  • vendor: Databricks
  • path: Az a hely, ahonnan az adatok betöltődnek.
  • checkpointLocation: A stream ellenőrzőpontjainak helye.
  • streamId: A stream globálisan egyedi azonosítója.

Ezek a kulcsnevek le vannak foglalva, és nem írhatók felül az értékük.

Címtárlista beállításai

Az alábbi beállítások a címtárlista módra vonatkozóak.

Beállítás
cloudFiles.useIncrementalListing

>[! FONTOS] >> Ez a funkció nyilvános előzetes >

Típus: String

Azt határozza meg, hogy a teljes lista helyett a növekményes listát használja-e könyvtárlista módban. Alapértelmezés szerint az automatikus betöltő mindent megtesz annak érdekében, hogy automatikusan észlelje, hogy egy adott könyvtár alkalmazható-e a növekményes listában. Explicit módon használhatja a növekményes listát, vagy használhatja a teljes könyvtároldalt, ha vagy értékként be van truefalse stb.

9.1 LTS és Databricks Runtime 9.1 LTS Photon és magasabb Databricks Runtime érhető el.

Alapértelmezett érték: auto

Elérhető értékek: auto , true , false

A választás maxFileAge

Megjegyzés

A 8.4-es Databricks Runtime és a feletti érhetők el.

Az automatikus betöltő az ellenőrzőpont-helyen található felderített fájlokat a RocksDB használatával követi nyomon, így pontosan egyszer biztosítja a betöltési garanciákat. Nagy mennyiségű adatkészlet esetén használhatja az események lejáratát maxFileAge az ellenőrzőpont helyéről. A minimálisan beállítható értéke maxFileAge"14 days" a következő: . A RocksDB törlései törlési bejegyzésként jelennek meg, ezért az események lejárata után várható a tárhelyhasználat növekedése, mielőtt megkezdődne a szint emelése.

Figyelmeztetés

maxFileAge A a nagy mennyiségű adatkészletek költségvezérlési mechanizmusa, amely óránként több millió fájlból áll. A maxFileAge helytelen hangolás adatminőségi problémákhoz vezethet. Ezért a Databricks nem javasolja a paraméter finomhangolását, hacsak nem feltétlenül szükséges.

A beállítás hangolása azt okozhatja, hogy az automatikus betöltő figyelmen kívül hagyja a feldolgozatlan fájlokat, vagy a már feldolgozott fájlok lejárnak, majd a feldolgozásuk ismétlődő adatokat maxFileAge okoz. Íme néhány dolog, amit érdemes figyelembe venni a maxFileAge kiválasztásakor:

  • Ha a stream hosszú idő után újraindul, a rendszer figyelmen kívül hagyja az üzenetsorból lekért fájlértesítési maxFileAge eseményeket. Hasonlóképpen, könyvtárlista használata esetén a rendszer figyelmen kívül hagyja azokat a fájlokat, amelyek a korábbiaknál régebbiek voltak a maxFileAge korábbiaknál.
  • Ha címtárlista-üzemmódot használ, és például a következőt használja: , állítsa le a streamet, és indítsa újra a streamet a következő beállítással: , az 1 hónapnál régebbi, de a 2 hónapnál újabb fájlok maxFileAge"1 month"maxFileAge"2 months" újrafeldolgozása.

A hangolás legjobb megközelítése az lenne, ha egy lejárati időtől indulna ki, és lefelé haladva a maxFileAge"1 year" következőt: "9 months" . Ha ezt a beállítást a stream első elindítanikor adja meg, nem fogja anél régebbi adatokat behozni, ezért ha régi adatokat szeretne behozni, ne állítsa be ezt a beállítást a stream maxFileAge eleinte.

Fájlértesítési beállítások

A fájlértesítési módhoz az alábbi beállítások használhatók.

Beállítás
cloudFiles.fetchParallelism

Típus: Integer

A várólistára kerülő szolgáltatás üzenetsorból való beolvasásához használt szálak száma.

Alapértelmezett érték: 1
cloudFiles.pathRewrites

Típus: JSON-sztring

Csak akkor szükséges, ha olyan értéket ad meg, amely több S3-gyűjtőből fogad fájlértesítéseket, és az ezekben a tárolókban található adatok elérésére konfigurált csatlakoztatási pontokat queueUrl szeretne használni. Ezzel a beállítással írhatja át az elérési út előtagját bucket/key a csatlakoztatási ponttal. Csak az előtagok írhatók át. Például a konfigurációhoz
{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, az elérési út
s3://<databricks-mounted-bucket>/path/2017/08/fileA.json A át lesz írva a dbfs:/mnt/data-warehouse/2017/08/fileA.json szövegre.

Alapértelmezett érték: Nincs
cloudFiles.queueUrl

Típus: String

Az SQS-üzenetsor URL-címe. Ha meg van téve, a felhőfájlok forrása közvetlenül ebből az üzenetsorból használja fel az eseményeket ahelyett, hogy saját AWS SNS- és SQS-szolgáltatásokat adna meg.

Alapértelmezett érték: Nincs
cloudFiles.useNotifications

Típus: Boolean

A fájlértesítési mód használata annak meghatározásához, hogy mikor vannak új fájlok. Ha false , használja a könyvtárlista módot. Lásd: Fájlfelderítési módok.

Alapértelmezett érték: false

Csak akkor adja meg a következő beállítást, ha úgy dönt, és azt szeretné, hogy az automatikus betöltő beállítsa önnek az cloudFiles.useNotifications = true értesítési szolgáltatásokat:

Beállítás
cloudFiles.region

Típus: String

A régió, ahol a forrás S3-gyűjtő található, és ahol az AWS SNS- és SQS-szolgáltatások létrejönnek.

Alapértelmezett érték: Databricks Runtime EC2-példány 9.0-s vagy magasabb régiójában. A Databricks Runtime 8.4-es és az alattiakban meg kell adnia a régiót.

Az AWS SNS és az SQS eléréséhez a következő beállításokat használhatja hitelesítő adatok megadásához, ha az IAM-szerepkörök nem érhetők el, vagy ha különböző felhőkből származó adatokat használ.

Beállítás
cloudFiles.awsAccessKey

Típus: String

A felhasználó AWS hozzáférési kulcsának azonosítója. Meg kell adni a
cloudFiles.awsSecretKey.

Alapértelmezett érték: Nincs
cloudFiles.awsSecretKey

Típus: String

A felhasználó AWS titkos hozzáférési kulcsa. Meg kell adni a
cloudFiles.awsAccessKey.

Alapértelmezett érték: Nincs
cloudFiles.roleArn

Típus: String

Egy IAM-szerepkör ARN-nek a feltételezése. Ez a szerepkör a fürt példányprofilja alapján vagy a következővel feltételezhető:
cloudFiles.awsAccessKey és cloudFiles.awsSecretKey.

Alapértelmezett érték: Nincs
cloudFiles.roleExternalId

Típus: String

Egy azonosító, amely akkor lesz megtéve, amikor szerepkört feltételez a cloudFiles.roleArn használatával.

Alapértelmezett érték: Nincs
cloudFiles.roleSessionName

Típus: String

Egy választható munkamenetnév, amely a szerepkör használata során használható
cloudFiles.roleArn.

Alapértelmezett érték: Nincs
cloudFiles.stsEndpoint

Típus: String

Egy választható végpont, amely az AWS STS eléréséhez szükséges, amikor szerepkört feltételez a cloudFiles.roleArn használatával.

Alapértelmezett érték: Nincs

Az alapértelmezett beállításértékek és kompatibilitás változásai

Az alábbi automatikus betöltési beállítások alapértelmezett értékei a 7.2-es Databricks Runtime a Konfigurációban felsorolt értékekre módosulnak.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Az automatikus betöltőstreamek a 7.1-es Databricks Runtime és az alatt elindított streamek az alábbi alapértelmezett beállításértékekkel vannak megjelölve:

  • A cloudFiles.useNotifications értéke true
  • A cloudFiles.includeExistingFiles értéke false
  • A cloudFiles.validateOptions értéke false

A meglévő alkalmazásokkal való kompatibilitás biztosítása érdekében ezek az alapértelmezett beállításértékek nem változnak, amikor a meglévő automatikus betöltőstreameket a 7.2-es vagy újabb Databricks Runtime futtatja; A streamek működése a frissítés után is ugyanaz lesz.

Engedélyek

A bemeneti könyvtárhoz olvasási engedélyekkel kell rendelkeznie. További részletekért lásd: S3-kapcsolat részletei.

A fájlértesítési módhoz csatolja a következő JSON-szabályzatdokumentumot az IAM-felhasználóhoz vagy -szerepkörhöz.

Ha nem tudja beállítani a JSON-szabályzat dokumentumában megadott engedélyeket, megkérhet egy rendszergazdát, hogy végezze el a beállítást a Felhőerőforrás-kezelési Scala API használatával. A rendszergazda meg tudja adni az üzenetsor URL-címét, amelyet közvetlenül meg tud adni a .option("queueUrl", <queue-url>)cloudFiles forrásról. Ezzel a konfigurációval csak korlátozott engedélyekre van szükség. A részleteket lásd: Függelék: Csökkentett engedélyek a kezdeti beállítás után.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

ahol:

  • <bucket-name>: Az S3-gyűjtő neve, ahol a stream beolvassa a fájlokat, auto-logs például: . A helyettesítő * karakterként is használható, databricks-*-logs például: . A DBFS-útvonal mögöttes S3-gyűjtőjéhez a jegyzetfüzet összes DBFS csatlakoztatási pontja kilistálhatja a %fs mounts futtatásával.
  • <region>: Az AWS-régió, ahol az S3-gyűjtő található, például us-west-2 . Ha nem szeretné megadni a régiót, használja a következőt: * .
  • <account-number>: Az S3-gyűjtőt tulajdonó AWS-fiók száma, például 123456789012 : . Ha nem szeretné megadni a fiók számát, használja a * következőt: .

Az SQS és az SNS ARN specifikációban található sztring a forrás által az SQS- és SNS-szolgáltatások létrehozásakor használt databricks-auto-ingest-*cloudFiles névelőtag. Mivel Azure Databricks beállítja az értesítési szolgáltatásokat a stream kezdeti futtatásakor, a kezdeti futtatás után korlátozott engedélyekkel rendelkező szabályzatot használhat (például leállíthatja a streamet, majd újraindíthatja). A részleteket lásd: Függelék: Csökkentett engedélyek a kezdeti beállítás után.

Megjegyzés

A fenti szabályzat csak a fájlértesítési szolgáltatások ( S3 bucket notification, SNS és SQS szolgáltatások) beállításához szükséges engedélyekkel kapcsolatos, és feltételezi, hogy már rendelkezik olvasási hozzáféréssel az S3-gyűjtőhez. Ha írásra vonatkozó S3-engedélyeket kell hozzáadnia, adja hozzá a következőt a ActionDatabricksAutoLoaderSetup JSON-dokumentum utasításában található listához:

  • s3:ListBucket
  • s3:GetObject

Adatok biztonságos bemenő adatok egy másik AWS-fiókba

Az automatikus betöltő IAM-szerepkört feltételezve képes adatokat betölteni az AWS-fiókok között. Miután beállította a által létrehozott ideiglenes biztonsági hitelesítő adatokat, az automatikus betöltővel több fiókba is betöltheti a AssumeRole felhőbeli fájlokat. Az AWS-fiókok közötti automatikus betöltő beállításához kövesse a _. Győződjön meg arról, hogy:

  • Ellenőrizze, hogy hozzá van-e rendelve az AssumeRole metaszereppont a fürthöz.

  • Konfigurálja a fürt Spark-konfigurációját úgy, hogy tartalmazza a következő tulajdonságokat:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

Mérőszámok

Az automatikus betöltő minden kötegben jelenti a metrikákat. A streamelési lekérdezési folyamat irányítópultjának Nyers adatok lapján megtekintheti, hogy hány fájl létezik a hátralékban, és mekkora a hátralék numFilesOutstandingnumBytesOutstandingnumBytesOutstandingnumFilesOutstanding

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

Felhőerőforrás-kezelés

A Scala API-val kezelheti az automatikus betöltő által létrehozott AWS SNS- és SQS-szolgáltatásokat. Az API használata előtt konfigurálnia kell az Engedélyekben leírt erőforrás-beállítási engedélyeket.

Fontos

Ha a 7.1-es vagy újabb Databricks Runtime használt automatikus betöltőt, frissítse az IAM-szabályzatot az Engedélyek dokumentum JSON-házirenddokumentumával. A 7.2-es és Databricks Runtime szabályzatában új utasítások határozzák meg a Scala API-hoz szükséges további DatabricksAutoLoaderListDatabricksAutoLoaderTeardown engedélyeket.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

// Set up an SQS queue and a topic subscribed to the path provided in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Megjegyzés

A 7.4-es Databricks Runtime és a feletti érhetők el.

A setUpNotificationServices(<resource-suffix>) használatával hozzon létre egy SQS-üzenetsort és egy nevű SNS-témakört. databricks-auto-ingest-<resource-suffix> Ha már létezik azonos nevű SQS-üzenetsor vagy SNS-témakör, a Azure Databricks a már létező erőforrást használja egy új létrehozása helyett. Ez a függvény egy SQS-üzenetsort ad vissza, amely a használatával átadhat a cloudFiles.option("cloudFiles.queueUrl", <queue-url>) forrásnak. Ez lehetővé teszi, hogy a forrásfelhasználó kevesebb engedéllyel rendelkezik, mint az cloudFiles erőforrásokat létrehozó felhasználó. Lásd: Engedélyek.

Csak "path" akkor adja meg a beállítást, ha a hívása ; a vagy a newManagersetUpNotificationServices parancshoz nem listNotificationServicestearDownNotificationServices szükséges. Ez ugyanaz, path mint amit a streamelési lekérdezések futtatásakor használ.

Gyakori kérdések (GYIK)

Előzetesen létre kell hoznom AWS eseményértesítési szolgáltatásokat?

Nem. Ha a fájlértesítési módot választja, az automatikus betöltő automatikusan létrehoz egy AWS S3 > SNS-témakör SQS-üzenetsor-eseményértesítési folyamatot a > stream elindítani.

Hogyan az automatikus betöltő által létrehozott eseményértesítési erőforrásokat, például SNS-témaköröket és SQS-üzenetsorokat?

A felhőerőforrás-kezelővel listába sorolhatja és lebonthatja az erőforrásokat. Ezeket az erőforrásokat manuálisan is törölheti a webkonzolon vagy AWS API-k használatával. Az automatikus betöltő által létrehozott összes erőforrás előtagja a következő: databricks-auto-ingest- .

Az automatikus betöltő ismét feldolgozta a fájlt a fájl hozzáfűzése vagy felülírása után?

A fájlok feldolgozása pontosan egyszer, kivéve, ha engedélyezi cloudFiles.allowOverwrites az engedélyezését. Ha hozzáfűz vagy felülír egy fájlt, a Databricks nem garantálja, hogy a rendszer a fájl melyik verzióját feldolgozta. A jól meghatározott viselkedés érdekében a Databricks azt javasolja, hogy csak a nem módosítható fájlok betöltéséhez használja az automatikus betöltőt. Ha ez nem felel meg a követelményeknek, forduljon a Databricks képviselőjéhez.

Futtathatok több streamelési lekérdezést ugyanattól a bemeneti könyvtártól?

Igen. Minden felhőbeli fájlstream egy egyedi ellenőrzőpont-könyvtár alapján saját SQS-üzenetsorsal rendelkezik, és ugyanazok az AWS S3-események több SQS-üzenetsorba is küldhetőek.

Ha az adatfájljaim nem érkeznek meg folyamatosan, de rendszeres időközönként, például naponta egyszer, akkor is ezt a forrást használjam, és van-e valamilyen előnyöm?

Igen és igen. Ebben az esetben beállíthat egy strukturált streamelési feladatot, és ütemezheti, hogy a fájl várható érkezési időpontja után Trigger-Once fusson. Az első futtatás beállítja az eseményértesítési szolgáltatásokat, amelyek mindig be fognak indulni, még akkor is, ha a streamelési fürt nem fut. A stream újraindításakor a forrás lekéri és feldolgozza az SQS-üzenetsorban lévő összes cloudFiles fájleseményt. Ebben az esetben az automatikus betöltő használatának előnye, hogy nem kell meghatároznia, hogy mely fájlok újak, és minden alkalommal fel kell őket feldolgozni, ami nagyon költséges lehet.

Mi történik, ha módosítom az ellenőrzőpont helyét a stream újraindításakor?

Az ellenőrzőpont-hely a stream fontos azonosító információit tartja fenn. Az ellenőrzőpont helyének módosítása gyakorlatilag azt jelenti, hogy felhagyt az előző streamel, és elindított egy új streamet. Az új stream új folyamatinformációkat hoz létre, és ha fájlértesítési módot használ, az új AWS SNS- és SQS-szolgáltatásokat. Manuálisan kell eltávolítania az ellenőrzőpont helyét, valamint az AWS SNS- és SQS-szolgáltatásokat minden elhagyott streamhez.

Futtatok több streamelési lekérdezést különböző bemeneti könyvtárakból ugyanazon az S3-gyűjtőn?

Igen, ha nem szülő-gyermek könyvtárak, például prod-logs/ és prod-logs/usage/ .

Használhatom ezt a funkciót, ha vannak meglévő fájlértesítések az S3-gyűjtőben?

Igen, ha a bemeneti könyvtár nem ütközik a meglévő értesítési előtaggal (például a fenti szülő-gyermek könyvtárakkal).

Függelék: Csökkentett engedélyek a kezdeti beállítás után

Az Engedélyek szakaszban leírt erőforrás-beállítási engedélyekre csak a stream kezdeti futtatásakor van szükség. Az első futtatás után a következő, korlátozott engedélyekkel rendelkező IAM-szabályzatra válthat.

Fontos

A csökkentett engedélyekkel nem tud új streamelési lekérdezéseket elindítani vagy erőforrásokat létrehozni hibák esetén (például az SQS-üzenetsor véletlenül törölve lett); A felhőbeli erőforrás-kezelési API-t sem fogja tudni használni az erőforrások listához vagy lebontához.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}