Руководство по Потоковая передача данных в Azure Databricks при помощи Центров событийTutorial: Stream data into Azure Databricks using Event Hubs

Важно!

В этом руководстве используется версия среды выполнения Azure Databricks 5.2.This tutorial works with the version of Azure Databricks runtime 5.2.

Из этого руководства вы узнаете, как подключить систему приема данных к Azure Databricks для потоковой передачи данных в кластер Apache Spark практически в реальном времени.In this tutorial, you connect a data ingestion system with Azure Databricks to stream data into an Apache Spark cluster in near real-time. Настройте систему приема данных с помощью Центров событий Azure, а затем подключите ее к Azure Databricks для обработки поступающих сообщений.You set up data ingestion system using Azure Event Hubs and then connect it to Azure Databricks to process the messages coming through. Для получения доступа к потоку данных используйте API-интерфейсы Twitter, чтобы получать твиты в Центрах событий.To access a stream of data, you use Twitter APIs to ingest tweets into Event Hubs. При наличии данных в Azure Databricks вы можете выполнять аналитические задания для дальнейшего анализа данных.Once you have the data in Azure Databricks, you can run analytical jobs to further analyze the data.

Заканчивая работу с этим руководством, вы получите потоковые твиты из Twitter, содержащие слово "Azure", и прочтете твиты в Azure Databricks.By the end of this tutorial, you would have streamed tweets from Twitter (that have the term "Azure" in them) and read the tweets in Azure Databricks.

На следующем рисунке показан поток в приложении.The following illustration shows the application flow:

Azure Databricks с Центрами событийAzure Databricks with Event Hubs

В рамках этого руководства рассматриваются следующие задачи:This tutorial covers the following tasks:

  • Создание рабочей области Azure DatabricksCreate an Azure Databricks workspace
  • Создание кластера Spark в Azure Databricks.Create a Spark cluster in Azure Databricks
  • Создание приложения Twitter для доступа к потоковым данныхCreate a Twitter app to access streaming data
  • Создание записных книжек в Azure Databricks.Create notebooks in Azure Databricks
  • Подключение библиотек к Центрам событий и API Twitter.Attach libraries for Event Hubs and Twitter API
  • Отправка твитов в Центры событий.Send tweets to Event Hubs
  • Чтение твитов из Центров событий.Read tweets from Event Hubs

Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.If you don't have an Azure subscription, create a free account before you begin.

Примечание

Инструкции из этого руководство нельзя выполнять с бесплатной пробной версией подписки.This tutorial cannot be carried out using Azure Free Trial Subscription. Если у вас есть бесплатная учетная запись, перейдите к профилю и измените подписку на подписку с оплатой по мере использования.If you have a free account, go to your profile and change your subscription to pay-as-you-go. Дополнительные сведения см. на странице создания бесплатной учетной записи Azure.For more information, see Azure free account. Затем удалите предельную сумму расходов и запросите увеличение квоты на ЦП в своем регионе.Then, remove the spending limit, and request a quota increase for vCPUs in your region. При создании рабочей области Azure Databricks можно выбрать ценовую категорию Пробная версия ("Премиум" — 14 дней бесплатно (DBU)) для предоставления рабочей области доступа к бесплатным DBU Azure Databricks уровня "Премиум" на 14 дней.When you create your Azure Databricks workspace, you can select the Trial (Premium - 14-Days Free DBUs) pricing tier to give the workspace access to free Premium Azure Databricks DBUs for 14 days.

Предварительные требованияPrerequisites

Прежде чем приступить к изучению этого руководства, убедитесь, что выполнены следующие требования.Before you start with this tutorial, make sure to meet the following requirements:

  • Пространство имен Центров событий Azure.An Azure Event Hubs namespace.
  • В пространстве имен имеется концентратор событий.An Event Hub within the namespace.
  • Имеется строка подключения для получения доступа к пространству имен Центров событий.Connection string to access the Event Hubs namespace. Строка подключения должна иметь формат Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.The connection string should have a format similar to Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.
  • Имя политики общего доступа и ключ политики для Центров событий.Shared access policy name and policy key for Event Hubs.

Соответствие этим требованиям можно проверить, выполнив шаги из статьи Создание пространства имен Центров событий и концентратора событий с помощью портала Azure.You can meet these requirements by completing the steps in the article, Create an Azure Event Hubs namespace and event hub.

Вход на портал AzureSign in to the Azure portal

Войдите на портале Azure.Sign in to the Azure portal.

Создание рабочей области Azure DatabricksCreate an Azure Databricks workspace

В этом разделе вы создадите рабочую область Azure Databricks с помощью портала Azure.In this section, you create an Azure Databricks workspace using the Azure portal.

  1. На портале Azure выберите Создать ресурс > Данные и аналитика > Azure Databricks.In the Azure portal, select Create a resource > Data + Analytics > Azure Databricks.

    Databricks на портале AzureDatabricks on Azure portal

  2. В разделе службы Azure Databricks укажите значения для создания рабочей области Databricks.Under Azure Databricks Service, provide the values to create a Databricks workspace.

    Создание рабочей области Azure DatabricksCreate an Azure Databricks workspace

    Укажите следующие значения.Provide the following values:

    СвойствоProperty ОПИСАНИЕDescription
    Имя рабочей областиWorkspace name Укажите имя рабочей области Databricks.Provide a name for your Databricks workspace
    подпискаSubscription Выберите подписку Azure в раскрывающемся списке.From the drop-down, select your Azure subscription.
    группа ресурсовResource group Укажите, следует ли создать новую группу ресурсов или использовать имеющуюся.Specify whether you want to create a new resource group or use an existing one. Группа ресурсов — это контейнер, содержащий связанные ресурсы для решения Azure.A resource group is a container that holds related resources for an Azure solution. Дополнительные сведения см. в обзоре группы ресурсов Azure.For more information, see Azure Resource Group overview.
    Местоположение.Location Выберите регион Восточная часть США 2.Select East US 2. Другие доступные регионы см. в статье о доступности служб Azure по регионам.For other available regions, see Azure services available by region.
    Ценовая категорияPricing Tier Вы можете выбрать уровень Стандартный или Премиум.Choose between Standard or Premium. Дополнительные сведения об этих ценовых категориях см. на странице цен на Databricks.For more information on these tiers, see Databricks pricing page.

    Установите флажок Закрепить на панели мониторинга и щелкните Создать.Select Pin to dashboard and then select Create.

  3. Создание учетной записи займет несколько минут.The account creation takes a few minutes. Во время создания учетной записи на портале с правой стороны отображается плитка Submitting deployment for Azure Databricks (Идет отправка развертывания для Databricks).During account creation, the portal displays the Submitting deployment for Azure Databricks tile on the right side. Возможно, вам потребуется прокрутить панель мониторинга, чтобы увидеть эту плитку.You may need to scroll right on your dashboard to see the tile. В верхней части экрана также будет отображаться индикатор хода выполнения.There is also a progress bar displayed near the top of the screen. Следить за выполнением можно с помощью любого из этих элементов.You can watch either area for progress.

    Плитка развертывания DatabricksDatabricks deployment tile

Создание кластера Spark в DatabricksCreate a Spark cluster in Databricks

  1. На портале Azure перейдите к созданной рабочей области Databricks, а затем выберите Launch Workspace (Запуск рабочей области).In the Azure portal, go to the Databricks workspace that you created, and then select Launch Workspace.

  2. Вы будете перенаправлены на портал Azure Databricks.You are redirected to the Azure Databricks portal. На портале выберите Кластер.From the portal, select Cluster.

    Databricks в AzureDatabricks on Azure

  3. На странице создания кластера укажите значения для создания кластера.In the New cluster page, provide the values to create a cluster.

    Создание кластера Databricks Spark в AzureCreate Databricks Spark cluster on Azure

    Для всех остальных параметров, кроме следующих, примите значения по умолчанию:Accept all other default values other than the following:

    • Введите имя кластера.Enter a name for the cluster.
    • В рамках этой статьи создайте кластер со средой выполнения 5.2.For this article, create a cluster with 5.2 runtime.
    • Убедитесь, что установлен флажок Terminate after __ minutes of activity (Завершить через __ минут бездействия).Make sure you select the Terminate after __ minutes of inactivity checkbox. Укажите длительность (в минутах) для завершения работы кластера, если тот не используется.Provide a duration (in minutes) to terminate the cluster, if the cluster is not being used.

    Выберите рабочий кластер и размер узла драйвера, соответствующие своим техническим критериям и бюджету.Select cluster worker and driver node size suitable for your technical criteria and budget.

    Выберите Create cluster (Создать кластер).Select Create cluster. После запуска кластера можно вложить записные книжки в кластер и запустить задания Spark.Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

Создание приложения TwitterCreate a Twitter application

Для получения потока твитов создайте приложение в Twitter.To receive a stream of tweets, you create an application in Twitter. Следуйте инструкциям по созданию приложения Twitter и запишите значения, необходимые для выполнения заданий из этого руководства.Follow the instructions create a Twitter application and record the values that you need to complete this tutorial.

  1. В веб-браузере перейдите на Twitter For Developers (Twitter для разработчиков) и выберите Create an app (Создать приложение).From a web browser, go to Twitter For Developers, and select Create an app. Может появиться сообщение о том, что требуется подать заявку на учетную запись разработчика Twitter.You might see a message saying that you need to apply for a Twitter developer account. После того, как ваша заявка будет одобрена, вы должны увидеть подтверждающее сообщение электронной почты.Feel free to do so, and after your application has been approved you should see a confirmation email. Чтобы получить утверждения учетной записи разработчика, может потребоваться несколько дней.It could take several days to be approved for a developer account.

    Подтверждение учетной записи разработчика в ТвиттереTwitter developer account confirmation

  2. На странице Create an application (Создание приложения) укажите сведения для нового приложения, а затем выберите Create your Twitter application (Создать приложение Twitter).In the Create an application page, provide the details for the new app, and then select Create your Twitter application.

    Подробные сведения о приложении TwitterTwitter application details

    Подробные сведения о приложении TwitterTwitter application details

  3. На странице приложения перейдите на вкладку Keys and Tokens (Ключи и маркеры) и скопируйте значения ключа API потребителя и секретного ключа API потребителя.In the application page, select the Keys and Tokens tab and copy the values for Consumer API Key and Consumer API Secret Key. Также выберите Создать в маркере доступа и секрете маркера доступа, чтобы сгенерировать маркеры доступа.Also, select Create under Access Token and Access Token Secret to generate the access tokens. Скопируйте значения маркера доступа и секрета маркера доступа.Copy the values for Access Token and Access Token Secret.

    Подробные сведения о приложении TwitterTwitter application details

Сохраните значения, полученные для приложения Twitter.Save the values that you retrieved for the Twitter application. Они понадобятся вам позже при работе с этим руководством.You need the values later in the tutorial.

Подключение библиотек к кластеру SparkAttach libraries to Spark cluster

В этом руководстве для отправки твитов в Центры событий используются API-интерфейсы Twitter.In this tutorial, you use the Twitter APIs to send tweets to Event Hubs. Для чтения и записи данных в Центрах событий Azure используется соединитель Центров событий Apache Spark.You also use the Apache Spark Event Hubs connector to read and write data into Azure Event Hubs. Чтобы использовать эти API-интерфейсы в рамках кластера, добавьте их в Azure Databricks в качестве библиотек и свяжите с кластером Spark.To use these APIs as part of your cluster, add them as libraries to Azure Databricks and associate them with your Spark cluster. Ниже показано, как добавить библиотеку.The following instructions show how to add a library.

  1. В рабочей области Azure Databricks выберите Кластеры, а затем — существующий кластер Spark.In the Azure Databricks workspace, select Clusters, and choose your existing Spark cluster. В меню кластера выберите Библиотеки и нажмите Установить новую.Within the cluster menu, choose Libraries and click Install New.

    Диалоговое окно добавления библиотекиAdd library dialog box

    Диалоговое окно добавления библиотекиAdd library dialog box

  2. На странице новой библиотеки для параметра Источник выберите Maven.In the New Library page, for Source select Maven. В поле Coordinate (Координата) щелкните Search Packages (Поиск пакетов), который требуется добавить.For Coordinate, click Search Packages for the package you want to add. Ниже указаны координаты Maven для библиотек, используемых в рамках этого руководства.Here is the Maven coordinates for the libraries used in this tutorial:

    • Соединитель Центров событий Spark — com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10Spark Event Hubs connector - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10

    • API Twitter — org.twitter4j:twitter4j-core:4.0.7Twitter API - org.twitter4j:twitter4j-core:4.0.7

      Предоставление координат MavenProvide Maven coordinates

      Предоставление координат MavenProvide Maven coordinates

  3. Щелкните Установить.Select Install.

  4. В меню кластера убедитесь, что обе библиотеки установлены и подключены правильно.In the cluster menu, make sure both libraries are installed and attached properly.

    Проверка библиотекCheck libraries

  5. Повторите эти действия для пакета Twitter twitter4j-core:4.0.7.Repeat these steps for the Twitter package, twitter4j-core:4.0.7.

Создание записных книжек в DatabricksCreate notebooks in Databricks

В этом разделе в рабочей области Databricks создается две записные книжки со следующими именами.In this section, you create two notebooks in Databricks workspace with the following names:

  • SendTweetsToEventHub — записная книжка производителя, которая используется для получения твитов из приложения Twitter и для их потоковой передачи в Центры событий.SendTweetsToEventHub - A producer notebook you use to get tweets from Twitter and stream them to Event Hubs.
  • ReadTweetsFromEventHub — записная книжка потребителя, которая используется для считывания твитов из Центров событий.ReadTweetsFromEventHub - A consumer notebook you use to read the tweets from Event Hubs.
  1. В левой области выберите Рабочая область.In the left pane, select Workspace. В раскрывающемся списке Рабочая область выберите Создать > Notebook (Записная книжка).From the Workspace drop-down, select Create > Notebook.

    Создание записной книжки в DatabricksCreate notebook in Databricks

  2. В диалоговом окне Create Notebook (Создание записной книжки) введите SendTweetsToEventHub, а затем выберите Scala в качестве языка и выберите созданный ранее кластер Spark.In the Create Notebook dialog box, enter SendTweetsToEventHub, select Scala as the language, and select the Spark cluster that you created earlier.

    Создание записной книжки в DatabricksCreate notebook in Databricks

    Нажмите кнопку Создать.Select Create.

  3. Повторите эти шаги, чтобы создать записную книжку ReadTweetsFromEventHub.Repeat the steps to create the ReadTweetsFromEventHub notebook.

Отправка твитов в Центры событий.Send tweets to Event Hubs

В записной книжке SendTweetsToEventHub вставьте приведенный ниже код и замените заполнители значениями вашего пространства имен Центров событий и созданного ранее приложения Twitter.In the SendTweetsToEventHub notebook, paste the following code, and replace the placeholders with values for your Event Hubs namespace and Twitter application that you created earlier. Эта записная книжка выполняет потоковую передачу твитов с ключевым словом Azure в Центры событий в режиме реального времени.This notebook streams tweets with the keyword "Azure" into Event Hubs in real time.

Примечание

API Twitter имеет определенные ограничения запроса и квоты.Twitter API has certain request restrictions and quotas. Если вас не удовлетворяет ограничение стандартной частоты в API Twitter, вы можете создавать содержимое текста без использования API Twitter в этом примере.If you are not satisfied with standard rate limiting in Twitter API, you can generate text content without using Twitter API in this example. Для этого установите переменную dataSource testвместоtwitter и заполните список testSource предпочтительными вводными данными теста.To do that, set variable dataSource to test instead of twitter and populate the list testSource with preferred test input.

    import scala.collection.JavaConverters._
    import com.microsoft.azure.eventhubs._
    import java.util.concurrent._
    import scala.collection.immutable._
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global

    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val pool = Executors.newScheduledThreadPool(1)
    val eventHubClient = EventHubClient.create(connStr.toString(), pool)

    def sleep(time: Long): Unit = Thread.sleep(time)

    def sendEvent(message: String, delay: Long) = {
      sleep(delay)
      val messageData = EventData.create(message.getBytes("UTF-8"))
      eventHubClient.get().send(messageData)
      System.out.println("Sent event: " + message + "\n")
    }

    // Add your own values to the list
    val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")

    // Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
    // Otherwise specify 'twitter'
    val dataSource = "test"

    if (dataSource == "twitter") {

      import twitter4j._
      import twitter4j.TwitterFactory
      import twitter4j.Twitter
      import twitter4j.conf.ConfigurationBuilder

      // Twitter configuration!
      // Replace values below with you

      val twitterConsumerKey = "<CONSUMER API KEY>"
      val twitterConsumerSecret = "<CONSUMER API SECRET>"
      val twitterOauthAccessToken = "<ACCESS TOKEN>"
      val twitterOauthTokenSecret = "<TOKEN SECRET>"

      val cb = new ConfigurationBuilder()
        cb.setDebugEnabled(true)
        .setOAuthConsumerKey(twitterConsumerKey)
        .setOAuthConsumerSecret(twitterConsumerSecret)
        .setOAuthAccessToken(twitterOauthAccessToken)
        .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

      val twitterFactory = new TwitterFactory(cb.build())
      val twitter = twitterFactory.getInstance()

      // Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
      val query = new Query(" #Azure ")
      query.setCount(100)
      query.lang("en")
      var finished = false
      while (!finished) {
        val result = twitter.search(query)
        val statuses = result.getTweets()
        var lowestStatusId = Long.MaxValue
        for (status <- statuses.asScala) {
          if(!status.isRetweet()){
            sendEvent(status.getText(), 5000)
          }
          lowestStatusId = Math.min(status.getId(), lowestStatusId)
        }
        query.setMaxId(lowestStatusId - 1)
      }

    } else if (dataSource == "test") {
      // Loop through the list of test input data
      while (true) {
        testSource.foreach {
          sendEvent(_,5000)
        }
      }

    } else {
      System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
    }

    // Closing connection to the Event Hub
    eventHubClient.get().close()

Чтобы запустить записную книжку, нажмите клавиши SHIFT+ВВОД.To run the notebook, press SHIFT + ENTER. Вы увидите выходные данные, как в следующем фрагменте кода.You see an output like the snippet below. Каждое событие в выходных данных — это твит, полученный в Центрах событий и содержащий слово "Azure".Each event in the output is a tweet that is ingested into the Event Hubs containing the term "Azure".

Sent event: @Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence

Sent event: Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure

Sent event: 4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie

Sent event: Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk

Sent event: Top 10 Tricks to #Save Money with #Azure Virtual Machines https://t.co/F2wshBXdoz #Cloud

...
...

Чтение твитов из Центров событий.Read tweets from Event Hubs

В записной книжке ReadTweetsFromEventHub вставьте следующий код и замените заполнители значениями созданных ранее Центров событий Azure.In the ReadTweetsFromEventHub notebook, paste the following code, and replace the placeholder with values for your Azure Event Hubs that you created earlier. Эта записная книжка считывает твиты, переданные ранее в Центры событий с помощью записной книжки SendTweetsToEventHub.This notebook reads the tweets that you earlier streamed into Event Hubs using the SendTweetsToEventHub notebook.


    import org.apache.spark.eventhubs._
    import com.microsoft.azure.eventhubs._

    // Build connection string with the above information
    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val customEventhubParameters =
      EventHubsConf(connStr.toString())
      .setMaxEventsPerTrigger(5)

    val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

    incomingStream.printSchema

    // Sending the incoming stream into the console.
    // Data comes in batches!
    incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

Вы получите следующие выходные данные:You get the following output:

root
 |-- body: binary (nullable = true)
 |-- offset: long (nullable = true)
 |-- seqNumber: long (nullable = true)
 |-- enqueuedTime: long (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+--------------+---------------+---------+------------+
|body  |offset|sequenceNumber|enqueuedTime   |publisher|partitionKey|
+------+------+--------------+---------------+---------+------------+
|[50 75 62 6C 69 63 20 70 72 65 76 69 65 77 20 6F 66 20 4A 61 76 61 20 6F 6E 20 41 70 70 20 53 65 72 76 69 63 65 2C 20 62 75 69 6C 74 2D 69 6E 20 73 75 70 70 6F 72 74 20 66 6F 72 20 54 6F 6D 63 61 74 20 61 6E 64 20 4F 70 65 6E 4A 44 4B 0A 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 37 76 73 37 63 4B 74 76 61 68 20 0A 23 63 6C 6F 75 64 63 6F 6D 70 75 74 69 6E 67 20 23 41 7A 75 72 65]                              |0     |0             |2018-03-09 05:49:08.86 |null     |null        |
|[4D 69 67 72 61 74 65 20 79 6F 75 72 20 64 61 74 61 62 61 73 65 73 20 74 6F 20 61 20 66 75 6C 6C 79 20 6D 61 6E 61 67 65 64 20 73 65 72 76 69 63 65 20 77 69 74 68 20 41 7A 75 72 65 20 53 51 4C 20 44 61 74 61 62 61 73 65 20 4D 61 6E 61 67 65 64 20 49 6E 73 74 61 6E 63 65 20 7C 20 23 41 7A 75 72 65 20 7C 20 23 43 6C 6F 75 64 20 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 73 4A 48 58 4E 34 74 72 44 6B]            |168   |1             |2018-03-09 05:49:24.752|null     |null        |
+------+------+--------------+---------------+---------+------------+

-------------------------------------------
Batch: 1
-------------------------------------------
...
...

Так как выходные данные отображаются в двоичном режиме, используйте следующий фрагмент кода, чтобы преобразовать их в строку.Because the output is in a binary mode, use the following snippet to convert it into string.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._

    // Event Hub message format is JSON and contains "body" field
    // Body is binary, so we cast it to string to see the actual content of the message
    val messages =
      incomingStream
      .withColumn("Offset", $"offset".cast(LongType))
      .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
      .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
      .withColumn("Body", $"body".cast(StringType))
      .select("Offset", "Time (readable)", "Timestamp", "Body")

    messages.printSchema

    messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

Выходные данные будут выглядеть, как следующий фрагмент кода:The output now resembles the following snippet:

root
 |-- Offset: long (nullable = true)
 |-- Time (readable): timestamp (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Body: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----------------+----------+-------+
|Offset|Time (readable)  |Timestamp |Body
+------+-----------------+----------+-------+
|0     |2018-03-09 05:49:08.86 |1520574548|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure          |
|168   |2018-03-09 05:49:24.752|1520574564|Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |
|0     |2018-03-09 05:49:02.936|1520574542|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|
|176   |2018-03-09 05:49:20.801|1520574560|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |
+------+-----------------+----------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
...
...

Вот и все!That's it! Вы успешно выполнили потоковую передачу данных в Центры событий Azure с помощью Azure Databricks практически в реальном времени.Using Azure Databricks, you have successfully streamed data into Azure Event Hubs in near real-time. Затем вы применили данные потоковой передачи для Apache Spark, используя соединитель Центров событий.You then consumed the stream data using the Event Hubs connector for Apache Spark. Дополнительные сведения о том, как использовать соединитель Центров событий для Spark, см. в документации соединителя.For more information on how to use the Event Hubs connector for Spark, see the connector documentation.

Очистка ресурсовClean up resources

После выполнения заданий из этого руководства вы можете завершить работу кластера.After you have finished running the tutorial, you can terminate the cluster. Для этого в рабочей области Azure Databricks на левой панели выберите Кластеры.To do so, from the Azure Databricks workspace, from the left pane, select Clusters. Для кластера, работу которого необходимо завершить, переместите указатель мыши на многоточие в столбце Actions (Действия) и выберите значок Завершить.For the cluster you want to terminate, move the cursor over the ellipsis under Actions column, and select the Terminate icon.

Завершение работы кластера DatabricksStop a Databricks cluster

Если не завершить работу кластера вручную, она завершится автоматически, если во время создания кластера вы установили флажок Terminate after __ minutes of inactivity (Завершать работу после __ мин бездействия).If you do not manually terminate the cluster it will automatically stop, provided you selected the Terminate after __ minutes of inactivity checkbox while creating the cluster. В этом случае работа кластера должна завершиться автоматически, если кластер был неактивным в течение определенного времени.In such a case, the cluster will automatically stop if it has been inactive for the specified time.

Дополнительная информацияNext steps

Из этого руководства вы узнали, как выполнить следующие задачи:In this tutorial, you learned how to:

  • Создание рабочей области Azure DatabricksCreate an Azure Databricks workspace
  • Создание кластера Spark в Azure Databricks.Create a Spark cluster in Azure Databricks
  • Создание приложения Twitter для генерирования потоковой передачи данных.Create a Twitter app to generate streaming data
  • Создание записных книжек в Azure Databricks.Create notebooks in Azure Databricks
  • Добавление библиотеки для Центров событий и программного интерфейса Twitter.Add libraries for Event Hubs and Twitter API
  • Отправка твитов в Центры событий.Send tweets to Event Hubs
  • Чтение твитов из Центров событий.Read tweets from Event Hubs

Перейдите к изучению следующего руководства, чтобы узнать о выполнении анализа тональности в переданных данных с помощью Azure Databricks и программного интерфейса API Cognitive Services.Advance to the next tutorial to learn about performing sentiment analysis on the streamed data using Azure Databricks and Cognitive Services API.