2018 年 2 月

第 33 卷,第 2 期

Azure - 采用 Azure 事件网格的云的事件驱动体系结构

作者 David Barkol

云架构师是一项充满激情的工作。创新的快速发展让一系列新的挑战和技术更为突出,它们正在重塑解决方案的设计方式。受益于这样的快速发展,有丰富的服务和选项可供开发人员和架构师选择。

随着开发人员为了利用 Azure Functions、逻辑应用等其他新服务而开始分解体系结构,常见的障碍也随之浮现出来。在许多情况下,我们发现自己又在拼凑“纽带”,以便于这些服务能够协同运作。最近推出的 Azure 事件网格旨在应对这种挑战,具体是通过在云中提供一流的事件路由服务,这不仅是一项可缩放的完全托管服务,而且还极具灵活性。

在本文中,我将探索极具灵活性的 Azure 事件网格,并介绍如何用它来应对企业应用中的常见挑战。我还建议大家查看 aka.ms/egblog 上的事件网格正式版本推出公告。

依赖关系的逆转

对解决方案或应用使用事件这种概念并不新鲜。事件驱动编程其实已成功利用这种概念很长一段时间了。利用这种概念回应组织、应用或系统中发生的事件的示例有很多,发布/订阅队列和 GUI 只是其中几个。

事件驱动体系结构的核心租户之一是要逆转现有服务的可能相互依赖关系。图 1 中的示例展示了一组相互依赖的进程,共同负责人力资源 (HR) 部门的通信和技术支持。

包含有关其他服务的逻辑的服务
图 1:包含有关其他服务的逻辑的服务

为了让这个设计有效,每个服务都必须包含某种有关要与之通信的其他服务的基本逻辑。这些依赖关系构成了挑战,不仅仅在于规模,还在于此逻辑分散在整个体系结构中。久而久之,随着这些类型的解决方案不断扩展,它们就会变得难以维护且越来越脆弱,因为更多的更改和依赖关系被引入进来。

作为备选方法,事件驱动设计背后的概念是将事件提升为体系结构中的“一等公民”,这样就不用考虑这些依赖关系了。这一项重要考虑让其他许多系统都能够利用集中式服务,避免了依赖关系和分散在整个应用中的逻辑带来的麻烦。图 2 突出显示了 HR 部门解决方案的依赖关系逆转,这具体是通过引入此主体概念实现的。

逆转与其他服务的依赖关系的集中式服务
图 2:逆转与其他服务的依赖关系的集中式服务

本文的剩余部分全部都涉及这项关键服务。接下来,我将探索 Azure 事件网格,并介绍如何用它来支持下一代解决方案。

Azure 事件网格简介

Azure 事件网格是新推出的完全托管服务,支持利用发布程序-订阅程序模型来路由事件。事件网格的核心是事件路由服务,负责管理路由和发送来自众多来源和订阅程序的事件。图 3 取自事件网格概述文档 (bit.ly/2qhaj9q),展示了目前可以与事件网格结合使用的几个发布程序和处理程序。

Azure 事件网格概述
图 3:Azure 事件网格概述

事件由发布程序(如 Blob 存储帐户、事件中心或甚至是 Azure 订阅)创建。随着事件的发生,它们会被发布到“主题”终结点,事件网格服务在此设法消化所有传入的消息。与事件网格集成的 Azure 服务列表正在不断丰富,即将有更多服务与它集成。

事件发布程序不仅仅局限于 Azure 服务。实际上,在十分常见的用例中,事件就源自可以从任何地方运行的自定义应用或系统。这包括在本地、数据中心或甚至其他云中托管的应用。此类发布程序称为“自定义主题”。如果可以向事件网格服务发布 HTTP 请求,就有望发送事件。

事件处理程序还包括多个 Azure 服务。其中包括一些新兴的 Azure 无服务器技术(如 Azure Functions 和逻辑应用)。除了 Azure 自动化之外,另一种事件处理程序也可以是任何 HTTP 回调(亦称为“Webhook”)。向事件网格注册处理程序的方法为,创建事件订阅。如果事件处理程序终结点可公开访问,且由传输层安全性进行加密,便可以从事件网格向它推送消息。

与其他许多 Azure 服务不同,无需预配或管理任何事件网格命名空间。本机 Azure 资源的主题是内置的,对用户完全透明;而自定义主题则是特别预配的,存在于资源组中。事件订阅只与主题相关联。此模型不仅简化了管理订阅形式的主题,还为事件网格实现了高度多租户性,以支持大规模扩展。

Azure 事件网格与任何语言或平台都无关。虽然是与 Azure 服务本机集成,但只要服务支持 HTTP 协议,就可以轻松利用它。因此,Azure 事件网格是一项非常巧妙的创新服务。

事件或命令

深入研究某代码并生成突显部分功能的解决方案前,先来区分一下事件和命令。这种区别有时可能不易察觉,但在设计依赖消息的系统时务必要理解这一点。

如果是为了完成特定操作或响应而发送消息,这最有可能是命令。例如,如果某员工在组织内晋升,发送的消息是为了指示他的新经理填写表单,此消息就带有特定的目的或意图。由于此消息的发送者有所期待(在某些情况下,甚至可能期待获得响应),因此可以将它归类为命令。

如果发布的消息没有承载任何有关如何处理它的认知或预期,则将它视为事件。比方说,组织内的同一名员工已被要求更改通讯地址。因为此操作可能涉及组织中的多个系统,但发布程序无需关注其中任何一个,所以这就是未定义任何意图的消息。在这种情况下,发布程序只是通知所有相关方发生了事件。这既是发生的事件,也显然是事件网格等服务的可行选项。

我可以花更多的时间来讨论这些区别,以及如何择适选当的 Azure 消息服务,但这并不在本文的介绍范围之内。建议阅读 Clemens Vasters 对此主题发表的有见地帖子 (bit.ly/2CH3sbQ)。

人力资源方案

深入了解事件网格的最佳方式是,编写利用其功能的代码。在本文中,我将着眼于一些源自虚构 HR 应用的事件。我将把事件发布到自定义主题,再使用几个不同的处理程序来订阅事件。

为简单起见,我将实现源自 HR 系统的两类事件,分别是组织有新员工入职和组织有员工离职。这两类事件在本质上非常相近,以便于展示如何以不同方式筛选和处理事件。图 4 展示了此解决方案。

示例解决方案
图 4:示例解决方案

纵观此解决方案,可以发现它是由多个关键组件组成,我将在本文中生成这些组件。接下来,将逐一介绍它们。

“员工事件”是 HR 应用可以向其发送消息的事件网格主题。这将包括组织中新入职和已离职员工的相关事件。每条消息均包含员工相关信息、所属部门和事件类型。

“欢迎新员工”是逻辑应用,用于订阅组织中新入职员工的消息。它最终会向新入职员工发送欢迎邮件。

“新员工设备订购”是 Azure 函数,用于订阅工程部门新入职员工的事件。然后,它会在队列中创建消息,以进行额外处理。

“员工记录”是在 ASP.NET Core 基础之上生成的自定义网站,用于公开 Web API 以接收员工从组织离职的消息。

创建自定义主题

首先,我需要在 Azure 中创建一些基本资源。可以在门户中启动 Azure Cloud Shell,也可以在本地使用命令行接口 (CLI)。若要详细了解如何使用 Cloud Shell,可以访问 bit.ly/2CsFtQB。如果之前没有使用过 Cloud Shell,强烈建议这样做。

首先,我要创建用于管理和封装 Azure 资源的资源组:

az group create --name <resource-group-name> --location <location>

在资源组创建后,事件网格主题就会得到预配。这就提供了从 HR 应用发布自定义事件的终结点。此主题的名称必须对相应区域是唯一的,因为它将成为可公开访问的 Azure 服务。它还必须位于提供事件网格服务的区域内。我通常使用的是 westus2 位置,也可以参阅每个 Azure 区域提供的服务列表(请访问 bit.ly/2DU15ln)。

az eventgrid topic create --name <topic-name> \
  --location <location> \
  --resource-group <resource group name>

执行主题创建命令后,返回的是资源详细信息。输出与下面的代码类似,但并不完全一样:

{
  "endpoint": "https://<topic name>.westus2-1.eventgrid.azure.net/api/events",
  "id": "/subscriptions/xxxx-xxx-xx-xxx-xx/resourceGroups/eventgridsolution-rg/providers/Microsoft.EventGrid/topics/<topic name>",
  "location": "westus2",
  "name": "<topic name>",
  "provisioningState": "Succeeded",
  "resourceGroup": "eventgridsolution-rg",
  "tags": null,
  "type": "Microsoft.EventGrid/topics"
}

请记下终结点值,因为稍后在发布事件时将会用到它。还需要生成的两个访问密钥之一,以供授权时使用。若要检索密钥,可以列出与此主题关联的密钥。作为一项安全措施,可以而且应当循环并重新生成这些密钥,就像使用其他 Azure 服务一样。

az eventgrid topic key list --name <topic-name> --resource-group <resource-group-name>

如果选择使用 Azure 门户,也可以在门户中创建并查看所有这些选项和设置。

发布事件

发送首个事件前,必须先了解此主题应采用的事件架构。无论发布程序是 Azure 资源还是自定义应用,每个事件都将遵循以下代码中概述的结构(有关事件架构的实用参考和一些示例,可以访问 bit.ly/2CG8oxI):

[
  {
    "topic": string,
    "subject": string,   
    "id": string,
    "eventType": string,
    "eventTime": string,
    "data":{
      object-unique-to-each-publisher
    }
  }
]

要指出的第一点是,事件是以数组形式发送。这是有意而为之,目的是为了能够在一个请求中发送多个事件。事件可以批量发送,这样既可以减少网络通信量,同时还支持网络连接可能会断开的情况。

我要发布的首个事件是组织有新员工入职。此事件的有效负载如图 5 所示。

图 5:新员工入职事件

[{
  "id": "30934",
  "eventType": "employeeAdded",
  "subject": "department/engineering",
  "eventTime": "2017-12-14T10:10:20+00:00",
  "data":{
    "employeeId": "14",
    "employeeName": "Nigel Tufnel",
    "employeeEmail": "nigel@contoso.com",
    "manager": "itmanager@contoso.com",
    "managerId": "4"
  }
}]

稍后,我将在本文中对组织有员工离职事件使用基本相同的结构,区别在于值不同。此事件的关键属性如下:

eventType 是用于唯一标识已发布事件类型的值。如果处理程序希望只订阅特定类型(而不是所有类型)的事件,可以使用此属性。

与 eventType 类似,subject 是用于提供其他事件上下文的值,同时还向订阅程序额外提供筛选器。我将在订阅创建后立即利用 eventType 和 subject。subject 和 eventType 提供事件上下文。

data 是发布程序定义的存储桶,只是可以包含一个或多个属性的对象。发布程序在此属性内分配事件本身的相关信息。例如,Azure Blob 存储事件包含已创建或已删除 blob 的详细信息(如 URL 和内容类型)。

为了发布事件,我使用 Postman(或类似工具)模拟从 HR 应用发布到上述终结点地址的消息。为了进行授权,我在头中添加了 aeg-sas-key 项,它的值是在主题创建时生成的访问密钥之一。请求主体将包含图 5**** 中的有效负载。

由于尚无任何订阅程序,因此还不需要进行任何观察。下一步是要观察实际运行效果,具体操作是为事件网格创建一些事件处理程序,以便将事件推送到其中。

使用 Azure 函数处理事件

现在到了很有意思的部分,即订阅事件。首个处理程序将是 Azure 函数。若要了解有关如何创建函数的基础知识,请访问 bit.ly/2A6pFgu。在此示例中,我要专门订阅最近有新员工入职事件。另外,同样重要的是,只能对工程部门的新员工调用此处理程序。

逐步介绍如何创建函数的示例大多使用的是 Azure 门户,此方法非常简单快捷。我要介绍的是,如何在本地使用 Visual Studio 执行此操作。这将为更多的生产就绪代码铺平道路。为了支持通过事件网格进行本地调试,我还将使用 ngrok 实用工具(请访问 ngrok.com)。

若要跟着我一起操作,必须使用 ngrok 和最新版 Visual Studio(我在本文中使用的是版本 15.5.2)。首先,新建一个项目,并从“云模板”中选择“Azure Functions”。在“新建项目”对话框中,选择“HTTP 触发器”选项,并保留默认值。

将此函数的代码更新为反映图 6 中的内容。可以任意重命名文件,只要体现出是函数名称即可。

图 6:新员工入职事件处理程序的实现

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
namespace NewEmployeeApp
{
  public static class NewEmployeeHandler
  {
    public class GridEvent<T> where T : class
    {
      public string Id { get; set; }
      public string EventType { get; set; }
      public string Subject { get; set; }
      public DateTime EventTime { get; set; }
      public T Data { get; set; }
      public string Topic { get; set; }
    }
      [FunctionName("newemployeehandler")]
      public static async Task<HttpResponseMessage> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]
        HttpRequestMessage req,
        TraceWriter log)
         {
          log.Info("New Employee Handler Triggered");
          // Retrieve the contents of the request and
          // deserialize it into a grid event object.
          var jsonContent = await req.Content.ReadAsStringAsync();
          var gridEvent =
            JsonConvert.DeserializeObject<List<GridEvent<Dictionary<string,
              string>>>>(jsonContent)
              ?.SingleOrDefault();
            // Check to see if the event is available and
            // return an error response if its missing.
            if (gridEvent == null)
            {
              return req.CreateErrorResponse(HttpStatusCode.BadRequest,
                $@"Missing event details");
            }
          // Check the header to identify the type of request
          // from Event Grid. A subscription validation request
          // must echo back the validation code.
          var gridEventType = req.Headers.GetValues("Aeg-Event-Type"). 
            FirstOrDefault();
          if (gridEventType == "SubscriptionValidation")
          {
            var code = gridEvent.Data["validationCode"];
            return req.CreateResponse(HttpStatusCode.OK,
              new { validationResponse = code });
          }
          else if (gridEventType == "Notification")
          {
            // Pseudo code: place message into a queue
            // for further processing.
            return req.CreateResponse(HttpStatusCode.OK);
          }
          else
          {
            return req.CreateErrorResponse(HttpStatusCode.BadRequest,
              $@"Unknown request type");
          }
        }
  }
}

此时,代码中有几个重要点需要回顾一下。开头是 GridEvent 类,旨在反映来自事件网格的有效负载和事件架构。理想情况下,我会将此类添加到通用库中,以供重用。在此示例中,它用于将请求内容反序列化为强类型对象。

事件网格向订阅程序发送两种类型的请求(SubscriptionValidation 和 Notification),可以检查头中的值来识别这两种请求。验证请求对确保显式添加所有订阅程序很重要。此时,我所要做的就是回显验证码,以确认我能够收到消息:

var code = gridEvent.Data["validationCode"];
return req.CreateResponse(HttpStatusCode.OK,
  new { validationResponse = code });

也可以通过事件类型 (Microsoft.EventGrid.SubscriptionValidationEvent) 识别验证请求。如果事件类型是 Notification,我将继续实现业务逻辑。将终结点公开给其他服务时,强烈建议使用这种防御性编程方法。

如果函数是在 Azure 中托管且使用 azurewebsites.net 域进行引用,则无需使用订阅验证逻辑。相反,事件网格将它们与其他一些服务(如逻辑应用和来自 Azure 自动化 Runbook 的回调)一起列入白名单。由于我打算进行本地测试,因此需要回显验证码,以便事件网格能够确认函数是有效终结点。

最终,事件网格运行时 SDK 会处理此设置的大部分工作,从反序列化事件和创建强类型事件网格对象到自动验证终结点。截至本文撰写之时,更新后的运行时 SDK 尚不可用。

本地函数测试

接下来,将通过 Visual Studio 启动此函数,让它在端口 7071 上本地运行。函数运行后,立即打开命令提示符,并使用 ngrok 创建安全隧道:

ngrok http -host-header=localhost 7071

ngrok 将返回可用作订阅程序终结点的 HTTPS 地址。此地址应类似于 https://d69f6bed.ngrok.io,不同之处在于每次执行 ngrok 命令时都会有不同的子域。将函数路由追加到 URL,如 https://<生成的值>.ngrok.io/api/newemployeehandler 所示。这就是事件订阅的终结点地址。

在函数运行且安全隧道就绪后,我现在可以通过 CLI 或 Azure Cloud Shell 创建事件订阅了:

az eventgrid event-subscription create --name <event-subscription-name> \
  --resource-group <resource group name> \
  --topic-name <topic name> \
  --subject-ends-with engineering \
  --included-event-type employeeAdded \
  --endpoint <function endpoint>

我也可以填写对话框(如图 7 所示),在门户中添加事件订阅。

在门户中创建事件订阅
图 7:在门户中创建事件订阅

我要与大家分享一下在创建事件订阅时的几个重要参数。

subject-begins-with(前缀筛选器)是可选参数,用于根据事件中主题字段的前缀进行筛选。此为文本字符串匹配。不支持通配符和正则表达式。

subject-ends-with(后缀筛选器)是可选参数,用于根据后缀筛选事件。不支持通配符和正则表达式。

included-event-type(事件类型)是要订阅的事件类型的可选列表。各个类型用空格隔开。

我现在可以回到本文前面的发布事件示例,以确保事件从 Postman 流向事件网格,并最终流向本地函数。随时可以更改请求中的值,以验证筛选器是否按预期正常运行。

处理事件:逻辑应用和 Webhook

下一个事件订阅是逻辑应用。与 Azure 函数示例一样,它只关注新员工入职事件类型。它不会利用前缀或后缀筛选器,因为我要向所有部门的员工发送消息。逻辑应用的完整版本如图 8 所示。

欢迎新员工的逻辑应用
图 8:欢迎新员工的逻辑应用

逻辑应用的开头是事件网格触发器。选择“Microsoft.EventGrid.topics”作为资源类型,这样我就可以从订阅的自定义主题中进行选择。

Parse JSON 操作有助于访问 Data 对象中的属性。我将使用此示例有效负载生成架构:

{
  "id": "40000",
  "eventType": "employeeAdded",
  "subject": "department/finance",
  "eventTime": "2017-12-20T10:10:20+00:00",
  "data":{
    "employeeId": "24",
    "employeeName": "David St. Hubbins",
    "employeeEmail": "david@contoso.com",
    "manager": "finance@contoso.com",
    "managerId": "10"
  }
}

接下来,必须使用条件操作完成事件类型筛选。这与使用函数创建事件订阅的方式略有不同,因为无法在事件网格触发器中选择此选项。

最后一步是向员工发送电子邮件。它使用从第二步检索到的属性,以填充电子邮件的收件人地址和主题字段。若要测试逻辑应用,请单击设计器中的“运行”,再像以前一样向终结点发送消息。

最后一个事件订阅是基本 HTTP 回调或 Webhook。我将用传入事件的 Web API 更新现有 ASP.NET Core 应用。Webhook 代码与我之前编写的 Azure 函数非常相似。不过,也有一些细微差别,包括为检查请求类型而检索头值的方式,如图 9 所示。

图 9:接收事件的 Web API 控制器

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
namespace EmployeeRecords.Controllers
{
  public class GridEvent<T> where T : class
  {
    public string Id { get; set; }
    public string Subject { get; set; }
    public string EventType { get; set; }
    public T Data { get; set; }
    public DateTime EventTime { get; set; }
  }
  [Produces("application/json")]
  [Route("api/EmployeeUpdates")]
  public class EmployeeUpdatesController : Controller
  {
    private bool EventTypeSubcriptionValidation
      => HttpContext.Request.Headers["aeg-event-type"].FirstOrDefault() ==
        "SubscriptionValidation";
    private bool EventTypeNotification
      => HttpContext.Request.Headers["aeg-event-type"].FirstOrDefault() ==
        "Notification";
    [HttpPost]
    public async Task<HttpResponseMessage> Post()
    {
      using (var reader = new StreamReader(Request.Body, Encoding.UTF8))
      {
        var jsonContent = await reader.ReadToEndAsync();
        var gridEvent =
          JsonConvert.DeserializeObject<List<GridEvent<Dictionary<string,
          string>>>>(jsonContent)
            .SingleOrDefault();
        if (gridEvent == null)
        {
          return new HttpResponseMessage { StatusCode = HttpStatusCode.BadRequest};
        }
        // Check the event type from Event Grid.
        if (EventTypeSubcriptionValidation)
        {
          // Retrieve the validation code and echo back.
          var validationCode = gridEvent.Data["validationCode"];
          var validationResponse =
            JsonConvert.SerializeObject(new { validationResponse =
            validationCode });
          return new HttpResponseMessage
          {
            StatusCode = HttpStatusCode.OK,
            Content = new StringContent(validationResponse)
          };
        }
        else if (EventTypeNotification)
        {
          // Pseudo code: Update records
          return new HttpResponseMessage { StatusCode = HttpStatusCode.OK };
        }
        else
        {
          return new HttpResponseMessage { StatusCode = HttpStatusCode.BadRequest };
        }
      }
    }
  }
}

创建事件订阅时,注册的事件类型应为 employeeRemoved。此更改满足以下要求:处理程序只想接收从组织离职的员工的消息。另请注意,前缀和后缀筛选器都不会使用,因为无论员工属于哪个部门,只要离职就会通知订阅程序:

az eventgrid event-subscription create --name <event-subscription-name> \
  --resource-group <resource group name> \
  --topic-name <topic name> \
  --included-event-type employeeRemoved \
  --endpoint <function endpoint>

最后,请注意,事件订阅终结点必须是安全的。如果在 Azure 上引用应用服务,必须在地址中指定 HTTPS,否则将无法添加订阅。

总结

Azure 事件网格是一项真正改变业界格局的服务。在本文中,我介绍了常见的应用集成方案。事件网格可用作启用技术,将应用连接到其他服务(如 Azure Functions、逻辑应用,甚至是可以在任何位置驻留的自定义 Webhook)。与无服务器应用相互补充,事件网格确实出类拔萃,因为两者都可以充分利用 Azure 支持的大规模缩放和集成功能。若要获取本文中的代码,可以访问 github.com/dbarkol/AzureEventGrid


David Barkol**** 是 Microsoft 全球黑带团队的 Azure 专家。可通过 Twitter (@dbarkol) 或电子邮件方式 (dabarkol@microsoft.com) 与他取得联系。**

衷心感谢以下 Microsoft 技术专家对本文的审阅:Bahram Banisadr 和 Dan Rosanovsanova
Dan Rosanova 是负责 Azure Messaging 产品套件(包括服务总线、事件中心、Azure 中继和事件网格)的首席项目经理主管。
 
Bahram Banisadr 是负责 Azure 事件网格的 PM,致力于生成 Azure 服务的连接体系结构。


在 MSDN 杂志论坛讨论这篇文章