Esercitazione: Trasmettere dati in Azure Databricks tramite Hub eventiTutorial: Stream data into Azure Databricks using Event Hubs

Importante

Questa esercitazione funziona con la versione di Azure Databricks Runtime 5.2.This tutorial works with the version of Azure Databricks runtime 5.2.

In questa esercitazione si connette un sistema di inserimento dati ad Azure Databricks per trasmettere dati a un cluster Apache Spark quasi in tempo reale.In this tutorial, you connect a data ingestion system with Azure Databricks to stream data into an Apache Spark cluster in near real-time. Il sistema di inserimento dati viene configurato con Hub eventi di Azure e quindi connesso ad Azure Databricks per l'elaborazione dei messaggi in arrivo.You set up data ingestion system using Azure Event Hubs and then connect it to Azure Databricks to process the messages coming through. Per accedere a un flusso di dati si usano API Twitter per inserire i tweet in Hub eventi.To access a stream of data, you use Twitter APIs to ingest tweets into Event Hubs. Quando i dati sono disponibili in Azure Databricks, è possibile eseguire processi di analisi per esaminarli.Once you have the data in Azure Databricks, you can run analytical jobs to further analyze the data.

Al termine di questa esercitazione saranno stati trasmessi tweet di Twitter contenenti il termine "Azure" e sarà stata eseguita la lettura dei tweet in Azure Databricks.By the end of this tutorial, you would have streamed tweets from Twitter (that have the term "Azure" in them) and read the tweets in Azure Databricks.

L'illustrazione seguente mostra il flusso dell'applicazione:The following illustration shows the application flow:

Azure Databricks con Hub eventiAzure Databricks with Event Hubs

Questa esercitazione illustra le attività seguenti:This tutorial covers the following tasks:

  • Creare un'area di lavoro di Azure DatabricksCreate an Azure Databricks workspace
  • Creare un cluster Spark in Azure DatabricksCreate a Spark cluster in Azure Databricks
  • Creare un'app Twitter per accedere a dati in streamingCreate a Twitter app to access streaming data
  • Creare notebook in Azure DatabricksCreate notebooks in Azure Databricks
  • Collegare librerie per Hub eventi e API TwitterAttach libraries for Event Hubs and Twitter API
  • Inviare tweet a Hub eventiSend tweets to Event Hubs
  • Leggere tweet da Hub eventiRead tweets from Event Hubs

Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.If you don't have an Azure subscription, create a free account before you begin.

Nota

Questa esercitazione non può essere eseguita usando una sottoscrizione di valutazione gratuita di Azure.This tutorial cannot be carried out using Azure Free Trial Subscription. Se l'utente ha un account gratuito, andare al proprio profilo e modificare la sottoscrizione a con pagamento in base al consumo.If you have a free account, go to your profile and change your subscription to pay-as-you-go. Per altre informazioni, vedere Account gratuito di Azure.For more information, see Azure free account. Quindi rimuovere il limite di spesa e richiedere un aumento della quota per le vCPU nell'area dell'utente.Then, remove the spending limit, and request a quota increase for vCPUs in your region. Quando si crea l'area di lavoro Azure Databricks, è possibile selezionare il piano tariffario Versione di valutazione (Premium - Unità Databricks gratuite per 14 giorni) per concedere l'accesso gratuito Premium per 14 giorni dell'area di lavoro alle Unità Databricks di Azure.When you create your Azure Databricks workspace, you can select the Trial (Premium - 14-Days Free DBUs) pricing tier to give the workspace access to free Premium Azure Databricks DBUs for 14 days.

PrerequisitiPrerequisites

Prima di iniziare l'esercitazione, verificare che siano soddisfatti i requisiti seguenti:Before you start with this tutorial, make sure to meet the following requirements:

  • Uno spazio dei nomi di Hub eventi in Azure.An Azure Event Hubs namespace.
  • Un hub eventi nello spazio dei nomi.An Event Hub within the namespace.
  • Stringa di connessione per l'accesso allo spazio dei nomi di Hub eventi.Connection string to access the Event Hubs namespace. La stringa di connessione deve avere un formato simile a Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.The connection string should have a format similar to Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.
  • Nome dei criteri di accesso condiviso e chiave dei criteri per Hub eventi.Shared access policy name and policy key for Event Hubs.

È possibile soddisfare questi requisiti completando la procedura descritta nell'articolo Creare uno spazio dei nomi di Hub eventi e un hub eventi.You can meet these requirements by completing the steps in the article, Create an Azure Event Hubs namespace and event hub.

Accedere al portale di AzureSign in to the Azure portal

Accedere al portale di Azure.Sign in to the Azure portal.

Creare un'area di lavoro di Azure DatabricksCreate an Azure Databricks workspace

In questa sezione viene creata un'area di lavoro di Azure Databricks usando il portale di Azure.In this section, you create an Azure Databricks workspace using the Azure portal.

  1. Nel portale di Azure selezionare Crea una risorsa > Dati e analisi > Azure Databricks.In the Azure portal, select Create a resource > Data + Analytics > Azure Databricks.

    Databricks nel portale di AzureDatabricks on Azure portal

  2. Nella pagina Servizio Azure Databricks specificare i valori per creare un'area di lavoro di Databricks.Under Azure Databricks Service, provide the values to create a Databricks workspace.

    Creare un'area di lavoro di Azure DatabricksCreate an Azure Databricks workspace

    Specificare i valori seguenti:Provide the following values:

    ProprietàProperty DescrizioneDescription
    Nome area di lavoroWorkspace name Specificare un nome per l'area di lavoro di DatabricksProvide a name for your Databricks workspace
    SottoscrizioneSubscription Selezionare la sottoscrizione di Azure nell'elenco a discesa.From the drop-down, select your Azure subscription.
    Gruppo di risorseResource group Specificare se si vuole creare un nuovo gruppo di risorse o usarne uno esistente.Specify whether you want to create a new resource group or use an existing one. Un gruppo di risorse è un contenitore con risorse correlate per una soluzione Azure.A resource group is a container that holds related resources for an Azure solution. Per altre informazioni, vedere Panoramica di Gestione risorse di Microsoft Azure.For more information, see Azure Resource Group overview.
    PosizioneLocation Selezionare Stati Uniti orientali 2.Select East US 2. Per le altre aree disponibili, vedere Prodotti disponibili in base all'area.For other available regions, see Azure services available by region.
    Piano tariffarioPricing Tier Scegliere tra Standard e Premium.Choose between Standard or Premium. Per altre informazioni su questi piani tariffari, vedere la pagina dei prezzi di Databricks.For more information on these tiers, see Databricks pricing page.

    Selezionare Aggiungi al dashboard e quindi Crea.Select Pin to dashboard and then select Create.

  3. La creazione dell'account richiede alcuni minuti,The account creation takes a few minutes. durante i quali il portale visualizza il riquadro Invio della distribuzione per Azure Databricks a destra.During account creation, the portal displays the Submitting deployment for Azure Databricks tile on the right side. Potrebbe essere necessario scorrere verso destra nel dashboard per visualizzare il riquadro.You may need to scroll right on your dashboard to see the tile. È presente anche un indicatore di stato nella parte superiore della schermata.There is also a progress bar displayed near the top of the screen. È possibile esaminare lo stato di avanzamento nelle due aree.You can watch either area for progress.

    Riquadro di distribuzione di DatabricksDatabricks deployment tile

Creare un cluster Spark in DatabricksCreate a Spark cluster in Databricks

  1. Nel portale di Azure passare all'area di lavoro di Databricks creata e quindi selezionare Avvia area di lavoro.In the Azure portal, go to the Databricks workspace that you created, and then select Launch Workspace.

  2. Si verrà reindirizzati al portale di Azure Databricks.You are redirected to the Azure Databricks portal. Nel portale selezionare Cluster.From the portal, select Cluster.

    Databricks in AzureDatabricks on Azure

  3. Nella pagina New cluster (Nuovo cluster) specificare i valori per creare un cluster.In the New cluster page, provide the values to create a cluster.

    Creare un cluster di Databricks Spark in AzureCreate Databricks Spark cluster on Azure

    Accettare tutti gli altri valori predefiniti tranne i seguenti:Accept all other default values other than the following:

    • Immettere un nome per il cluster.Enter a name for the cluster.
    • Per questo articolo creare un cluster con il runtime 5.2.For this article, create a cluster with 5.2 runtime.
    • Assicurarsi di selezionare la casella di controllo Terminate after __ minutes of inactivity (Termina dopo __ minuti di attività).Make sure you select the Terminate after __ minutes of inactivity checkbox. Specificare una durata in minuti per terminare il cluster, se questo non viene usato.Provide a duration (in minutes) to terminate the cluster, if the cluster is not being used.

    Selezionare le dimensioni del nodo di lavoro e di driver del cluster appropriate per i criteri tecnici e il budget disponibile.Select cluster worker and driver node size suitable for your technical criteria and budget.

    Selezionare Crea cluster.Select Create cluster. Quando il cluster è in esecuzione, è possibile collegare blocchi appunti al cluster ed eseguire processi Spark.Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

Creare un'applicazione TwitterCreate a Twitter application

Per ricevere un flusso di tweet, si crea un'applicazione in Twitter.To receive a stream of tweets, you create an application in Twitter. Seguire le istruzioni per creare un'applicazione Twitter e registrare i valori necessari per completare questa esercitazione.Follow the instructions create a Twitter application and record the values that you need to complete this tutorial.

  1. In un Web browser passare a Twitter per sviluppatori e selezionare Create an app (Crea un'app).From a web browser, go to Twitter For Developers, and select Create an app. Potrebbe essere visualizzato un messaggio che indica che è necessario richiedere un account per sviluppatori Twitter.You might see a message saying that you need to apply for a Twitter developer account. Eseguire questa operazione e, dopo l'approvazione della richiesta, si dovrebbe ricevere un messaggio di posta elettronica di conferma.Feel free to do so, and after your application has been approved you should see a confirmation email. L'approvazione di un account di sviluppatore potrebbe richiedere diversi giorni.It could take several days to be approved for a developer account.

    Conferma dell'account sviluppatore TwitterTwitter developer account confirmation

  2. Nella pagina Create an application (Crea applicazione) specificare i dettagli per la nuova app e quindi selezionare Create your Twitter application (Crea applicazione Twitter).In the Create an application page, provide the details for the new app, and then select Create your Twitter application.

    Dettagli dell'applicazione TwitterTwitter application details

    Dettagli dell'applicazione TwitterTwitter application details

  3. Nella pagina dell'applicazione selezionare Keys and Tokens (Chiavi e token) e copiare i valori di Consumer API Key (Chiave API consumer) e Consumer API Secret Key (Chiave privata API consumer).In the application page, select the Keys and Tokens tab and copy the values for Consumer API Key and Consumer API Secret Key. Selezionare inoltre Create (Crea) in Access Token and Access Token Secret (Token di accesso e Segreto del token di accesso) per generare i token di accesso.Also, select Create under Access Token and Access Token Secret to generate the access tokens. Copiare i valori di Access Token (Token di accesso) e Access Token Secret (Segreto del token di accesso).Copy the values for Access Token and Access Token Secret.

    Dettagli dell'applicazione TwitterTwitter application details

Salvare i valori recuperati per l'applicazione Twitter.Save the values that you retrieved for the Twitter application. Sarà necessario usare questi valori più avanti nell'esercitazione.You need the values later in the tutorial.

Collegare librerie a un cluster SparkAttach libraries to Spark cluster

In questa esercitazione verranno usate le API Twitter per inviare tweet a Hub eventi.In this tutorial, you use the Twitter APIs to send tweets to Event Hubs. Verrà anche usato il connettore di Hub eventi per Apache Spark per leggere e scrivere dati in Hub eventi di Azure.You also use the Apache Spark Event Hubs connector to read and write data into Azure Event Hubs. Per usare queste API nell'ambito del cluster, aggiungerle come librerie ad Azure Databricks e associarle al cluster Spark.To use these APIs as part of your cluster, add them as libraries to Azure Databricks and associate them with your Spark cluster. Le istruzioni seguenti illustrano come aggiungere una libreria.The following instructions show how to add a library.

  1. Nell'area di lavoro di Azure Databricks selezionare Clusters e scegliere il cluster Spark esistente.In the Azure Databricks workspace, select Clusters, and choose your existing Spark cluster. Nel menu del cluster scegliere Libraries (Librerie), quindi fare clic su Install New (Installa nuova).Within the cluster menu, choose Libraries and click Install New.

    Finestra di dialogo Aggiungi libreriaAdd library dialog box

    Finestra di dialogo Aggiungi libreriaAdd library dialog box

  2. Nella pagina New Library (Nuova libreria) selezionare Maven in Source (Origine).In the New Library page, for Source select Maven. Per Coordinate (Coordinata) fare clic su Search Packages (Cerca pacchetti) per cercare il pacchetto da aggiungere.For Coordinate, click Search Packages for the package you want to add. Le coordinate di Maven per le librerie usate in questa esercitazione sono le seguenti:Here is the Maven coordinates for the libraries used in this tutorial:

    • Connettore di Hub eventi per Spark - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10Spark Event Hubs connector - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10

    • API Twitter - org.twitter4j:twitter4j-core:4.0.7Twitter API - org.twitter4j:twitter4j-core:4.0.7

      Specifica delle coordinate di MavenProvide Maven coordinates

      Specifica delle coordinate di MavenProvide Maven coordinates

  3. Selezionare Installa.Select Install.

  4. Nel menu dei cluster verificare che entrambe le librerie siano installate e collegate correttamente.In the cluster menu, make sure both libraries are installed and attached properly.

    Verifica delle librerieCheck libraries

  5. Ripetere questi passaggi per il pacchetto Twitter, twitter4j-core:4.0.7.Repeat these steps for the Twitter package, twitter4j-core:4.0.7.

Creare notebook in DatabricksCreate notebooks in Databricks

In questa sezione vengono creati due notebook nell'area di lavoro di Databricks con i nomi seguenti:In this section, you create two notebooks in Databricks workspace with the following names:

  • SendTweetsToEventHub: notebook producer usato per ottenere tweet da Twitter e trasmetterli a Hub eventi.SendTweetsToEventHub - A producer notebook you use to get tweets from Twitter and stream them to Event Hubs.
  • ReadTweetsFromEventHub: notebook consumer usato per leggere tweet da Hub eventi.ReadTweetsFromEventHub - A consumer notebook you use to read the tweets from Event Hubs.
  1. Nel riquadro a sinistra selezionare Workspace (Area di lavoro).In the left pane, select Workspace. Nell'elenco a discesa Workspace (Area di lavoro) selezionare Create (Crea) > Notebook.From the Workspace drop-down, select Create > Notebook.

    Creare un notebook in DatabricksCreate notebook in Databricks

  2. Nella finestra di dialogo Create Notebook (Crea notebook) immettere SendTweetsToEventHub, selezionare Scala come linguaggio e selezionare il cluster Spark creato in precedenza.In the Create Notebook dialog box, enter SendTweetsToEventHub, select Scala as the language, and select the Spark cluster that you created earlier.

    Creare un notebook in DatabricksCreate notebook in Databricks

    Selezionare Create (Crea).Select Create.

  3. Ripetere la procedura per creare il notebook ReadTweetsFromEventHub.Repeat the steps to create the ReadTweetsFromEventHub notebook.

Inviare tweet a Hub eventiSend tweets to Event Hubs

Nel notebook SendTweetsToEventHub incollare il codice seguente e sostituire i segnaposto con i valori per lo spazio dei nomi di Hub eventi e l'applicazione Twitter creata in precedenza.In the SendTweetsToEventHub notebook, paste the following code, and replace the placeholders with values for your Event Hubs namespace and Twitter application that you created earlier. Questo notebook trasmette i tweet con la parola chiave "Azure" a Hub eventi in tempo reale.This notebook streams tweets with the keyword "Azure" into Event Hubs in real time.

Nota

L'API Twitter presenta determinate restrizioni e quote per le richieste.Twitter API has certain request restrictions and quotas. Se non si è soddisfatti della limitazione della velocità standard nell'API Twitter, è possibile generare contenuto di testo senza usare l'API Twitter in questo esempio.If you are not satisfied with standard rate limiting in Twitter API, you can generate text content without using Twitter API in this example. A tale scopo, impostare la variabile dataSource su test invece che su twitter e popolare l'elenco testSource con l'input di testo preferito.To do that, set variable dataSource to test instead of twitter and populate the list testSource with preferred test input.

    import scala.collection.JavaConverters._
    import com.microsoft.azure.eventhubs._
    import java.util.concurrent._
    import scala.collection.immutable._
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global

    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val pool = Executors.newScheduledThreadPool(1)
    val eventHubClient = EventHubClient.create(connStr.toString(), pool)

    def sleep(time: Long): Unit = Thread.sleep(time)

    def sendEvent(message: String, delay: Long) = {
      sleep(delay)
      val messageData = EventData.create(message.getBytes("UTF-8"))
      eventHubClient.get().send(messageData)
      System.out.println("Sent event: " + message + "\n")
    }

    // Add your own values to the list
    val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")

    // Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
    // Otherwise specify 'twitter'
    val dataSource = "test"

    if (dataSource == "twitter") {

      import twitter4j._
      import twitter4j.TwitterFactory
      import twitter4j.Twitter
      import twitter4j.conf.ConfigurationBuilder

      // Twitter configuration!
      // Replace values below with you

      val twitterConsumerKey = "<CONSUMER API KEY>"
      val twitterConsumerSecret = "<CONSUMER API SECRET>"
      val twitterOauthAccessToken = "<ACCESS TOKEN>"
      val twitterOauthTokenSecret = "<TOKEN SECRET>"

      val cb = new ConfigurationBuilder()
        cb.setDebugEnabled(true)
        .setOAuthConsumerKey(twitterConsumerKey)
        .setOAuthConsumerSecret(twitterConsumerSecret)
        .setOAuthAccessToken(twitterOauthAccessToken)
        .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

      val twitterFactory = new TwitterFactory(cb.build())
      val twitter = twitterFactory.getInstance()

      // Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
      val query = new Query(" #Azure ")
      query.setCount(100)
      query.lang("en")
      var finished = false
      while (!finished) {
        val result = twitter.search(query)
        val statuses = result.getTweets()
        var lowestStatusId = Long.MaxValue
        for (status <- statuses.asScala) {
          if(!status.isRetweet()){
            sendEvent(status.getText(), 5000)
          }
          lowestStatusId = Math.min(status.getId(), lowestStatusId)
        }
        query.setMaxId(lowestStatusId - 1)
      }

    } else if (dataSource == "test") {
      // Loop through the list of test input data
      while (true) {
        testSource.foreach {
          sendEvent(_,5000)
        }
      }

    } else {
      System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
    }

    // Closing connection to the Event Hub
    eventHubClient.get().close()

Premere MAIUSC+INVIO per eseguire il notebook.To run the notebook, press SHIFT + ENTER. Verrà visualizzato un output simile al frammento di codice seguente.You see an output like the snippet below. Ogni evento nell'output è un tweet contenente il termine "Azure" che viene inserito in Hub eventi.Each event in the output is a tweet that is ingested into the Event Hubs containing the term "Azure".

Sent event: @Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence

Sent event: Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure

Sent event: 4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie

Sent event: Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk

Sent event: Top 10 Tricks to #Save Money with #Azure Virtual Machines https://t.co/F2wshBXdoz #Cloud

...
...

Leggere tweet da Hub eventiRead tweets from Event Hubs

Nel notebook ReadTweetsFromEventHub incollare il codice seguente e sostituire il segnaposto con i valori dell'istanza di Hub eventi di Azure creata in precedenza.In the ReadTweetsFromEventHub notebook, paste the following code, and replace the placeholder with values for your Azure Event Hubs that you created earlier. Questo notebook legge i tweet trasmessi in precedenza a Hub eventi tramite il notebook SendTweetsToEventHub.This notebook reads the tweets that you earlier streamed into Event Hubs using the SendTweetsToEventHub notebook.


    import org.apache.spark.eventhubs._
    import com.microsoft.azure.eventhubs._

    // Build connection string with the above information
    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val customEventhubParameters =
      EventHubsConf(connStr.toString())
      .setMaxEventsPerTrigger(5)

    val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

    incomingStream.printSchema

    // Sending the incoming stream into the console.
    // Data comes in batches!
    incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

Si ottiene l'output seguente:You get the following output:

root
 |-- body: binary (nullable = true)
 |-- offset: long (nullable = true)
 |-- seqNumber: long (nullable = true)
 |-- enqueuedTime: long (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+--------------+---------------+---------+------------+
|body  |offset|sequenceNumber|enqueuedTime   |publisher|partitionKey|
+------+------+--------------+---------------+---------+------------+
|[50 75 62 6C 69 63 20 70 72 65 76 69 65 77 20 6F 66 20 4A 61 76 61 20 6F 6E 20 41 70 70 20 53 65 72 76 69 63 65 2C 20 62 75 69 6C 74 2D 69 6E 20 73 75 70 70 6F 72 74 20 66 6F 72 20 54 6F 6D 63 61 74 20 61 6E 64 20 4F 70 65 6E 4A 44 4B 0A 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 37 76 73 37 63 4B 74 76 61 68 20 0A 23 63 6C 6F 75 64 63 6F 6D 70 75 74 69 6E 67 20 23 41 7A 75 72 65]                              |0     |0             |2018-03-09 05:49:08.86 |null     |null        |
|[4D 69 67 72 61 74 65 20 79 6F 75 72 20 64 61 74 61 62 61 73 65 73 20 74 6F 20 61 20 66 75 6C 6C 79 20 6D 61 6E 61 67 65 64 20 73 65 72 76 69 63 65 20 77 69 74 68 20 41 7A 75 72 65 20 53 51 4C 20 44 61 74 61 62 61 73 65 20 4D 61 6E 61 67 65 64 20 49 6E 73 74 61 6E 63 65 20 7C 20 23 41 7A 75 72 65 20 7C 20 23 43 6C 6F 75 64 20 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 73 4A 48 58 4E 34 74 72 44 6B]            |168   |1             |2018-03-09 05:49:24.752|null     |null        |
+------+------+--------------+---------------+---------+------------+

-------------------------------------------
Batch: 1
-------------------------------------------
...
...

L'output è in modalità binaria, quindi usare il frammento di codice seguente per convertirlo in stringa.Because the output is in a binary mode, use the following snippet to convert it into string.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._

    // Event Hub message format is JSON and contains "body" field
    // Body is binary, so we cast it to string to see the actual content of the message
    val messages =
      incomingStream
      .withColumn("Offset", $"offset".cast(LongType))
      .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
      .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
      .withColumn("Body", $"body".cast(StringType))
      .select("Offset", "Time (readable)", "Timestamp", "Body")

    messages.printSchema

    messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

L'output sarà ora simile al seguente:The output now resembles the following snippet:

root
 |-- Offset: long (nullable = true)
 |-- Time (readable): timestamp (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Body: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----------------+----------+-------+
|Offset|Time (readable)  |Timestamp |Body
+------+-----------------+----------+-------+
|0     |2018-03-09 05:49:08.86 |1520574548|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure          |
|168   |2018-03-09 05:49:24.752|1520574564|Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |
|0     |2018-03-09 05:49:02.936|1520574542|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|
|176   |2018-03-09 05:49:20.801|1520574560|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |
+------+-----------------+----------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
...
...

L'operazione è terminata.That's it! Usando Azure Databricks sono stati trasmessi dati a Hub eventi di Azure quasi in tempo reale.Using Azure Databricks, you have successfully streamed data into Azure Event Hubs in near real-time. I dati trasmessi sono stati quindi usati tramite il connettore di Hub eventi per Apache Spark.You then consumed the stream data using the Event Hubs connector for Apache Spark. Per altre informazioni su come usare il connettore di Hub eventi per Spark, vedere la documentazione del connettore.For more information on how to use the Event Hubs connector for Spark, see the connector documentation.

Pulire le risorseClean up resources

Dopo aver concluso l'esecuzione per l'esercitazione è possibile terminare il cluster.After you have finished running the tutorial, you can terminate the cluster. A questo scopo, nel riquadro sinistro dell'area di lavoro di Azure Databricks fare clic su Clusters (Cluster).To do so, from the Azure Databricks workspace, from the left pane, select Clusters. Per il cluster che si vuole terminare, posizionare il cursore sui puntini di sospensione sotto la colonna Actions (Azioni) e fare clic sull'icona Terminate (Termina).For the cluster you want to terminate, move the cursor over the ellipsis under Actions column, and select the Terminate icon.

Arrestare un cluster di DatabricksStop a Databricks cluster

Se non viene terminato manualmente, il cluster si arresterà automaticamente se è stata selezionata la casella di controllo Terminate after __ minutes of inactivity (Termina dopo __ minuti di attività) durante la creazione del cluster.If you do not manually terminate the cluster it will automatically stop, provided you selected the Terminate after __ minutes of inactivity checkbox while creating the cluster. In tal caso, il cluster verrà automaticamente arrestato se è rimasto inattivo per il tempo specificato.In such a case, the cluster will automatically stop if it has been inactive for the specified time.

Passaggi successiviNext steps

Questa esercitazione ha illustrato come:In this tutorial, you learned how to:

  • Creare un'area di lavoro di Azure DatabricksCreate an Azure Databricks workspace
  • Creare un cluster Spark in Azure DatabricksCreate a Spark cluster in Azure Databricks
  • Creare un'app Twitter per generare dati in streamingCreate a Twitter app to generate streaming data
  • Creare notebook in Azure DatabricksCreate notebooks in Azure Databricks
  • Aggiungere librerie per Hub eventi e API TwitterAdd libraries for Event Hubs and Twitter API
  • Inviare tweet a Hub eventiSend tweets to Event Hubs
  • Leggere tweet da Hub eventiRead tweets from Event Hubs

Passare all'esercitazione successiva per informazioni su come eseguire l'analisi del sentiment sui dati trasmessi con Azure Databricks e l'API Servizi cognitivi.Advance to the next tutorial to learn about performing sentiment analysis on the streamed data using Azure Databricks and Cognitive Services API.