教學課程:使用串流分析處理適用於事件中樞事件的 Apache Kafka

本文說明如何將資料串流至事件中樞,以及使用 Azure 串流分析處理該資料。 本文將逐步引導您完成下列步驟:

  1. 建立事件中樞命名空間。
  2. 建立 Kafka 用戶端,以將訊息傳送到事件中樞。
  3. 建立 Stream Analytics 作業,以將事件中樞的資料複製到 Azure blob 儲存體。

當您使用事件中樞所公開的 Kafka 端點時,您不需要變更您的通訊協定用戶端或執行自己的叢集。 Azure 事件中樞支援 Apache Kafka 1.0 版和更新版本。

必要條件

若要完成本快速入門,請確定您具備下列必要條件︰

  • Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶
  • Java Development Kit (JDK) 1.7+
  • 下載安裝 Maven 二進位封存檔。
  • Git
  • Azure 儲存體帳戶。 如果您沒有 Azure 儲存體帳戶,請先建立一個,再繼續執行。 本逐步解說中的作業作業會在 Azure blob 儲存體中儲存輸出資料。

建立事件中樞命名空間

當建立事件中樞命名空間時,會自動啟用命名空間的 Kafka 端點。 您可以將事件從使用 Kafka 通訊協定的應用程式串流至事件中樞。 遵循使用 Azure 入口網站建立事件中樞內的逐步指示,來建立事件中樞命名空間。 若您正在使用專用叢集,請參閱在專用叢集中建立命名空間與事件中樞

注意

基本層不支援適用於 Kafka 的事件中樞。

使用事件中樞內的 Kafka 傳送訊息

  1. 適用於 Kafka 的 Azure 事件中樞存放庫複製到電腦。

  2. 瀏覽到 azure-event-hubs-for-kafka/quickstart/java/producer 資料夾。

  3. src/main/resources/producer.config 中更新產生器的組態詳細資料。 指定事件中樞命名空間名稱連接字串

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. 瀏覽至 azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/,然後在您選擇的編輯器中開啟 TestDataReporter.java 檔案。

  5. 註解化下列程式碼行:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. 新增下列程式碼行,代替加上註解的程式碼:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    此程式碼會以 JSON 格式傳送事件資料。 當您設定串流分析作業的輸入時,您可指定 JSON 作為輸入資料的格式。

  7. 執行產生器並串流至事件中樞。 在 Windows 電腦上,使用 Node.js 命令提示字元時,先切換至 azure-event-hubs-for-kafka/quickstart/java/producer 資料夾,再執行這些命令。

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

確認事件中樞接收資料

  1. 選取 [實體] 之下的 [事件中樞]。 確認您看到名為 test 的事件中樞。

    Event hub - test

  2. 確認您看到訊息傳入事件中樞。

    Event hub - messages

使用串流分析作業處理事件資料

在這一節中,您會建立 Azure 串流分析作業。 Kafka 用戶端會將事件傳送到事件中樞。 您會建立串流分析作業,以將事件資料當作輸入並將它輸出至 Azure blob 儲存體。 如果您沒有 Azure 儲存體帳戶,請加以建立

串流分析作業中的查詢會通過資料,而不執行任何分析。 您可以建立可轉換輸入資料的查詢,以產生不同格式或有所見解的輸出資料。

建立串流分析作業

  1. Azure 入口網站中選取 [+ 建立資源]
  2. Azure Marketplace 功能表中選取 [分析],然後選取 [串流分析作業]
  3. 在 [新增串流分析] 頁面上,執行下列動作:
    1. 輸入作業的名稱

    2. 選取 [訂用帳戶]

    3. 針對 [資源群組] 選取 [建立新的],然後輸入名稱。 您也可以 [使用現有的] 資源群組。

    4. 選取作業的 [位置]

    5. 選取 [建立] 來建立作業。

      New Stream Analytics job

設定作業輸入

  1. 在通知訊息中,選取 [前往資源] 以查看 [串流分析作業] 頁面。

  2. 在功能表的 [作業拓撲] 區段中選取 [輸入]

  3. 選取 [新增資料流輸入] 並選取 [事件中樞]

    Add event hub as an input

  4. 在 [事件中樞輸入] 設定頁面上,執行下列動作:

    1. 指定輸入的別名

    2. 選取您的 Azure 訂用帳戶

    3. 選取您稍早建立的事件中樞命名空間

    4. 針對事件中樞選取 test

    5. 選取 [儲存]。

      Event hub input configuration

設定作業輸出

  1. 在功能表的 [作業拓撲] 區段中選取 [輸出]
  2. 選取工具列上的 [+ 新增],然後選取 [Blob 儲存體]
  3. 在 Blob 儲存體輸出設定頁面上,執行下列動作:
    1. 指定輸出的別名

    2. 選取您的 Azure 訂用帳戶

    3. 選取您的 Azure 儲存體帳戶

    4. 輸入容器名稱,該容器會儲存串流分析查詢的輸出資料。

    5. 選取 [儲存]。

      Blob Storage output configuration

定義查詢

在串流分析作業安裝程式讀取傳入資料流之後,下一個步驟是建立轉換以即時分析資料。 您可使用串流分析查詢語言來定義轉換查詢。 在本逐步解說中,您會定義可通過資料的查詢,而不需執行任何轉換。

  1. 選取查詢

  2. 在查詢視窗中,以您稍早建立的輸出別名取代 [YourOutputAlias]

  3. 以您稍早建立的輸入別名取代 [YourInputAlias]

  4. 在工具列上選取 [儲存]

    Screen capture shows the query window with values for input and output variables.

執行串流分析作業

  1. 選取左側功能表上的 [概觀]

  2. 選取 [開始]。

    Start menu

  3. 在 [啟動作業] 頁面中,選取 [啟動]

    Start job page

  4. 等到作業的狀態從 [啟動中] 變成 [執行中] 為止。

    Job status - running

測試案例

  1. 再次執行 Kafka 產生器,將事件傳送至事件中樞。

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. 確認您看到 Azure blob 儲存體中產生了輸出資料。 您會在容器中看到包含 100 個資料列的 JSON 檔案,如下列範例資料列所示:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Azure 串流分析作業收到來自事件中樞的輸入資料,並將它儲存在此案例中的 Azure blob 儲存體。

下一步

在本文中,您已了解如何串流至事件中樞,而不需要變更通訊協定用戶端或執行您自己的叢集。 若要深入了解適用於 Apache Kafka 的事件中樞,請參閱 適用於 Azure 事件中樞的 Apache Kafka 開發人員指南