本文章是由機器翻譯。

Azure 內幕消息

事件分析和視覺化,樞紐第 2 部分

Bruno Terkaly

使用微軟 Azure 事件集線器來處理大量的資料,時,必須將重點放在交付和消費方面。這是關於資料視覺化為物聯網 (物聯網) 方案的文章三部分系列的第二部分。在 4 月一期 (msdn.microsoft.com/magazine/dn948106),我討論了大規模使用 Azure 事件集線器,一種技術,是服務匯流排提供的一部分,並在規模支援資訊發行者 (覆盆子 Pi) 和資訊消費者 (C# 程式) 接收資料。在這篇文章中,我將重點在消費方面,從事件集線器中讀取並堅持永久存儲中的資料的 C# 代碼。

這篇文章被分為四個支柱。我開始與體系結構,說明消息消費和存儲。第二,我將討論在 Azure 門戶完成的任務。第三,我將描述我將使用和解釋為什麼他們的兩個不同的資料存儲。我會最後,解釋事件的中心,從讀取消息的用戶端 C# 代碼,以及將消息存儲在 SQL Azure 資料庫和 DocumentDB 的代碼。

事件集線器架構

後從 Azure 事件集線器讀取消息,你會想要堅持這些 SQL Azure 資料庫和 DocumentDB 的消息。SQL azure 資料庫作為一種服務 (DaaS) 可擴展到數以千計的資料庫是一個關係資料庫。DocumentDB 是一個完全託管的、 可伸縮的 NoSQL 文檔資料庫服務 (閱讀更多關於在 DocumentDB bit.ly/1oWZP7i)。這種體系結構顯示在圖 1

概括蔚藍事件集線器架構
圖 1 概括蔚藍事件集線器架構

對完成任務

若要使該解決方案會蘇醒過來,我需要完成幾個任務在門戶。有兩種類別要在 Azure 入口網站執行的任務。第一類事件集線器,SQL Azure 資料庫和 DocumentDB 資源調配。我置備四月份的文章中事件交通樞紐。

為了從事件集線器讀取消息,然後保存到 SQL Azure 資料庫和 DocumentDB 的那些消息,需要從 Azure 入口網站獲取連接資訊。所有的設置都將放置在 App.configVisual Studio解決方案 C# 應用程式中。我將創建普通的主控台應用程式使用事件集線器 SDK。

最後,為了説明資料庫操作,我會為 SQL Azure 資料庫寫幾個預存程序。你會發現代碼用於在資料庫中創建表,以及兩個存儲的過程,­入深圳作為Visual Studio解決方案的一部分。你可以使用SQL ServerManagement Studio打造出了資料庫表和存儲的過程。使用的連接資訊的 Azure 門戶將坐在微軟資料中心到您的本機複本的Visual StudioSQL Azure 資料庫附加。關於如何做到這一點,詳細資訊請閱讀關於它在 Azure 檔 bit.ly/1K1BIeM

今年 4 月,我說明了資源調配過程為服務中心公車事件。要穿過一些簡單的教程,查閱"得到開始與事件中心"在 bit.ly/1F2gp9H

一旦你已經設置事件集線器,您會收到一個端點。您將需要將這和將放置點的連接資訊複製到 App.config 編寫 C# 代碼時。您還需要提供在 Azure 門戶 Azure SQL 資料庫。閱讀更多關於這一進程在檔 bit.ly/1Enln5c

一旦你已經創建了資料庫,有三個更簡單的任務。第一個任務是創建氣溫表。其餘的兩項任務必須處理兩個預存程序,您將使用來清除舊郵件並插入新的消息。在Visual Studio專案中,還有用表定義為溫度,如 CleanTable 和 InsertTempData 的兩個存儲的程序呼叫 DatabaseCode.txt 的文字檔。

既然你已經照顧事件集線器和 SQL Azure 資料庫元件,將注意力轉到 DocumentDB。你可以瞭解更多有關在此使用 NoSQL 資料存儲資源調配 bit.ly/1IBnGQ5。你也可以觀看視頻從Ryan克勞科爾,高級專案經理團隊成員以來帶領的產品之一。

一旦你已經在門戶設置 DocumentDB,您需要定義一個集合名稱和資料庫名稱。DocumentDB 的思考方式是作為一系列的集合和集合作為一系列檔。資料庫世界中的類比推理是集合是表和一份檔是記錄。這是一個鬆散定義的類比,因為使用 NoSQL 資料存儲通常非模式化。我們的應用程式中使用的集合名稱被稱為 CityTempCollection,資料庫就稱為 TemperatureDB。

理解資料存儲選項

這次討論的起始點是建立超越事件集線器可以以本機方式提供的中學資料存儲的需要。例如,一個顯而易見的原因,想持久性是存儲在事件集線器的消息並不能持久。第二,你無法查詢郵件與任何種類的查詢語言。你只能讀他們以串列方式從事件樞紐分區,在那裡每個用戶端讀取器維護自己消息存儲庫中的游標。第三,事件中心為您提供 84 GB 的事件存儲預設 24 小時保留期。

雖然有很多選項,我選擇了 SQL Azure 資料庫和 DocumentDB 作為主資料存儲為與溫度相關的消息資料。到 SQL Azure 資料庫的優點是雙重的。第一個方面涉及的事實它是雲託管,提供其他寶貴的幾個屬性。提供幾乎是暫態的它是經濟的也正是三複製,使它相當強勁。

除了這些顯而易見的好處,也是大量的業務智慧工具圍繞 Azure SQL 資料庫如功率雙。這允許您構建複雜的視覺化互動式報表。您可以構建功能強大的儀表板,然後可以使用瀏覽器和行動裝置。

使用 DocumentDB 作為第二兩個數據存儲。這有一些明顯的優勢。它是一個完全託管的服務,所以你不必費心的基礎設施和單個虛擬機器 (Vm)。此外,它配備了企業的服務水準協定,這是大多數基於 Azure 服務為例。最後,DocumentDB 也是架構自由。

無架構文檔存儲通常被認為是可取的在物件導向和繼承方面的。這是因為繼承是指你有具有某些屬性的物件共同之處,但也有些特定于物件子類型的屬性。

與大多數關係資料庫中,您將需要一個表為所有可能的屬性,使他們很多人不適用的欄位為 null。在架構靈活的資料庫中,但是,您可以存儲不同的可選屬性集。這工作呈現 HTML,因為 JavaScript 可以檢查一個未知的可選屬性,並調用適當的函數,以輸出到可顯示的表時,很好。

其他優勢 (這可以也被看作是一個缺點對於一些) 架構靈活的資料庫之一就是它在發展過程中提供額外的靈活性。您可以添加新的功能沒有重組資料庫。這使得它容易保持向下相容由早期版本創建的資料。這種方法的缺點寫對可選欄位的查詢可以變得複雜和令人費解。

基於 JSON 資料存儲,如 DocumentDB 在普照的領域之一是使用 Node.js 應用程式暴露的資料移動或 Web 應用程式時。本機的 JSON 格式,是緊湊而富有表現力,使它容易和自然的工作。使用基於 JSON 資料存儲使用 HTTP 和休息也是簡單明瞭,是否你正在使用 Microsoft.NET 框架、 JavaScript、 Ruby、JAVA、 PHP 或 Python。我將重點介紹 Node.js 應用程式讀取 DocumentDB 資料和將在下一篇文章中的移動應用程式的資料暴露。

消費和堅持的消息

從事件集線器使用消息有三種常見的方法。一如往常,還有什麼是容易與什麼是柔性之間的權衡。最簡單的方法是與流分析。另一種方式是直接的接收器。直接的接收器是負責訪問自己協調分區在消費者群體中,藉以您的代碼直接解決分區 ID 時創建 EventHubConsumerGroup 物件的接收器。另一種方式是與更高層次的抽象,例如事件­ProcessorHost。我會使用此方法,因為它是很簡單,但卻提供了足夠的靈活性。

流分析是一個完全託管的服務,使得它容易攝取來自事件中心的消息。很容易消耗、 變換並將這些消息放入 SQL Azure 資料庫。閱讀更多關於這個方法在 bit.ly/1IRvPDc

它是一個簡單的創建一個資料庫、 表和查詢的一系列問題。例如,您使用簡單的 select 語句從事件集線器讀取消息"選擇 DeviceId,溫度從輸入,"並將這些消息放入SQL Server。你可以連鏈查詢遞迴並創建一系列變換使用管道方法對資料的查詢。這種能力來篩選郵件通過一系列的 SQL 查詢是一個強大的抽象。

更有趣的擴展可以通過流分析之一有跟視窗。通常是在一段時間內的事件的子集上執行一些基於集的計算 (聚合) 或其他操作的要求。因為時間是關鍵在複雜事件處理系統中,它是重要的是有簡單的方法來查詢邏輯的時間元件與系統內的工作。在 Azure 流分析中,透過窗戶,屆時代表分組定義了這些事件的子集。支援的時間視窗的各種類型的詳細說明,請查閱 bit.ly/1DIizfM

直接接收器讓您針對特定事件樞紐分區 Id。分區提供兩方面的價值。首先,他們讓你規模發佈和使用資訊。第二,他們也讓你將資料隔離到單獨的筒倉。

EventProcessorHost 方法,是一個智慧代理管理分區訪問的.NET 消費者以及每分區偏移量為消費者會吧有關更詳細的說明,請閱讀在 ServiceBus 博客 bit.ly/1aO5I19。使用 EventProcessorHost 可以輕鬆閱讀的郵件,將它們轉換,把它們寫到永久存儲區。

我會寫一個自訂 C# 程式,利用事件集線器 SDK 和寫入 SQL Azure 資料庫和 DocumentDB (查閱事件集線器程式設計指南在 bit.ly/1IBrpNz)。你可以為此源的所有代碼 bit.ly/1aSFF99。我已經剝離的密碼和金鑰,但是你可以從入口網站的連接資訊。若要獲取的所有代碼,安裝 Git 並運行頒發的命令:git 的克隆。一些你會發現Visual Studio主控台應用程式中的檔所示圖 2。主要驅動程式代碼可以在 Program.cs 中發現。

圖 2 的代碼,以消耗,堅持消息

+==============+
|  Section 1   |
+==============+
internal class Program
{
  private static string eventHubConnectionString =             
    ConfigurationManager.AppSettings["eventHubConnectionString"];
  static private string eventHubName = 
    ConfigurationManager.AppSettings["eventHubName"];
  static private string storageAccountName =
    ConfigurationManager.AppSettings["storageAccountName"];
  static private string storageAccountKey =
    ConfigurationManager.AppSettings["storageAccountKey"];
  static private string storageConnectionString =
    string.Format("DefaultEndpointsProtocol=https;" +
    "AccountName={0};AccountKey={1}",
    storageAccountName, storageAccountKey);
  static private string eventProcessorHostName = Guid.NewGuid().ToString();
  private static void Main(string[] args)
  {
    // Start the main message consumption engine
    +==============+
    |  Section 2   |
    +==============+
    var eventProcessorHost =
      new EventProcessorHost(eventProcessorHostName,
      eventHubName, EventHubConsumerGroup.DefaultGroupName,
      eventHubConnectionString, storageConnectionString);
    // Asynchronously consume message from Event Hub
    eventProcessorHost.
      RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();
    Console.WriteLine("Receiving. Press enter key to stop worker.");
    Console.ReadLine();
  }
}
+==============+
|  Section 3   |
+==============+
// SimpleEventProcessor.cs
class SimpleEventProcessor : IEventProcessor
{
  private DataManager dataManager = new DataManager(new SQLDatabaseManager());
  // ... Means omitted for brevity
  public SimpleEventProcessor()  ...
  async Task IEventProcessor.CloseAsync(PartitionContext context,
    CloseReason reason) ...
  Task IEventProcessor.OpenAsync(PartitionContext context) ...
  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, 
    IEnumerable<EventData> messages)
  {
    // Loop through messages to insert
    foreach (EventData eventData in messages)
    {
      string data = Encoding.UTF8.GetString(eventData.GetBytes());
      // Comma separated so divide up fields
      string[] msg = data.Split(',');
      if (msg.Length > 2)
      {
        +==============+
        |  Section 4   |
        +==============+
        // Insert into SQL
        dataManager.InsertSqlMessage(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
        // Insert into global DocumentDB object
        // (global because of thread timing issues)
        Globals.DocDb.InsertEntry(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
      }
      Console.WriteLine(string.Format("Message received.  Partition: '{0}', " +
        "Data: '{1}'", context.Lease.PartitionId, data));
    }
    // Call checkpoint every 5 minutes, so that worker can resume
    // processing from the 5 minutes back if it restarts
    if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
    {
      await context.CheckpointAsync();
      lock (this)
      {
        this.checkpointStopWatch.Reset();
      }
    }
  }
}
+==============+
|  Section 5   |
+==============+
// DocumentDBManager.cs
public class DocumentDbManager
{
  // Omitted variables for brevity
  public string collectionName = "CityTempCollection";
  public string databaseName = "TemperatureDB";
  public async Task<bool> InsertEntry(
    string city, int month, double temperature)
  {
    dynamic document = null;
    try
    {
      +==============+
      |  Section 6   |
      +==============+
      // Check if City exists
      document = client.CreateDocumentQuery(documentCollection.DocumentsLink)
        .Where(d => d.Id == city).AsEnumerable().FirstOrDefault();
    }
    catch (Exception ex)
    {
      throw;
    }
    bool docExists = (document != null);
    // Document DOESN'T exist, yet
    if (!docExists)
    {
      +==============+
      |  Section 7   |
      +==============+
      var cityMonthTemperature = new CityMonthTemperature
      {
        Id = city,
        City = city
      };
      cityMonthTemperature.Temperatures[month - 1] = temperature;
      try
      {
        +==============+
        |  Section 8   |
        +==============+
        // Create, and set document to the return, yes --
        // here is where you reset the document object
        document = await client.CreateDocumentAsync(
          documentCollection.DocumentsLink, cityMonthTemperature);
      }
      catch (DocumentClientException ex)
      // Omitted for brevity   
    }
  }
}

節 1 應該不會感到驚訝。它從 App.config 中檢索一些連接資訊。 你可以得到所有的連接資訊可以從入口網站獲得。第二節則在哪裡你是具現化 EventProcessorHost,核心物件會實際上的在事件集線器 SDK,您可以檢索郵件。你會發現綁第 3 節中在 SimpleEventProcessor 類中的此物件的基本事件。

非同步回檔,ProcessEventsAsync,由 SDK 調用。這將傳遞參數 IEnumerable < EventData > 消息,然後解析來檢索存儲在事件中心的消息。然後解析消息參數,並將解析後的消息插入到 SQL 資料庫和 DocumentDB 在第 4 節。 你會發現所有的低級代碼細節為 InsertSqlMessage 和 InsertEntry 在Visual Studio解決方案。

節 5 表示確實插入操作的類­DocumentDB 為實現。資料庫名稱是 TemperatureDB,集合名稱是 CityTempCollection。節 6 表示其中你搜索一個城市使用LINQ查詢的查詢。這裡的邏輯是,可能以前插入一個城市。你真的想要做的是更新溫度資料,如果城市存在。

節 7 代表還被這座城市的場景。您創建一個簡單的.NET 物件,一旦插入發生轉化為 JSON 資料。底層 SDK 照顧這種轉變。請插入相應的月偏移量溫度陣列溫度。最後,在第八節,你實際上更新文檔物件。

為了簡潔起見,此程式碼片段省略了代碼以執行只是溫度的更新在這座城市已經被插入的情況下,但你可以找到它在 GitHub 存儲庫,整個的Visual Studio解決方案中,在 bit.ly/1aSFF99

因為 DocumentDB 是提供預覽的一部分,您需要使用新的 Azure 預覽門戶,在 portal.azure.com。DocumentDB 好的功能之一就是允許您創建的文檔和資料,以及資料查詢和查看現有的資料,如中所示的文檔資源管理器圖 3

使用 DocumentDB 來創建、 查詢和查看資料
圖 3 使用 DocumentDB 來創建、 查詢和查看資料

在 4 月的文章中,我創建了運行 Linux 利用 AMQP 傳輸協議將消息插入事件集線器中的 c 語言程式。該代碼 Azure 主辦的 Linux VM 中跑了,所以我可以輕鬆地導入基於 Debian 的樹莓派執行。總之,第一篇文章是關於生產消息發佈。本期一直都對消息消耗和持久性存儲的資訊。並在最後一部分中,我會處理持久性將資料暴露給移動用戶端和提供視覺化的基礎資料的能力。


Bruno Terkaly 是主要軟體工程師在微軟,目的是發展的業界領先的應用程式和服務啟用跨設備。他是負責開車穿越美國的頂級雲計算和移動的機會和從技術支援的角度之外。他説明合作夥伴帶來市場及其應用提供建築指導和深厚的技術接合 ISV 的評價、 開發和部署過程。Terkaly 還密切與雲計算和移動工程群體、 提供回饋和影響路線圖 》。

感謝以下的微軟技術專家對本文的審閱:Ryan克勞科爾,DanRosanova
Ryan克勞科爾是 20 年資料庫老兵開始出許多年前為SQL Server4.2 寫他的第一個存儲的過程。多個游標、 聯接和存儲的過程後,他開始探索令人興奮的自由世界的 NoSQL 解決方案。Ryan現在正在與雷德蒙德説明形狀作為專案經理的 DocumentDB 產品團隊未來的這所有新 NoSQL 資料庫作為-服務。

DanRosanova 是 Azure 服務匯流排團隊負責消息的事件中心的高級專案經理 (佇列 & 主題),和中繼。Dan一直在郵件系統和分散式運算空間十六年超規模和進化計算的重點。