Öğretici: Azure Data Lake Storage 2. Nesil, Azure Databricks ve Spark

Bu öğreticide, Azure Databricks kümenizi Azure Data Lake Storage 2. Nesil etkinleştirilmiş bir Azure depolama hesabında depolanan verilere nasıl bağlayacağınız gösterilmektedir. Bu bağlantı, kümenizden verilerinizdeki sorguları ve analizleri yerel olarak çalıştırmanıza olanak tanır.

Bu öğreticide şunları yapacaksınız:

  • Yapılandırılmamış verileri bir depolama hesabına alma
  • Blob depolamadaki verileriniz üzerinde analiz çalıştırma

Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.

Önkoşullar

  • Hiyerarşik ad alanına sahip bir depolama hesabı oluşturma (Azure Data Lake Storage 2. Nesil)

    Bkz. Azure Data Lake Storage 2. Nesil ile kullanmak için depolama hesabı oluşturma.

  • Kullanıcı hesabınızın kendisine atanmış Depolama Blob Verileri Katkıda Bulunanı rolüne sahip olduğundan emin olun.

  • AzCopy v10'u yükleyin. Bkz. AzCopy v10 ile veri aktarma

  • Bir hizmet sorumlusu oluşturun, bir istemci gizli dizisi oluşturun ve ardından hizmet sorumlusuna depolama hesabına erişim verin.

    Bkz. Öğretici: Azure Data Lake Storage 2. Nesil için Bağlan (1- 3. Adım). Bu adımları tamamladıktan sonra kiracı kimliği, uygulama kimliği ve istemci gizli anahtarı değerlerini bir metin dosyasına yapıştırdığınızdan emin olun. Daha sonra bu öğreticide kullanacaksınız.

Azure Databricks çalışma alanı, küme ve not defteri oluşturma

  1. Azure Databricks çalışma alanı oluşturun. Bkz. Azure Databricks çalışma alanı oluşturma.

  2. Küme oluşturma. Bkz. Küme oluşturma.

  3. Bir not defteri oluşturun. Bkz . Not defteri oluşturma. Not defterinin varsayılan dili olarak Python'ı seçin.

Not defterinizi açık tutun. Bunu aşağıdaki bölümlerde kullanırsınız.

Uçuş verilerini indirme

Bu öğreticide, BIR ETL işleminin nasıl gerçekleştirileceklerini göstermek için Ulaşım İstatistikleri Bürosu'ndan Ocak 2016'ya ait zamanında performans uçuş verileri kullanılır. Öğreticiyi tamamlamak için bu verileri indirmeniz gerekir.

  1. On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip dosyasını indirin. Bu dosya, uçuş verilerini içerir.

  2. Sıkıştırılmış dosyanın içeriğini açın ve dosya adını ve dosyanın yolunu not edin. Bu bilgilere sonraki bir adımda ihtiyacınız olacak.

Zamanında raporlama performansı verilerinde yakalanan bilgiler hakkında bilgi edinmek istiyorsanız, Ulaşım İstatistikleri Bürosu web sitesinde alan açıklamalarını görebilirsiniz.

Verileri alma

Bu bölümde,.csv uçuş verilerini Azure Data Lake Storage 2. Nesil hesabınıza yükleyecek ve depolama hesabını Databricks kümenize bağlayacaksınız. Son olarak Databricks'i kullanarak .csv uçuş verilerini okur ve Apache parquet biçiminde depolama alanına geri yazarsınız.

Uçuş verilerini depolama hesabınıza yükleme

.csv dosyanızı Azure Data Lake Storage 2. Nesil hesabınıza kopyalamak için AzCopy kullanın. komutunu kullanarak azcopy make depolama hesabınızda bir kapsayıcı oluşturursunuz. Ardından komutunu kullanarak azcopy copy az önce indirdiğiniz csv verilerini bu kapsayıcıdaki bir dizine kopyalarsınız.

Aşağıdaki adımlarda, oluşturmak istediğiniz kapsayıcının adlarını ve uçuş verilerini kapsayıcıya yüklemek istediğiniz dizini ve blobu girmeniz gerekir. Her adımda önerilen adları kullanabilir veya kapsayıcılar, dizinler ve bloblar için adlandırma kurallarını gözlemleyerek kendi adlarınızı belirtebilirsiniz.

  1. Bir komut istemi penceresi açın ve depolama hesabınıza erişmek için Azure Active Directory'de oturum açmak için aşağıdaki komutu girin.

    azcopy login
    

    Kullanıcı hesabınızın kimliğini doğrulamak için komut istemi penceresinde görüntülenen yönergeleri izleyin.

  2. Depolama hesabınızda uçuş verilerini depolamak üzere bir kapsayıcı oluşturmak için aşağıdaki komutu girin:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • <storage-account-name> Yer tutucu değerini depolama hesabınızın adıyla değiştirin.

    • Yer tutucuyu <container-name> csv verilerini depolamak için oluşturmak istediğiniz kapsayıcının adıyla değiştirin; örneğin flight-data-container.

  3. Csv verilerini depolama hesabınıza yüklemek (kopyalamak) için aşağıdaki komutu girin.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • <csv-folder-path> Yer tutucu değerini .csv dosyasının yoluyla değiştirin.

    • <storage-account-name> Yer tutucu değerini depolama hesabınızın adıyla değiştirin.

    • Yer tutucuyu <container-name> depolama hesabınızdaki kapsayıcının adıyla değiştirin.

    • Yer tutucuyu <directory-name> , verilerinizi kapsayıcıda depolamak için bir dizinin adıyla değiştirin; örneğin, jan2016.

Depolama hesabınızı Databricks kümenize bağlama

Bu bölümde, Azure Data Lake Storage 2. Nesil bulut nesne depolama alanınızı Databricks Dosya Sistemi'ne (DBFS) bağlarsınız. Depolama hesabıyla kimlik doğrulaması için daha önce oluşturduğunuz Azure AD hizmet ilkesini kullanırsınız. Daha fazla bilgi için bkz . Azure Databricks'te bulut nesne depolamasını bağlama.

  1. Not defterinizi kümenize ekleyin.

    1. Daha önce oluşturduğunuz not defterinde, not defteri araç çubuğunun sağ üst köşesindeki Bağlan düğmesini seçin. Bu düğme işlem seçicisini açar. (Not defterinizi zaten bir kümeye bağladıysanız, bu kümenin adı düğme metninde Bağlan).

    2. Küme açılan menüsünde daha önce oluşturduğunuz kümeyi seçin.

    3. Küme seçicideki metnin başlatılıyor olarak değiştiğine dikkat edin. Devam etmeden önce kümenin başlatılmasını ve küme adının düğmede görünmesini bekleyin.

  2. Aşağıdaki kod bloğunu kopyalayıp ilk hücreye yapıştırın, ancak bu kodu henüz çalıştırmayın.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. Bu kod bloğunda:

    • içinde configs, <clientSecret>ve <tenantId> yer tutucu değerlerini, önkoşullarda hizmet sorumlusunu oluştururken kopyaladığınız uygulama kimliği, istemci gizli dizisi ve kiracı kimliğiyle değiştirin<appId>.

    • source URI'de , <container-name>ve <directory-name> yer tutucu değerlerini Azure Data Lake Storage 2. Nesil depolama hesabınızın adıyla ve uçuş verilerini depolama hesabına yüklerken belirttiğiniz kapsayıcı ve dizinin adıyla değiştirin<storage-account-name>.

      Dekont

      URI'deki düzen tanımlayıcısı Databricks'e abfssAktarım Katmanı Güvenliği (TLS) ile Azure Blob Dosya Sistemi sürücüsünü kullanmasını söyler. URI hakkında daha fazla bilgi edinmek için bkz. Azure Data Lake Storage 2. Nesil URI'sini kullanma.

  4. Devam etmeden önce kümenizin başlatılmasının tamamlandığından emin olun.

  5. Kodu bu blokta çalıştırmak için SHIFT + ENTER tuşlarına basın.

Depolama hesabınıza uçuş verilerini yüklediğiniz kapsayıcıya ve dizine artık not defterinizde /mnt/flightdata bağlama noktası üzerinden erişilebilir.

Databricks Not Defteri'ni kullanarak CSV'yi Parquet biçimine dönüştürme

Csv uçuş verilerine dbfs bağlama noktası üzerinden erişilebildiğine göre, apache Spark DataFrame kullanarak çalışma alanınıza yükleyebilir ve Azure Data Lake Storage 2. Nesil nesne depolama alanınıza Apache parquet biçiminde geri yazabilirsiniz.

  • Spark DataFrame, potansiyel olarak farklı türlerde sütunlar içeren iki boyutlu etiketli bir veri yapısıdır. DataFrame kullanarak desteklenen çeşitli biçimlerde verileri kolayca okuyabilir ve yazabilirsiniz. DataFrame ile bulut nesne depolamasından veri yükleyebilir ve bulut nesne depolamasında temel alınan verileri etkilemeden işlem kümenizin içinde analiz ve dönüşümler gerçekleştirebilirsiniz. Daha fazla bilgi edinmek için bkz . Azure Databricks'te PySpark DataFrames ile çalışma.

  • Apache parquet, sorguları hızlandıran iyileştirmelere sahip sütunlu bir dosya biçimidir. CSV veya JSON'dan daha verimli bir dosya biçimidir. Daha fazla bilgi edinmek için bkz . Parquet Dosyaları.

Not defterine yeni bir hücre ekleyin ve içine aşağıdaki kodu yapıştırın.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Kodu bu blokta çalıştırmak için SHIFT + ENTER tuşlarına basın.

Sonraki bölüme geçmeden önce, tüm parquet verilerinin yazıldığına ve çıktıda "Bitti" ifadesinin göründüğünden emin olun.

Verileri inceleme

Bu bölümde, önceki bölümde oluşturduğunuz DBFS bağlama noktasını kullanarak Azure Data Lake Storage 2. Nesil nesne depolama alanınızı keşfetmek için Databricks dosya sistemi yardımcı programını kullanacaksınız.

Bağlama noktasında dosyaların listesini almak için yeni bir hücreye aşağıdaki kodu yapıştırın. İlk komut bir dosya ve dizin listesi oluşturur. İkinci komut, daha kolay okunması için çıkışı tablo biçiminde görüntüler.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Kodu bu blokta çalıştırmak için SHIFT + ENTER tuşlarına basın.

Parquet dizininin listede göründüğüne dikkat edin. .csv uçuş verilerini parquet biçiminde önceki bölümde parquet/flight dizinine kaydettiniz. Parquet/flights dizinindeki dosyaları listelemek için aşağıdaki kodu yeni bir hücreye yapıştırın ve çalıştırın:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Yeni bir dosya oluşturup listelemek için aşağıdaki kodu yeni bir hücreye yapıştırın ve çalıştırın:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Bu öğreticide 1.txt dosyasına ihtiyacınız olmadığından, aşağıdaki kodu bir hücreye yapıştırabilir ve yinelemeli olarak mydirectory'yi silmek için çalıştırabilirsiniz. True parametresi özyinelemeli silme işlemini gösterir.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Kolaylık olması için yardım komutunu kullanarak diğer komutlar hakkında ayrıntılı bilgi edinebilirsiniz.

dbutils.fs.help("rm")

Bu kod örnekleriyle, Azure Data Lake Storage 2. Nesil etkin bir depolama hesabında depolanan verileri kullanarak HDFS'nin hiyerarşik doğasını keşfettiniz.

Verileri sorgulama

Bir sonraki adımda depolama hesabınıza yüklediğiniz verileri sorgulamaya başlayabilirsiniz. Aşağıdaki kod bloklarının her birini yeni bir hücreye girin ve Python betiğini çalıştırmak için SHIFT + ENTER tuşlarına basın.

DataFrame'ler, yaygın veri çözümleme sorunlarını verimli bir şekilde çözmenize olanak sağlayan zengin bir işlev kümesi (sütunları seçme, filtreleme, birleştirme, toplama) sağlar.

Daha önce kaydedilmiş parquet uçuş verilerinizden bir DataFrame yüklemek ve desteklenen işlevlerden bazılarını keşfetmek için bu betiği yeni bir hücreye girin ve çalıştırın.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Verilere karşı bazı temel çözümleme sorguları çalıştırmak için bu betiği yeni bir hücreye girin. Betiğin tamamını çalıştırmayı (SHIFT + ENTER) seçebilir, her sorguyu vurgulayabilir ve CTRL + SHIFT + ENTER ile ayrı olarak çalıştırabilir veya her sorguyu ayrı bir hücreye girip orada çalıştırabilirsiniz.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Özet

Bu öğreticide şunları yaptınız:

  • Azure Data Lake Storage 2. Nesil depolama hesabı ve Azure AD hizmet sorumlusu dahil olmak üzere Azure kaynakları ve depolama hesabına erişim izinleri oluşturuldu.

  • Azure Databricks çalışma alanı, not defteri ve işlem kümesi oluşturuldu.

  • Yapılandırılmamış .csv uçuş verilerini Azure Data Lake Storage 2. Nesil depolama hesabına yüklemek için AzCopy'yi kullandı.

  • Azure Data Lake Storage 2. Nesil depolama hesabınızı bağlamak ve hiyerarşik dosya sistemini keşfetmek için Databricks Dosya Sistemi yardımcı programı işlevlerini kullandınız.

  • .csv uçuş verilerinizi Apache parquet biçimine dönüştürmek ve Azure Data Lake Storage 2. Nesil depolama hesabınıza geri depolamak için Apache Spark DataFrames'i kullandınız.

  • DataFrame'leri kullanarak uçuş verilerini inceleyin ve basit bir sorgu gerçekleştirin.

  • Apache Spark SQL'i kullanarak Ocak 2016'da her havayolu için toplam uçuş sayısı, Texas'taki havalimanları, Texas'tan uçan hava yolları, ulusal olarak her havayolu için dakika cinsinden ortalama varış gecikmesi ve her havayolunun kalkış veya varış gecikmesi olan uçuşlarının yüzdesini sorgulamak için kullanılır.

Kaynakları temizleme

Not defterini korumak ve daha sonra geri dönmek istiyorsanız ücret ödememek için kümenizi kapatmanız (sonlandırmanız) iyi bir fikirdir. Kümenizi sonlandırmak için not defteri araç çubuğunun sağ üst kısmındaki işlem seçicide kümeyi seçin, menüden Sonlandır'ı seçin ve seçiminizi onaylayın. (Varsayılan olarak, küme 120 dakika etkinlik dışı kalma süresinden sonra otomatik olarak sonlandırılır.)

Not defterleri ve kümeler gibi tek tek çalışma alanı kaynaklarını silmek istiyorsanız, bunu çalışma alanının sol kenar çubuğundan yapabilirsiniz. Ayrıntılı yönergeler için bkz . Küme silme veya Not defterini silme.

Artık gerekli olmadığında kaynak grubunu ve tüm ilgili kaynakları silin. Azure portalında bunu yapmak için depolama hesabı ve çalışma alanı için kaynak grubunu seçin ve Sil'i seçin.

Sonraki adımlar