Samouczek: Azure Data Lake Storage Gen2, Azure Databricks i Spark

W tym samouczku pokazano, jak połączyć klaster usługi Azure Databricks z danymi przechowywanymi na koncie magazynu platformy Azure z włączoną usługą Azure Data Lake Storage Gen2. Takie połączenie umożliwia natywne wykonywanie w klastrze zapytań i analiz dotyczących tych danych.

Ten samouczek obejmuje następujące kroki:

  • Pozyskiwanie danych bez struktury na koncie magazynu
  • Uruchamianie analiz dotyczących danych w magazynie obiektów blob

Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.

Wymagania wstępne

Tworzenie obszaru roboczego, klastra i notesu usługi Azure Databricks

  1. Tworzenie obszaru roboczego usługi Azure Databricks. Zobacz Tworzenie obszaru roboczego usługi Azure Databricks.

  2. Tworzenie klastra. Zobacz Tworzenie klastra.

  3. Utwórz notes. Zobacz Tworzenie notesu. Wybierz język Python jako domyślny język notesu.

Pozostaw otwarty notes. Należy go użyć w poniższych sekcjach.

Pobieranie danych lotów

W tym samouczku używane są dane dotyczące lotów o wydajności w czasie dla stycznia 2016 r. z Biura Statystyki Transportu, aby zademonstrować sposób wykonywania operacji ETL. Aby ukończyć samouczek, musisz pobrać te dane.

  1. Pobierz plik On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Ten plik zawiera dane lotu.

  2. Rozpakuj zawartość pliku zip i zanotuj nazwę pliku oraz jego ścieżkę. Te informacje będą potrzebne w późniejszym kroku.

Jeśli chcesz dowiedzieć się więcej o informacjach przechwyconych w danych dotyczących wydajności raportowania czasu, możesz zobaczyć opisy pól w witrynie internetowej Bureau of Transportation Statistics.

Pozyskiwanie danych

W tej sekcji przekażesz dane lotu csv do konta usługi Azure Data Lake Storage Gen2, a następnie zainstalujesz konto magazynu w klastrze usługi Databricks. Na koniec użyjesz usługi Databricks, aby odczytać dane lotu csv i zapisać je z powrotem do magazynu w formacie Apache parquet.

Przekazywanie danych lotu do konta magazynu

Użyj narzędzia AzCopy, aby skopiować plik CSV na konto usługi Azure Data Lake Storage Gen2. Polecenie służy azcopy make do tworzenia kontenera na koncie magazynu. Następnie użyjesz polecenia , azcopy copy aby skopiować właśnie pobrane dane csv do katalogu w tym kontenerze.

W poniższych krokach należy wprowadzić nazwy kontenera, który chcesz utworzyć, oraz katalog i obiekt blob, do którego chcesz przekazać dane lotu do kontenera. Sugerowane nazwy można użyć w każdym kroku lub określić własne konwencje nazewnictwa kontenerów, katalogów i obiektów blob.

  1. Otwórz okno wiersza polecenia i wprowadź następujące polecenie, aby zalogować się do usługi Azure Active Directory w celu uzyskania dostępu do konta magazynu.

    azcopy login
    

    Postępuj zgodnie z instrukcjami wyświetlanymi w oknie wiersza polecenia, aby uwierzytelnić konto użytkownika.

  2. Aby utworzyć kontener na koncie magazynu do przechowywania danych lotu, wprowadź następujące polecenie:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Zastąp wartość symbolu zastępczego <storage-account-name> nazwą konta magazynu.

    • <container-name> Zastąp symbol zastępczy nazwą kontenera, który chcesz utworzyć, aby przechowywać dane csv, na przykład flight-data-container.

  3. Aby przekazać (skopiować) dane csv na konto magazynu, wprowadź następujące polecenie.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Zastąp wartość symbolu <csv-folder-path> zastępczego ścieżką do pliku CSV .

    • Zastąp wartość symbolu zastępczego <storage-account-name> nazwą konta magazynu.

    • <container-name> Zastąp symbol zastępczy nazwą kontenera na koncie magazynu.

    • <directory-name> Zastąp symbol zastępczy nazwą katalogu do przechowywania danych w kontenerze, na przykład jan2016.

Instalowanie konta magazynu w klastrze usługi Databricks

W tej sekcji zainstalujesz magazyn obiektów w chmurze usługi Azure Data Lake Storage Gen2 w systemie plików usługi Databricks (DBFS). Używasz wcześniej utworzonej jednostki usługi Azure AD do uwierzytelniania przy użyciu konta magazynu. Aby uzyskać więcej informacji, zobacz Instalowanie magazynu obiektów w chmurze w usłudze Azure Databricks.

  1. Dołącz notes do klastra.

    1. W utworzonym wcześniej notesie wybierz przycisk Połączenie w prawym górnym rogu paska narzędzi notesu. Ten przycisk otwiera selektor obliczeniowy. (Jeśli notes został już połączony z klastrem, nazwa tego klastra jest wyświetlana w tekście przycisku zamiast Połączenie).

    2. W menu rozwijanym klastra wybierz utworzony wcześniej klaster.

    3. Zwróć uwagę, że tekst w selektorze klastra zmienia się na rozpoczęcie. Przed kontynuowaniem poczekaj na zakończenie uruchamiania klastra i zaczekaj na wyświetlenie nazwy klastra w przycisku.

  2. Skopiuj i wklej następujący blok kodu do pierwszej komórki, ale jeszcze nie uruchamiaj kodu.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. W tym bloku kodu:

    • W configspliku zastąp <appId>wartości zastępcze , <clientSecret>i <tenantId> identyfikatorem aplikacji, kluczem tajnym klienta i identyfikatorem dzierżawy skopiowanymi podczas tworzenia jednostki usługi w wymaganiach wstępnych.

    • W identyfikatorze source URI zastąp <storage-account-name>wartości zastępcze , <container-name>i <directory-name> nazwą konta magazynu usługi Azure Data Lake Storage Gen2 oraz nazwą kontenera i katalogu określonego podczas przekazywania danych lotu do konta magazynu.

      Uwaga

      Identyfikator schematu w identyfikatorze URI abfssinformuje usługę Databricks o użyciu sterownika systemu plików obiektów blob platformy Azure z protokołem Transport Layer Security (TLS). Aby dowiedzieć się więcej na temat identyfikatora URI, zobacz Use the Azure Data Lake Storage Gen2 URI (Korzystanie z identyfikatora URI usługi Azure Data Lake Storage Gen2).

  4. Przed kontynuowaniem upewnij się, że klaster został ukończony.

  5. Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.

Kontener i katalog, w którym przekazano dane lotu na koncie magazynu, jest teraz dostępny w notesie za pośrednictwem punktu instalacji /mnt/flightdata.

Konwertowanie formatu CSV na format Parquet za pomocą notesu usługi Databricks

Teraz, gdy dane lotu csv są dostępne za pośrednictwem punktu instalacji systemu plików DBFS, możesz użyć ramki danych platformy Apache Spark, aby załadować je do obszaru roboczego i zapisać je z powrotem w formacie Apache parquet do magazynu obiektów usługi Azure Data Lake Storage Gen2.

  • Ramka danych platformy Spark to dwuwymiarowa struktura danych z kolumnami potencjalnie różnych typów. Za pomocą ramki danych można łatwo odczytywać i zapisywać dane w różnych obsługiwanych formatach. Za pomocą ramki danych można ładować dane z magazynu obiektów w chmurze i wykonywać na nim analizy i przekształcenia wewnątrz klastra obliczeniowego bez wpływu na dane bazowe w magazynie obiektów w chmurze. Aby dowiedzieć się więcej, zobacz Praca z ramkami danych PySpark w usłudze Azure Databricks.

  • Apache parquet to format pliku kolumnowego z optymalizacjami, które przyspieszają zapytania. Jest to bardziej wydajny format pliku niż CSV lub JSON. Aby dowiedzieć się więcej, zobacz Parquet Files.

W notesie dodaj nową komórkę i wklej do niego następujący kod.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.

Przed przejściem do następnej sekcji upewnij się, że wszystkie dane parquet zostały zapisane, a komunikat "Done" (Gotowe) pojawia się w danych wyjściowych.

Eksplorowanie danych

W tej sekcji użyjesz narzędzia systemu plików usługi Databricks do eksplorowania magazynu obiektów usługi Azure Data Lake Storage Gen2 przy użyciu punktu instalacji systemu plików DBFS utworzonego w poprzedniej sekcji.

W nowej komórce wklej następujący kod, aby uzyskać listę plików w punkcie instalacji. Pierwsze polecenie zwraca listę plików i katalogów. Drugie polecenie wyświetla dane wyjściowe w formacie tabelarycznym, aby ułatwić odczytywanie.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Naciśnij klawisze SHIFT+ENTER, aby uruchomić kod w tym bloku.

Zwróć uwagę, że katalog parquet pojawia się na liście. Dane lotu csv zostały zapisane w formacie parquet w katalogu parquet/flights w poprzedniej sekcji. Aby wyświetlić listę plików w katalogu parquet/flights , wklej następujący kod do nowej komórki i uruchom go:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Aby utworzyć nowy plik i wyświetlić go, wklej następujący kod do nowej komórki i uruchom go:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Ponieważ w tym samouczku nie potrzebujesz pliku 1.txt , możesz wkleić następujący kod do komórki i uruchomić go, aby rekursywnie usunąć element mydirectory. Parametr True wskazuje cykliczne usuwanie.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Dla wygody możesz użyć polecenia pomocy, aby dowiedzieć się więcej o innych poleceniach.

dbutils.fs.help("rm")

Korzystając z tych przykładów kodu, zapoznaliśmy się z hierarchicznym charakterem systemu plików HDFS przy użyciu danych przechowywanych na koncie magazynu z włączoną usługą Azure Data Lake Storage Gen2.

Wykonywanie zapytań na danych

Następnie możesz rozpocząć wykonywanie zapytań dotyczących danych przekazanych na swoje konto magazynu. Wprowadź każdy z poniższych bloków kodu w nowej komórce i naciśnij klawisze SHIFT + ENTER , aby uruchomić skrypt języka Python.

Ramki danych zapewniają bogaty zestaw funkcji (wybieranie kolumn, filtrowanie, sprzężenie, agregowanie), które umożliwiają efektywne rozwiązywanie typowych problemów z analizą danych.

Aby załadować ramkę danych z wcześniej zapisanych danych lotu parquet i zapoznać się z niektórymi obsługiwanymi funkcjami, wprowadź ten skrypt w nowej komórce i uruchom go.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Wprowadź ten skrypt w nowej komórce, aby uruchomić kilka podstawowych zapytań analizy względem danych. Możesz uruchomić cały skrypt (SHIFT + ENTER), wyróżnić każde zapytanie i uruchomić je oddzielnie przy użyciu klawiszy CTRL + SHIFT + ENTER lub wprowadzić każde zapytanie w osobnej komórce i uruchomić je tam.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Podsumowanie

W tym samouczku zostały wykonane następujące czynności:

  • Utworzono zasoby platformy Azure, w tym konto magazynu usługi Azure Data Lake Storage Gen2 i jednostkę usługi Azure AD oraz przypisano uprawnienia dostępu do konta magazynu.

  • Utworzono obszar roboczy, notes i klaster obliczeniowy usługi Azure Databricks.

  • Za pomocą narzędzia AzCopy można przekazać dane lotów bez struktury csv do konta magazynu usługi Azure Data Lake Storage Gen2.

  • Używane funkcje narzędzi systemu plików usługi Databricks do instalowania konta magazynu usługi Azure Data Lake Storage Gen2 i eksplorowania jego hierarchicznego systemu plików.

  • Użyto ramek danych platformy Apache Spark, aby przekształcić dane lotu csv w format apache parquet i zapisać je z powrotem na koncie magazynu usługi Azure Data Lake Storage Gen2.

  • Używane ramki danych do eksplorowania danych lotu i wykonywania prostego zapytania.

  • Usługa Apache Spark SQL umożliwia wykonywanie zapytań dotyczących danych lotów dotyczących łącznej liczby lotów dla każdej linii lotniczej w styczniu 2016 r., lotnisk w Teksasie, linii lotniczych, które latają z Teksasu, średniego opóźnienia przylotu w minutach dla każdej linii lotniczej na szczeblu krajowym oraz procent lotów każdej linii lotniczej, które opóźniły loty lub przyloty.

Czyszczenie zasobów

Jeśli chcesz zachować notes i wrócić do niego później, warto zamknąć (zakończyć) klaster, aby uniknąć naliczania opłat. Aby zakończyć działanie klastra, wybierz go w selektorze obliczeniowym znajdującym się w prawym górnym rogu paska narzędzi notesu, wybierz pozycję Zakończ z menu i potwierdź wybór. (Domyślnie klaster zostanie automatycznie zakończony po 120 minutach braku aktywności).

Jeśli chcesz usunąć poszczególne zasoby obszaru roboczego, takie jak notesy i klastry, możesz to zrobić na lewym pasku bocznym obszaru roboczego. Aby uzyskać szczegółowe instrukcje, zobacz Usuwanie klastra lub Usuwanie notesu.

Gdy grupa zasobów i wszystkie pokrewne zasoby nie będą już potrzebne, usuń je. Aby to zrobić w witrynie Azure Portal, wybierz grupę zasobów dla konta magazynu i obszaru roboczego, a następnie wybierz pozycję Usuń.

Następne kroki