Azure 內幕消息

使用 Microsoft Azure 服務進行遙測擷取及分析

**Bruno Terkaly
Ricardo Villalobos
Thomas Conte

Bruno Terkaly and Ricardo Villalobos[定期專欄作家Bruno Terkaly和Ricardo Villalobos目前為本月的專欄特約專欄作家。 他們將返回與下一次分期付款。 — —Ed.]

基於感應器的每個設備生成的遙測資料。解釋該資料是其價值主張的核心。在消費者的世界,一個驅動程式看他連接的汽車儀表板,看到他的駕駛風格對燃料消耗和交通的影響。在工業世界中,進行比較的一台機器在工廠車間的其他人的平均溫度可以説明操作員識別與失敗的風險並執行預防性維護。

這些情況下,需要從數十或數百數千個連接的設備的遙測資料。更重要的是,您需要分析此資料,以提供有意義的視覺化效果和見解。當處理如此大量的資料,大資料框架,例如 Hadoop 牢固的基礎資料處理,可以使用設備的安裝基礎擴大規模。

在本文中,您將學習如何創建一個簡單遙測攝入的體系結構使用微軟 Azure 服務匯流排。然後你消耗分析此資料在可伸縮的方式使用名為 HDInsight 的微軟 Azure Hadoop 服務。

解決方案架構

在以前的專欄,Bruno Terkaly和Ricardo Villalobos顯示如何使用服務匯流排來建立一種命令通道與連線物件進行通信。在本文中,我會用服務匯流排作為一個中介軟體層緩衝遙測設備所發送的消息。

這些設備將直接溝通服務匯流排將遙測消息發送到一個專門的主題 (見圖 1)。然後一個或幾個訂閱將重復資料佇列中工作者角色這些消息並將它們存儲為 Blob 存儲中的一般檔案。Hadoop 群集然後可以使用這些輸入的檔來執行分析和計算。

Basic Flow of Big Data Telemetry Solution圖 1 基本流的大資料遙測技術解決方案

此體系結構具有解耦各種不同的塊從彼此的利益。服務匯流排作為中介軟體,可以緩衝資料如果工人不能讀取它們還不夠快。您可以監視佇列長度和使用,作為基礎進行自動縮放的輔助層。

此外可用於執行簡單篩選傳入的資料和路由到不同的後端處理層訂閱。例如,您可以將消息發送到即時警報系統,進行緊急訂閱和使用一切訂閱來捕獲所有資料供以後分析。

因為工人們只是將資料移動到存儲 — — 是否 Hadoop 分散式檔案系統 (HDFS) 或 Blob 存儲 — — 它脫鉤 Hadoop 加工件。這可以運行獨立的傳入資料節奏。你可以選擇有一個永久運行的 Hadoop 群集。這允許您處理小批量所有的時間和減少計算延遲。您也可以選擇存錢,只是每天一次,在一個批次處理中執行的所有計算 HDInsight 群集啟動。您還可以二者的混合。

接收遙測資料使用服務匯流排

Azure 服務匯流排提供兩個協定選擇將郵件發送到一個主題:HTTP 或 AMQP。在連接的設備,經常與有限的頻寬,AMQP 具有一些優勢。它是一個高效率、 二進位、 可靠和可擕式的協定。它也有許多種語言、 運行時環境和作業系統庫。當您的設備直接連接到服務匯流排發送遙測消息,這使你靈活性。

若要測試這種方法,我用覆盆子 Pi 板飼料溫度和其他感應器的資料,使用 Apache Qpid 質子 AMQP 庫。質子是光禿的骨頭,可擕式庫您可以在各種環境中發送 AMQP 消息編譯。它將與 Azure 服務匯流排完全交互操作。瞭解更多有關在質子 AMQP 圖書館 bit.ly/1icc6Ag

對於此示例,我已經編譯直接上覆盆子 Pi 板的質子庫。我用的 Python 綁定來編寫一個簡單的腳本來捕獲感應器讀數從 USB 序列埠並將它們發送到 Azure 服務匯流排,你可以看到在圖 2

圖 2 Python 代碼中的覆盆子 Pi 讀來捕獲感應器的讀數

#!/usr/bin/python
import sys
import commands
import re
import uuid
import serial
from proton import *
# Device ID
id = uuid.getnode()
# Topic address
address = "amqps://owner:key@address.servicebus.windows.
net/telemetry"
# Open serial port
ser = serial.Serial('/dev/ttyACM0', 9600)
# Create Proton objects
messenger = Messenger()
while True:
  # Read values from Arduino in the form K1:V1_K2:V2_...
temp = ser.readline().rstrip('\r\n')
  print temp
  # Create AMQP message
  message = Message()
  # Initialize properties
  message.properties = dict()
  message.properties[symbol("did")] = symbol(id)
  # Map string to list, symbolize, create dict and merge
  pairs=map(lambda x:x.split(':'), temp.split('_'))
  symbols = map(lambda x:(symbol(x[0]),int(x[1])), pairs)
  message.properties.update(dict(symbols))
  message.address = address
  messenger.put(message)
  messenger.send()

Python 腳本直接位址 Azure 服務巴士主題命名為"遙測技術"。它使用一個連接字串,包括標準的服務匯流排的身份驗證權杖,並指定使用 AMQP 協定。 在現實世界環境中,您將需要使用一個更複雜的身份驗證機制,以確保您的連接參數不會遭到損壞。

承擔大量的這些覆盆子設備開始收集資料。 每一個會發送第 ID (DID 設備) 你以後要用再來計算的平均溫度。 在此示例中,將 DID 生成帶有 UUID 模組檢索系統的 MAC 位址。

連接到通過 USB 樹莓 Pi Arduino Esplora 板收集讀數。 Esplora 是與內置感應器-一板。 這使得它容易讀取溫度或其他環境參數並將它們發送到串列匯流排。 然後將 USB 電纜的另一端的 Python 腳本讀取輸出值。 Arduino 架構的列印感應器值到序列埠的示例所示圖 3

圖 3 Arduino 代碼收集覆盆子 Pi 讀數

void loop()
{
  int celsius = Esplora.readTemperature(DEGREES_C);
  int loudness = Esplora.readMicrophone();
  int light = Esplora.readLightSensor();
  Serial.print("T:");
  Serial.print(celsius);
  Serial.print("_");
  Serial.print("M:");
  Serial.print(loudness);
  Serial.print("_");
  Serial.print("L:");
  Serial.print(light);
  Serial.println();
  // Wait a second
  delay(1000);
}

請選擇您的大資料部署

你有幾個選擇為哪種類型的 Hadoop 的解決方案,您將使用的資料分析。 部署類型的選擇將決定如何以及在何處你需要聚合資料進行分析。

Azure 提供令人信服的解決方案與 HDInsight。 這暴露了 Hadoop 框架作為一種服務。 這種分佈的 Hadoop,基於 Hortonworks 的資料平臺 (HDP) 上的 Windows,都帶有一種連接器,可以直接從 Azure Blob 存儲訪問輸入的資料的工作。

這意味著你不需要有了 Hadoop 集群和運行,以接收的輸入的檔。 您可以將檔上載到 HDInsight 將使用後的 Blob 存儲容器。 當你分析一個批次檔時,可以在幾分鐘內開始 HDInsight 群集、 對於幾個小時,執行一系列的工作崗位,然後關閉它。 這轉化為低法案在計算小時數。

另一方面,如果您選擇部署 HDP,如標準 Hadoop 分佈或分配對天青虛擬機器 (Vm) 的 Cloudera,你會負責保持最新的群集。 也可以有它正確地配置為最佳操作。 此方法很有意義,如果您打算使用不包含在 HDInsight,如 HBase 作為存儲機制中的自訂 Hadoop 元件。

遙測資料保存到 Blob 存儲

從 Azure 服務匯流排中提取資料的過程很簡單。 為訂閱使用作為"讀者"或"攔截器"工作者角色。 然後,積聚成 HDInsight 可以使用的輸入檔的消息。

首先,設置您的 Azure 服務巴士主題上的一個或幾個訂閱。 這為您提供一些緯度時拆分或路由根據要求的流資料。 至少,在它是個好主意,以創建一個"包羅萬象"訂閱來存儲所有傳入的郵件。 您還可以使用篩選器 Azure 服務匯流排的訂閱。 這將創建附加特定的消息流。 創建的主題和訂閱使用 C# 和 Azure 服務匯流排 SDK 庫的示例所示圖 4

圖 4 Azure 服務匯流排訂閱

var namespaceManager = 
  NamespaceManager.CreateFromConnectionString(connectionString);
// Create the Topic
if (!
namespaceManager.TopicExists("telemetry"))
{
  namespaceManager.CreateTopic("telemetry");
}
// Create a "catch-all" Subscription
if (!
namespaceManager.SubscriptionExists("telemetry", "all"))
{
  namespaceManager.CreateSubscription("telemetry", "all");
}
// Create an "alerts" subscription
if (!
namespaceManager.SubscriptionExists("telemetry", "alert"))
{
  SqlFilter alertFilter = new SqlFilter("type = 99");
  namespaceManager.CreateSubscription("telemetry", 
  "alert", alertFilter);
}

一旦創建了 Azure 服務匯流排訂閱,您可以接收並保存的消息。 此示例使用 CSV 格式,這是很容易閱讀和理解都由人類和機器。 閱讀作為傳入的郵件,盡可能快地工人創建一定數量的任務 (在此示例中有 10)。 它還使用非同步方法來讀的郵件,而不是讀他們一次一個批次。 "所有人"的訂閱和"遙測"主題將接收消息 (見圖 5)。

圖 5 從訂閱接收消息並將其存儲在 Blob 存儲

SubscriptionClient client = 
  SubscriptionClient.CreateFromConnectionString(connectionString, 
  "telemetry", "all", ReceiveMode.ReceiveAndDelete);
List<Task> tasks = new List<Task>();
for (int i = 0; i < NBTASKS; i++)
{
  var id = i; // Closure alert
  Task t = Task.Run(async () =>
  {
    BlobStorageWriter writer = new BlobStorageWriter(id);
    while (true)
    {
      var messages = await client.ReceiveBatchAsync(BATCH_SIZE);
      foreach (var message in messages)
      {
        try
        {
          await writer.WriteOneLine(TelemetryMessage.Stringify(message));
        }
        catch (Exception ex)
        {
          Trace.TraceError(ex.Message);
        }
      }
    }
  });
  tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());

TelemetryMessage.Stringify 方法只是返回一行的文本中包含的遙測資料的 CSV 格式。 它還可以從的 Azure 服務匯流排標頭,例如消息的消息 ID 或入隊時間提取一些有用的欄位。

BlobStorageWriter.WriteOneLine 的工作是把線直接寫入 Blob。 因為 10 個任務可用的同時,將一次影響同一數量的 Blob。 WriteOneLine 還旋轉時,不時為 HDInsight 去接他們的檔。 我使用兩個參數來決定何時切換到一個新的檔:由於將 Blob 創建的 (例如,創建一個新的檔每小時或當它到達 1,000,000 線) 寫入該檔和時間的行數。 此方法使用非同步調用來避免阻塞消息寫入 Blob 流時 (請參見圖 6)。

圖 6 將消息的資料寫入到 Azure Blob

public async Task WriteOneLine(string line)
{
  var bytes = Encoding.UTF8.GetBytes(string.Format("{0}\n", line));
  await destinationStream.WriteAsync(bytes, 0, bytes.Length);
  TimeSpan ts = DateTime.Now - startBlobTime;
  if (++linesWritten > MAX_LINES || ts.TotalSeconds > MAX_SECONDS)
  {
    Trace.TraceInformation(
      "Wrote " + linesWritten + " lines to " + currentBlob.Name);
    GetNextBlob();
    linesWritten = 0;
  }
}

生成的檔包含資料提取遙測郵件,如圖所示:

145268284e8e498282e20b01170634df,test,24,980,21,2014-03-14 13:43:32
dbb52a3cf690467d8401518fc5e266fd,test,24,980,21,2014-03-14 13:43:32
e9b5f508ef8c4d1e8d246162c02e7732,test,24,980,21,2014-03-14 13:43:32

他們包括消息 ID、 裝置識別碼,三個讀數和訊息佇列的日期。 這種格式很容易下, 一步分析。

流量分析資料 HDInsight

最令人印象深刻的 HDInsight 好處是你可以啟動完整的 Hadoop 集群、 運行作業和取消設置直接從命令列群集。 你不用再登錄到虛擬機器上或執行任何自訂的配置。 你可以調配和管理與 Windows PowerShell 基於 Windows,或 Mac 或 Linux 上使用跨平臺的命令列工具 HDInsight。

您可以下載從綜合的 Azure PowerShell commandlets bit.ly/1tGirZk。 這些 commandlets 包括一切您需要管理你蔚藍的基礎設施,包括 HDInsight 集群。 一旦你已經導入您的發佈設置和所選的預設訂閱,您只需要一個命令列,以創建一個新的 HDInsight 群集:

New-AzureHDInsightCluster -Name "hditelemetry" -Location "North Europe" -DefaultStorageAccountName "telemetry.blob.core.windows.
net" -DefaultStorageAccountKey "storage-account-key" -DefaultStorageContainerName "data" -ClusterSizeInNodes 4

此命令指示 HDInsight 群集使用的現有存儲帳戶和容器作為其檔案系統的根目錄。 這是它將如何訪問所有遙測接收過程生成的資料。 您還可以選擇多少個工人節點群集應根據資料量的使用,你需要多少並行度。

一旦群集啟動並運行,您可以啟用遠端桌面訪問。 這樣做可以讓其他使用者登錄到要啟動互動式會話與標準 Hadoop 命令和工具的頭節點上。 然而,它是要使用遠端命令,利用 Windows PowerShell 啟動地圖減少、 蜂巢或豬作業快得多。

我用豬作業來計算平均溫度值。 豬最初是在雅虎開發的。 它允許使用 Hadoop 重點分析大型資料集上更多的人,花更少的時間編寫映射器和減速機的程式。 豬的腳本通常有三個階段:

  1. 載入您想要操作的資料。
  2. 運行一系列的資料轉換 (這翻譯成一組映射器和減速機的任務)。
  3. 轉儲的結果的螢幕,或將結果存儲在一個檔中。

下面的示例演示如何你通常實現這通過在探索性資料分析 (EDA) 階段,與豬譯員以對話模式運行腳本:

data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);
data1 = group data by did;
data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);
dump data2;

如果此腳本直接鍵入到豬解譯器,它將顯示一個表包含用於每個 DID 溫度資料點的數量和平均測量的值。 正如您所看到的豬的語法是相當明確的。 清楚地分隔不同的資料操作步驟:

  • 負載的第一個語句用來從 CSV 檔,描述的名稱和類型的輸入域中載入資料。
  • 按 DID,或每個設備,然後被分組資料。
  • 結果資料集生成與像計數和平均彙總函式

一旦完成該腳本,可以自動執行此任務與 Windows PowerShell。 使用新 AzureHDInsightPigJob­定義 commandlet 來初始化一個豬工作與創建的腳本。 然後您可以使用啟動 AzureHDInsightJob 和等待-AzureHD­InsightJob 開始作業,等待它的結論 (見圖 7)。 然後可以使用 Get AzureHDInsightJobOutput 檢索結果。

圖 7 插入、 分析,並在 HDInsight 啟動作業

$PigScript = "data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);" +
"data1 = group data by did;" +
"data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);" +
"dump data2;"
# Define a Pig job
$pigJobDefinition = New-AzureHDInsightPigJobDefinition -Query $PigScript
# Start the job
$pigJob = Start-AzureHDInsightJob -Cluster "hditelemetry" -JobDefinition $pigJobDefinition
# Wait for the job to finish
Wait-AzureHDInsightJob -Job $pigJob -WaitTimeoutInSeconds 3600
# Get the job results
Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId –StandardOutput

在命令列主控台中顯示的結果看起來像這樣:

C:\> Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId
(test,29091,24.0)
(49417795060,3942,30.08371385083714)

在這種情況下,有很多測試測量和覆盆子 Pi 的約 4,000 讀數。 讀數平均 30 度。

總結

Azure 服務匯流排是可靠和快速的方式,從的各種設備中收集資料。 為了存儲和分析這些資料,您需要一個可靠的存儲和分析引擎。 天青 HDInsight 摘要創建和維護 Hadoop 集群的這種程度的存儲的過程。 它是一個高度可擴展的解決方案您可以配置和自動化使用工具 (如 Windows PowerShell 或 Azure Mac/Linux 命令列介面。

Thomas Conte 是為微軟 Azure 平臺的開發人員技術福音傳教士 & 平臺傳福音 (DPE) 司。他的作用是促進獲得技術為開發人員、 架構師和軟體合作夥伴通過代碼示例、 出版物和公共演講。他致力於盡可能開放原始碼世界上盡可能多的非 Microsoft 技術運行微軟 Azure。跟著他在 twitter.com/tomconte

Bruno Terkaly 是微軟開發者福音傳教士。他淵博的知識是來自在該領域使用眾多平台、語言、架構、SDK、程式庫和 API 撰寫程式碼的多年經驗。他花時間編寫代碼,寫博客,給現場演示上構建基於雲計算的應用程式,專門使用微軟 Azure 平臺。您可以閱讀他的博客在 blogs.msdn.com/b/brunoterkaly

Ricardo Villalobos 是具有超過 15 年的經驗設計和創建應用程式的公司在多個行業的經驗豐富的軟體設計師。他從達拉斯大學工商管理持有不同的技術認證,以及碩士學位,為微軟,説明世界各地的公司要在微軟 Azure 實施解決方案作為一個雲建築師在 DPE 全球範圍內參與合作夥伴團隊工作。您可以閱讀他的博客在 blog.ricardovillalobos.com

感謝以下 Microsoft 技術專家對本文的審閱:拉斐爾 Godinho 和耶利米達爾加爾