此文章由机器翻译。

Azure Insider

事件分析和可视化,枢纽第 2 部分

Bruno Terkaly

使用微软 Azure 事件集线器来处理大量的数据,时,必须将重点放在交付和消费方面。这是关于数据可视化为物联网 (物联网) 方案的文章三部分系列的第二部分。在 4 月一期 (msdn.microsoft.com/magazine/dn948106),我讨论了大规模使用 Azure 事件集线器,一种技术,是服务总线提供的一部分,并在规模支持信息发布者 (覆盆子 Pi) 和信息消费者 (C# 程序) 接收数据。在这篇文章中,我将重点在消费方面,从事件集线器中读取并坚持永久存储中的数据的 C# 代码。

这篇文章被分为四个支柱。我开始与体系结构,说明消息消费和存储。第二,我将讨论在 Azure 门户完成的任务。第三,我将描述我将使用和解释为什么他们的两个不同的数据存储。我会最后,解释事件的中心,从读取消息的客户端 C# 代码,以及将消息存储在 SQL Azure 数据库和 DocumentDB 的代码。

事件集线器架构

后从 Azure 事件集线器读取消息,你会想要坚持这些 SQL Azure 数据库和 DocumentDB 的消息。SQL azure 数据库作为一种服务 (DaaS) 可扩展到数以千计的数据库是一个关系数据库。DocumentDB 是一个完全托管的、 可伸缩的 NoSQL 文档数据库服务 (阅读更多关于在 DocumentDB bit.ly/1oWZP7i)。这种体系结构显示在图 1

概括蔚蓝事件集线器架构
图 1 概括蔚蓝事件集线器架构

对完成任务

若要使该解决方案会苏醒过来,我需要完成几个任务在门户。有两种类别要在 Azure 门户网站执行的任务。第一类事件集线器,SQL Azure 数据库和 DocumentDB 资源调配。我置备四月份的文章中事件交通枢纽。

为了从事件集线器读取消息,然后保存到 SQL Azure 数据库和 DocumentDB 的那些消息,需要从 Azure 门户网站获取连接信息。所有的设置都将放置在 App.config Visual Studio 解决方案 C# 应用程序中。我将创建普通的控制台应用程序使用事件集线器 SDK。

最后,为了帮助数据库操作,我会为 SQL Azure 数据库写几个存储过程。你会发现代码用于在数据库中创建表,以及两个存储的过程,­入深圳作为 Visual Studio 解决方案的一部分。您可以使用 SQL 服务器管理工作室打造出了数据库表和存储的过程。使用从 Azure 门户连接信息附加 Azure SQL 数据库坐在微软数据中心到您的 Visual Studio 的本地副本。关于如何做到这一点,详细信息请阅读关于它在 Azure 文件 bit.ly/1K1BIeM

今年 4 月,我说明了资源调配过程为服务中心公交车事件。要穿过一些简单的教程,查阅"得到开始与事件中心"在 bit.ly/1F2gp9H

一旦你已经设置事件集线器,您会收到一个端点。您将需要将这和将放置点的连接信息复制到 App.config 编写 C# 代码时。您还需要提供在 Azure 门户 Azure SQL 数据库。阅读更多关于这一进程在文件 bit.ly/1Enln5c

一旦你已经创建了数据库,有三个更简单的任务。第一个任务是创建气温表。其余的两项任务必须处理两个存储过程,您将使用来清除旧邮件并插入新的消息。在 Visual Studio 项目中,有是用表定义为温度,如 CleanTable 和 InsertTempData 的两个存储的过程调用 DatabaseCode.txt 的文本文件。

既然你已经照顾事件集线器和 SQL Azure 数据库组件,将注意力转到 DocumentDB。你可以了解更多有关在此使用 NoSQL 数据存储资源调配 bit.ly/1IBnGQ5。你也可以观看视频从 Ryan 克劳科尔,高级项目经理团队成员以来带领的产品之一。

一旦你已经在门户设置 DocumentDB,您需要定义一个集合名称和数据库名称。DocumentDB 的思考方式是作为一系列的集合和集合作为一系列文件。数据库世界中的类比推理是集合是表和一份文件是记录。这是一个松散定义的类比,因为使用 NoSQL 数据存储通常非模式化。我们的应用程序中使用的集合名称被称为 CityTempCollection,数据库就称为 TemperatureDB。

理解数据存储选项

这次讨论的起始点是建立超越事件集线器可以以本机方式提供的中学数据存储的需要。例如,一个显而易见的原因,想持久性是存储在事件集线器的消息并不能持久。第二,你无法查询邮件与任何种类的查询语言。你只能读他们以串行方式从事件枢纽分区,在那里每个客户端读取器维护自己消息存储库中的光标。第三,事件中心为您提供 84 GB 的事件存储默认 24 小时保留期。

虽然有很多选项,我选择了 SQL Azure 数据库和 DocumentDB 作为主数据存储为与温度相关的消息数据。到 SQL Azure 数据库的优点是双重的。第一个方面涉及的事实它是云托管,提供其他宝贵的几个属性。提供几乎是瞬时的它是经济的也正是三复制,使它相当强劲。

除了这些显而易见的好处,也是大量的业务智能工具围绕 Azure SQL 数据库如功率双。这允许您构建复杂的可视化交互式报表。您可以构建功能强大的仪表板,然后可以使用浏览器和移动设备。

使用 DocumentDB 作为第二两个数据存储。这有一些明显的优势。它是一个完全托管的服务,所以你不必费心的基础设施和单个虚拟机 (Vm)。此外,它配备了企业的服务水平协议,这是大多数基于 Azure 服务为例。最后,DocumentDB 也是架构自由。

无架构文档存储通常被认为是可取的在面向对象和继承方面的。这是因为继承是指你有具有某些属性的对象共同之处,但也有些特定于对象子类型的属性。

与大多数关系数据库中,您将需要一个表为所有可能的属性,使他们很多人不适用的字段为 null。在架构灵活的数据库中,但是,您可以存储不同的可选属性集。这工作呈现 HTML,因为 JavaScript 可以检查一个未知的可选属性,并调用适当的函数,以输出到可显示的表时,很好。

其他优势 (这可以也被看作是一个缺点对于一些) 架构灵活的数据库之一就是它在发展过程中提供额外的灵活性。您可以添加新的功能没有重组数据库。这使得它容易保持向下兼容由早期版本创建的数据。这种方法的缺点写对可选字段的查询可以变得复杂和令人费解。

基于 JSON 数据存储,如 DocumentDB 在普照的领域之一是使用 Node.js 应用程序暴露的数据移动或 Web 应用程序时。本机的 JSON 格式,是紧凑而富有表现力,使它容易和自然的工作。使用基于 JSON 数据存储使用 HTTP 和休息也是简单明了,是否你正在使用 Microsoft.NET 框架、 JavaScript、 Ruby、 Java、 PHP 或 Python。我将重点介绍 Node.js 应用程序读取 DocumentDB 数据和将在下一篇文章中的移动应用程序的数据暴露。

消费和坚持的消息

从事件集线器使用消息有三种常见的方法。一如往常,还有什么是容易与什么是柔性之间的权衡。最简单的方法是与流分析。另一种方式是直接的接收器。直接的接收器是负责访问自己协调分区在消费者群体中,藉以您的代码直接解决分区 ID 时创建 EventHubConsumerGroup 对象的接收器。另一种方式是与更高层次的抽象,例如事件­ProcessorHost。我会使用此方法,因为它是很简单,但却提供了足够的灵活性。

流分析是一个完全托管的服务,使得它容易摄取来自事件中心的消息。很容易消耗、 变换并将这些消息放入 SQL Azure 数据库。阅读更多关于这个方法在 bit.ly/1IRvPDc

它是一个简单的创建一个数据库、 表和查询的一系列问题。例如,您使用简单的 select 语句从事件集线器读取消息"选择 DeviceId,温度从输入,"并将这些消息放入 SQL Server。你可以连链查询递归并创建一系列变换使用管道方法对数据的查询。这种能力来筛选邮件通过一系列的 SQL 查询是一个强大的抽象。

更有趣的扩展可以通过流分析之一有跟窗口。通常是在一段时间内的事件的子集上执行一些基于集的计算 (聚合) 或其他操作的要求。因为时间是关键在复杂事件处理系统中,它是重要的是有简单的方法来查询逻辑的时间组件与系统内的工作。在 Azure 流分析中,透过窗户,届时代表分组定义了这些事件的子集。支持的时间窗口的各种类型的详细说明,请查阅 bit.ly/1DIizfM

直接接收器让您针对特定事件枢纽分区 Id。分区提供两方面的价值。首先,他们让你规模发布和使用信息。第二,他们也让你将数据隔离到单独的筒仓。

EventProcessorHost 方法,是一个智能代理管理分区访问的.NET 消费者以及每分区偏移量为消费者会吧有关更详细的说明,请阅读在 ServiceBus 博客 bit.ly/1aO5I19。使用 EventProcessorHost 可以轻松阅读的邮件,将它们转换,把它们写到永久存储区。

我会写一个自定义 C# 程序,利用事件集线器 SDK 和写入 SQL Azure 数据库和 DocumentDB (查阅事件集线器编程指南在 bit.ly/1IBrpNz)。你可以为此源的所有代码 bit.ly/1aSFF99。我已经剥离的密码和密钥,但是你可以从门户网站的连接信息。若要获取的所有代码,安装 Git 并运行颁发的命令:git 的克隆。一些你会发现在 Visual Studio 的控制台应用程序的文件所示图 2。主要驱动程序代码可以在 Program.cs 中发现。

图 2 的代码,以消耗,坚持消息

+==============+
|  Section 1   |
+==============+
internal class Program
{
  private static string eventHubConnectionString =             
    ConfigurationManager.AppSettings["eventHubConnectionString"];
  static private string eventHubName = 
    ConfigurationManager.AppSettings["eventHubName"];
  static private string storageAccountName =
    ConfigurationManager.AppSettings["storageAccountName"];
  static private string storageAccountKey =
    ConfigurationManager.AppSettings["storageAccountKey"];
  static private string storageConnectionString =
    string.Format("DefaultEndpointsProtocol=https;" +
    "AccountName={0};AccountKey={1}",
    storageAccountName, storageAccountKey);
  static private string eventProcessorHostName = Guid.NewGuid().ToString();
  private static void Main(string[] args)
  {
    // Start the main message consumption engine
    +==============+
    |  Section 2   |
    +==============+
    var eventProcessorHost =
      new EventProcessorHost(eventProcessorHostName,
      eventHubName, EventHubConsumerGroup.DefaultGroupName,
      eventHubConnectionString, storageConnectionString);
    // Asynchronously consume message from Event Hub
    eventProcessorHost.
      RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();
    Console.WriteLine("Receiving. Press enter key to stop worker.");
    Console.ReadLine();
  }
}
+==============+
|  Section 3   |
+==============+
// SimpleEventProcessor.cs
class SimpleEventProcessor : IEventProcessor
{
  private DataManager dataManager = new DataManager(new SQLDatabaseManager());
  // ... Means omitted for brevity
  public SimpleEventProcessor()  ...
  async Task IEventProcessor.CloseAsync(PartitionContext context,
    CloseReason reason) ...
  Task IEventProcessor.OpenAsync(PartitionContext context) ...
  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, 
    IEnumerable<EventData> messages)
  {
    // Loop through messages to insert
    foreach (EventData eventData in messages)
    {
      string data = Encoding.UTF8.GetString(eventData.GetBytes());
      // Comma separated so divide up fields
      string[] msg = data.Split(',');
      if (msg.Length > 2)
      {
        +==============+
        |  Section 4   |
        +==============+
        // Insert into SQL
        dataManager.InsertSqlMessage(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
        // Insert into global DocumentDB object
        // (global because of thread timing issues)
        Globals.DocDb.InsertEntry(msg[0], Convert.ToInt32(msg[1]),
          Convert.ToDouble(msg[2]));
      }
      Console.WriteLine(string.Format("Message received.  Partition: '{0}', " +
        "Data: '{1}'", context.Lease.PartitionId, data));
    }
    // Call checkpoint every 5 minutes, so that worker can resume
    // processing from the 5 minutes back if it restarts
    if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
    {
      await context.CheckpointAsync();
      lock (this)
      {
        this.checkpointStopWatch.Reset();
      }
    }
  }
}
+==============+
|  Section 5   |
+==============+
// DocumentDBManager.cs
public class DocumentDbManager
{
  // Omitted variables for brevity
  public string collectionName = "CityTempCollection";
  public string databaseName = "TemperatureDB";
  public async Task<bool> InsertEntry(
    string city, int month, double temperature)
  {
    dynamic document = null;
    try
    {
      +==============+
      |  Section 6   |
      +==============+
      // Check if City exists
      document = client.CreateDocumentQuery(documentCollection.DocumentsLink)
        .Where(d => d.Id == city).AsEnumerable().FirstOrDefault();
    }
    catch (Exception ex)
    {
      throw;
    }
    bool docExists = (document != null);
    // Document DOESN'T exist, yet
    if (!docExists)
    {
      +==============+
      |  Section 7   |
      +==============+
      var cityMonthTemperature = new CityMonthTemperature
      {
        Id = city,
        City = city
      };
      cityMonthTemperature.Temperatures[month - 1] = temperature;
      try
      {
        +==============+
        |  Section 8   |
        +==============+
        // Create, and set document to the return, yes --
        // here is where you reset the document object
        document = await client.CreateDocumentAsync(
          documentCollection.DocumentsLink, cityMonthTemperature);
      }
      catch (DocumentClientException ex)
      // Omitted for brevity   
    }
  }
}

节 1 应该不会感到惊讶。它从 App.config 中检索一些连接信息。 你可以得到所有的连接信息可以从门户网站获得。第二节则在哪里你是实例化 EventProcessorHost,核心对象会实际上的在事件集线器 SDK,您可以检索邮件。你会发现绑第 3 节中在 SimpleEventProcessor 类中的此对象的基本事件。

异步回调,ProcessEventsAsync,由 SDK 调用。这将传递参数 IEnumerable < EventData > 消息,然后解析来检索存储在事件中心的消息。然后解析消息参数,并将解析后的消息插入到 SQL 数据库和 DocumentDB 在第 4 节。 你会发现所有的低级代码细节为 InsertSqlMessage 和 InsertEntry 在 Visual Studio 解决方案中。

节 5 表示确实插入操作的类­DocumentDB 为实现。数据库名称是 TemperatureDB,集合名称是 CityTempCollection。节 6 表示其中你搜索一个城市使用 LINQ 查询的查询。这里的逻辑是,可能以前插入一个城市。你真的想要做的是更新温度数据,如果城市存在。

节 7 代表还被这座城市的场景。您创建一个简单的.NET 对象,一旦插入发生转化为 JSON 数据。底层 SDK 照顾这种转变。请插入相应的月偏移量温度阵列温度。最后,在第八节,你实际上更新文档对象。

为了简洁起见,此代码段省略了代码以执行只是温度的更新在这座城市已经被插入的情况下,但你可以找到它在 GitHub 存储库,整个的 Visual Studio 解决方案中,在 bit.ly/1aSFF99

因为 DocumentDB 是提供预览的一部分,您需要使用新的 Azure 预览门户,在 portal.azure.com。DocumentDB 好的功能之一就是允许您创建的文档和数据,以及数据查询和查看现有的数据,如中所示的文档资源管理器图 3

使用 DocumentDB 来创建、 查询和查看数据
图 3 使用 DocumentDB 来创建、 查询和查看数据

在 4 月的文章中,我创建了运行 Linux 利用 AMQP 传输协议将消息插入事件集线器中的 c 语言程序。该代码 Azure 主办的 Linux VM 中跑了,所以我可以轻松地导入基于 Debian 的树莓派执行。总之,第一篇文章是关于生产消息发布。本期一直都对消息消耗和持久性存储的信息。并在最后一部分中,我会处理持久性将数据暴露给移动客户端和提供可视化的基础数据的能力。


Bruno Terkaly 是主要软件工程师在微软,目的是发展的业界领先的应用程序和服务启用跨设备。他是负责开车穿越美国的顶级云计算和移动的机会和从技术支持的角度之外。他帮助合作伙伴带来市场及其应用提供建筑指导和深厚的技术接合 ISV 的评价、 开发和部署过程。Terkaly 还密切与云计算和移动工程群体、 提供反馈和影响路线图 》。

衷心感谢以下 Microsoft 技术专家对本文的审阅:丹 Rosanova Ryan 克劳科尔
Ryan 克劳科尔是 20 年数据库老兵开始出许多年前为 SQL 服务器 4.2 写他的第一个存储的过程。多个游标、 联接和存储的过程后,他开始探索令人兴奋的自由世界的 NoSQL 解决方案。Ryan 现在正在与雷德蒙德帮助形状作为项目经理的 DocumentDB 产品团队未来的这所有新 NoSQL 数据库作为-服务。

丹 Rosanova 是 Azure 服务总线团队负责消息的事件中心的高级项目经理 (队列 & 主题),和中继。丹一直在邮件系统和分布式计算空间十六年超规模和进化计算的重点。