Udostępnij za pośrednictwem


Źródło pliku usługi Azure Blob Storage za pomocą usługi Azure Queue Storage (starsza wersja)

Ważne

Ta dokumentacja została wycofana i może nie zostać zaktualizowana. Produkty, usługi lub technologie wymienione w tej zawartości nie są już obsługiwane. Zobacz Co to jest moduł automatycznego ładowania?.

Łącznik ABS-AQS udostępnia zoptymalizowane źródło plików, które używa usługi Azure Queue Storage (AQS) do znajdowania nowych plików zapisanych w kontenerze usługi Azure Blob Storage (ABS) bez wielokrotnego wyświetlania listy wszystkich plików. Zapewnia to dwie zalety:

  • Mniejsze opóźnienie: nie ma potrzeby wyświetlania listy zagnieżdżonych struktur katalogów w abs, co jest powolne i intensywnie obciąża zasoby.
  • Niższe koszty: nie są już kosztowne żądania interfejsu API LIST wysyłane do abs.

Uwaga

Źródło ABS-AQS usuwa komunikaty z kolejki AQS, gdy zużywa zdarzenia. Jeśli chcesz, aby inne potoki używały komunikatów z tej kolejki, skonfiguruj oddzielną kolejkę AQS dla zoptymalizowanego czytnika. Istnieje możliwość skonfigurowania wielu subskrypcji usługi Event Grid w celu publikowania w różnych kolejkach.

Korzystanie ze źródła plików ABS-AQS

Aby użyć źródła pliku ABS-AQS, musisz:

  • Skonfiguruj powiadomienia o zdarzeniach ABS, korzystając z subskrypcji usługi Azure Event Grid i kierując je do usługi AQS. Zobacz Reagowanie na zdarzenia usługi Blob Storage.

  • fileFormat Określ opcje i queueUrl i i schemat. Na przykład:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

Uwierzytelnianie za pomocą usługi Azure Queue Storage i usługi Blob Storage

Aby przeprowadzić uwierzytelnianie za pomocą usługi Azure Queue Storage i usługi Blob Storage, użyj tokenów sygnatury dostępu współdzielonego (SAS) lub kluczy konta magazynu. Musisz podać parametry połączenia dla konta magazynu, na którym wdrożono kolejkę, która zawiera token SAS lub klucze dostępu do konta magazynu. Aby uzyskać więcej informacji, zobacz Konfigurowanie parametry połączenia usługi Azure Storage.

Należy również zapewnić dostęp do kontenerów usługi Azure Blob Storage. Aby uzyskać informacje na temat konfigurowania dostępu do kontenera usługi Azure Blob Storage, zobacz Połączenie do usługi Azure Data Lake Storage Gen2 i Usługi Blob Storage.

Uwaga

Zdecydowanie zalecamy używanie wpisów tajnych do udostępniania parametry połączenia.

Konfigurowanie

Opcja Typ Domyślny opis
allowOverwrites Wartość logiczna true Czy obiekt blob, który zostanie zastąpiony, powinien zostać ponownie przetworzony.
Parametry połączenia String Brak (wymagany parametr) Parametry połączenia dostępu do kolejki.
fetchParallelism Integer 1 Liczba wątków do użycia podczas pobierania komunikatów z usługi kolejkowania.
fileFormat String Brak (wymagany parametr) Format plików, takich jak parquet, json, csv, texti tak dalej.
ignoreFileDeletion Wartość logiczna false Jeśli masz konfiguracje cyklu życia lub ręcznie usuniesz pliki źródłowe, musisz ustawić tę opcję na true.
maxFileAge Integer 604800 Określa, jak długo (w sekundach) powiadomienia o plikach są przechowywane jako stan, aby zapobiec zduplikowanemu przetwarzaniu.
pathRewrites Ciąg JSON. "{}" Jeśli używasz punktów instalacji, możesz ponownie napisać prefiks container@storageAccount/key ścieżki z punktem instalacji. Można przepisać tylko prefiksy. Na przykład w przypadku konfiguracji {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"}ścieżka wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json zostanie przepisana do
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval Ciąg czasu trwania, na przykład przez 2m 2 minuty. "5s" Jak długo czekać między pobieraniem, jeśli kolejka jest pusta. Opłaty za platformę Azure na żądanie interfejsu API do AQS. W związku z tym, jeśli dane nie docierają często, tę wartość można ustawić na długi czas trwania. Tak długo, jak kolejka nie jest pusta, będziemy pobierać w sposób ciągły. Jeśli nowe pliki są tworzone co 5 minut, możesz ustawić wysoki queueFetchInterval poziom, aby zmniejszyć koszty AQS.
Queuename String Brak (wymagany parametr) Nazwa kolejki AQS.

Jeśli obserwujesz wiele komunikatów w dziennikach sterowników, które wyglądają jak Fetched 0 new events and 3 old events., gdzie obserwujesz o wiele więcej starych zdarzeń niż nowe, należy zmniejszyć interwał wyzwalacza strumienia.

Jeśli korzystasz z plików z lokalizacji w usłudze Blob Storage, w której spodziewasz się, że niektóre pliki mogą zostać usunięte przed ich przetworzeniem, możesz ustawić następującą konfigurację, aby zignorować błąd i kontynuować przetwarzanie:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Często zadawane pytania

Jeśli ignoreFileDeletion wartość False (wartość domyślna) i obiekt został usunięty, czy cały potok zakończy się niepowodzeniem?

Tak, jeśli otrzymamy zdarzenie informujące o tym, że plik został usunięty, cały potok zakończy się niepowodzeniem.

Jak należy ustawić maxFileAge?

Usługa Azure Queue Storage zapewnia semantyka dostarczania komunikatów co najmniej raz, dlatego musimy zachować stan deduplikacji. Ustawieniem domyślnym dla maxFileAge parametru jest 7 dni, co jest równe maksymalnemu czasowi wygaśnięcia komunikatu w kolejce.