你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure 流分析的输出

Azure 流分析作业由输入、查询和输出构成。 可以将转换后的数据发送到多个输出类型。 本文列出了支持的流分析输出。 设计流分析查询时,使用 INTO 子句引用输出的名称。 可针对每个作业使用单个输出,也可通过向查询添加多个 INTO 子句,针对每个流式处理作业使用多个输出(如果需要)。

要创建、编辑和测试流分析作业输出,可使用 Azure 门户Azure PowerShell.NET APIREST APIVisual StudioVisual Studio Code

注意

为了获得最佳的本地开发体验,我们强烈建议使用适用于 Visual Studio Code 的流分析工具。 适用于 Visual Studio 2019 的流分析工具(版本 2.6.3000.0)存在已知的功能差距,今后将不会改进。

部分输出类型支持分区,并且输出批大小可变化以优化吞吐量。 下表显示了每种输出类型支持的功能:

输出类型 分区 安全
Azure Data Lake Storage Gen 1 Azure Active Directory 用户
、托管标识
Azure 数据资源管理器 托管标识
Azure Database for PostgreSQL 用户名和密码身份验证
Azure SQL 数据库 是,可选。 SQL 用户身份验证、
托管标识
Azure Synapse Analytics SQL 用户身份验证、
托管标识
Blob 存储和 Azure Data Lake Gen 2 访问密钥、
托管标识
Azure 事件中心 是,需要在输出配置中设置分区键列。 访问密钥、
托管标识
Power BI Azure Active Directory 用户
托管标识
Azure 表存储 帐户密钥
Azure 服务总线队列 访问密钥
Azure 服务总线主题 访问密钥
Azure Cosmos DB 访问密钥
Azure Functions 访问密钥

分区

流分析支持对除 Power BI 之外的所有输出进行分区。 有关分区键和输出编写器数目的详细信息,请参阅你感兴趣的特定输出类型的文章。 在上一节中链接了所有输出文章。

另外,若要对分区进行更高级的优化,可以在查询中使用 INTO <partition count>(请参阅 INTO)子句来控制输出写入器的数量,这可能有助于实现所需的作业拓扑。 如果输出适配器未分区,则一个输入分区中缺少数据将导致延迟最多可达延迟到达的时间量。 在这种情况下,输出将合并到单个写入器,这可能会导致管道中出现瓶颈。 若要了解有关延迟到达策略的详细信息,请参阅 Azure 流分析事件顺序注意事项

输出批大小

所有输出都支持批处理,但仅部分输出显式支持批处理大小。 Azure 流分析使用大小可变的批来处理事件和写入到输出。 通常流分析引擎不会一次写入一条消息,而是使用批来提高效率。 当传入和传出事件的速率较高时,流分析将使用更大的批。 输出速率低时,使用较小的批来保证低延迟。

Parquet 输出批处理窗口属性

使用 Azure 资源管理器模板部署或 REST API 时,两个批处理窗口属性为:

  1. timeWindow

    每批的最长等待时间。 该值应为时间跨度的字符串。 例如,“00:02:00”表示两分钟。 在此时间后,即使不满足最小行数要求,也会将该批写入输出。 默认值为 1 分钟,允许的最大值为 2 小时。 如果 blob 输出具有路径模式频率,则等待时间不能超出分区时间范围。

  2. sizeWindow

    每批的最小行数。 对于 Parquet,每个批处理都将创建一个新文件。 当前默认值为 2000 行,允许的最大值为 10000 行。

这些批处理窗口属性仅受 API 版本“2017-04-01-preview”支持。 下面是 REST API 调用的 JSON 有效负载的示例:

"type": "stream",
      "serialization": {
        "type": "Parquet",
        "properties": {}
      },
      "timeWindow": "00:02:00",
      "sizeWindow": "2000",
      "datasource": {
        "type": "Microsoft.Storage/Blob",
        "properties": {
          "storageAccounts" : [
          {
            "accountName": "{accountName}",
            "accountKey": "{accountKey}",
          }
          ],

后续步骤