Connect(); 2016

第 31 卷,第 12 期

Connect(); 智能应用 - U-SQL 大数据应用程序的扩展性

作者:Michael Rys; 2016

传统上,在大数据处理期间,对大数据的数据量、速度和多样性的处理主要集中在提供能处理海量数据的可缩放平台、添加近乎实时的处理功能,以及提供处理各种格式(从 CSV 到 JSON,再到自定义二进制格式)输入数据的相应功能上。经常事后才想到的一种多样性是与自定义数据处理相关的多样性,不仅体现在格式方面,还体现在能否轻松地使用自定义算法扩展分析,同时保留查询语言的声明性。

一些新式大数据处理和查询语言正着手解决此问题。特别是 U-SQL,一种从头到脚专为以下用途而设计的语言:将 SQL 语言的声明性与灵活使用现有代码库和开发新的自定义算法相结合。

在上一篇文章 (bit.ly/1OtXM2K) 中,我介绍了 U-SQL,并展示了如何通过在 U-SQL 中结合使用 Microsoft .NET Framework 类型系统与基于 C# 的表达式语言,使用自定义代码表达式顺畅扩展分析。我介绍了如何使用 C# 程序集来定义用户定义函数 (UDF),并将其用于 U-SQL 查询脚本。

使用 U-SQL,你不仅可以添加自己的自定义 C# 函数,还可以通过一个框架添加自己的用户定义运算符 (UDO),如你自己的提取程序、输出程序和行集运算符(如处理程序、应用方、化简程序和自定义合并程序)。此框架由以下两部分组成:

  1. .NET 接口,为你提供生成这些运算符的协定,以便你可以专注于代码编写,将横向扩展留给 U-SQL 来执行。请注意,不必在 .NET 中实现实际的业务逻辑代码,我将在后面予以介绍。
  2. U-SQL 表达式(如 EXTRACT 和 REDUCE),调用自定义运算符,并对数据大规模执行这些运算符。

在本文中,我将接着上一篇文章介绍如何利用 U-SQL 扩展性机制处理各种不同的数据(从 JSON 到图像数据)。此外,我还将介绍如何添加你自己的运算符。

在 U-SQL 中管理自定义代码

在开始引入一些示例之前,我们最好先了解一下 U-SQL 是如何使用自定义代码的。

如上所述,U-SQL 沿用 C# 及其标量表达式语言,适用于 U-SQL 谓词和 select 子句表达式等场景。为了让你的自定义代码对 U-SQL 编译器可见,必须将代码打包到必须由 U-SQL 脚本引用的 .NET 程序集中。必须先使用 CREATE ASSEMBLY 语句事先在 U-SQL 元数据服务中注册此程序集,然后才能引用它。

注册和引用 U-SQL 程序集:我建议使用用于 Visual Studio 的 Azure Data Lake 工具 (aka.ms/adltoolsvs),以便轻松生成和注册与 U-SQL 兼容的程序集。如果你在“类库(对于 U-SQL 应用程序)”项目(见图 1)中编写自定义代码,可以编写代码并生成项目,然后右键单击一下即可直接注册已生成的程序集 DLL 文件(见图 2)。

类库(对于 U-SQL 应用程序)项目
图 1:类库(对于 U-SQL 应用程序)项目

注册 U-SQL 程序集
图 2:注册 U-SQL 程序集

然后,只需在 U-SQL 脚本中使用 REFERENCE ASSEMBLY 语句,以便可在 U-SQL 脚本中使用公共类和方法即可,如图 3 所示。

图 3:从自定义程序集引用用户定义函数

REFERENCE ASSEMBLY master.TweetAnalysis;
USING tweet_fns = TweetAnalysis.Udfs;
@t =
  EXTRACT date string,
          time string,
          author string,
          tweet string
  FROM "/Samples/Data/Tweets/Tweets.csv"
  USING Extractors.Csv();
// Get the mentions from the tweet string
@m =
  SELECT origin
       , tweet_fns.get_mentions(tweet) AS mentions
       , author AS mentioned_by
FROM @t;
...

将现有代码与 U-SQL 程序集结合使用:你经常想要使用现有代码库或非 .NET 代码。若要使用非 .NET 代码(如本机库或完全不同的语言运行时,如 Python 或 JavaScript),必须为非 .NET 代码包装 C# 互操作性层(从 U-SQL 调用此层,然后调用非 .NET 代码),同时封送处理组件之间的数据,并实现 UDO 接口协定。在这种情况下,需要将非 .NET 代码项目(如本机 .dll 或其他运行时的文件)添加为其他文件。可在用于注册程序集的“其他文件”选项中执行此操作。当 .NET 程序集在脚本中获得引用时,这些文件会自动部署到每个节点,并可用于 .NET 程序集在相应节点的本地工作目录。

若要使用现有 .NET 库,你需要在自己的程序集上将现有代码库注册为托管依赖项。或者,如果你重复使用可直接用于 U-SQL 的库,请直接在 U-SQL 数据库中进行注册。无论属于上述哪种情况,脚本均必须引用所需的全部 .NET 程序集。

在本文的剩余部分中,我将在介绍一些最好使用扩展性模型的自定义代码应用场景中,介绍几个与这些注册选项相关的示例。这些应用场景包括:使用自定义化简程序合并重叠的范围、处理 JSON 文档、图像数据和空间数据。我将挨个介绍。

使用自定义化简程序合并重叠的范围

假设你有一个日志文件,用于跟踪用户何时与你的服务交互。此外,还假设用户能够以多种方式(例如,在多台设备或多个浏览器窗口中执行必应搜索)与你的服务交互。在准备日志文件以供日后分析的 U-SQL 作业中,你想要合并重叠的范围。

例如,如果输入日志文件如图 4 所示,则你需要针对每个用户合并重叠的范围(如图 5 所示)。

图 4:包含重叠的时间范围的日志文件

开始时间  结束时间  用户名
5:00 AM  6:00 AM  ABC
5:00 AM  6:00 AM  XYZ
8:00 AM  9:00 AM  ABC
8:00 AM  10:00 AM  ABC
10:00 AM  2:00 PM  ABC
7:00 AM  11:00 AM  ABC
9:00 AM  11:00 AM  ABC
11:00 AM  11:30 AM  ABC
11:40 PM  11:59 PM  FOO
11:50 PM  0:40 AM  FOO

图 5:合并重叠的时间范围后的日志文件

开始时间  结束时间  用户名
5:00 AM  6:00 AM  ABC
5:00 AM  6:00 AM  XYZ
7:00 AM  2:00 PM  ABC
11:40 PM  0:40 AM  FOO

如果你研究一下此问题,首先会注意到你想要通过定义用户定义聚合等内容来合并重叠的时间间隔。不过,如果你研究一下输入数据,则会注意到由于数据未经排序,你要么需要维持所有可能的时间间隔的状态,然后将非连续时间间隔合并为桥接时间间隔,要么需要针对每个用户名对时间间隔进行重新排序,以简化时间间隔的合并。

虽然有序聚合更易于横向扩展,但 U-SQL 不提供有序的用户定义聚合程序 (UDAGG)。此外,UDAGG 通常会每组生成一行,而在此示例中,如果为非连续范围,我可以每组生成多行。

幸运的是,U-SQL 提供了一种称为化简程序 (bit.ly/2evGsDA) 的可缩放 UDO,可根据使用自定义代码设置的分组键聚合行集。

我们要先编写 U-SQL 逻辑,其中 ReduceSample.Range­Reducer 是 RangeReducer 程序集中的用户定义化简程序(化简程序 UDO),日志数据位于文件 /Samples/Blogs/MRys/Ranges/ranges.txt (bit.ly/2eseZyw) 中,并使用“-”作为列分隔符。代码如下:

REFERENCE ASSEMBLY RangeReducer;
@in = EXTRACT start DateTime, end DateTime, user string
FROM "/Samples/Blogs/MRys/Ranges/ranges.txt"
USING Extractors.Text(delimiter:'-');
@r =  REDUCE @in PRESORT start ON user
      PRODUCE start DateTime, end DateTime, user string
      READONLY user
      USING new ReduceSample.RangeReducer();
OUTPUT @r
TO "/temp/result.csv"
USING Outputters.Csv();

REDUCE 表达式将行集 @in 视作输入,根据用户列对其进行分区,根据起始列中的值对分区进行预排序,然后应用 RangeReducer,从而在输出中生成相同的行集架构。由于化简程序仅调整起止范围,因此它其实并未调整用户列,因此将用户列标记为 READONLY。这样一来,化简程序框架便有权自动为该列传递数据,进而允许 U-SQL 查询处理程序出于一些目的主动优化只读列,如为了先于化简程序在只读列上叠加谓词。

编写化简程序的方法是实现 Microsoft.Analytics.Interfaces.IReducer 实例。在此示例中,由于不必提供任何参数,因此只需覆盖抽象的 Reduce 方法。可以将代码复制到适用于 U-SQL 的 C# 库中,然后将它注册为程序集 RangeReducer,如上所述。图 6 展示了 RangeReducer 的实现。(请注意,由于空间有限,已改变一些代码示例中的常规代码缩进做法。)

图 6:RangeReducer 的 C# 实现

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ReduceSample
{
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Init aggregation values
      bool first_row_processed = false;
      var begin = DateTime.MaxValue;
      var end = DateTime.MinValue;
      // Requires that the reducer is PRESORTED on begin and
      // READONLY on the reduce key.
      foreach (var row in input.Rows)
      {
        // Initialize the first interval with the first row if i is 0
       if (!first_row_processed)
        {
         first_row_processed = true; // Mark that the first row was handled
          begin = row.Get<DateTime>("start");
          end = row.Get<DateTime>("end");
          // If the end is just a time and not a date, it can be earlier
          // than the begin, indicating it is on the next day;
          // this let's you fix up the end to the next day in that case
          if (end < begin) { end = end.AddDays(1); }
        }
        else // Handle the remaining rows
        {
          var b = row.Get<DateTime>("start");
          var e = row.Get<DateTime>("end");
          // Fix up the date if end is earlier than begin
          if (e < b) { e = e.AddDays(1); }
          // If begin time is still inside the interval,
          // increase the interval if it is longer
          if (b <= end)
          {
            // If the new end time is later than the current,
            // extend the interval
            if (e > end) { end = e; }
          }
          else // Output the previous interval and start a new one
          {
            output.Set<DateTime>("start", begin);
            output.Set<DateTime>("end", end);
            yield return output.AsReadOnly();
            begin = b; end = e;
          } // if
        } // if
      } // foreach
      // Now output the last interval
      output.Set<DateTime>("start", begin);
      output.Set<DateTime>("end", end);
      yield return output.AsReadOnly();
    } // Reduce
  } // RangeReducer
} // ReduceSample

U-SQL REDUCE 表达式会对各个不同的分区键并行应用一次 Reduce 方法。因此,输入参数仅包含给定组的行,且实现可返回零到多行作为输出。

由于 PRESORT 子句保证行已经过排序,因此内部逻辑可假设数据已经过排序。又因为用户列被标记为 READONLY,所以将自动传递列数据,而你只需关注要转换的列,便可以更惯常的做法编写 UDO 代码。

如果你现在对大型数据集应用化简程序,且一些用户使用你系统的频次可能高于另一些用户,那么你会遇到所谓的数据倾斜问题,即一些用户具有大型分区,而另一些用户仅具有小型分区。由于可保证化简程序协定查看相应分区的所有数据,因此必须将所有数据都无序播放到相应节点,然后在一次调用中予以读取。鉴于这一要求,最好的情况是,此类数据倾斜可能会导致一些分区的处理时间长于另一些分区;最坏的情况是,此类数据倾斜可能会导致某些化简程序用尽可用的内存和时间资源(运行约五小时后,U-SQL 顶点将超时)。

如果化简程序语义具有关联性和可交换性,且其输出架构与输入架构相同,则可将化简程序标记为递归,这可允许查询引擎将大型组拆分为较小的子组,然后在这些子组上以递归方式应用化简程序,从而计算最终结果。此递归应用程序允许化简程序在出现数据偏斜时更好地平衡和并行化。化简程序是使用属性注释 SqlUserDefinedReducer(IsRecursive = true) 标记为递归:

namespace ReduceSample
{
  [SqlUserDefinedReducer(IsRecursive = true)]
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Insert the code from Figure 6 here
    } // Reduce
  } // RangeReducer
} // ReduceSample

在我们的示例中,可将化简程序标记为递归,从而提升可缩放性和性能(假设每次递归调用时处理功能将保留行之间的排序)。

有关 Visual Studio 项目示例,请访问我们的 GitHub 存储库 (bit.ly/2ecLe5B)。

处理 JSON 文档

除了逗号分隔文本文件之外,最常见的一种数据格式是 JSON。不同于 CSV 文件格式,U-SQL 不提供内置的 JSON 提取程序。不过,U-SQL 社区提供了一个示例程序集 (bit.ly/2d9O4va),可支持提取和处理 JSON 和 XML 文档。

这一解决方案使用 Newtonsoft 的 Json.NET 库 (bit.ly/2evWJbz) 执行繁重的 JSON 处理工作,并使用 System.XML 处理 XML。此程序集可使用 JsonExtractor (bit.ly/2dPARsM) 从 JSON 文档提取数据,并能通过 JsonTuple 函数 (bit.ly/2e8tSuX) 获取 JSON 文档并将其拆分为 SqlMap(以便导航和分解 JSON 文档),最后可通过 JSONOutputter (bit.ly/2e4uv3W) 将行集转换成 JSON 格式文件。

请注意,根据设计,此程序集为通用 JSON 处理程序。也就是说,它不会假设 JSON 文档结构,需要适应 JSON 的半结构化特性,包括异类键入的元素(标量与结构化、同一元素的不同数据类型、缺少的元素等)。如果你了解到 JSON 文档遵循某种特定的架构,可以创建更加有效的 JSON 提取程序。

不同于前面介绍的化简程序示例(你需要编写自己的程序集以供稍后部署),在此示例中,这一解决方案随时可用。你可以将这一解决方案从我们的 GitHub 存储库加载到 Visual Studio 中,然后自行生成和部署,也可以在这一解决方案的 bin\Debug 目录中找到 DLL。

如前所述,非系统依赖项要求必须在 U-SQL 元数据存储中同时注册 Samples.Format 和 Json.NET 程序集(你可以在使用 Visual Studio 工具注册 Format 程序集时,选择 Newtonsoft 程序集作为托管依赖项),且必须同时引用这两个程序集(如果你要处理 JSON 文档的话)。假设你已在 U-SQL 目录的 U-SQL 数据库 JSONBlog 中以 [Microsoft.Analytics.Samples.Formats] 和 [NewtonSoft.Json] 名称安装 JSON 程序集(见图 7),可以使用它们,方法为在脚本开头通过以下代码引用它们:

REFERENCE ASSEMBLY JSONBlog.[NewtonSoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];

在 Visual Studio 中注册 Formats 程序集
图 7:在 Visual Studio 中注册 Formats 程序集

JSON 提取程序实现 U-SQL IExtractor 接口。由于需要完全分析 JSON 文档以确保其格式正确,因此需要在单个提取程序顶点处理包含单个 JSON 文档的文件。所以,可通过将 AtomicFileProcessing 属性设置为 true,指明提取程序需要查看完整文件内容(见图 8)。可使用可选参数 rowpath 调用提取程序。借助此参数,我们可以标识将使用 JSONPath 表达式 (bit.ly/1EmvgKO) 映射到行的各个 JSON 对象。

图 8:JSON 提取程序

[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class JsonExtractor : IExtractor
{
  private string rowpath;            
  public JsonExtractor(string rowpath = null)
  {
    this.rowpath = rowpath;
  }
  public override IEnumerable<IRow> Extract(
    IUnstructuredReader input, IUpdatableRow output)
  {
    // Json.NET
    using (var reader = new JsonTextReader(
      new StreamReader(input.BaseStream)))
    {
      // Parse Json
      var root = JToken.ReadFrom(reader);
      // Rows
      // All objects are represented as rows
      foreach (JObject o in SelectChildren(root, this.rowpath))
      {
        // All fields are represented as columns
        this.JObjectToRow(o, output);
        yield return output.AsReadOnly();
      }
    }
  }
}

通过提取程序实现,可将 U-SQL 提取程序框架馈送给提取程序的输入流传递到 Json.NET JsonTextReader。然后,它使用 rowpath 获取通过 SelectChildren 映射到行的子树。因为 JSON 对象可以是异类的,所以代码返回的是常规 JObject,而不是位置 JArray 或标量值。

请注意,此提取程序要将 JSON 文档加载到内存中。如果文档太大,可能会导致内存不足。在这种情况下,你不得不编写自己的提取程序,以流式传输文档,而无需将整个文档加载到内存中。

现在,让我们来使用 JSON 提取程序和 JSON 元组函数分析 /Samples/Blogs/MRys/JSON/complex.json 中的复杂 JSON 文档 (bit.ly/2ekwOEQ),如图 9 所示。

图 9:JSON 示例文档

[{
  "person": {
    "personid": 123456,
    "name": "Person 1",
    "addresses": {
      "address": [{
        "addressid": "2",
        "street": "Street 2",
        "postcode": "1234 AB",
        "city": "City 1"
      }, {
        "addressid": "2",
        "street": "Street 2",
        "postcode": "5678 CD",
        "city": "City 2"
      }]
    }
  }
}, {
     "person": {
     "personid": 798,
     "name": "Person 2",
     "addresses": {
       "address": [{
         "addressid": "1",
         "street": "Street 1",
         "postcode": "1234 AB",
         "city": "City 1"
     }, {
         "addressid": "4",
         "street": "Street 7",
         "postcode": "98799",
         "city": "City 3"
     }]
   }
  }
}]

格式为一组人员“对象”(从技术角度来讲,每个对象包含一个人员键),这样就可以包含一些人员属性和地址对象。图 10 中的 U-SQL 脚本每个人员/地址组合提取一行。

图 10:处理图 9 中的示例 JSON 文档的 U-SQL 脚本

DECLARE @input string = "/Samples/Blogs/MRys/JSON/complex.json";
REFERENCE ASSEMBLY JSONBlog.[Newtonsoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
@json =
  EXTRACT personid int,
          name string,
          addresses string
  FROM @input
  USING new JsonExtractor("[*].person");
@person =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(
           addresses, "address")["address"] AS address_array
  FROM @json;
@addresses =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(address) AS address
  FROM @person
       CROSS APPLY
         EXPLODE (JsonFunctions.JsonTuple(address_array).Values)
           AS A(address);
@result =
  SELECT personid,
         name,
         address["addressid"]AS addressid,
         address["street"]AS street,
         address["postcode"]AS postcode,
         address["city"]AS city
  FROM @addresses;
OUTPUT @result
TO "/output/json/persons.csv"
USING Outputters.Csv();

请注意,此脚本将 JSONPath 表达式 [*].person 传递给提取程序,从而为顶级数组中的每个人员字段生成一行。提取程序使用 EXTRACT 架构将生成的对象属性提取到列中。由于地址字段本身就是嵌套的 JSON 文档,因此首次调用 JsonTuple 函数会创建包含地址对象的映射,随后每个地址对象会通过 CROSS APPLY EXPLODE 表达式映射到一行。最后,从映射数据类型中提取出所有地址属性,从而生成行集,如图 11 所示。

图 11:处理图 9 中的 JSON 文档后生成的行集

123456 人员 1 2 街道 2 1234 AB 城市 1
123456 人员 1 2 街道 2 5678 CD 城市 2
798 人员 2 1 街道 1 1234 AB 城市 1
798 人员 2 4 街道 7 98799 城市 3

有关 Visual Studio 项目示例和其他 JSON 处理应用场景(包括一个文件内含多个 JSON 文档),请访问我们的 GitHub 存储库 (bit.ly/2dzceLv)。

处理图像数据

在此示例中,我将处理一些较大的非结构化数据,即图像数据。特别是,我想处理 JPEG 图片,并提取一些 JPEG EXIF 属性。此外,我还想创建图像的缩略图。幸运的是,.NET 在 System.Drawing 类中提供了各种图像处理功能。因此,只需生成 U-SQL 扩展函数和运算符,即可将 JPEG 处理委派给这些类。

有多种方法可以做到这一点。初步尝试可能是将所有图像作为字节数组加载到行集中,然后应用各个用户定义函数来提取每个属性并创建缩略图,如图 12 所示。

图 12:将图像加载到行中,从而在 U-SQL 中处理图像

REFERENCE ASSEMBLY Images;
USING Images;
@image_data =
  EXTRACT image_data byte[]  // Max size of row is 4MB!
        , name string
        , format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new ImageExtractor();
// Use UDFs
@image_properties =
  SELECT ImageOps.getImageProperty(image_data, ImageProperties.copyright)
         AS image_copyright,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_make)
         AS image_equipment_make,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_model)
         AS image_equipment_model,
         ImageOps.getImageProperty(image_data, ImageProperties.description)
         AS image_description
  FROM @image_data
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");

不过,这种方法存在一些缺点:

  • 由于 U-SQL 行大小上限为 4MB,因此解决方案的图像大小(减去其他列的大小)不得超过 4MB。
  • 每个函数调用都有可能会增加内存压力,需要在整个 U-SQL 处理过程中传递字节数组。

因此,最好直接在自定义提取程序中提取属性和创建缩略图。图 13 展示了修订后的 U-SQL 脚本。

图 13:使用提取程序提取特性,从而在 U-SQL 中处理图像

REFERENCE ASSEMBLY Images;
@image_features =
  EXTRACT copyright string,
          equipment_make string,
          equipment_model string,
          description string,
          thumbnail byte[],
          name string,
          format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new Images.ImageFeatureExtractor(scaleWidth:500, scaleHeight:300);
@image_features =
  SELECT *
  FROM @image_features
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");
OUTPUT @image_features
TO @"/output/images/image_features.csv"
USING Outputters.Csv();
@scaled_image =
  SELECT thumbnail
  FROM @image_features
  WHERE name == "GT4";
OUTPUT @scaled_image
TO "/output/images/GT4_thumbnail_2.jpg"
USING new Images.ImageOutputter();

此脚本从文件集模式 /Samples/Data/Images/{name}.{format} (bit.ly/2ektTY6) 指定的图像中提取属性和缩略图。然后,SELECT 语句限制为仅提取 JPEG 文件,具体方法为仅在格式列上使用谓词,将所有非 JPEG 文件排除在提取范围外(优化程序仅将提取程序应用于满足格式列上谓词的文件)。提取程序允许指定缩略图的尺寸。然后,脚本将特性输出到 CSV 文件中,并使用简单的“字节-流-级别”输出程序为其中一个缩小图像创建缩略图文件。

图 14 展示了提取程序的实现。

图 14:图像特性提取程序

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Drawing;
using System.Drawing.Imaging;
using System.Drawing.Drawing2D;
namespace Images
{
  public static class UpdatableRowExtensions
  {
    public static void SetColumnIfExists<T>(this IUpdatableRow source
                                           , string colName, T value)
    {
      var colIdx = source.Schema.IndexOf(colName);
      if (colIdx != -1)
      { source.Set<T>(colIdx, value); }
    }
  }
  [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
  public class ImageFeatureExtractor : IExtractor
  {
    private int _scaleWidth, _scaleHeight;
    public ImageFeatureExtractor(int scaleWidth = 150, int scaleHeight = 150)
    { _scaleWidth = scaleWidth; _scaleHeight = scaleHeight; }
    public override IEnumerable<IRow> Extract(IUnstructuredReader input
                                             , IUpdatableRow output)
    {
      byte[] img = ImageOps.GetByteArrayforImage(input.BaseStream);
      using (StreamImage inImage = new StreamImage(img))
      {
        output.SetColumnIfExists("image", img);
        output.SetColumnIfExists("equipment_make",
          inImage.getStreamImageProperty(ImageProperties.equipment_make));
        output.SetColumnIfExists("equipment_model",
          inImage.getStreamImageProperty(ImageProperties.equipment_model));
        output.SetColumnIfExists("description",
          inImage.getStreamImageProperty(ImageProperties.description));
        output.SetColumnIfExists("copyright",
          inImage.getStreamImageProperty(ImageProperties.copyright));
        output.SetColumnIfExists("thumbnail",
          inImage.scaleStreamImageTo(this._scaleWidth, this._scaleHeight));
      }
      yield return output.AsReadOnly();
    }
  }
}

此提取程序也需要查看整个文件,并在 input.BaseStream 上运行,但现在仅在内存中创建一个图像,不同于图 12 中的脚本。此提取程序还检查是否有各个请求的列,并使用扩展方法 SetColumnIfExists 仅处理请求的列名称的数据。

如需了解更多详情,请参阅 GitHub 站点上的 Visual Studio 项目 (bit.ly/2dngXCE)。

处理空间数据

在此示例中,我将展示如何在 U-SQL 中使用 SQL Server 空间类型程序集 Microsoft.SqlServer.Types.dll。特别是,我想在 U-SQL 脚本中将空间库函数用作用户定义函数。如前面介绍的 JSON 提取程序,这意味着你需要在 JSON 中注册现有程序集,而无需编写自己的程序集。

首先,你需要从 SQL Server 2016 功能包 (bit.ly/2dZTw1k) 下载并安装程序集。选择 64 位版本安装程序 (ENU\x64\SQLSysClrTypes.msi),以确保拥有 64 位版本的库。

此安装程序将托管程序集 Microsoft.Sql­­Server.Types.dll 安装到 C:\Program Files (x86)\Microsoft SQL Server\130\SDK\Assemblies 中,并将本机程序集 SqlServerSpatial130.dll 安装到 \Windows\System32\ 中。接下来,将程序集上载到 Azure Data Lake Store(例如,文件夹 /upload/asm/spatial)中。由于安装程序已将本机库安装到系统文件夹 c:\Windows\System32 中,因此,请务必在上载之前从此文件夹复制 SqlServerSpatial130.dll,或确保你所用的工具不会在系统文件夹上执行文件系统重定向 (bit.ly/1TYm9YZ)。例如,如果你想要使用最新的 Visual Studio ADL 文件资源管理器上载它,必须先将此文件复制到其他目录中;否则,(自本文编写之时起)你将上载的是 32 位版本(因为 Visual Studio 是 32 位应用程序,在 ADL 上载文件选择窗口中执行文件系统重定向),而且你会在运行调用本机程序集的 U-SQL 脚本时看到以下运行时(内部)错误: “来自用户表达式的内部异常: 尝试加载格式错误的程序。(HRESULT 发生异常: 0x8007000B)”。

上载两个程序集文件后,在 SQLSpatial 数据库中向此脚本注册它们:

DECLARE @ASSEMBLY_PATH string = "/upload/asm/spatial/";
DECLARE @SPATIAL_ASM string = @ASSEMBLY_PATH+"Microsoft.SqlServer.Types.dll";
DECLARE @SPATIAL_NATIVEDLL string = @ASSEMBLY_PATH+"SqlServerSpatial130.dll";
CREATE DATABASE IF NOT EXISTS SQLSpatial;
USE DATABASE SQLSpatial;
DROP ASSEMBLY IF EXISTS SqlSpatial;
CREATE ASSEMBLY SqlSpatial
FROM @SPATIAL_ASM
WITH ADDITIONAL_FILES =
  (
    @SPATIAL_NATIVEDLL
  );

请注意,在此示例中,仅注册一个 U-SQL 程序集,然后将本机程序集作为强依赖项添加到 U-SQL 程序集中。若要使用空间程序集,你只需引用 U-SQL 程序集,其他文件会自动对此程序集可用。图 15 展示了使用空间程序集的简单脚本示例。

图 15:在 U-SQL 中使用空间功能

REFERENCE SYSTEM ASSEMBLY [System.Xml];
REFERENCE ASSEMBLY SQLSpatial.SqlSpatial;
USING Geometry = Microsoft.SqlServer.Types.SqlGeometry;
USING Geography = Microsoft.SqlServer.Types.SqlGeography;
USING SqlChars = System.Data.SqlTypes.SqlChars;
@spatial =
    SELECT * FROM (VALUES
                   // The following expression is not using the native DDL
                   ( Geometry.Point(1.0,1.0,0).ToString()),   
                   // The following expression is using the native DDL
                   ( Geometry.STGeomFromText(
                     new SqlChars("LINESTRING (100 100, 20 180, 180 180)"),
                     0).ToString())
                  ) AS T(geom);
OUTPUT @spatial
TO "/output/spatial.csv"
USING Outputters.Csv();

由于 SQL 类型库依赖 System.Xml 程序集,因此需要引用它。此外,一些方法使用的是 System.Data.SqlTypes 类型,而不是内置的 C# 类型。由于 System.Data 已默认包括在内,因此你只需引用所需的 SQL 类型即可。有关图 15 中的代码,请访问 GitHub 站点 (bit.ly/2dMSBm9)。

总结: 一些与 UDO 有关的提示和最佳做法

本文介绍了如何使用 U-SQL 扩展性机制重复使用现有的域特定代码,同时使用 U-SQL 扩展框架大规模处理典型的大数据,而这只是对强大的 U-SQL 扩展性功能的简单介绍。

不过,此类功能强大的工具也很容易被滥用,因此下面提供了一些提示和最佳做法建议。

尽管自定义数据格式通常需要自定义提取程序,并可能会需要输出程序,仍应慎重考虑数据格式能否进行并行提取(如 CSV 类型格式),或考虑处理功能是否需要查看单个运算符实例中的所有数据。此外,如果运算符的通用性足以仅在请求特定列时执行处理,还可提升性能。

考虑处理程序、化简程序、合并程序和应用方等 UDO 时,强烈建议首先考虑利用内置运算符的纯 U-SQL 解决方案。例如,实际上可巧妙地使用一些窗口化和排名函数,编写前面介绍的范围化简程序脚本。下面介绍了仍要考虑 UDO 的部分原因:

  • 逻辑需要动态访问正在处理的行集的输入或输出架构。例如,为行中的数据创建 JSON 文档,但无法预知其中的列。
  • 在 SELECT 表达式中使用多个用户定义函数的解决方案会带来太大的内存压力,你可以使用处理程序 UDO 编写可提高内存效率的代码。
  • 你需要使用有序聚合程序或每组生成多行的聚合程序,无法使用窗口化函数编写任一聚合程序。

使用 UDO 时,应始终遵循以下提示:

  • 使用 READONLY 子句允许通过 UDO 叠加谓词。
  • 使用 REQUIRED 子句允许通过 UDO 叠加列修剪。
  • 在使用 UDO 的查询表达式上提示基数,以防查询优化程序选择错误的计划。

Michael Rys 是 Microsoft 的首席项目经理。他从二十世纪八十年代便开始从事数据处理和查询语言相关工作。他代表 Microsoft 加入了 XQuery 和 SQL 设计委员会,升华了 SQL Server 与 XML、地理空间和语义搜索的关系。目前,他正致力于大数据查询语言(如 SCOPE 和 U-SQL)的研究。工作以外的时间,他还会与家人一起潜水或观看汽车越野赛。可以在 Twitter 上关注他 (@MikeDoesBigData)。

衷心感谢以下 Microsoft 技术专家对本文的审阅: Clemens Szyperski、Ed Triou、Saveen Reddy 和 Michael Kadaner
Clemens Szyperski 是 Microsoft 的首席集团工程经理。数十年来,他热衷于从事语言、工具和方法的专业研究,以简化复杂软件系统的构造。目前,他负责领导 Azure Data Lake U-SQL 和 Scope 团队。工作以外的时间,他还会与家人一起去航海。可以在 Twitter 上关注他 (@ClemensSzy)。

Ed Triou 是 Microsoft 的首席开发负责人。过去 20 年里,他一直致力于研究数据可编程性(ODBC、OLEDB、ADO.NET、JDBC、PHP 和 EDM),专门从事编译器和查询语言(IDL、TSQl、LINQ to SQL/实体、eSQL、SCOPE 和 U-SQL)的研究。  目前,他负责领导 U-SQL 编译器和语言团队,力求在内外部业务(每天依赖于 ADL 和 Cosmos,规模达百亿亿字节)方面保持领先地位。

Saveen Reddy 是 Microsoft 的首席项目经理,致力于设计和生成 Azure Data Lake 平台(支持所有 Microsoft 大数据云服务的组件和体验)。Saveen 的《合金装备 V 幻痛》 已全部通关。可以在 Twitter 上关注他 (@saveenr)。

Michael Kadaner 是 Microsoft 的首席软件工程师。尽管在计算机科学和软件开发等各种领域拥有数十年的经验,他仍认为编写程序是一门精准的学问,而且软件是可以没有任何错误的。他真正热衷的是,解决复杂的算法和工程问题,并通过简洁且正确设计的代码实现解决方案。在业余时间,他喜欢阅读和 DIY。