Źródło pliku usługi Azure Blob Storage za pomocą usługi Azure Queue Storage (starsza wersja)
Ważne
Ta dokumentacja została wycofana i może nie zostać zaktualizowana. Produkty, usługi lub technologie wymienione w tej zawartości nie są już obsługiwane. Zobacz Co to jest moduł automatycznego ładowania?.
Łącznik ABS-AQS udostępnia zoptymalizowane źródło plików, które używa usługi Azure Queue Storage (AQS) do znajdowania nowych plików zapisanych w kontenerze usługi Azure Blob Storage (ABS) bez wielokrotnego wyświetlania listy wszystkich plików. Zapewnia to dwie zalety:
- Mniejsze opóźnienie: nie ma potrzeby wyświetlania listy zagnieżdżonych struktur katalogów w abs, co jest powolne i intensywnie obciąża zasoby.
- Niższe koszty: nie są już kosztowne żądania interfejsu API LIST wysyłane do abs.
Uwaga
Źródło ABS-AQS usuwa komunikaty z kolejki AQS, gdy zużywa zdarzenia. Jeśli chcesz, aby inne potoki używały komunikatów z tej kolejki, skonfiguruj oddzielną kolejkę AQS dla zoptymalizowanego czytnika. Istnieje możliwość skonfigurowania wielu subskrypcji usługi Event Grid w celu publikowania w różnych kolejkach.
Korzystanie ze źródła plików ABS-AQS
Aby użyć źródła pliku ABS-AQS, musisz:
Skonfiguruj powiadomienia o zdarzeniach ABS, korzystając z subskrypcji usługi Azure Event Grid i kierując je do usługi AQS. Zobacz Reagowanie na zdarzenia usługi Blob Storage.
fileFormat
Określ opcje iqueueUrl
i i schemat. Na przykład:spark.readStream \ .format("abs-aqs") \ .option("fileFormat", "json") \ .option("queueName", ...) \ .option("connectionString", ...) \ .schema(...) \ .load()
Uwierzytelnianie za pomocą usługi Azure Queue Storage i usługi Blob Storage
Aby przeprowadzić uwierzytelnianie za pomocą usługi Azure Queue Storage i usługi Blob Storage, użyj tokenów sygnatury dostępu współdzielonego (SAS) lub kluczy konta magazynu. Musisz podać parametry połączenia dla konta magazynu, na którym wdrożono kolejkę, która zawiera token SAS lub klucze dostępu do konta magazynu. Aby uzyskać więcej informacji, zobacz Konfigurowanie parametry połączenia usługi Azure Storage.
Należy również zapewnić dostęp do kontenerów usługi Azure Blob Storage. Aby uzyskać informacje na temat konfigurowania dostępu do kontenera usługi Azure Blob Storage, zobacz Połączenie do usługi Azure Data Lake Storage Gen2 i Usługi Blob Storage.
Uwaga
Zdecydowanie zalecamy używanie wpisów tajnych do udostępniania parametry połączenia.
Konfigurowanie
Jeśli obserwujesz wiele komunikatów w dziennikach sterowników, które wyglądają jak Fetched 0 new events and 3 old events.
, gdzie obserwujesz o wiele więcej starych zdarzeń niż nowe, należy zmniejszyć interwał wyzwalacza strumienia.
Jeśli korzystasz z plików z lokalizacji w usłudze Blob Storage, w której spodziewasz się, że niektóre pliki mogą zostać usunięte przed ich przetworzeniem, możesz ustawić następującą konfigurację, aby zignorować błąd i kontynuować przetwarzanie:
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
Często zadawane pytania
Jeśli ignoreFileDeletion
wartość False (wartość domyślna) i obiekt został usunięty, czy cały potok zakończy się niepowodzeniem?
Tak, jeśli otrzymamy zdarzenie informujące o tym, że plik został usunięty, cały potok zakończy się niepowodzeniem.
Jak należy ustawić maxFileAge
?
Usługa Azure Queue Storage zapewnia semantyka dostarczania komunikatów co najmniej raz, dlatego musimy zachować stan deduplikacji. Ustawieniem domyślnym dla maxFileAge
parametru jest 7 dni, co jest równe maksymalnemu czasowi wygaśnięcia komunikatu w kolejce.