Optimalizovaný zdroj souborů Azure Blob Storage se službou Azure Queue Storage

Konektor datacihly-AQS používá k poskytnutí optimalizovaného zdroje souborů Azure Queue Storage (AQS), který umožňuje najít nové soubory zapsané do kontejneru služby Azure Blob Storage (ABS) bez opakovaného výpisu všech souborů. To poskytuje dvě hlavní výhody:

  • Nižší latence: není nutné zobrazovat výpis struktur vnořených adresářů na ABS, což je pomalé a náročné na prostředky.
  • Nižší náklady: žádné obecnější požadavky rozhraní API seznamů na ABS.

Důležité

  • Zdroj ABS-AQS je zastaralý. Pro nové streamy doporučujeme místo toho použít Automatický zavaděč .
  • Zdroj ABS-AQS odstraní zprávy z fronty AQS, protože se potvrdí události. Pokud chcete, aby jiné kanály byly ze stejné fronty náročné, nastavte pro optimalizované čtecí zařízení samostatnou AQS frontu. Můžete nastavit několik předplatných Event Grid pro publikování do různých front.

Použití zdroje souborů ABS-AQS

Pokud chcete použít zdroj souborů ABS-AQS, musíte:

  • Nastavte oznámení události ABS pomocí Azure Event Grid předplatných a směrovat je na AQS. Přečtěte si téma reakce na události služby Blob Storage.

  • Zadejte fileFormat Možnosti a queueUrl a schéma. Například:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

Ověřování pomocí služby Azure Queue Storage a služby Blob Storage

Aby bylo možné provést ověření pomocí služby Azure Queue Storage a služby Blob Storage, budete muset použít tokeny sdíleného přístupového podpisu (SAS) nebo klíče účtu úložiště. Budete muset zadat připojovací řetězec pro účet úložiště, ve kterém je nainstalovaná vaše fronta, který bude obsahovat buď token SAS, nebo přístupové klíče k vašemu účtu úložiště. Další informace najdete v tématu Konfigurace připojovacích řetězců Azure Storage .

Budete taky muset poskytnout přístup ke kontejnerům úložiště objektů BLOB v Azure. Informace o tom, jak nakonfigurovat přístup ke kontejneru úložiště objektů BLOB v Azure, najdete v tématu Azure Blob Storage .

Poznámka

Důrazně doporučujeme, abyste používali tajné klíče k poskytování připojovacích řetězců.

Konfigurace

Možnost Typ Výchozí Popis
allowOverwrites Logická hodnota true Určuje, zda má být znovu zpracován objekt blob, který se má přepsat.
připojovací řetězec Řetězec Žádný (povinný parametr) Připojovací řetězec pro přístup k vaší frontě.
fetchParallelism Integer 1 Počet vláken, která se mají použít při načítání zpráv ze služby Queueing
Formát formátu. Řetězec Žádný (povinný parametr) Formát souborů, například,,, parquet json csv text a tak dále.
ignoreFileDeletion Logická hodnota false Pokud máte konfigurace životního cyklu nebo zdrojové soubory odstraníte ručně, musíte tuto možnost nastavit na true .
maxFileAge Integer 604800 Určuje, jak dlouho (v sekundách) se mají ukládat oznámení o souborech jako stav, aby se zabránilo duplicitnímu zpracování.
pathRewrites Řetězec JSON. "{}" Použijete-li přípojné body, můžete předponu container@storageAccount/key cesty přepsat přípojným bodem. Přepsat lze pouze předpony. Například pro konfiguraci se {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"} cesta wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json přepíše na
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval Řetězec doby trvání, například 2m 2 minuty. "5s" Doba, po kterou se má čekat, než se načte, pokud je fronta prázdná Za požadavek na AQS se účtují poplatky za Azure. Proto pokud data nepřicházejí často, tato hodnota může být nastavena na dlouhou dobu. Dokud není fronta prázdná, budeme načítat průběžně. Pokud se nové soubory vytvoří každých 5 minut, možná budete chtít nastavit vysokou úroveň queueFetchInterval , aby se snížily náklady na AQS.
Proměnné QueueName Řetězec Žádný (povinný parametr) Název fronty AQS

Pokud si vyberete spoustu zpráv v protokolech ovladačů, které vypadají jako Fetched 0 new events and 3 old events. , kde byste měli v úmyslu sledovat mnohem staré události, než je nového, měli byste snížit interval triggeru vašeho datového proudu.

Pokud nepoužíváte soubory z umístění v úložišti objektů blob, kde očekáváte, že některé soubory mohou být před zpracováním odstraněny, můžete nastavit následující konfiguraci tak, aby ignorovala chybu a pokračovala v zpracování:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Nejčastější dotazy

Pokud ignoreFileDeletion má hodnotu false (výchozí) a objekt byl odstraněn, dojde k selhání celého kanálu?

Ano, pokud se zobrazí událost s oznámením, že soubor byl odstraněn, dojde k selhání celého kanálu.

Jak mám nastavit maxFileAge ?

Azure Queue Storage poskytuje sémantiku doručování zpráv nejméně jednou, proto musíme pro odstranění duplicitních dat zachovat stav. Výchozí nastavení maxFileAge je 7 dní, které se rovná maximální hodnotě TTL zprávy ve frontě.