Usar o cluster do HDInsight Spark para leitura e gravação dos dados no Banco de Dados SQL do Azure

Saiba como conectar um cluster Apache Spark no Azure HDInsight com o Banco de Dados SQL do Azure. Em seguida, leia, grave e transmita dados para o banco de dados SQL. As instruções neste artigo usam um Jupyter Notebook para executar os trechos de código do Scala. No entanto, você pode criar um aplicativo autônomo em Scala ou Python e realizar as mesmas tarefas.

Pré-requisitos

  • Cluster do Azure HDInsight Spark. Siga as instruções em Criar um cluster do Apache Spark no HDInsight .

  • Banco de Dados SQL do Azure. Siga as instruções em Criar um banco de dados no Banco de Dados SQL do Azure. Certifique-se de criar um banco de dados com o esquema e dados AdventureWorksLT de exemplo. Além disso, certifique-se de criar uma regra de firewall no nível do servidor para permitir que o endereço IP do seu cliente acesse o banco de dados SQL. As instruções para adicionar a regra de firewall estão disponíveis no mesmo artigo. Após criar seu banco de dados SQL, certifique-se de manter os seguintes valores acessíveis. Esses valores serão necessários para se conectar ao banco de dados de um cluster Spark.

    • Nome de servidor.
    • nome do banco de dados.
    • Nome de usuário/senha do administrador do Banco de Dados SQL do Azure.
  • SSMS (SQL Server Management Studio). Siga as instruções em Usar SSMS para conectar e consultar dados.

Criará um Jupyter Notebook

Comece criando um Jupyter Notebook associado ao cluster do Spark. Você usa esse notebook para executar os snippets de código usados neste artigo.

  1. Do portal do Azure, abra o seu cluster.

  2. Selecione o Jupyter Notebook abaixo dos Painéis do cluster no lado direito. Se você não ver Painéis do cluster, selecione Visão geral no menu à esquerda. Se você receber uma solicitação, insira as credenciais de administrador para o cluster.

    Jupyter Notebook no Apache Spark

    Observação

    Também é possível acessar o Jupyter Notebook no cluster Spark, abrindo a seguinte URL no seu navegador. Substitua CLUSTERNAME pelo nome do cluster:

    https://CLUSTERNAME.azurehdinsight.net/jupyter

  3. No Jupyter Notebook, no canto superior direito, clique em Novo e em Spark para criar um notebook Scala. Os Jupyter Notebooks no cluster do HDInsight Spark também fornecem o kernel PySpark para aplicativos Python2 e o kernel PySpark3 para aplicativos Python3. Para este artigo, criamos um notebook Scala.

    Kernels para o Jupyter Notebook no Spark

    Para saber mais sobre os kernels, confira Usar kernels do Jupyter Notebook com clusters do Apache Spark no HDInsight.

    Observação

    Neste artigo, usamos um kernel (Scala) Spark porque atualmente os dados de streaming do Spark no Banco de Dados SQL têm suporte apenas em Scala e Java. Embora a leitura e gravação em SQL possam ser feitas usando o Python, visando manter a consistência neste artigo nós usamos o Scala para todas as três operações.

  4. Um novo notebook abre com o nome padrão, Sem título. Clique no nome do notebook e insira o nome de sua escolha.

    Fornecer um nome para o bloco de anotações

Agora, você pode iniciar a criação do seu aplicativo.

Leitura dos dados do Banco de Dados SQL do Azure

Nesta seção, você faz a leitura dos dados de uma tabela (por exemplo, SalesLT.Address) que existe no banco de dados AdventureWorks.

  1. Em um novo Jupyter Notebook, em uma célula de código, cole o seguinte trecho de código e substitua os valores de espaço reservados pelos valores para seu banco de dados.

    // Declare the values for your database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    Pressione SHIFT + ENTER para executar a célula de código.

  2. Use o trecho abaixo para compilar uma URL JDBC que você pode passar para as APIs do dataframe do Spark. O código cria um objeto Properties para manter os parâmetros. Cole o snippet de código a seguir em uma célula de código e pressione SHIFT+ENTER para executar.

    import java.util.Properties
    
    val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
    val connectionProperties = new Properties()
    connectionProperties.put("user", s"${jdbcUsername}")
    connectionProperties.put("password", s"${jdbcPassword}")
    
  3. Use o trecho de código abaixo para criar um dataframe com os dados de uma tabela no seu banco de dados. Neste trecho, usamos uma tabela SalesLT.Address que está disponível como parte do banco de dados AdventureWorksLT. Cole o snippet de código a seguir em uma célula de código e pressione SHIFT+ENTER para executar.

    val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
    
  4. Agora, é possível executar operações no dataframe, como para obter o esquema de dados:

    sqlTableDF.printSchema
    

    Você verá uma saída semelhante à seguinte imagem:

    schema output

  5. Também é possível executar operações como recuperar as 10 primeiras filas.

    sqlTableDF.show(10)
    
  6. Ou, recuperar colunas específicas do conjunto de dados.

    sqlTableDF.select("AddressLine1", "City").show(10)
    

Gravação de dados no Banco de Dados SQL do Azure

Nesta seção, usamos um arquivo CSV de exemplo disponível no cluster para criar uma tabela no banco de dados e preenchê-lo com dados. O arquivo CSV de exemplo (HVAC.csv) está disponível em todos os clusters HDInsight em HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv.

  1. Em um novo Jupyter Notebook, em uma célula de código, cole o seguinte trecho de código e substitua os valores de espaço reservados pelos valores para seu banco de dados.

    // Declare the values for your database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    Pressione SHIFT + ENTER para executar a célula de código.

  2. O trecho a seguir compila uma URL JDBC que você pode passar para as APIs do dataframe do Spark. O código cria um objeto Properties para manter os parâmetros. Cole o snippet de código a seguir em uma célula de código e pressione SHIFT+ENTER para executar.

    import java.util.Properties
    
    val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
    val connectionProperties = new Properties()
    connectionProperties.put("user", s"${jdbcUsername}")
    connectionProperties.put("password", s"${jdbcPassword}")
    
  3. Use o snippet de código abaixo para extrair o esquema dos dados em HVAC.csv e usar o esquema para carregar os dados do CSV em um dataframe, readDf. Cole o snippet de código a seguir em uma célula de código e pressione SHIFT+ENTER para executar.

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
    
  4. Use o dataframe readDf para criar uma tabela temporária, temphvactable. Em seguida, use a tabela temporária para criar uma tabela de hive, hvactable_hive.

    readDf.createOrReplaceTempView("temphvactable")
    spark.sql("create table hvactable_hive as select * from temphvactable")
    
  5. Por fim, use a tabela de hive para criar uma tabela no banco de dados. O trecho a seguir cria hvactable no Banco de Dados SQL do Azure.

    spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
    
  6. Conecte-se ao Banco de Dados SQL do Azure usando SSMS e verifique se há um dbo.hvactable.

    a. Inicie o SSMS e conecte-se ao Banco de Dados SQL do Azure, fornecendo os detalhes de conexão, como mostrado na captura de tela abaixo.

    Conectar-se ao Banco de Dados SQL usando o SSMS1

    b. No Pesquisador de Objetos, expanda o banco de dados e o nó da tabela para ver o dbo.hvactable criado.

    Conectar-se ao Banco de Dados SQL usando o SSMS2

  7. Execute uma consulta no SSMS para ver as colunas na tabela.

    SELECT * from hvactable
    

Transmitir dados no Banco de Dados SQL do Azure

Nesta seção, transmitimos dados para o hvactable que você criou na seção anterior.

  1. Como primeira etapa, verifique se não há registros no hvactable. Usando SSMS, execute a seguinte consulta na tabela.

    TRUNCATE TABLE [dbo].[hvactable]
    
  2. Crie um novo Jupyter Notebook no cluster do HDInsight Spark. Em uma célula de código, cole o seguinte snippet de código e, em seguida, pressione SHIFT + ENTER:

    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming._
    import java.sql.{Connection,DriverManager,ResultSet}
    
  3. Transmitimos os dados do HVAC.csv para o hvactable. O arquivo HVAC.csv está disponível no cluster em /HdiSamples/HdiSamples/SensorSampleData/HVAC/. No snippet de código a seguir, primeiro recebemos o esquema dos dados a serem transmitidos. Em seguida, criamos um dataframe de transmissão usando esse esquema. Cole o snippet de código a seguir em uma célula de código e pressione SHIFT+ENTER para executar.

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/") 
    readStreamDf.printSchema
    
  4. A saída mostra o esquema de HVAC.csv. O hvactable também tem o mesmo esquema. A saída lista as colunas na tabela.

    “tabela de esquema do hdinsight Apache Spark”

  5. Por fim, use o seguinte trecho para a leitura dos dados do HVAC.csv e transmita-o para o hvactable no seu banco de dados. Cole o trecho em uma célula de código, substitua os valores de espaço reservado pelos valores do seu banco de dados e pressione SHIFT + ENTER para executar.

    val WriteToSQLQuery  = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
        var connection:java.sql.Connection = _
        var statement:java.sql.Statement = _
    
        val jdbcUsername = "<SQL DB ADMIN USER>"
        val jdbcPassword = "<SQL DB ADMIN PWD>"
        val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
        val jdbcPort = 1433
        val jdbcDatabase ="<AZURE SQL DB NAME>"
        val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    
        def open(partitionId: Long, version: Long):Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
        statement = connection.createStatement
        true
        }
    
        def process(value: Row): Unit = {
        val Date  = value(0)
        val Time = value(1)
        val TargetTemp = value(2)
        val ActualTemp = value(3)
        val System = value(4)
        val SystemAge = value(5)
        val BuildingID = value(6)  
    
        val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
        statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")
        }
    
        def close(errorOrNull: Throwable): Unit = {
        connection.close
        }
        })
    
    var streamingQuery = WriteToSQLQuery.start()
    
  6. Verifique se os dados estão sendo transmitidos para o hvactable executando a seguinte consulta no SSMS (SQL Server Management Studio). Toda vez que você executa a consulta, ele mostra o número de linhas na tabela aumentando.

    SELECT COUNT(*) FROM hvactable
    

Próximas etapas