チュートリアル:Azure Databricks を使用した、ストリーミング データに対する感情分析Tutorial: Sentiment analysis on streaming data using Azure Databricks

このチュートリアルでは、Azure Databricks を使用して、データ ストリームに対して、ほぼリアルタイムで感情分析を実行する方法について説明します。In this tutorial, you learn how to run sentiment analysis on a stream of data using Azure Databricks in near real time. Azure Event Hubs を使用して、データ インジェスト システムを設定します。You set up data ingestion system using Azure Event Hubs. Spark Event Hubs コネクタを使用して Event Hubs から Azure Databricks にメッセージを読み取ります。You consume the messages from Event Hubs into Azure Databricks using the Spark Event Hubs connector. そして、Cognitive Services API シリーズを使用して、ストリーム配信されたデータに対して感情分析を実行します。Finally, you use Cognitive Service APIs to run sentiment analysis on the streamed data.

このチュートリアルの完了時には、内容に用語 "Azure" が含まれたツイートを Twitter からストリーム配信し、それらのツイートに対して感情分析を実行することになります。By the end of this tutorial, you would have streamed tweets from Twitter that have the term "Azure" in them and ran sentiment analysis on the tweets.

次の図に、アプリケーション フローを示します。The following illustration shows the application flow:

Event Hubs と Cognitive Services を使用した Azure DatabricksAzure Databricks with Event Hubs and Cognitive Services

このチュートリアルに含まれるタスクは次のとおりです。This tutorial covers the following tasks:

  • Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace
  • Azure Databricks で Spark クラスターを作成するCreate a Spark cluster in Azure Databricks
  • ストリーミング データにアクセスする Twitter アプリを作成するCreate a Twitter app to access streaming data
  • Azure Databricks でノートブックを作成するCreate notebooks in Azure Databricks
  • Event Hubs と Twitter API のライブラリをアタッチするAttach libraries for Event Hubs and Twitter API
  • Cognitive Services アカウントを作成し、アクセス キーを取得するCreate a Cognitive Services account and retrieve the access key
  • ツイートを Event Hubs に送信するSend tweets to Event Hubs
  • Event Hubs からツイートを読み取るRead tweets from Event Hubs
  • ツイートに対して感情分析を実行するRun sentiment analysis on tweets

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。If you don't have an Azure subscription, create a free account before you begin.

注意

Azure 無料試用版サブスクリプションを使用してこのチュートリアルを実行することはできません。This tutorial cannot be carried out using Azure Free Trial Subscription. 無料アカウントを使用して Azure Databricks クラスターを作成するには、クラスターを作成する前に、プロファイルにアクセスし、サブスクリプションを従量課金制に変更します。To use a free account to create the Azure Databricks cluster, before creating the cluster, go to your profile and change your subscription to pay-as-you-go. 詳細については、Azure 無料アカウントに関するページをご覧ください。For more information, see Azure free account.

前提条件Prerequisites

このチュートリアルを開始する前に、次の要件を満たしてください。Before you start with this tutorial, make sure to meet the following requirements:

  • Azure Event Hubs 名前空間。An Azure Event Hubs namespace.
  • 名前空間内のイベント ハブ。An Event Hub within the namespace.
  • Event Hubs 名前空間にアクセスするための接続文字列。Connection string to access the Event Hubs namespace. 接続文字列は次のような形式になります。Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>The connection string should have a format similar to Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=<key name>;SharedAccessKey=<key value>.
  • Event Hubs の共有アクセス ポリシー名とポリシー キー。Shared access policy name and policy key for Event Hubs.

これらの要件は、Azure Event Hubs 名前空間とイベント ハブの作成に関する記事の手順を完了することで満たせます。You can meet these requirements by completing the steps in the article, Create an Azure Event Hubs namespace and event hub.

Azure portal にサインインしますSign in to the Azure portal

Azure Portal にサインインします。Sign in to the Azure portal.

Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace

このセクションでは、Azure Portal を使って Azure Databricks ワークスペースを作成します。In this section, you create an Azure Databricks workspace using the Azure portal.

  1. Azure Portal で、 [リソースの作成] > [データ + 分析] > [Azure Databricks] の順に選択します。In the Azure portal, select Create a resource > Data + Analytics > Azure Databricks.

    Azure Portal の DatabricksDatabricks on Azure portal

  2. [Azure Databricks サービス] で値を指定して、Databricks ワークスペースを作成します。Under Azure Databricks Service, provide the values to create a Databricks workspace.

    Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace

    次の値を指定します。Provide the following values:

    プロパティProperty 説明Description
    ワークスペース名Workspace name Databricks ワークスペースの名前を指定しますProvide a name for your Databricks workspace
    サブスクリプションSubscription ドロップダウンから Azure サブスクリプションを選択します。From the drop-down, select your Azure subscription.
    リソース グループResource group 新しいリソース グループを作成するか、既存のリソース グループを使用するかを指定します。Specify whether you want to create a new resource group or use an existing one. リソース グループは、Azure ソリューションの関連するリソースを保持するコンテナーです。A resource group is a container that holds related resources for an Azure solution. 詳しくは、Azure リソース グループの概要に関するページをご覧ください。For more information, see Azure Resource Group overview.
    場所Location [米国東部 2] を選択します。Select East US 2. 使用可能な他のリージョンについては、「リージョン別の利用可能な製品」をご覧ください。For other available regions, see Azure services available by region.
    価格レベルPricing Tier StandardPremium のいずれかを選択します。Choose between Standard or Premium. これらのレベルの詳細については、Databricks の価格に関するページを参照してください。For more information on these tiers, see Databricks pricing page.

    [ダッシュボードにピン留めする] チェック ボックスをオンにして、 [作成] を選択します。Select Pin to dashboard and then select Create.

  3. アカウントの作成には数分かかります。The account creation takes a few minutes. アカウント作成時に、ポータルの右側に [Submitting deployment for Azure Databricks](Azure Databricks のデプロイを送信しています) タイルが表示されます。During account creation, the portal displays the Submitting deployment for Azure Databricks tile on the right side. このタイルを表示するために、ダッシュボードを右へスクロールしなければならない場合があります。You may need to scroll right on your dashboard to see the tile. スクリーンの上部に進行状況バーも表示されます。There is also a progress bar displayed near the top of the screen. いずれかの領域で進行状況を確認できます。You can watch either area for progress.

    Databricks のデプロイのタイルDatabricks deployment tile

Databricks に Spark クラスターを作成するCreate a Spark cluster in Databricks

  1. Azure Portal で、作成した Databricks ワークスペースに移動して、 [Launch Workspace](ワークスペースの起動) を選択します。In the Azure portal, go to the Databricks workspace that you created, and then select Launch Workspace.

  2. Azure Databricks ポータルにリダイレクトされます。You are redirected to the Azure Databricks portal. ポータルで [クラスター] を選択します。From the portal, select Cluster.

    Azure の DatabricksDatabricks on Azure

  3. [New cluster](新しいクラスター) ページで、クラスターを作成するための値を指定します。In the New cluster page, provide the values to create a cluster.

    Azure で Databricks Spark クラスターを作成するCreate Databricks Spark cluster on Azure

    以下を除くすべての値は、既定値のままにします。Accept all other default values other than the following:

    • クラスターの名前を入力します。Enter a name for the cluster.
    • この記事では、5.2 ランタイムを使用してクラスターを作成します。For this article, create a cluster with 5.2 runtime.
    • [Terminate after __ minutes of inactivity] (アクティビティが __ 分ない場合は終了する) チェック ボックスをオンにします。Make sure you select the Terminate after __ minutes of inactivity checkbox. クラスターが使われていない場合にクラスターを終了するまでの時間 (分単位) を指定します。Provide a duration (in minutes) to terminate the cluster, if the cluster is not being used.

    技術的条件と予算に適したクラスター ワーカーとドライバー ノードのサイズを選択します。Select cluster worker and driver node size suitable for your technical criteria and budget.

    [クラスターの作成] を選択します。Select Create cluster. クラスターが実行されたら、ノートブックをクラスターにアタッチして、Spark ジョブを実行できます。Once the cluster is running, you can attach notebooks to the cluster and run Spark jobs.

Twitter アプリケーションを作成するCreate a Twitter application

ツイートのストリームを受け取るには、Twitter でアプリケーションを作成します。To receive a stream of tweets, you create an application in Twitter. 手順に従って Twitter アプリケーションを作成し、このチュートリアルの完了に必要な値を記録します。Follow the instructions create a Twitter application and record the values that you need to complete this tutorial.

  1. Web ブラウザーで Twitter の開発者用ページに移動して、 [Create an app](アプリの作成) を選択します。From a web browser, go to Twitter For Developers, and select Create an app. Twitter 開発者アカウントを申請する必要があるというメッセージが表示される場合があります。You might see a message saying that you need to apply for a Twitter developer account. お気軽にお申し込みください。申請が承認されると、確認メールを受け取ります。Feel free to do so, and after your application has been approved you should see a confirmation email. 開発者アカウントの承認には、数日かかることがあります。It could take several days to be approved for a developer account.

    Twitter 開発者アカウントの確認Twitter developer account confirmation

  2. [Create an application](アプリケーションの作成) ページで新しいアプリの詳細を入力し、 [Create your Twitter application](Twitter アプリケーションの作成) を選択します。In the Create an application page, provide the details for the new app, and then select Create your Twitter application.

    Twitter アプリケーションの詳細Twitter application details

    Twitter アプリケーションの詳細Twitter application details

  3. アプリケーションのページで [Keys and Tokens](キーとトークン) タブを選択し、 [Consumer API Key](コンシューマー API キー)[Consumer API Secret Key](コンシューマー API シークレット キー) の値をコピーします。In the application page, select the Keys and Tokens tab and copy the values for Consumer API Key and Consumer API Secret Key. さらに、 [Access Token and Access Token Secret](アクセス トークンとアクセス トークン シークレット) の下の [Create](作成) を選択して、アクセス トークンを生成します。Also, select Create under Access Token and Access Token Secret to generate the access tokens. [Access Token](アクセス トークン)[Access Token Secret](アクセス トークン シークレット) の値をコピーします。Copy the values for Access Token and Access Token Secret.

    Twitter アプリケーションの詳細Twitter application details

Twitter アプリケーションについて取得した値を保存します。Save the values that you retrieved for the Twitter application. 値は、このチュートリアルの後の方で必要になります。You need the values later in the tutorial.

ライブラリを Spark クラスターにアタッチするAttach libraries to Spark cluster

このチュートリアルでは、Twitter API を使用してツイートを Event Hubs に送信します。In this tutorial, you use the Twitter APIs to send tweets to Event Hubs. さらに、Apache Spark Event Hubs コネクタを使用して、Azure Event Hubs に対するデータの読み取りと書き込みを行います。You also use the Apache Spark Event Hubs connector to read and write data into Azure Event Hubs. これらの API をクラスターの一部として使用するには、それらをライブラリとして Azure Databricks に追加し、Spark クラスターに関連付けます。To use these APIs as part of your cluster, add them as libraries to Azure Databricks and associate them with your Spark cluster. 次の手順は、ライブラリを追加する方法を示しています。The following instructions show how to add a library.

  1. Azure Databricks ワークスペースで [クラスター] を選択し、既存の Spark クラスターを選択します。In the Azure Databricks workspace, select Clusters, and choose your existing Spark cluster. クラスター メニューで、 [ライブラリ] を選択し、 [新規インストール] をクリックします。Within the cluster menu, choose Libraries and click Install New.

    [ライブラリの追加] ダイアログ ボックスAdd library dialog box

    [ライブラリの追加] ダイアログ ボックスAdd library dialog box

  2. [新しいライブラリ] ページの [ソース] で、 [Maven] を選択します。In the New Library page, for Source select Maven. [座標] では、追加したいパッケージの [パッケージの検索] をクリックします。For Coordinate, click Search Packages for the package you want to add. ここでは、このチュートリアルで使用するライブラリの Maven 座標です。Here is the Maven coordinates for the libraries used in this tutorial:

    • Spark Event Hubs コネクタ - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10Spark Event Hubs connector - com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.10

    • Twitter API - org.twitter4j:twitter4j-core:4.0.7Twitter API - org.twitter4j:twitter4j-core:4.0.7

      Maven 座標を入力するProvide Maven coordinates

      Maven 座標を入力するProvide Maven coordinates

  3. [インストール] を選択します。Select Install.

  4. クラスター メニューで、両方のライブラリが正しくインストールされ、アタッチされていることを確認します。In the cluster menu, make sure both libraries are installed and attached properly.

    ライブラリを確認するCheck libraries

  5. Twitter パッケージ twitter4j-core:4.0.7 について、これらの手順を繰り返します。Repeat these steps for the Twitter package, twitter4j-core:4.0.7.

Cognitive Services のアクセス キーを取得するGet a Cognitive Services access key

このチュートリアルでは、Azure Cognitive Services Text Analytics API を使用して、ツイートのストリームに対して、ほぼリアルタイムで感情分析を実行します。In this tutorial, you use the Azure Cognitive Services Text Analytics APIs to run sentiment analysis on a stream of tweets in near real time. API を使用する前に、Azure 上で Azure Cognitive Services アカウントを作成し、Text Analytics API を使用するためのアクセス キーを取得する必要があります。Before you use the APIs, you must create a Azure Cognitive Services account on Azure and retrieve an access key to use the Text Analytics APIs.

  1. Azure Portal にサインインします。Sign in to the Azure portal.

  2. [+ リソースの作成] を選択します。Select + Create a resource.

  3. Azure Marketplace で、 [AI + Cognitive Services] > [Text Analytics API] の順に選択します。Under Azure Marketplace, Select AI + Cognitive Services > Text Analytics API.

    Cognitive Services アカウントを作成するCreate cognitive services account

  4. [作成] ダイアログ ボックスで、次の値を入力します。In the Create dialog box, provide the following values:

    Cognitive Services アカウントを作成するCreate cognitive services account

    • Cognitive Services アカウントの名前を入力します。Enter a name for the Cognitive Services account.

    • アカウントが作成される Azure サブスクリプションを選択します。Select the Azure subscription under which the account is created.

    • Azure の場所を選択します。Select an Azure location.

    • サービスの価格レベルを選択します。Select a pricing tier for the service. Cognitive Services の価格の詳細については、料金ページを参照してください。For more information about Cognitive Services pricing, see pricing page.

    • 新しいリソース グループを作成するか、既存のリソース グループを選択するかを指定します。Specify whether you want to create a new resource group or select an existing one.

      作成 を選択します。Select Create.

  5. アカウントの作成後、 [概要] タブで [アクセス キーを表示] を選択します。After the account is created, from the Overview tab, select Show access keys.

    アクセス キーを表示するShow access keys

    さらに、スクリーンショットに示されているようにエンドポイント URL の一部をコピーします。Also, copy a part of the endpoint URL, as shown in the screenshot. この URL はチュートリアルで必要になります。You need this URL in the tutorial.

  6. [キーの管理] で、使用したいキーのコピー アイコンを選択します。Under Manage keys, select the copy icon against the key you want to use.

    アクセス キーをコピーするCopy access keys

  7. この手順で取得したエンドポイント URL とアクセス キーの値を保存します。Save the values for the endpoint URL and the access key, you retrieved in this step. これは、このチュートリアルで後ほど必要になります。You need it later in this tutorial.

Databricks でノートブックを作成するCreate notebooks in Databricks

このセクションでは、次の名前を使用して、Databricks ワークスペース内に 2 つのノートブックを作成しますIn this section, you create two notebooks in Databricks workspace with the following names

  • SendTweetsToEventHub - Twitter からツイートを取得し、Event Hubs にそれらをストリーム配信するために使用されるプロデューサー ノートブック。SendTweetsToEventHub - A producer notebook you use to get tweets from Twitter and stream them to Event Hubs.
  • AnalyzeTweetsFromEventHub - Event Hubs からのツイートの読み取りと感情分析の実行に使用されるコンシューマー ノートブック。AnalyzeTweetsFromEventHub - A consumer notebook you use to read the tweets from Event Hubs and run sentiment analysis.
  1. 左側のウィンドウで、 [ワークスペース] を選択します。In the left pane, select Workspace. [ワークスペース] ドロップダウンの [作成] を選択して、 [ノートブック] を選択します。From the Workspace drop-down, select Create, and then select Notebook.

    Databricks でノートブックを作成するCreate notebook in Databricks

  2. [Create Notebook](ノートブックの作成) ダイアログ ボックスに「SendTweetsToEventHub」と入力し、言語として [Scala] を選んで、前に作成した Spark クラスターを選びます。In the Create Notebook dialog box, enter SendTweetsToEventHub, select Scala as the language, and select the Spark cluster that you created earlier.

    Databricks でノートブックを作成するCreate notebook in Databricks

    作成 を選択します。Select Create.

  3. 手順を繰り返して AnalyzeTweetsFromEventHub ノートブックを作成します。Repeat the steps to create the AnalyzeTweetsFromEventHub notebook.

ツイートを Event Hubs に送信するSend tweets to Event Hubs

SendTweetsToEventHub ノートブックで次のコードを貼り付けて、プレースホルダーを、先ほど作成した Event Hubs 名前空間と Twitter アプリケーションの値に置き換えます。In the SendTweetsToEventHub notebook, paste the following code, and replace the placeholders with values for your Event Hubs namespace and Twitter application that you created earlier. このノートブックによって、キーワード "Azure" が含まれたツイートがリアルタイムで Event Hubs にストリーム配信されます。This notebook streams tweets with the keyword "Azure" into Event Hubs in real time.

注意

Twitter API には、特定の要求の制限とクォータがあります。Twitter API has certain request restrictions and quotas. Twitter API の標準のレート制限では不足の場合、この例では Twitter API を使用せずにテキスト コンテンツを生成できます。If you are not satisfied with standard rate limiting in Twitter API, you can generate text content without using Twitter API in this example. そうするには、変数 dataSourcetwitter ではなく test に設定し、リスト testSource に適切なテスト入力を設定します。To do that, set variable dataSource to test instead of twitter and populate the list testSource with preferred test input.

    import scala.collection.JavaConverters._
    import com.microsoft.azure.eventhubs._
    import java.util.concurrent._
    import scala.collection.immutable._
    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global

    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val pool = Executors.newScheduledThreadPool(1)
    val eventHubClient = EventHubClient.create(connStr.toString(), pool)

    def sleep(time: Long): Unit = Thread.sleep(time)

    def sendEvent(message: String, delay: Long) = {
      sleep(delay)
      val messageData = EventData.create(message.getBytes("UTF-8"))
      eventHubClient.get().send(messageData)
      System.out.println("Sent event: " + message + "\n")
    }

    // Add your own values to the list
    val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")

    // Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
    // Otherwise specify 'twitter'
    val dataSource = "test"

    if (dataSource == "twitter") {

      import twitter4j._
      import twitter4j.TwitterFactory
      import twitter4j.Twitter
      import twitter4j.conf.ConfigurationBuilder

      // Twitter configuration!
      // Replace values below with you

      val twitterConsumerKey = "<CONSUMER API KEY>"
      val twitterConsumerSecret = "<CONSUMER API SECRET>"
      val twitterOauthAccessToken = "<ACCESS TOKEN>"
      val twitterOauthTokenSecret = "<TOKEN SECRET>"

      val cb = new ConfigurationBuilder()
        cb.setDebugEnabled(true)
        .setOAuthConsumerKey(twitterConsumerKey)
        .setOAuthConsumerSecret(twitterConsumerSecret)
        .setOAuthAccessToken(twitterOauthAccessToken)
        .setOAuthAccessTokenSecret(twitterOauthTokenSecret)

      val twitterFactory = new TwitterFactory(cb.build())
      val twitter = twitterFactory.getInstance()

      // Getting tweets with keyword "Azure" and sending them to the Event Hub in realtime!
      val query = new Query(" #Azure ")
      query.setCount(100)
      query.lang("en")
      var finished = false
      while (!finished) {
        val result = twitter.search(query)
        val statuses = result.getTweets()
        var lowestStatusId = Long.MaxValue
        for (status <- statuses.asScala) {
          if(!status.isRetweet()){
            sendEvent(status.getText(), 5000)
          }
          lowestStatusId = Math.min(status.getId(), lowestStatusId)
        }
        query.setMaxId(lowestStatusId - 1)
      }

    } else if (dataSource == "test") {
      // Loop through the list of test input data
      while (true) {
        testSource.foreach {
          sendEvent(_,5000)
        }
      }

    } else {
      System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
    }

    // Closing connection to the Event Hub
    eventHubClient.get().close()

ノートブックを実行するには、Shift + Enter キーを押します。To run the notebook, press SHIFT + ENTER. 以下のようなスニペットが出力されます。You see an output like the snippet below. 出力の各イベントは、用語 "Azure" が含まれているため Event Hubs に取り込まれたツイートです。Each event in the output is a tweet that is ingested into the Event Hubs containing the term "Azure".

Sent event: @Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence

Sent event: Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure

Sent event: 4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie

Sent event: Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk

Sent event: Top 10 Tricks to #Save Money with #Azure Virtual Machines https://t.co/F2wshBXdoz #Cloud

...
...

Event Hubs からツイートを読み取るRead tweets from Event Hubs

AnalyzeTweetsFromEventHub ノートブックで次のコードを貼り付けて、プレースホルダーを、先ほど作成した Azure Event Hubs の値に置き換えます。In the AnalyzeTweetsFromEventHub notebook, paste the following code, and replace the placeholder with values for your Azure Event Hubs that you created earlier. このノートブックによって、先ほど SendTweetsToEventHub ノートブックを使用して Event Hubs にストリーム配信したツイートが読み取られます。This notebook reads the tweets that you earlier streamed into Event Hubs using the SendTweetsToEventHub notebook.


    import org.apache.spark.eventhubs._
    import com.microsoft.azure.eventhubs._

    // Build connection string with the above information
    val namespaceName = "<EVENT HUBS NAMESPACE>"
    val eventHubName = "<EVENT HUB NAME>"
    val sasKeyName = "<POLICY NAME>"
    val sasKey = "<POLICY KEY>"
    val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
                .setNamespaceName(namespaceName)
                .setEventHubName(eventHubName)
                .setSasKeyName(sasKeyName)
                .setSasKey(sasKey)

    val customEventhubParameters =
      EventHubsConf(connStr.toString())
      .setMaxEventsPerTrigger(5)

    val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

    incomingStream.printSchema

    // Sending the incoming stream into the console.
    // Data comes in batches!
    incomingStream.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

次の出力が得られます。You get the following output:

root
 |-- body: binary (nullable = true)
 |-- offset: long (nullable = true)
 |-- seqNumber: long (nullable = true)
 |-- enqueuedTime: long (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+--------------+---------------+---------+------------+
|body  |offset|sequenceNumber|enqueuedTime   |publisher|partitionKey|
+------+------+--------------+---------------+---------+------------+
|[50 75 62 6C 69 63 20 70 72 65 76 69 65 77 20 6F 66 20 4A 61 76 61 20 6F 6E 20 41 70 70 20 53 65 72 76 69 63 65 2C 20 62 75 69 6C 74 2D 69 6E 20 73 75 70 70 6F 72 74 20 66 6F 72 20 54 6F 6D 63 61 74 20 61 6E 64 20 4F 70 65 6E 4A 44 4B 0A 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 37 76 73 37 63 4B 74 76 61 68 20 0A 23 63 6C 6F 75 64 63 6F 6D 70 75 74 69 6E 67 20 23 41 7A 75 72 65]                              |0     |0             |2018-03-09 05:49:08.86 |null     |null        |
|[4D 69 67 72 61 74 65 20 79 6F 75 72 20 64 61 74 61 62 61 73 65 73 20 74 6F 20 61 20 66 75 6C 6C 79 20 6D 61 6E 61 67 65 64 20 73 65 72 76 69 63 65 20 77 69 74 68 20 41 7A 75 72 65 20 53 51 4C 20 44 61 74 61 62 61 73 65 20 4D 61 6E 61 67 65 64 20 49 6E 73 74 61 6E 63 65 20 7C 20 23 41 7A 75 72 65 20 7C 20 23 43 6C 6F 75 64 20 68 74 74 70 73 3A 2F 2F 74 2E 63 6F 2F 73 4A 48 58 4E 34 74 72 44 6B]            |168   |1             |2018-03-09 05:49:24.752|null     |null        |
+------+------+--------------+---------------+---------+------------+

-------------------------------------------
Batch: 1
-------------------------------------------
...
...

出力はバイナリ モードのため、次のスニペットを使用して文字列に変換します。Because the output is in a binary mode, use the following snippet to convert it into string.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._

    // Event Hub message format is JSON and contains "body" field
    // Body is binary, so we cast it to string to see the actual content of the message
    val messages =
      incomingStream
      .withColumn("Offset", $"offset".cast(LongType))
      .withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
      .withColumn("Timestamp", $"enqueuedTime".cast(LongType))
      .withColumn("Body", $"body".cast(StringType))
      .select("Offset", "Time (readable)", "Timestamp", "Body")

    messages.printSchema

    messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

これで、次のようなスニペットが出力されます。The output now resembles the following snippet:

root
 |-- Offset: long (nullable = true)
 |-- Time (readable): timestamp (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Body: string (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----------------+----------+-------+
|Offset|Time (readable)  |Timestamp |Body
+------+-----------------+----------+-------+
|0     |2018-03-09 05:49:08.86 |1520574548|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah
#cloudcomputing #Azure          |
|168   |2018-03-09 05:49:24.752|1520574564|Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |
|0     |2018-03-09 05:49:02.936|1520574542|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|
|176   |2018-03-09 05:49:20.801|1520574560|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |
+------+-----------------+----------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
...
...

これで、Apache Spark 用の Event Hubs コネクタを使用して、Azure Event Hubs から Azure Databricks にデータをほぼリアルタイムでストリーム配信できました。You have now streamed data from Azure Event Hubs into Azure Databricks at near real time using the Event Hubs connector for Apache Spark. Spark 用の Event Hubs コネクタを使用する方法の詳細については、コネクタに関するドキュメントを参照してください。For more information on how to use the Event Hubs connector for Spark, see the connector documentation.

ツイートに対して感情分析を実行するRun sentiment analysis on tweets

このセクションでは、Twitter API を使用して受信されたツイートに対して感情分析を実行します。In this section, you run sentiment analysis on the tweets received using the Twitter API. このセクションでは同じ AnalyzeTweetsFromEventHub ノートブックにコード スニペットを追加します。For this section, you add the code snippets to the same AnalyzeTweetsFromEventHub notebook.

まず、ノートブックに新しいコード セルを追加し、下記のコード スニペットを貼り付けます。Start by adding a new code cell in the notebook and paste the code snippet provided below. このコード スニペットでは、言語およびセンチメント API で扱うデータ型を定義します。This code snippet defines data types for working with the Language and Sentiment API.

import java.io._
import java.net._
import java.util._

case class Language(documents: Array[LanguageDocuments], errors: Array[Any]) extends Serializable
case class LanguageDocuments(id: String, detectedLanguages: Array[DetectedLanguages]) extends Serializable
case class DetectedLanguages(name: String, iso6391Name: String, score: Double) extends Serializable

case class Sentiment(documents: Array[SentimentDocuments], errors: Array[Any]) extends Serializable
case class SentimentDocuments(id: String, score: Double) extends Serializable

case class RequestToTextApi(documents: Array[RequestToTextApiDocument]) extends Serializable
case class RequestToTextApiDocument(id: String, text: String, var language: String = "") extends Serializable

新しいコード セルを追加して、下記のスニペットを貼り付けます。Add a new code cell and paste the snippet provided below. このスニペットでは、Text Analysis API を呼び出す関数が含まれたオブジェクトを定義します。この Text Analysis API が言語検出と感情分析を実行します。This snippet defines an object that contains functions to call the Text Analysis API to run language detection and sentiment analysis. プレースホルダー <PROVIDE ACCESS KEY HERE> は、お使いの Cognitive Services アカウントに関して取得した値に置き換えてください。Make sure you replace the placeholder <PROVIDE ACCESS KEY HERE> with the value you retrieved for your Cognitive Services account.

import javax.net.ssl.HttpsURLConnection
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import com.google.gson.JsonParser
import scala.util.parsing.json._

object SentimentDetector extends Serializable {

    // Cognitive Services API connection settings
    val accessKey = "<PROVIDE ACCESS KEY HERE>"
    val host = "https://cognitive-docs.cognitiveservices.azure.com/"
    val languagesPath = "/text/analytics/v2.1/languages"
    val sentimentPath = "/text/analytics/v2.1/sentiment"
    val languagesUrl = new URL(host+languagesPath)
    val sentimenUrl = new URL(host+sentimentPath)
    val g = new Gson

    def getConnection(path: URL): HttpsURLConnection = {
        val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
        connection.setRequestMethod("POST")
        connection.setRequestProperty("Content-Type", "text/json")
        connection.setRequestProperty("Ocp-Apim-Subscription-Key", accessKey)
        connection.setDoOutput(true)
        return connection
    }

    def prettify (json_text: String): String = {
        val parser = new JsonParser()
        val json = parser.parse(json_text).getAsJsonObject()
        val gson = new GsonBuilder().setPrettyPrinting().create()
        return gson.toJson(json)
    }

    // Handles the call to Cognitive Services API.
    def processUsingApi(request: RequestToTextApi, path: URL): String = {
        val requestToJson = g.toJson(request)
        val encoded_text = requestToJson.getBytes("UTF-8")
        val connection = getConnection(path)
        val wr = new DataOutputStream(connection.getOutputStream())
        wr.write(encoded_text, 0, encoded_text.length)
        wr.flush()
        wr.close()

        val response = new StringBuilder()
        val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
        var line = in.readLine()
        while (line != null) {
            response.append(line)
            line = in.readLine()
        }
        in.close()
        return response.toString()
    }

    // Calls the language API for specified documents.
    def getLanguage (inputDocs: RequestToTextApi): Option[Language] = {
        try {
            val response = processUsingApi(inputDocs, languagesUrl)
            // In case we need to log the json response somewhere
            val niceResponse = prettify(response)
            // Deserializing the JSON response from the API into Scala types
            val language = g.fromJson(niceResponse, classOf[Language])
            if (language.documents(0).detectedLanguages(0).iso6391Name == "(Unknown)")
                return None
            return Some(language)
        } catch {
            case e: Exception => return None
        }
    }

    // Calls the sentiment API for specified documents. Needs a language field to be set for each of them.
    def getSentiment (inputDocs: RequestToTextApi): Option[Sentiment] = {
        try {
            val response = processUsingApi(inputDocs, sentimenUrl)
            val niceResponse = prettify(response)
            // Deserializing the JSON response from the API into Scala types
            val sentiment = g.fromJson(niceResponse, classOf[Sentiment])
            return Some(sentiment)
        } catch {
            case e: Exception => return None
        }
    }
}

センチメントを決定する Spark UDF (ユーザー定義関数) を定義するために、別のセルを追加します。Add another cell to define a Spark UDF (User-defined function) that determines sentiment.

// User Defined Function for processing content of messages to return their sentiment.
val toSentiment =
    udf((textContent: String) =>
        {
            val inputObject = new RequestToTextApi(Array(new RequestToTextApiDocument(textContent, textContent)))
            val detectedLanguage = SentimentDetector.getLanguage(inputObject)
            detectedLanguage match {
                case Some(language) =>
                    if(language.documents.size > 0) {
                        inputObject.documents(0).language = language.documents(0).detectedLanguages(0).iso6391Name
                        val sentimentDetected = SentimentDetector.getSentiment(inputObject)
                        sentimentDetected match {
                            case Some(sentiment) => {
                                if(sentiment.documents.size > 0) {
                                    sentiment.documents(0).score.toString()
                                }
                                else {
                                    "Error happened when getting sentiment: " + sentiment.errors(0).toString
                                }
                            }
                            case None => "Couldn't detect sentiment"
                        }
                    }
                    else {
                        "Error happened when getting language" + language.errors(0).toString
                    }
                case None => "Couldn't detect language"
            }
        }
    )

最後のコード セルを追加して、ツイートの内容とツイートに関連付けられた感情が含まれたデータフレームを準備します。Add a final code cell to prepare a dataframe with the content of the tweet and the sentiment associated with the tweet.

// Prepare a dataframe with Content and Sentiment columns
val streamingDataFrame = incomingStream.selectExpr("cast (body as string) AS Content").withColumn("Sentiment", toSentiment($"Content"))

// Display the streaming data with the sentiment
streamingDataFrame.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

次のスニペットのような出力が表示されます。You should see an output like the following snippet:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------------------+------------------+
|Content                         |Sentiment         |
+--------------------------------+------------------+
|Public preview of Java on App Service, built-in support for Tomcat and OpenJDK
https://t.co/7vs7cKtvah   #cloudcomputing #Azure          |0.7761918306350708|
|Migrate your databases to a fully managed service with Azure SQL Database Managed Instance | #Azure | #Cloud https://t.co/sJHXN4trDk    |0.8558163642883301|
|@Microsoft and @Esri launch Geospatial AI on Azure https://t.co/VmLUCiPm6q via @geoworldmedia #geoai #azure #gis #ArtificialIntelligence|0.5               |
|4 Killer #Azure Features for #Data #Performance https://t.co/kpIb7hFO2j by @RedPixie                                                    |0.5               |
+--------------------------------+------------------+

1 に近い値が Sentiment 列に含まれていることは、Azure でのエクスペリエンスが優れていることを示唆します。A value closer to 1 in the Sentiment column suggests a great experience with Azure. 0 に近い値は、Microsoft Azure での作業中にユーザーが問題に直面したことを示唆します。A value closer to 0 suggests issues that users faced while working with Microsoft Azure.

これで完了です。That's it! Azure Databricks を使用して、Azure Event Hubs にデータをストリーム配信し、Event Hubs コネクタによってデータ ストリームを読み取り、ストリーミング データに対し、ほぼリアルタイムで感情分析を実行することができました。Using Azure Databricks, you have successfully streamed data into Azure Event Hubs, consumed the stream data using the Event Hubs connector, and then ran sentiment analysis on streaming data in near real time.

リソースのクリーンアップClean up resources

チュートリアルの実行が完了したら、クラスターを終了できます。After you have finished running the tutorial, you can terminate the cluster. そのためには、Azure Databricks ワークスペースの左側のウィンドウで、 [クラスター] を選択します。To do so, from the Azure Databricks workspace, from the left pane, select Clusters. 終了するクラスターで、 [アクション] 列の下にある省略記号をポイントし、 [終了] アイコンを選択します。For the cluster you want to terminate, move the cursor over the ellipsis under Actions column, and select the Terminate icon.

Databricks クラスターを停止するStop a Databricks cluster

クラスター作成時に [Terminate after __ minutes of inactivity] (アクティビティが __ 分ない場合は終了する) チェック ボックスをオンにしていた場合、手動で終了しなくともクラスターは自動で停止します。If you do not manually terminate the cluster it will automatically stop, provided you selected the Terminate after __ minutes of inactivity checkbox while creating the cluster. このような場合、クラスターは、一定の時間だけ非アクティブな状態が続くと自動的に停止します。In such a case, the cluster will automatically stop if it has been inactive for the specified time.

次の手順Next steps

このチュートリアルでは、Azure Databricks を使用して、データを Azure Event Hubs にストリーム配信し、Event Hubs からストリーミング データをリアルタイムで読み取る方法について説明しました。In this tutorial, you learned how to use Azure Databricks to stream data into Azure Event Hubs and then read the streaming data from Event Hubs in real time. 以下の方法について学習しました。You learned how to:

  • Azure Databricks ワークスペースを作成するCreate an Azure Databricks workspace
  • Azure Databricks で Spark クラスターを作成するCreate a Spark cluster in Azure Databricks
  • ストリーミング データにアクセスする Twitter アプリを作成するCreate a Twitter app to access streaming data
  • Azure Databricks でノートブックを作成するCreate notebooks in Azure Databricks
  • Event Hubs と Twitter API のライブラリを追加およびアタッチするAdd and attach libraries for Event Hubs and Twitter API
  • Microsoft Cognitive Services アカウントを作成し、アクセス キーを取得するCreate a Microsoft Cognitive Services account and retrieve the access key
  • ツイートを Event Hubs に送信するSend tweets to Event Hubs
  • Event Hubs からツイートを読み取るRead tweets from Event Hubs
  • ツイートに対して感情分析を実行するRun sentiment analysis on tweets

次のチュートリアルに進み、Azure Databricks を使用して機械学習タスクを実行する方法について学習してください。Advance to the next tutorial to learn about performing machine learning tasks using Azure Databricks.