チュートリアル:Event Hubs を使用してデータを Azure Databricks にストリーム配信するTutorial: Stream data into Azure Databricks using Event Hubs

重要

このチュートリアルでは、Azure Databricks Runtime のバージョン 5.2 を使用します。This tutorial works with the version of Azure Databricks runtime 5.2.

このチュートリアルでは、データ インジェスト システムを Azure Databricks に接続し、ほぼリアルタイムで Apache Spark クラスターにデータをストリーム配信します。In this tutorial, you connect a data ingestion system with Azure Databricks to stream data into an Apache Spark cluster in near real-time. Azure Event Hubs を使用してデータ インジェスト システムを設定し、それを Azure Databricks に接続して、届いたメッセージを処理します。You set up data ingestion system using Azure Event Hubs and then connect it to Azure Databricks to process the messages coming through. データ ストリームにアクセスするために、Twitter API を使用してツイートを Event Hubs に取り込みます。To access a stream of data, you use Twitter APIs to ingest tweets into Event Hubs. Azure Databricks にデータを用意したら、分析ジョブを実行してデータをさらに分析できます。Once you have the data in Azure Databricks, you can run analytical jobs to further analyze the data.

このチュートリアルの完了時には、内容に用語 "Azure" が含まれたツイートを Twitter からストリーム配信し、そのツイートを Azure Databricks で読み取ることができるようになります。By the end of this tutorial, you would have streamed tweets from Twitter (that have the term "Azure" in them) and read the tweets in Azure Databricks.

次の図に、アプリケーション フローを示します。The following illustration shows the application flow:

Azure Databricks と Event HubsAzure Databricks with Event Hubs

このチュートリアルに含まれるタスクは次のとおりです。This tutorial covers the following tasks:

  • Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace
  • Azure Databricks で Spark クラスターを作成するCreate a Spark cluster in Azure Databricks
  • ストリーミング データにアクセスする Twitter アプリを作成するCreate a Twitter app to access streaming data
  • Azure Databricks でノートブックを作成するCreate notebooks in Azure Databricks
  • Event Hubs と Twitter API のライブラリをアタッチするAttach libraries for Event Hubs and Twitter API
  • ツイートを Event Hubs に送信するSend tweets to Event Hubs
  • Event Hubs からツイートを読み取るRead tweets from Event Hubs

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。If you don't have an Azure subscription, create a free account before you begin.

注意

Azure 無料試用版サブスクリプションを使用してこのチュートリアルを実行することはできません。This tutorial cannot be carried out using Azure Free Trial Subscription. 無料アカウントをお持ちの場合は、お使いのプロファイルにアクセスし、サブスクリプションを [従量課金制] に変更します。If you have a free account, go to your profile and change your subscription to pay-as-you-go. 詳細については、Azure 無料アカウントに関するページをご覧ください。For more information, see Azure free account. 次に、リージョン内の vCPU について使用制限を削除しクォータの増加を依頼します。Then, remove the spending limit, and request a quota increase for vCPUs in your region. Azure Databricks ワークスペースを作成するときに、 [Trial (Premium - 14-Days Free DBUs)](試用版 (Premium - 14 日間の無料 DBU)) の価格レベルを選択し、ワークスペースから 14 日間無料の Premium Azure Databricks DBU にアクセスできるようにします。When you create your Azure Databricks workspace, you can select the Trial (Premium - 14-Days Free DBUs) pricing tier to give the workspace access to free Premium Azure Databricks DBUs for 14 days.

前提条件Prerequisites

このチュートリアルを開始する前に、次の要件を満たしてください。Before you start with this tutorial, make sure to meet the following requirements:

  • Azure Event Hubs 名前空間。An Azure Event Hubs namespace.
  • 名前空間内のイベント ハブ。An Event Hub within the namespace.
  • Event Hubs 名前空間にアクセスするための接続文字列。Connection string to access the Event Hubs namespace. 接続文字列は次のような形式になります。Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>The connection string should have a format similar to Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.
  • Event Hubs の共有アクセス ポリシー名とポリシー キー。Shared access policy name and policy key for Event Hubs.

これらの要件は、Azure Event Hubs 名前空間とイベント ハブの作成に関する記事の手順を完了することで満たせます。You can meet these requirements by completing the steps in the article, Create an Azure Event Hubs namespace and event hub.

Azure portal にサインインしますSign in to the Azure portal

Azure Portal にサインインします。Sign in to the Azure portal.

Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace

このセクションでは、Azure Portal を使って Azure Databricks ワークスペースを作成します。In this section, you create an Azure Databricks workspace using the Azure portal.

  1. Azure Portal で、 [リソースの作成] > [データ + 分析] > [Azure Databricks] の順に選択します。In the Azure portal, select Create a resource > Data + Analytics > Azure Databricks.

    Azure Portal の DatabricksDatabricks on Azure portal

  2. [Azure Databricks サービス] で値を指定して、Databricks ワークスペースを作成します。Under Azure Databricks Service, provide the values to create a Databricks workspace.

    Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace

    次の値を指定します。Provide the following values:

    プロパティProperty 説明Description
    ワークスペース名Workspace name Databricks ワークスペースの名前を指定しますProvide a name for your Databricks workspace
    サブスクリプションSubscription ドロップダウンから Azure サブスクリプションを選択します。From the drop-down, select your Azure subscription.
    リソース グループResource group 新しいリソース グループを作成するか、既存のリソース グループを使用するかを指定します。Specify whether you want to create a new resource group or use an existing one. リソース グループは、Azure ソリューションの関連するリソースを保持するコンテナーです。A resource group is a container that holds related resources for an Azure solution. 詳しくは、Azure リソース グループの概要に関するページをご覧ください。For more information, see Azure Resource Group overview.
    場所Location [米国東部 2] を選択します。Select East US 2. 使用可能な他のリージョンについては、「リージョン別の利用可能な製品」をご覧ください。For other available regions, see Azure services available by region.
    価格レベルPricing Tier StandardPremium のいずれかを選択します。Choose between Standard or Premium. これらのレベルの詳細については、Databricks の価格に関するページを参照してください。For more information on these tiers, see Databricks pricing page.

    [ダッシュボードにピン留めする] チェック ボックスをオンにして、 [作成] を選択します。Select Pin to dashboard and then select Create.

  3. アカウントの作成には数分かかります。The account creation takes a few minutes. アカウント作成時に、ポータルの右側に [Submitting deployment for Azure Databricks](Azure Databricks のデプロイを送信しています) タイルが表示されます。During account creation, the portal displays the Submitting deployment for Azure Databricks tile on the right side. このタイルを表示するために、ダッシュボードを右へスクロールしなければならない場合があります。You may need to scroll right on your dashboard to see the tile. スクリーンの上部に進行状況バーも表示されます。There is also a progress bar displayed near the top of the screen. いずれかの領域で進行状況を確認できます。You can watch either area for progress.

    Databricks のデプロイのタイルDatabricks deployment tile

Databricks に Spark クラスターを作成するCreate a Spark cluster in Databricks

  1. Azure Portal で、作成した Databricks ワークスペースに移動して、 [Launch Workspace](ワークスペースの起動) を選択します。In the Azure portal, go to the Databricks workspace that you created, and then select Launch Workspace.

  2. Azure Databricks ポータルにリダイレクトされます。You are redirected to the Azure Databricks portal. ポータルで [クラスター] を選択します。From the portal, select Cluster.

    Azure の DatabricksDatabricks on Azure

  3. [New cluster](新しいクラスター) ページで、クラスターを作成するための値を指定します。In the New cluster page, provide the values to create a cluster.

    Azure で Databricks Spark クラスターを作成するCreate Databricks Spark cluster on Azure

    以下を除くすべての値は、既定値のままにします。Accept all other default values other than the following:

    • クラスターの名前を入力します。Enter a name for the cluster.
    • この記事では、5.2 ランタイムを使用してクラスターを作成します。For this article, create a cluster with 5.2 runtime.
    • [Terminate after __ minutes of inactivity] (アクティビティが __ 分ない場合は終了する) チェック ボックスをオンにします。Make sure you select the Terminate after __ minutes of inactivity checkbox. クラスターが使われていない場合にクラスターを終了するまでの時間 (分単位) を指定します。Provide a duration (in minutes) to terminate the cluster, if the cluster is not being used.

    技術的条件と予算に適したクラスター ワーカーとドライバー ノードのサイズを選択します。Select cluster worker and driver node size suitable for your technical criteria and budget.

    [クラスターの作成] を選択します。Select Create cluster. クラスターが実行されたら、ノートブックをクラスターにアタッチして、Spark ジョブを実行できます。Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

Twitter アプリケーションを作成するCreate a Twitter application

ツイートのストリームを受け取るには、Twitter でアプリケーションを作成します。To receive a stream of tweets, you create an application in Twitter. 手順に従って Twitter アプリケーションを作成し、このチュートリアルの完了に必要な値を記録します。Follow the instructions create a Twitter application and record the values that you need to complete this tutorial.

  1. Web ブラウザーで Twitter の開発者用ページに移動して、 [Create an app](アプリの作成) を選択します。From a web browser, go to Twitter For Developers, and select Create an app. Twitter 開発者アカウントを申請する必要があるというメッセージが表示される場合があります。You might see a message saying that you need to apply for a Twitter developer account. お気軽にお申し込みください。申請が承認されると、確認メールを受け取ります。Feel free to do so, and after your application has been approved you should see a confirmation email. 開発者アカウントの承認には、数日かかることがあります。It could take several days to be approved for a developer account.

    Twitter 開発者アカウントの確認Twitter developer account confirmation

  2. [Create an application](アプリケーションの作成) ページで新しいアプリの詳細を入力し、 [Create your Twitter application](Twitter アプリケーションの作成) を選択します。In the Create an application page, provide the details for the new app, and then select Create your Twitter application.

    Twitter アプリケーションの詳細Twitter application details

    Twitter アプリケーションの詳細Twitter application details

  3. アプリケーションのページで [Keys and Tokens](キーとトークン) タブを選択し、 [Consumer API Key](コンシューマー API キー)[Consumer API Secret Key](コンシューマー API シークレット キー) の値をコピーします。In the application page, select the Keys and Tokens tab and copy the values for Consumer API Key and Consumer API Secret Key. さらに、 [Access Token and Access Token Secret](アクセス トークンとアクセス トークン シークレット) の下の [Create](作成) を選択して、アクセス トークンを生成します。Also, select Create under Access Token and Access Token Secret to generate the access tokens. [Access Token](アクセス トークン)[Access Token Secret](アクセス トークン シークレット) の値をコピーします。Copy the values for Access Token and Access Token Secret.

    Twitter アプリケーションの詳細Twitter application details

Twitter アプリケーションについて取得した値を保存します。Save the values that you retrieved for the Twitter application. 値は、このチュートリアルの後の方で必要になります。You need the values later in the tutorial.

ライブラリを Spark クラスターにアタッチするAttach libraries to Spark cluster

このチュートリアルでは、Twitter API を使用してツイートを Event Hubs に送信します。In this tutorial, you use the Twitter APIs to send tweets to Event Hubs. さらに、Apache Spark Event Hubs コネクタを使用して、Azure Event Hubs に対するデータの読み取りと書き込みを行います。You also use the Apache Spark Event Hubs connector to read and write data into Azure Event Hubs. これらの API をクラスターの一部として使用するには、それらをライブラリとして Azure Databricks に追加し、Spark クラスターに関連付けます。To use these APIs as part of your cluster, add them as libraries to Azure Databricks and associate them with your Spark cluster. 次の手順は、ライブラリを追加する方法を示しています。The following instructions show how to add a library.

  1. Azure Databricks ワークスペースで [クラスター] を選択し、既存の Spark クラスターを選択します。In the Azure Databricks workspace, select Clusters, and choose your existing Spark cluster. クラスター メニューで、 [ライブラリ] を選択し、 [新規インストール] をクリックします。Within the cluster menu, choose Libraries and click Install New.

    [ライブラリの追加] ダイアログ ボックスAdd library dialog box

    [ライブラリの追加] ダイアログ ボックスAdd library dialog box

  2. [新しいライブラリ] ページの [ソース] で、 [Maven] を選択します。In the New Library page, for Source select Maven. [座標] では、追加したいパッケージの [パッケージの検索] をクリックします。For Coordinate, click Search Packages for the package you want to add. ここでは、このチュートリアルで使用するライブラリの Maven 座標です。Here is the Maven coordinates for the libraries used in this tutorial:

    • Spark Event Hubs コネクタ - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10Spark Event Hubs connector - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10

    • Twitter API - org.twitter4j:twitter4j-core:4.0.7Twitter API - org.twitter4j:twitter4j-core:4.0.7

      Maven 座標を入力するProvide Maven coordinates

      Maven 座標を入力するProvide Maven coordinates

  3. [インストール] を選択します。Select Install.

  4. クラスター メニューで、両方のライブラリが正しくインストールされ、アタッチされていることを確認します。In the cluster menu, make sure both libraries are installed and attached properly.

    ライブラリを確認するCheck libraries

  5. Twitter パッケージ twitter4j-core:4.0.7 について、これらの手順を繰り返します。Repeat these steps for the Twitter package, twitter4j-core:4.0.7.

Databricks でノートブックを作成するCreate notebooks in Databricks

このセクションでは、次の名前を使用して、Databricks ワークスペース内に 2 つのノートブックを作成します。In this section, you create two notebooks in Databricks workspace with the following names:

  • SendTweetsToEventHub - Twitter からツイートを取得し、Event Hubs にそれらをストリーム配信するために使用されるプロデューサー ノートブック。SendTweetsToEventHub - A producer notebook you use to get tweets from Twitter and stream them to Event Hubs.
  • ReadTweetsFromEventHub - Event Hubs からツイートを読み取るために使用されるコンシューマー ノートブック。ReadTweetsFromEventHub - A consumer notebook you use to read the tweets from Event Hubs.
  1. 左側のウィンドウで、 [ワークスペース] を選択します。In the left pane, select Workspace. [ワークスペース] ドロップダウンで、 [作成] > [ノートブック] の順に選択します。From the Workspace drop-down, select Create > Notebook.

    Databricks でノートブックを作成するCreate notebook in Databricks

  2. [Create Notebook](ノートブックの作成) ダイアログ ボックスに「SendTweetsToEventHub」と入力し、言語として [Scala] を選んで、前に作成した Spark クラスターを選びます。In the Create Notebook dialog box, enter SendTweetsToEventHub, select Scala as the language, and select the Spark cluster that you created earlier.

    Databricks でノートブックを作成するCreate notebook in Databricks

    作成 を選択します。Select Create.

  3. 手順を繰り返して ReadTweetsFromEventHub ノートブックを作成します。Repeat the steps to create the ReadTweetsFromEventHub notebook.

ツイートを Event Hubs に送信するSend tweets to Event Hubs

SendTweetsToEventHub ノートブックで次のコードを貼り付けて、プレースホルダーを、先ほど作成した Event Hubs 名前空間と Twitter アプリケーションの値に置き換えます。In the SendTweetsToEventHub notebook, paste the following code, and replace the placeholders with values for your Event Hubs namespace and Twitter application that you created earlier. このノートブックによって、キーワード "Azure" が含まれたツイートがリアルタイムで Event Hubs にストリーム配信されます。This notebook streams tweets with the keyword "Azure" into Event Hubs in real time.

注意

Twitter API には、特定の要求の制限とクォータがあります。Twitter API has certain request restrictions and quotas. Twitter API の標準のレート制限では不足の場合、この例では Twitter API を使用せずにテキスト コンテンツを生成できます。If you are not satisfied with standard rate limiting in Twitter API, you can generate text content without using Twitter API in this example. そうするには、変数 dataSourcetwitter ではなく test に設定し、リスト testSource に適切なテスト入力を設定します。To do that, set variable dataSource to test instead of twitter and populate the list testSource with preferred test input.

    import scala.collection.JavaConverters._
    import com.microsoft.azure.eventhubs._
    import java.util.concurrent._
    import scala.collection.immutable._
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global

    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val pool = Executors.newScheduledThreadPool(1)
    val eventHubClient = EventHubClient.create(connStr.toString(), pool)

    def sleep(time: Long): Unit = Thread.sleep(time)

    def sendEvent(message: String, delay: Long) = {
      sleep(delay)
      val messageData = EventData.create(message.getBytes("UTF-8"))
      eventHubClient.get().send(messageData)
      System.out.println("Sent event: " + message + "\n")
    }

    // Add your own values to the list
    val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")

    // Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
    // Otherwise specify 'twitter'
    val dataSource = "test"

    if (dataSource == "twitter") {

      import twitter4j._
      import twitter4j.TwitterFactory
      import twitter4j.Twitter
      import twitter4j.conf.ConfigurationBuilder

      // Twitter configuration!
      // Replace values below with you

      val twitterConsumerKey = "<CONSUMER API KEY>"
      val twitterConsumerSecret = "<CONSUMER API SECRET>"
      val twitterOauthAccessToken = "<ACCESS TOKEN>"
      val twitterOauthTokenSecret = "<TOKEN SECRET>"

      val cb = new ConfigurationBuilder()
        cb.setDebugEnabled(true)
        .setOAuthConsumerKey(twitterConsumerKey)
        .setOAuthConsumerSecret(twitterConsumerSecret)
        .setOAuthAccessToken(twitterOauthAccessToken)
        .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

      val twitterFactory = new TwitterFactory(cb.build())
      val twitter = twitterFactory.getInstance()

      // Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
      val query = new Query(" #Azure ")
      query.setCount(100)
      query.lang("en")
      var finished = false
      while (!finished) {
        val result = twitter.search(query)
        val statuses = result.getTweets()
        var lowestStatusId = Long.MaxValue
        for (status <- statuses.asScala) {
          if(!status.isRetweet()){
            sendEvent(status.getText(), 5000)
          }
          lowestStatusId = Math.min(status.getId(), lowestStatusId)
        }
        query.setMaxId(lowestStatusId - 1)
      }

    } else if (dataSource == "test") {
      // Loop through the list of test input data
      while (true) {
        testSource.foreach {
          sendEvent(_,5000)
        }
      }

    } else {
      System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
    }

    // Closing connection to the Event Hub
    eventHubClient.get().close()

ノートブックを実行するには、Shift + Enter キーを押します。To run the notebook, press SHIFT + ENTER. 以下のようなスニペットが出力されます。You see an output like the snippet below. 出力の各イベントは、用語 "Azure" が含まれているため Event Hubs に取り込まれたツイートです。Each event in the output is a tweet that is ingested into the Event Hubs containing the term "Azure".

Sent event: @Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence

Sent event: Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure

Sent event: 4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie

Sent event: Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk

Sent event: Top 10 Tricks to #Save Money with #Azure Virtual Machines https://t.co/F2wshBXdoz #Cloud

...
...

Event Hubs からツイートを読み取るRead tweets from Event Hubs

ReadTweetsFromEventHub ノートブックで次のコードを貼り付けて、プレースホルダーを、先ほど作成した Azure Event Hubs の値に置き換えます。In the ReadTweetsFromEventHub notebook, paste the following code, and replace the placeholder with values for your Azure Event Hubs that you created earlier. このノートブックによって、先ほど SendTweetsToEventHub ノートブックを使用して Event Hubs にストリーム配信したツイートが読み取られます。This notebook reads the tweets that you earlier streamed into Event Hubs using the SendTweetsToEventHub notebook.


    import org.apache.spark.eventhubs._
    import com.microsoft.azure.eventhubs._

    // Build connection string with the above information
    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val customEventhubParameters =
      EventHubsConf(connStr.toString())
      .setMaxEventsPerTrigger(5)

    val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

    incomingStream.printSchema

    // Sending the incoming stream into the console.
    // Data comes in batches!
    incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

次の出力が得られます。You get the following output:

root
 |-- body: binary (nullable = true)
 |-- offset: long (nullable = true)
 |-- seqNumber: long (nullable = true)
 |-- enqueuedTime: long (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+--------------+---------------+---------+------------+
|body  |offset|sequenceNumber|enqueuedTime   |publisher|partitionKey|
+------+------+--------------+---------------+---------+------------+
|[50 75 62 6C 69 63 20 70 72 65 76 69 65 77 20 6F 66 20 4A 61 76 61 20 6F 6E 20 41 70 70 20 53 65 72 76 69 63 65 2C 20 62 75 69 6C 74 2D 69 6E 20 73 75 70 70 6F 72 74 20 66 6F 72 20 54 6F 6D 63 61 74 20 61 6E 64 20 4F 70 65 6E 4A 44 4B 0A 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 37 76 73 37 63 4B 74 76 61 68 20 0A 23 63 6C 6F 75 64 63 6F 6D 70 75 74 69 6E 67 20 23 41 7A 75 72 65]                              |0     |0             |2018-03-09 05:49:08.86 |null     |null        |
|[4D 69 67 72 61 74 65 20 79 6F 75 72 20 64 61 74 61 62 61 73 65 73 20 74 6F 20 61 20 66 75 6C 6C 79 20 6D 61 6E 61 67 65 64 20 73 65 72 76 69 63 65 20 77 69 74 68 20 41 7A 75 72 65 20 53 51 4C 20 44 61 74 61 62 61 73 65 20 4D 61 6E 61 67 65 64 20 49 6E 73 74 61 6E 63 65 20 7C 20 23 41 7A 75 72 65 20 7C 20 23 43 6C 6F 75 64 20 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 73 4A 48 58 4E 34 74 72 44 6B]            |168   |1             |2018-03-09 05:49:24.752|null     |null        |
+------+------+--------------+---------------+---------+------------+

-------------------------------------------
Batch: 1
-------------------------------------------
...
...

出力はバイナリ モードのため、次のスニペットを使用して文字列に変換します。Because the output is in a binary mode, use the following snippet to convert it into string.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._

    // Event Hub message format is JSON and contains "body" field
    // Body is binary, so we cast it to string to see the actual content of the message
    val messages =
      incomingStream
      .withColumn("Offset", $"offset".cast(LongType))
      .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
      .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
      .withColumn("Body", $"body".cast(StringType))
      .select("Offset", "Time (readable)", "Timestamp", "Body")

    messages.printSchema

    messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

これで、次のようなスニペットが出力されます。The output now resembles the following snippet:

root
 |-- Offset: long (nullable = true)
 |-- Time (readable): timestamp (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Body: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----------------+----------+-------+
|Offset|Time (readable)  |Timestamp |Body
+------+-----------------+----------+-------+
|0     |2018-03-09 05:49:08.86 |1520574548|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure          |
|168   |2018-03-09 05:49:24.752|1520574564|Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |
|0     |2018-03-09 05:49:02.936|1520574542|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|
|176   |2018-03-09 05:49:20.801|1520574560|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |
+------+-----------------+----------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
...
...

これで完了です。That's it! Azure Databricks を使用して、データを Azure Event Hubs にほぼリアルタイムで正常にストリーム配信できました。Using Azure Databricks, you have successfully streamed data into Azure Event Hubs in near real-time. 次に、Apache Spark 用の Event Hubs コネクタを使用してストリーム データを読み取りました。You then consumed the stream data using the Event Hubs connector for Apache Spark. Spark 用の Event Hubs コネクタを使用する方法の詳細については、コネクタに関するドキュメントを参照してください。For more information on how to use the Event Hubs connector for Spark, see the connector documentation.

リソースのクリーンアップClean up resources

チュートリアルの実行が完了したら、クラスターを終了できます。After you have finished running the tutorial, you can terminate the cluster. そのためには、Azure Databricks ワークスペースの左側のウィンドウで、 [クラスター] を選択します。To do so, from the Azure Databricks workspace, from the left pane, select Clusters. 終了するクラスターで、 [アクション] 列の下にある省略記号をポイントし、 [終了] アイコンを選択します。For the cluster you want to terminate, move the cursor over the ellipsis under Actions column, and select the Terminate icon.

Databricks クラスターを停止するStop a Databricks cluster

クラスター作成時に [Terminate after __ minutes of inactivity] (アクティビティが __ 分ない場合は終了する) チェック ボックスをオンにしていた場合、手動で終了しなくともクラスターは自動で停止します。If you do not manually terminate the cluster it will automatically stop, provided you selected the Terminate after __ minutes of inactivity checkbox while creating the cluster. このような場合、クラスターは、一定の時間だけ非アクティブな状態が続くと自動的に停止します。In such a case, the cluster will automatically stop if it has been inactive for the specified time.

次の手順Next steps

このチュートリアルで学習した内容は次のとおりです。In this tutorial, you learned how to:

  • Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace
  • Azure Databricks で Spark クラスターを作成するCreate a Spark cluster in Azure Databricks
  • ストリーミング データを生成する Twitter アプリを作成するCreate a Twitter app to generate streaming data
  • Azure Databricks でノートブックを作成するCreate notebooks in Azure Databricks
  • Event Hubs と Twitter API のライブラリを追加するAdd libraries for Event Hubs and Twitter API
  • ツイートを Event Hubs に送信するSend tweets to Event Hubs
  • Event Hubs からツイートを読み取るRead tweets from Event Hubs

次のチュートリアルに進み、Azure Databricks と Cognitive Services API を使用して、ストリーム配信されたデータに対して感情分析を実行する方法について学習してください。Advance to the next tutorial to learn about performing sentiment analysis on the streamed data using Azure Databricks and Cognitive Services API.