Azure SQL veritabanı 'na veri okumak ve yazmak için HDInsight Spark kümesi kullanma

Azure SQL veritabanı ile Azure HDInsight 'ta bir Apache Spark kümesini bağlamayı öğrenin. Sonra verileri SQL veritabanına okuyun, yazın ve veri akışını yapın. Bu makaledeki yönergeler, Scala kod parçacıklarını çalıştırmak için bir Jupyter Notebook kullanır. Ancak, Scala veya Python 'da tek başına bir uygulama oluşturabilir ve aynı görevleri gerçekleştirebilirsiniz.

Önkoşullar

  • Azure HDInsight Spark küme. HDInsight 'ta Apache Spark kümesi oluşturma bölümündekiyönergeleri izleyin.

  • Azure SQL Veritabanı. Azure SQL veritabanı 'nda veritabanı oluşturma bölümündekiyönergeleri izleyin. Örnek AdventureWorksLT şeması ve verilerle bir veritabanı oluşturduğunuzdan emin olun. Ayrıca, istemcinizin IP adresinin SQL veritabanına erişmesine izin vermek için sunucu düzeyinde bir güvenlik duvarı kuralı oluşturduğunuzdan emin olun. Güvenlik duvarı kuralını ekleme yönergeleri aynı makalede bulunur. SQL veritabanınızı oluşturduktan sonra, aşağıdaki değerleri yararlı tutmanız gerekir. Bir Spark kümesinden veritabanına bağlanmanız gerekir.

    • Sunucu adı.
    • Veritabanı adı.
    • Azure SQL veritabanı yönetici Kullanıcı adı/parolası.
  • SQL Server Management Studio (SSMS). Verileri bağlanmak ve sorgulamak IÇIN SSMS kullanmayönergelerini izleyin.

Jupyter Notebook oluşturma

Spark kümesiyle ilişkili bir Jupyter Notebook oluşturarak başlayın. Bu makalede kullanılan kod parçacıklarını çalıştırmak için bu not defterini kullanırsınız.

  1. Azure Portal, kümenizi açın.

  2. Sağ tarafta küme panoları altında Jupyter Notebook seçin. Küme panoları görmüyorsanız, sol menüden genel bakış ' ı seçin. İstenirse, küme için yönetici kimlik bilgilerini girin.

    Apache Spark Jupyter Notebook

    Not

    Ayrıca, tarayıcınızda aşağıdaki URL 'YI açarak Spark kümesindeki Jupyter Notebook de erişebilirsiniz. CLUSTERNAME değerini kümenizin adıyla değiştirin:

    https://CLUSTERNAME.azurehdinsight.net/jupyter

  3. Jupyter Notebook, sağ üst köşedeki Yeni' ye ve ardından Spark ' a tıklayarak bir Scala Not defteri oluşturun. HDInsight Spark kümesindeki jupi Not defterleri, Python2 uygulamaları için Pyspark çekirdeği ve Python3 uygulamaları için PySpark3 çekirdeğini de sağlar. Bu makalede bir Scala Not defteri oluşturacağız.

    Spark üzerinde Jupyter Notebook için kernels

    Çekirdekler hakkında daha fazla bilgi için bkz. HDInsight 'ta Apache Spark kümeleriyle Jupyter Notebook çekirdekler kullanma.

    Not

    Bu makalede Spark (Scala) çekirdeği kullanıyoruz çünkü Spark 'tan SQL veritabanı 'na veri akışı yalnızca Scala ve Java 'da desteklenmektedir. SQL 'e okuma ve yazma işlemi, bu makalede tutarlılık için Python kullanılarak yapılabilir, ancak üç işlem için de Scala kullanırız.

  4. Varsayılan adla yeni bir not defteri açılır. Not defteri adına tıklayın ve seçtiğiniz bir adı girin.

    Not defteri adını belirtme

Artık uygulamanızı oluşturmaya başlayabilirsiniz.

Azure SQL veritabanı 'ndan veri okuma

Bu bölümde, AdventureWorks veritabanında bulunan bir tablodan (örneğin, SalesLT. Address) verileri okuyabilirsiniz.

  1. Yeni bir Jupyter Notebook, bir kod hücresinde aşağıdaki kod parçacığını yapıştırın ve yer tutucu değerlerini veritabanınızın değerleriyle değiştirin.

    // Declare the values for your database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    Kod hücresini çalıştırmak için SHIFT + ENTER tuşlarına basın.

  2. Spark dataframe API 'Lerine geçirebilmeniz için bir JDBC URL 'SI oluşturmak üzere aşağıdaki kod parçacığını kullanın. Kod, Properties parametreleri tutacak bir nesne oluşturur. Parçacığı bir kod hücresine yapıştırın ve çalıştırmak için SHIFT + enter tuşlarına basın.

    import java.util.Properties
    
    val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
    val connectionProperties = new Properties()
    connectionProperties.put("user", s"${jdbcUsername}")
    connectionProperties.put("password", s"${jdbcPassword}")
    
  3. Veritabanınızdaki bir tablodaki verilerle bir dataframe oluşturmak için aşağıdaki kod parçacığını kullanın. Bu kod parçacığında, SalesLT.Address AdventureWorksLT veritabanının bir parçası olarak kullanılabilen bir tablo kullanıyoruz. Parçacığı bir kod hücresine yapıştırın ve çalıştırmak için SHIFT + enter tuşlarına basın.

    val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
    
  4. Artık dataframe üzerinde, veri şemasını alma gibi işlemler yapabilirsiniz:

    sqlTableDF.printSchema
    

    Aşağıdaki görüntüye benzer bir çıktı görürsünüz:

    şema çıkışı

  5. Ayrıca, ilk 10 satırı alma gibi işlemleri de yapabilirsiniz.

    sqlTableDF.show(10)
    
  6. Ya da veri kümesinden belirli sütunları alın.

    sqlTableDF.select("AddressLine1", "City").show(10)
    

Azure SQL veritabanı 'na veri yazma

Bu bölümde, veritabanında bir tablo oluşturmak ve verileri veriyle doldurmak için kümede bulunan örnek bir CSV dosyası kullanıyoruz. Örnek CSV dosyası (HVAC.csv), adresindeki tüm HDInsight kümelerinde kullanılabilir HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv .

  1. Yeni bir Jupyter Notebook, bir kod hücresinde aşağıdaki kod parçacığını yapıştırın ve yer tutucu değerlerini veritabanınızın değerleriyle değiştirin.

    // Declare the values for your database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    Kod hücresini çalıştırmak için SHIFT + ENTER tuşlarına basın.

  2. Aşağıdaki kod parçacığı Spark dataframe API 'Lerine geçirebilmeniz için bir JDBC URL 'SI oluşturur. Kod, Properties parametreleri tutacak bir nesne oluşturur. Parçacığı bir kod hücresine yapıştırın ve çalıştırmak için SHIFT + enter tuşlarına basın.

    import java.util.Properties
    
    val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
    val connectionProperties = new Properties()
    connectionProperties.put("user", s"${jdbcUsername}")
    connectionProperties.put("password", s"${jdbcPassword}")
    
  3. Aşağıdaki kod parçacığını kullanarak HVAC.csv verilerin şemasını ayıklayın ve verileri bir dataframe içindeki CSV 'den yüklemek için şemayı kullanın readDf . Parçacığı bir kod hücresine yapıştırın ve çalıştırmak için SHIFT + enter tuşlarına basın.

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
    
  4. readDfGeçici bir tablo oluşturmak için veri çerçevesini kullanın temphvactable . Ardından, bir Hive tablosu oluşturmak için geçici tabloyu kullanın hvactable_hive .

    readDf.createOrReplaceTempView("temphvactable")
    spark.sql("create table hvactable_hive as select * from temphvactable")
    
  5. Son olarak, veritabanınızda tablo oluşturmak için Hive tablosunu kullanın. Aşağıdaki kod parçacığı hvactable Azure SQL veritabanı 'nda oluşturulur.

    spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
    
  6. SSMS kullanarak Azure SQL veritabanı 'na bağlanın ve burada görmediğinizi doğrulayın dbo.hvactable .

    a. SSMS 'yi başlatın ve aşağıdaki ekran görüntüsünde gösterildiği gibi bağlantı ayrıntılarını sağlayarak Azure SQL veritabanına bağlanın.

    SSMS1 kullanarak SQL veritabanı 'na bağlanma

    b. Nesne Gezgini, dbo. hboş tablosunun oluşturulduğunu görmek için veritabanını ve tablo düğümünü genişletin.

    SSMS2 kullanarak SQL veritabanı 'na bağlanma

  7. Tablodaki sütunları görmek için SSMS 'de bir sorgu çalıştırın.

    SELECT * from hvactable
    

Azure SQL veritabanı 'na veri akışı

Bu bölümde, önceki bölümde oluşturduğunuz içine veri akışı yaptık hvactable .

  1. İlk adım olarak, içinde hiçbir kayıt bulunmadığından emin olun hvactable . SSMS 'yi kullanarak tabloda aşağıdaki sorguyu çalıştırın.

    TRUNCATE TABLE [dbo].[hvactable]
    
  2. HDInsight Spark kümesinde yeni bir Jupyter Notebook oluşturun. Bir kod hücresinde aşağıdaki kod parçacığını yapıştırın ve ardından SHIFT + enter tuşlarına basın:

    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming._
    import java.sql.{Connection,DriverManager,ResultSet}
    
  3. HVAC.csv verileri içine akışımız hvactable . HVAC.csv dosya üzerinde kümede kullanılabilir /HdiSamples/HdiSamples/SensorSampleData/HVAC/ . Aşağıdaki kod parçacığında, ilk olarak akışa alınacak verilerin şemasını alırız. Daha sonra, bu şemayı kullanarak bir akış veri çerçevesi oluşturacağız. Parçacığı bir kod hücresine yapıştırın ve çalıştırmak için SHIFT + enter tuşlarına basın.

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/") 
    readStreamDf.printSchema
    
  4. Çıktıda HVAC.csv şeması gösterilmektedir. hvactableAynı şemaya da sahiptir. Çıktı, tablodaki sütunları listeler.

    ' HDInsight Apache Spark şeması tablosu '

  5. Son olarak, HVAC.csv verileri okumak ve veritabanınızdaki içine aktarmak için aşağıdaki kod parçacığını kullanın hvactable . Parçacığı bir kod hücresine yapıştırın, yer tutucu değerlerini veritabanınızın değerleriyle değiştirin ve ardından çalıştırmak için SHIFT + enter tuşlarına basın.

    val WriteToSQLQuery  = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
        var connection:java.sql.Connection = _
        var statement:java.sql.Statement = _
    
        val jdbcUsername = "<SQL DB ADMIN USER>"
        val jdbcPassword = "<SQL DB ADMIN PWD>"
        val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
        val jdbcPort = 1433
        val jdbcDatabase ="<AZURE SQL DB NAME>"
        val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    
        def open(partitionId: Long, version: Long):Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
        statement = connection.createStatement
        true
        }
    
        def process(value: Row): Unit = {
        val Date  = value(0)
        val Time = value(1)
        val TargetTemp = value(2)
        val ActualTemp = value(3)
        val System = value(4)
        val SystemAge = value(5)
        val BuildingID = value(6)  
    
        val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
        statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")
        }
    
        def close(errorOrNull: Throwable): Unit = {
        connection.close
        }
        })
    
    var streamingQuery = WriteToSQLQuery.start()
    
  6. hvactableSQL Server Management Studio (SSMS) ' de aşağıdaki sorguyu çalıştırarak verilerin öğesine aktarılmakta olduğunu doğrulayın. Sorguyu her çalıştırdığınızda, tablodaki satır sayısını artan şekilde gösterir.

    SELECT COUNT(*) FROM hvactable
    

Sonraki adımlar