Connect(); 2016

Volume 31 Number 12

Connect(); インテリジェント アプリ - U-SQL ビッグ データ アプリケーションでの拡張性

Michael Rys; 2016

ビック データを処理する場合、データの量 (volume)、処理速度 (velocity)、多様性 (variety) という 3 つの重要な V への対処が以前から重視されてきました。そのため、大量のデータを処理するスケーラブルなプラットフォームの提供、ほぼリアルタイムの処理機能の追加、CSV、JSON、カスタム バイナリ形式など多様な入力データ形式を処理する機能などに力が注がれてきました。多くの場合、ある程度後回しにされる多様性の 1 つが、カスタム データの処理に関連する多様性です。これは形式面だけでなく、クエリ言語を使用する際の宣言型の性質を保ちながら、カスタム アルゴリズムによって分析の拡張を容易にする機能です。

最近のビッグ データ処理とクエリ言語の中には、この面の多様性に対処するために生まれたものもあります。特に、U-SQL は、SQL ベースの言語が持つ宣言型の威力と、既存のコード ライブラリを使用して新しいカスタム アルゴリズムを開発する柔軟性を組み合わせるために根本から設計されています。

前回 (bit.ly/1OtXM2K) はこの U-SQL を紹介し、U-SQL で Microsoft .NET Framework の型システムと C# ベースの記述言語を使用して、カスタム コード式で分析をシームレスに拡張する方法を説明しました。今回は、C# のアセンブリを使用してユーザー定義関数 (UDF) を定義する方法と、その関数を U-SQL クエリ スクリプトで使用する方法を取り上げます。

U-SQL は独自のカスタム C# 関数を追加できるだけではなく、抽出ツール、出力ツール、行セット演算子 (プロセッサ、適用ツール、レジューサ、カスタム結合ツール) などの独自のユーザー定義演算子 (UDO) を追加できるフレームワークも提供しています。このフレームワークは、以下の2 つの部分から構成されます。

  1. .NET インターフェイス。このインターフェイスは、U-SQL へのスケールアウト実行を可能にしたまま、コードに専念できる方法で上記の演算子をビルドするコントラクトを提供します。実際のビジネス ロジックのコードを .NET で実装する必要はありません。これについては後ほど説明します。
  2. U-SQL 式。これは、カスタム演算子を呼び出して、この演算子をデータのスケールで実行する EXTRACT や REDUCE などの式のことです。

今回は、前回に基づいて、JSON から画像データまで、多種多様なデータ範囲を処理するために U-SQL の拡張メカニズムを使用する方法を示します。また、独自の演算子を追加する方法も取り上げます。

U-SQL でのカスタム コードの管理

いくつかの例を使って説明する前に、U-SQL がカスタム コードを使用するしくみを理解します。

上記のとおり、U-SQL は、U-SQL 述語や select 句内の式で使用されるスカラー式と、C# に従います。カスタム コードを U-SQL コンパイラに認識させるため、カスタム コードを .NET アセンブリにパッケージ化しなければなりません。U-SQL スクリプトでは、このアセンブリを参照する必要があります。アセンブリを参照できるようにするには、CREATE ASSEMBLY ステートメントを使用して U-SQL メタデータ サービスにあらかじめそのアセンブリを登録しておく必要があります。

U-SQL アセンブリの登録と参照: ここでは、Azure Data Lake Tools for Visual Studio (aka.ms/adltoolsvs、英語) を使用します。このツールにより、U-SQL を操作するアセンブリのビルドと登録が容易になります。“Class Library (For U-SQL Application)“ プロジェクトでカスタム コードを作成する場合 (図 1 参照) は、コードを記述し、プロジェクトをビルド後、生成されたアセンブリ DLL ファイルを右クリックして直接登録できます (図 2 参照)。

Class Library (For U-SQL Application) プロジェクト
図 1 Class Library (For U-SQL Application) プロジェクト

U-SQL アセンブリの登録
図 2 U-SQL アセンブリの登録

その後、U-SQL スクリプトで REFERENCE ASSEMBLY ステートメントを使用して、パブリック クラスとメソッドを U-SQL スクリプトで使用できるようにするだけです (図 3 参照)。

図 3 カスタム アセンブリからのユーザー定義関数の参照

REFERENCE ASSEMBLY master.TweetAnalysis;
USING tweet_fns = TweetAnalysis.Udfs;
@t =
  EXTRACT date string,
          time string,
          author string,
          tweet string
  FROM "/Samples/Data/Tweets/Tweets.csv"
  USING Extractors.Csv();
// Get the mentions from the tweet string
@m =
  SELECT origin
       , tweet_fns.get_mentions(tweet) AS mentions
       , author AS mentioned_by
FROM @t;
...

既存のコードと U-SQL アセンブリの併用: 既存のコードや、.NET 以外のコードを使用することはよくあります。.NET 以外のコード (ネイティブ ライブラリや、Python、JavaScript のようなまったく異なる言語ランタイムなど) を使用する場合、そのコードを C# 相互運用層でラップして U-SQL から呼び出せるようにして、コンポーネントと UDO インターフェイス コントラクトの実装の間でデータをマーシャリングしなければなりません。この場合、ネイティブ .dll や異なるランタイムのファイルなどの .NET 以外のコードのアーティファクトを、追加ファイルとして追加する必要があります。これはアセンブリ登録の [Additional File] (追加ファイル) オプションで行います。これらのファイルはスクリプトでの .NET アセンブリ参照時にすべてのノードに自動的に配置され、.NET アセンブリの作業ディレクトリをそのノードのローカルで使用できるようにします。

既存の .NET ライブラリを使用するには、既存のコード ライブラリを [Managed Dependencies] (管理対象の依存関係) として独自のアセンブリに登録します。あるいは、U-SQL で直接使用可能なライブラリを再利用する場合は、それを U-SQL データベースに直接登録します。いずれの場合でも、スクリプトでは必要なすべての .NET アセンブリを参照しなければなりません。

本稿の後半では、これらの登録オプションの例を紹介し、拡張モデルの使用が妥当なカスタム コード シナリオについて考えます。このようなシナリオには、カスタム レジューサによる重複範囲のマージ、JSON ドキュメントの処理、画像データの処理、空間データの処理などがあります。ここからは、これらを順番に見ていきます。

カスタム レジューサによる重複範囲のマージ

開発したサービスをユーザーがいつ利用したかを追跡するログ ファイルがあるとします。さらに、ユーザーは複数の方法 (複数のデバイスやブラウザー ウィンドウからの Bing 検索実行など) でそのサービスを操作できるものとします。ここで、後から分析するためにログ ファイルを準備する U-SQL ジョブの一環として、重複する範囲をマージしたいと考えます。

たとえば、図 4 のような入力ログ ファイルを、ユーザーごとに重複範囲をマージして図 5 のようにします。

図 4 時間範囲が重複するログ ファイル

開始時刻  終了時刻  ユーザー名
午前 5:00  午前 6:00  ABC
午前 5:00  午前 6:00  XYZ
午前 8:00  午前 9:00  ABC
午前 8:00  午前 10:00  ABC
午前 10:00  午後 2:00  ABC
午前 7:00  午前 11:00  ABC
午前 9:00  午前 11:00  ABC
午前 11:00  午前 11:30  ABC
午後 11:40  午後 11:59  FOO
午後 11:50  午前 0:40  FOO

図 5 重複時間範囲をマージ後のログ ファイル

開始時刻  終了時刻  ユーザー名
午前 5:00  午前 6:00  ABC
午前 5:00  午前 6:00  XYZ
午前 7:00  午後 2:00  ABC
午後 11:40  午前 0:40  FOO

状況を見ると、いわゆるユーザー定義集計を定義して、重複する時間間隔を組み合わせる必要があることがわかります。ただし、入力データを見ると、データが順番に並んでいないため、すべての間隔の状態を保持した後に不連続の間隔を連続間隔になるようにマージするか、ユーザー名ごとに間隔をあらかじめ並べ替えてから間隔のマージを容易にする必要があることがわかります。

並べ替え済みの集計はスケールアウトが簡単になりますが、U-SQL には並べ替えを行うユーザー定義集計ツール (UDAGG) がありません。また、通常、UDAGG はグループごとに 1 行生成しますが、今回のケースでは、範囲に連続性がないと、グループごとに複数行が存在する可能性があります。

さいわい、U-SQL には、カスタム コードを使用してグループ化キーの設定に基づいて一連の行を集計できる、レジューサと呼ばれるスケーラブルな UDO (bit.ly/2evGsDA、英語) があります。

まず、U-SQL ロジック ReduceSample.RangeReducer を作成します。これは今回 RangeReducer アセンブリに作成するユーザー定義レジューサ (レジューサ UDO) です。ログ データはファイル /Samples/Blogs/MRys/Ranges/ranges.txt (bit.ly/2eseZyw、英語) にあり、列区切りに “-” を使用します。コードは次のようになります。

REFERENCE ASSEMBLY RangeReducer;
@in = EXTRACT start DateTime, end DateTime, user string
FROM "/Samples/Blogs/MRys/Ranges/ranges.txt"
USING Extractors.Text(delimiter:'-');
@r =  REDUCE @in PRESORT start ON user
      PRODUCE start DateTime, end DateTime, user string
      READONLY user
      USING new ReduceSample.RangeReducer();
OUTPUT @r
TO "/temp/result.csv"
USING Outputters.Csv();

REDUCE 式は行セット @in を入力として受け取り、user 列に基づいてその入力をパーティションに分割し、start 列の値に基づいてそのパーティションの順序を並べ替え、RangeReducer を適用して、同じ行セット スキーマを出力に生成します。レジューサは最初から最後までの範囲を調整することしか行わず、実際には user 列を操作しないため、user 列を READONLY としてマークします。これによって、その列のデータを自動的にパススルーするアクセス許可がレジューサ フレームワークに与えられ、その見返りとして、U-SQL クエリ プロセッサは読み取り専用の列に最適化を積極的に適用できるようになります。たとえば、レジューサに先行して、読み取り専用の列に述語をプッシュするような最適化です。

レジューサを作成するには、Microsoft.Analytics.Interfaces.IReducer のインスタンスを実装します。今回は、パラメーターを指定する必要がないため、抽象 Reduce メソッドを上書きするだけです。前述のように、U-SQL 用の C# ライブラリにコードをコピーして、そのライブラリをアセンブリ RangeReducer として登録できます。図 6 に RangeReducer の実装を示します (スペースの都合上、一部のコード サンプルでは標準のコード インデントを変更しています)。

図 6 RangeReducer の C# 実装

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ReduceSample
{
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Init aggregation values
      bool first_row_processed = false;
      var begin = DateTime.MaxValue;
      var end = DateTime.MinValue;
      // Requires that the reducer is PRESORTED on begin and
      // READONLY on the reduce key.
      foreach (var row in input.Rows)
      {
        // Initialize the first interval with the first row if i is 0
       if (!first_row_processed)
        {
         first_row_processed = true; // Mark that the first row was handled
          begin = row.Get<DateTime>("start");
          end = row.Get<DateTime>("end");
          // If the end is just a time and not a date, it can be earlier
          // than the begin, indicating it is on the next day;
          // this let's you fix up the end to the next day in that case
          if (end < begin) { end = end.AddDays(1); }
        }
        else // Handle the remaining rows
        {
          var b = row.Get<DateTime>("start");
          var e = row.Get<DateTime>("end");
          // Fix up the date if end is earlier than begin
          if (e < b) { e = e.AddDays(1); }
          // If begin time is still inside the interval,
          // increase the interval if it is longer
          if (b <= end)
          {
            // If the new end time is later than the current,
            // extend the interval
            if (e > end) { end = e; }
          }
          else // Output the previous interval and start a new one
          {
            output.Set<DateTime>("start", begin);
            output.Set<DateTime>("end", end);
            yield return output.AsReadOnly();
            begin = b; end = e;
          } // if
        } // if
      } // foreach
      // Now output the last interval
      output.Set<DateTime>("start", begin);
      output.Set<DateTime>("end", end);
      yield return output.AsReadOnly();
    } // Reduce
  } // RangeReducer
} // ReduceSample

U-SQL REDUCE 式は、個別のパーティション キーごとに一度並列に Reduce メソッドを適用します。そのため、入力パラメーターには特定のグループの行のみが含まれ、この実装では多くの行に出力としてゼロを返します。

PRESORT 句は行が順番に並んでいることを保証するため、内部ロジックではデータが順番に並んでいると想定できます。また、user 列が READONLY とマークされ、自動的にパススルーされるため、変更する列だけに注目することで、UDO コードをより一般的に記述することができます。

大きなデータ セットにレジューサを適用することになる場合、かつユーザーの一部が他のユーザーよりも非常に多い頻度でシステムを使用している場合、一部のユーザーのパーティションが大きく、それ以外のユーザーのパーティションがご小さくなる「データ傾斜」という状況が発生します。レジューサのコントラクトではパーティションのすべてのデータを確認することを保証するため、すべてのデータをそのノードにシャッフルして、1 回の呼び出しで読み取る必要があります。この要件により、このようなデータ傾斜の最善のケースでも、他と比べて一部のパーティションの処理に非常に長い時間がかかることになります。最悪のケースでは、一部のレジューサが、使用可能なメモリと時間のリソースを使い果たすことになります (U-SQL 頂点は約 5 時間動作した後タイムアウトします)。

レジューサのセマンティクスに関係性があり、交換可能で、出力スキーマが入力スキーマと同じであれば、レジューサを再帰としてマークすることができます。その結果、クエリ エンジンは大きなグループを小さなサブグループに分割して、これらのサブグループにレジューサを再帰的に適用して最終結果を計算できるようになります。このような再帰的適用により、データ傾斜の出現時でもレジューサが適切にバランスを取り、並列処理が可能になります。レジューサを再帰としてマークするには、プロパティの注釈 SqlUserDefinedReducer(IsRecursive = true) を以下のように使用します。

namespace ReduceSample
{
  [SqlUserDefinedReducer(IsRecursive = true)]
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Insert the code from Figure 6 here
    } // Reduce
  } // RangeReducer
} // ReduceSample

今回の場合、処理によって再帰的呼び出し時に行間の並べ替えが保たれると想定すれば、レジューサを再帰としてマークすることで、スケーラビリティとパフォーマンスを向上できます。

今回の例の Visual Studio プロジェクトは、GitHub リポジトリ (bit.ly/2ecLe5B、英語) にあります。

JSON ドキュメントの処理

よく使われるデータ形式のうち、コンマ区切りテキスト ファイルの次に多いのが JSON です。CSV ファイル形式とは異なり、U-SQL には組み込みの JSON 抽出ツールはありません。ただし、U-SQL コミュニティでは、JSON ドキュメントと XML ドキュメント両方の抽出と処理のサポートを提供するサンプル アセンブリが提供されています (bit.ly/2d9O4va、英語)。

このソリューションは、複雑な JSON の処理には Newtonsoft の Json.NET ライブラリ (bit.ly/2evWJbz、英語) を、XML の処理には System.XML を使用しています。このアセンブリは、JsonExtractor (bit.ly/2dPARsM、英語) を使用して JSON ドキュメントからデータを抽出し、JSON ドキュメントを SqlMap に分割することで、関数 JsonTuple (bit.ly/2e8tSuX、英語) によって JSON ドキュメントを移動および分解できるようにし、最後に、JSONOutputter (bit.ly/2e4uv3W、英語) を使用して行セットを JSON 形式のファイルに変換します。

このアセンブリは、汎用の JSON プロセッサとして設計されているのがわかります。つまり、このアセンブリでは JSON ドキュメントの構造について推測せず、異なる型が指定された要素 (スカラーと構造化、同じ要素に対するさまざまなデータ型、不足している要素など) を含め、JSON の準構造化性に対して弾力性を持つ必要があります。JSON ドキュメントが特定のスキーマに従っていることが分かっている場合は、より効率的な JSON 抽出ツールを作成できる可能性があります。

後で配置する独自のアセンブリを作成した前述のレジューサの例とは異なり、JSON 抽出ツールの場合はそのソリューションをすぐに使用できます。GitHub レポジトリから Visual Studio にソリューションを読み込んで、自身でビルドして配置するか、そのソリューションの bin\Debug ディレクトリに DLL があることを確認できます。

前述のように、非システム依存関係は、Samples.Format アセンブリと Json.NET アセンブリの両方が U-SQL メタデータ ストア (Format アセンブリの登録時に Visual Studio ツールを使用して、Newtonsoft アセンブリを [Managed Dependencies] (管理対象の依存関係) として選択できます) に登録されていなければならず、JSON ドキュメントを処理する場合は、両方のアセンブリを参照する必要があります。JSON アセンブリが、U-SQL データベース JSONBlog の [Microsoft.Analytics.Samples.Formats] および [NewtonSoft.Json] 下にある SQL カタログにインストールされているとすると (図 7 参照)、そのアセンブリは以下のようにスクリプトの先頭で参照することで使用できます。

REFERENCE ASSEMBLY JSONBlog.[NewtonSoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];

Visual Studio での Formats アセンブリの登録
図 7 Visual Studio での Formats アセンブリの登録

JSON 抽出ツールは U-SQL IExtractor インターフェイスを実装します。JSON ドキュメントを完全に解析して確実に適切な形式にする必要があるため、1 つの JSON ドキュメントを含むファイルを、1 つの Extractor 頂点で処理する必要があります。そのため、AtomicFileProcessing プロパティを true に設定することで、抽出ツールがファイルのコンテンツをすべて確認する必要があることを示します (図 8 参照)。抽出ツールを rowpath というオプション値を指定して呼び出し、JSONPath 式 (bit.ly/1EmvgKO、英語) を使用して、それぞれ行にマップ付けされる JSON オブジェクトを特定できるようにします。

図 8 JSON 抽出ツール

[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class JsonExtractor : IExtractor
{
  private string rowpath;            
  public JsonExtractor(string rowpath = null)
  {
    this.rowpath = rowpath;
  }
  public override IEnumerable<IRow> Extract(
    IUnstructuredReader input, IUpdatableRow output)
  {
    // Json.NET
    using (var reader = new JsonTextReader(
      new StreamReader(input.BaseStream)))
    {
      // Parse Json
      var root = JToken.ReadFrom(reader);
      // Rows
      // All objects are represented as rows
      foreach (JObject o in SelectChildren(root, this.rowpath))
      {
        // All fields are represented as columns
        this.JObjectToRow(o, output);
        yield return output.AsReadOnly();
      }
    }
  }
}

この抽出ツールの実装は入力ストリームを Json.NET JsonTextReader に渡します。この入力ストリームは、U-SQL Extractor フレームワークが抽出ツールにフィードします。その後、抽出ツールは rowpath を使用して、SelectChildren によって行にマップされるサブツリーを取得します。JSON オブジェクトは種類が異なる可能性があるため、このコードでは位置が決まる JArray やスカラー値ではなく、汎用の JObject を返します。

この抽出ツールでは JSON ドキュメントをメモリに読み込んでいます。ドキュメントのサイズが大きすぎると、メモリ不足の状態になる可能性があります。その場合は、メモリにドキュメントをすべて読み込むのではなく、ドキュメントをストリーミング処理する独自の抽出ツールを作成する必要があります。

これで JSON 抽出ツールと JSON タプル関数を使用して、 /Samples/Blogs/MRys/JSON/complex.json (bit.ly/2ekwOEQ、英語) から複雑な JSON ドキュメントを解析できるようになります (図 9 参照)。

図 9 JSON ドキュメントの例

[{
  "person": {
    "personid": 123456,
    "name": "Person 1",
    "addresses": {
      "address": [{
        "addressid": "2",
        "street": "Street 2",
        "postcode": "1234 AB",
        "city": "City 1"
      }, {
        "addressid": "2",
        "street": "Street 2",
        "postcode": "5678 CD",
        "city": "City 2"
      }]
    }
  }
}, {
     "person": {
     "personid": 798,
     "name": "Person 2",
     "addresses": {
       "address": [{
         "addressid": "1",
         "street": "Street 1",
         "postcode": "1234 AB",
         "city": "City 1"
     }, {
         "addressid": "4",
         "street": "Street 7",
         "postcode": "98799",
         "city": "City 3"
     }]
   }
  }
}]

形式は person "オブジェクト" の配列 (技術的には、それぞれ 1 つの個人キーを持つオブジェクト) です。このオブジェクトには、個人に関するいくつかのプロパティと住所のオブジェクトが含まれています。図 10 の U-SQL スクリプトは、個人と住所の組み合わせごとに行を抽出します。

図 10 図 9 の JSON ドキュメントの例を処理する U-SQL スクリプト

DECLARE @input string = "/Samples/Blogs/MRys/JSON/complex.json";
REFERENCE ASSEMBLY JSONBlog.[Newtonsoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
@json =
  EXTRACT personid int,
          name string,
          addresses string
  FROM @input
  USING new JsonExtractor("[*].person");
@person =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(
           addresses, "address")["address"] AS address_array
  FROM @json;
@addresses =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(address) AS address
  FROM @person
       CROSS APPLY
         EXPLODE (JsonFunctions.JsonTuple(address_array).Values)
           AS A(address);
@result =
  SELECT personid,
         name,
         address["addressid"]AS addressid,
         address["street"]AS street,
         address["postcode"]AS postcode,
         address["city"]AS city
  FROM @addresses;
OUTPUT @result
TO "/output/json/persons.csv"
USING Outputters.Csv();

スクリプトは JSONPath 式 [*].person を抽出ツールに渡します。そのため、最上位配列の個人フィールドごとに 1 行が生成されます。抽出ツールは EXTRACT スキーマを使用して、結果のオブジェクトのプロパティを列に取得します。住所のフィールド自体は入れ子になった JSON ドキュメントのため、関数 JsonTuple を初めて呼び出すと、住所のオブジェクトを含むマップが作成され、CROSS APPLY EXPLODE 式によって住所ごとに 1 行にマップされます。最後に、住所のすべてプロパティが、map データ型から推測され、行セットが得られます (図 11 参照)。

図 11 図 9 の JSON ドキュメントを処理して生成された行セット

123456 Person 1 2 Street 2 1234 AB City 1
123456 Person 1 2 Street 2 5678 CD City 2
798 Person 2 1 Street 1 1234 AB City 1
798 Person 2 4 Street 7 98799 City 3

この例の Visual Studio プロジェクト、およびファイル内部に複数の JSON ドキュメントを含むその他の JSON 処理シナリオは、GitHub リポジトリ (bit.ly/2dzceLv、英語) にあります。

画像データの処理

この例では、大きな非構造化データである画像を処理します。具体的には、JPEG 画像を処理して、JPEG EXIF プロパティをいくつか抽出し、画像のサムネイルを作成します。さいわい、.NET は System.Drawing でさまざまな画像処理機能を提供します。そのため、ここでは U-SQL 拡張機能の関数と演算子をビルドして、JPEG 処理をこれらのクラスにデリゲートするだけです。

これを実行する方法はいくつかあります。最初に試みるのは、すべての画像をバイト配列として行セットに読み込んだ後、個別のユーザー定義関数を適用してプロパティをそれぞれ抽出し、サムネイルを作成する方法です (図 12 参照)。

図 12 行への画像読み込みによる U-SQL での画像処理

REFERENCE ASSEMBLY Images;
USING Images;
@image_data =
  EXTRACT image_data byte[]  // Max size of row is 4MB!
        , name string
        , format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new ImageExtractor();
// Use UDFs
@image_properties =
  SELECT ImageOps.getImageProperty(image_data, ImageProperties.copyright)
         AS image_copyright,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_make)
         AS image_equipment_make,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_model)
         AS image_equipment_model,
         ImageOps.getImageProperty(image_data, ImageProperties.description)
         AS image_description
  FROM @image_data
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");

ただし、この方法にはいくつか問題があります。

  • U-SQL 行のサイズは最大 4 MB になるため、このソリューションの画像サイズは 4 MB から他の列のサイズを差し引いた値に制限されます。
  • 関数の各呼び出しによりさらにメモリが使用される可能性があるため、バイト配列を U-SQL 処理で段階的に処理する必要があります。

そのため、カスタム抽出ツール内部で直接プロパティを抽出してサムネイルを作成する方が適切です。図 13 に、改訂した U-SQL スクリプトを示します。

図 13 抽出ツールによって特徴を抽出する U-SQL での画像処理

REFERENCE ASSEMBLY Images;
@image_features =
  EXTRACT copyright string,
          equipment_make string,
          equipment_model string,
          description string,
          thumbnail byte[],
          name string,
          format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new Images.ImageFeatureExtractor(scaleWidth:500, scaleHeight:300);
@image_features =
  SELECT *
  FROM @image_features
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");
OUTPUT @image_features
TO @"/output/images/image_features.csv"
USING Outputters.Csv();
@scaled_image =
  SELECT thumbnail
  FROM @image_features
  WHERE name == "GT4";
OUTPUT @scaled_image
TO "/output/images/GT4_thumbnail_2.jpg"
USING new Images.ImageOutputter();

このスクリプトは、ファイルセット パターン (bit.ly/2ektTY6、英語) /Samples/Data/Images/{name}.{format} で指定された画像から、プロパティとサムネイルを抽出します。その後、SELECT ステートメントが、抽出時に JPEG 以外のファイルをすべて取り除く述語を format 列にのみ使用することで、抽出を JPEG ファイルに限定します (オプティマイザーは format 列で述語を満たすファイルにのみ抽出ツールを適用します)。抽出ツールはサムネイルのサイズを指定するオプションを提供します。その後、スクリプトは特徴を CSV ファイルに出力し、簡単なバイトストリームレベルの出力ツールを使用して、縮小した画像のいずれかのサムネイル ファイルを作成します。

図 14 に抽出ツールの実装を示します。

図 14 画像の特徴抽出ツール

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Drawing;
using System.Drawing.Imaging;
using System.Drawing.Drawing2D;
namespace Images
{
  public static class UpdatableRowExtensions
  {
    public static void SetColumnIfExists<T>(this IUpdatableRow source
                                           , string colName, T value)
    {
      var colIdx = source.Schema.IndexOf(colName);
      if (colIdx != -1)
      { source.Set<T>(colIdx, value); }
    }
  }
  [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
  public class ImageFeatureExtractor : IExtractor
  {
    private int _scaleWidth, _scaleHeight;
    public ImageFeatureExtractor(int scaleWidth = 150, int scaleHeight = 150)
    { _scaleWidth = scaleWidth; _scaleHeight = scaleHeight; }
    public override IEnumerable<IRow> Extract(IUnstructuredReader input
                                             , IUpdatableRow output)
    {
      byte[] img = ImageOps.GetByteArrayforImage(input.BaseStream);
      using (StreamImage inImage = new StreamImage(img))
      {
        output.SetColumnIfExists("image", img);
        output.SetColumnIfExists("equipment_make",
          inImage.getStreamImageProperty(ImageProperties.equipment_make));
        output.SetColumnIfExists("equipment_model",
          inImage.getStreamImageProperty(ImageProperties.equipment_model));
        output.SetColumnIfExists("description",
          inImage.getStreamImageProperty(ImageProperties.description));
        output.SetColumnIfExists("copyright",
          inImage.getStreamImageProperty(ImageProperties.copyright));
        output.SetColumnIfExists("thumbnail",
          inImage.scaleStreamImageTo(this._scaleWidth, this._scaleHeight));
      }
      yield return output.AsReadOnly();
    }
  }
}

抽出ツールでは、ファイル全体を再度確認して input.BaseStream を操作する必要がありますが、図 12 のスクリプトとは異なり、ここではメモリ内に 1 つの画像のみを作成します。また、抽出ツールは要求された列もそれぞれチェックし、拡張メソッド SetColumnIfExists を使用して要求された列名のデータのみを処理します。

詳細については、GitHub Web サイト (bit.ly/2dngXCE、英語) の Visual Studio プロジェクトを参照してください。

空間データの処理

この例では、SQL Server Spatical 型のアセンブリ Microsoft.SqlServer.Types.dll を U-SQL で使用する方法を示します。具体的には、U-SQL スクリプトで空間ライブラリ関数をユーザー定義関数として使用します。前述の JSON 抽出ツールの場合と同様、既存のアセンブリを U-SQL に登録します。独自のアセンブリを作成する必要はありません。

最初に、SQL Server 2016 Feature Pack (bit.ly/2dZTw1k) をダウンロードしてインストールします。64 ビット版のインストーラー (JPN\x64\SQLSysClrTypes.msi) を選択して、ライブラリが 64 ビット版になるようにします。

このインストーラーにより、マネージ アセンブリ Microsoft.Sql­­Server.Types.dll が C:\Program Files (x86)\Microsoft SQL Server\130\SDK\Assemblies に、ネイティブ アセンブリ SqlServerSpatial130.dll が \Windows\System32\ にインストールされます。次に、これらのアセンブリを Azure Data Lake Store (フォルダー /upload/asm/spatial など) にアップロードします。インストーラーによってネイティブ ライブラリがシステム フォルダー C:\Windows\System32 にインストールされるため、アップロードする前にそのフォルダーから SqlServerSpatial130.dll のコピーを作成する必要があります。あるいは、使用するツールがシステム フォルダーでファイル システムのリダイレクト (bit.ly/1TYm9YZ) を実行しないようにします。たとえば、最新の Visual Studio ADL ファイル エクスプローラーを使用してアセンブリをアップロードする場合、まずそのファイルを別のディレクトリにコピーする必要があります。そうしないと、本稿執筆時点では 32 ビット版のファイルがアップロードされます (これは、Visual Studio が 32 ビット版のアプリケーションで、ADL のアップロード ファイルの選択ウィンドウでファイル システムのリダイレクトを実行するためです)。さらに、ネイティブ アセンブリを呼び出す U-SQL スクリプトを実行すると、(内部) 実行時エラーとして、 「Inner exception from user expression: 間違ったフォーマットのプログラムを読み込もうとしました。(HRESULT からの例外: 0x8007000B)」が表示されます。

2 つのアセンブリ ファイルをアップロードしたら、以下のスクリプトを使用してそれらを SQLSpatial というデータベースに登録します。

DECLARE @ASSEMBLY_PATH string = "/upload/asm/spatial/";
DECLARE @SPATIAL_ASM string = @ASSEMBLY_PATH+"Microsoft.SqlServer.Types.dll";
DECLARE @SPATIAL_NATIVEDLL string = @ASSEMBLY_PATH+"SqlServerSpatial130.dll";
CREATE DATABASE IF NOT EXISTS SQLSpatial;
USE DATABASE SQLSpatial;
DROP ASSEMBLY IF EXISTS SqlSpatial;
CREATE ASSEMBLY SqlSpatial
FROM @SPATIAL_ASM
WITH ADDITIONAL_FILES =
  (
    @SPATIAL_NATIVEDLL
  );

この場合は、1 つの U-SQL アセンブリを登録して、ネイティブ アセンブリをその U-SQL アセンブリへの厳密な依存関係として含めるだけです。空間アセンブリを使用するには、U-SQL アセンブリを参照する必要があるだけです。追加ファイルがそのアセンブリで自動的に使用できるようになります。図 15 に、空間アセンブリを使用するシンプルなサンプル スクリプトを示しています。

図 15 U-SQL での空間機能の使用

REFERENCE SYSTEM ASSEMBLY [System.Xml];
REFERENCE ASSEMBLY SQLSpatial.SqlSpatial;
USING Geometry = Microsoft.SqlServer.Types.SqlGeometry;
USING Geography = Microsoft.SqlServer.Types.SqlGeography;
USING SqlChars = System.Data.SqlTypes.SqlChars;
@spatial =
    SELECT * FROM (VALUES
                   // The following expression is not using the native DDL
                   ( Geometry.Point(1.0,1.0,0).ToString()),   
                   // The following expression is using the native DDL
                   ( Geometry.STGeomFromText(
                     new SqlChars("LINESTRING (100 100, 20 180, 180 180)"),
                     0).ToString())
                  ) AS T(geom);
OUTPUT @spatial
TO "/output/spatial.csv"
USING Outputters.Csv();

SQL Types ライブラリには System.Xml アセンブリへの依存関係があるため、そのアセンブリを参照する必要があります。また、一部のメソッドは、C# の組み込み型ではなく System.Data.SqlTypes 型を使用しています。System.Data は既定でインクルードされるため、必要な SQL 型を参照するだけです。図 15 のコードは GitHub の Web サイト (bit.ly/2dMSBm9、英語) から入手できます。

まとめ: UDO に関するヒントとベスト プラクティス

今回は、U-SQL の強力な拡張機能の一端に触れながら、U-SQL の拡張メカニズムによって既存のドメイン固有のコードを再利用し、U-SQL 拡張フレームワークを使用してビッグ データの代表的な量まで処理をスケーリングする方法を取り上げました。

ただし、この強力なツールは誤用しやすいため、いくつかのヒントとベスト プラクティスについてアドバイスします。

ほとんどの場合、カスタム データ形式にはカスタム抽出ツールや、場合によっては出力演算子が必要になりますが、データ形式が並列に抽出される (CSV 形式など) かどうかや、1 つの演算子インスタンスですべてのデータを確認する必要があるかどうかを慎重に検討してください。また、特定の列が要求されたときのみ処理が行われるように演算子をジェネリックにすることで、パフォーマンスを改善できる可能性があります。

プロセッサ、レジューサ、結合ツール、適用ツールなどの UDO を検討する前に、組み込みの演算子を利用する純粋な U-SQL ソリューションを最初に考えることを強くお勧めします。たとえば、前述の範囲レジューサのスクリプトは、実際、いくつかのウィンドウ関数や順位付け関数をうまく使用すれば、作成できます。それでも以下のような場合は、UDO を検討します。

  • 処理される行セットの入力スキーマまたは出力スキーマに動的にアクセスする必要があるロジック。たとえば、列数を事前に把握できない行のデータの JSON ドキュメントを作成する場合などです。
  • SELECT 式で複数のユーザー定義関数を使用し、メモリの使用量が多くなるため、プロセッサ UDO でメモリ効率を上げるコードを作成するソリューション。
  • 順番に並んだ集計ツールやグループごとに複数の行を生成する集計ツールが必要な場合。このような集計ツールは、ウィンドウ関数では作成できません。

UDO を使用しているときは、以下のヒントを念頭においてください。

  • READONLY 句を使用すれば、UDO を使って述語をプッシュできる。
  • REQUIRED 句を使用すれば、UDO を使ってプッシュされた列を整理できる。
  • クエリ オプティマイザーが間違ったプランを選択する場合は、UDO を使用するクエリ式でカーディナリティのヒントを示す。

Michael Rys は、マイクロソフトの主任プログラム マネージャーです。彼は、1980 年代から、データ処理とクエリ言語に携わっています。XQuery および SQL 設計委員会におけるマイクロソフトの代表を務めており、XML、地理空間、およびセマンティック検索によって、SQL Server をリレーショナルを超えた製品に進化させました。現在は、家族とダイビングや自動車レースをするとき以外は、SCOPE や U-SQL などのビッグ データ クエリ言語に取り組んでいます。彼には Twitter (@MikeDoesBigData、英語) から連絡できます。

この記事のレビューに協力してくれたマイクロソフト技術スタッフの Clemens Szyperski、Ed Triou、Saveen Reddy、および Michael Kadaner に心より感謝いたします。
Clemens Szyperski はマイクロソフトの主任グループ エンジニアリング マネージャーです。彼は数十年、複雑なソフトウェア システムの構築を容易にする特殊な言語、ツール、および方法に情熱を傾けています。現在は、家族と海に出ているとき以外は、Azure Data Lake U-SQL と Scope のチームを率いています。彼には Twitter (、英語) から連絡できます。

Ed Triou はマイクロソフトの主任開発責任者です。彼は現在までの 20 年間、特にコンパイラとクエリ言語 (IDL、TSQl、LINQ to SQL/Entities、eSQL、SCOPE、U-SQL) によるデータのプログラマビリティ (ODBC、OLEDB、ADO.NET、JDBC、PHP、EDM) に力を注いできました。  現在は、U-SQL コンパイラおよび言語チームを率いており、エクサバイト規模で ADL と Cosmos に日々頼っている社内外のビジネスの一歩先を行こうとしています。

Saveen Reddy はマイクロソフトの主任プログラム マネージャーで、Azure Data Lake Platform の設計とビルドに力を注いでいます。そのコンポーネントと経験は、マイクロソフトのビッグ データ クラウド サービスのすべてを支えています。彼は Metal Gear Solid V: The Phantom Pain の達成率を 100% にしました。彼には Twitter (@saveenr、英語) から連絡できます。

Michael Kadaner はマイクロソフトの主任ソフトウェア エンジニアです。コンピューター科学とソフトウェア開発に関するさまざまな分野で数十年経験を積んできた彼は、プログラムの記述は精密技術であり、ソフトウェアからバグはなくせると主張しています。彼の真の情熱は、複雑なアルゴリズムやエンジニアリングに関する問題を解決することと、設計上正しく、簡潔で美しいコードでソリューションを実装することに向けられています。余暇は読書と DIY プロジェクトに充てています。