Connect(); 2016

Band 31, Nummer 12

Connect(); Intelligente Apps: Erweiterbarkeit in U-SQL Big Data-Anwendungen

Von Michael Rys; 2016

Der traditionelle Schwerpunkt bei der Behandlung der großen drei Vs von Big Data (Volumen, Geschwindigkeit (velocity) und Vielfalt (variety)) bei der Big Data-Verarbeitung lag hauptsächlich auf der Bereitstellung einer skalierbaren Plattform zum Verarbeiten des Datenvolumens, dem Hinzufügen von Verarbeitungsfunktionen beinahe in Echtzeit und der Möglichkeit, eine Vielzahl von Eingabedatenformaten von CSV über JSON bis hin zu benutzerdefinierten Binärformaten zu verarbeiten. Ein Aspekt von Vielfalt, der häufig nicht an erster Stelle berücksichtigt wird, ist die Vielfalt, die für mit benutzerdefinierter Datenverarbeitung einhergeht: nicht nur hinsichtlich der verschiedenen Formate, sondern auch unter dem Aspekt der einfachen Erweiterbarkeit Ihrer Analyse durch benutzerdefinierte Algorithmen unter Beibehaltung der deklarativen Natur der Abfragesprache.

Einige moderne Big Data-Verarbeitungs- und Abfragesprachen beginnen, sich der Herausforderung zu stellen. Insbesondere U-SQL wurde von Grund auf dafür konzipiert, die deklarative Leistungsfähigkeit einer auf SQL basierenden Sprache mit der Flexibilität zur Verwendung vorhandener Codebibliotheken und Entwicklung neuer benutzerdefinierter Algorithmen zu kombinieren.

In einem früheren Artikel (bit.ly/1OtXM2K) habe ich U-SQL vorgestellt und gezeigt, wie durch die Verwendung des Microsoft .NET Framework-Typsystems zusammen mit der auf C# basierenden Ausdruckssprache in U-SQL die Erweiterung Ihrer Analyse durch benutzerdefinierte Codeausdrücke nahtlos erfolgen kann. Ich habe erläutert, wie C#-Assemblys zum Definieren benutzerdefinierter Funktionen (User-Defined Functions, UDFs) verwendet werden und wie die Verwendung dieser Funktionen in Ihren U-SQL-Abfrageskripts erfolgt.

U-SQL ermöglicht nicht nur das Hinzufügen eigener benutzerdefinierter C#-Funktionen, sondern stellt auch ein Framework bereit, in dem Sie eigene benutzerdefinierte Operatoren (User-Defined Operators, UDOs) hinzufügen können, z. B. eigene Extraktions-, Ausgabe- und Rowsetoperatoren (Prozessoren, Anwender, Reduzierer und benutzerdefinierte Kombinierer). Das Framework besteht aus zwei Teilen:

  1. .NET-Schnittstellen, die den Vertrag für Sie zum Erstellen dieser Operatoren so bereitstellen, dass Sie sich auf den Code konzentrieren können. Die horizontal hochskalierte Ausführung wird dabei U-SQL überlassen. Beachten Sie, dass der tatsächliche Geschäftslogikcode nicht in .NET implementiert werden muss, wie ich an späterer Stelle zeigen werde.
  2. U-SQL-Ausdrücken wie EXTRACT und REDUCE, die die benutzerdefinierten Operatoren aufrufen und diese dann skaliert für Ihre Daten ausführen.

Dieser Artikel baut auf dem früheren Artikel auf. Ich zeige, wie die Erweiterbarkeitsmechanismen von U-SQL zum Verarbeiten einer Vielzahl verschiedener Daten von JSON bis hin zu Bilddaten verwendet werden können. Außerdem zeige ich, wie Sie eigene Operatoren hinzufügen können.

Verwalten Ihres benutzerdefinierten Codes in U-SQL

Bevor ich Ihnen einige Beispiele vorstelle, möchte ich erläutern, wie U-SQL Ihren benutzerdefinierten Code verwenden kann.

Wie bereits erwähnt, folgt U-SQL C# bezüglich der skalaren Ausdruckssprache, die z. B. in U-SQL-Prädikaten und in den Ausdrücken einer select-Klausel verwendet wird. Damit Ihr benutzerdefinierter Code für den U-SQL-Compiler sichtbar wird, muss der Code als .NET-Assemblypaket vorliegen, auf das durch das U-SQL-Skript verwiesen werden muss. Damit auf die Assembly verwiesen werden kann, muss diese zuvor im U-SQL-Metadatendienst mithilfe einer CREATE ASSEMBLY-Anweisung registriert worden sein.

Registrieren von und Verweisen auf U-SQL-Assemblys. Ich schlage vor, die Azure Data Lake-Tools für Visual Studio (aka.ms/adltoolsvs) zu verwenden, die das Erstellen und Registrieren von Assemblys vereinfachen, die mit U-SQL zusammenarbeiten. Wenn Sie Ihren benutzerdefinierten Code in einem Projekt „Klassenbibliothek (für U-SQL-Anwendung)” schreiben (siehe Abbildung 1), können Sie den Code schreiben, das Projekt erstellen und die generierte DLL-Assemblydatei dann direkt durch Klicken mit der rechten Maustaste registrieren (siehe Abbildung 2).

Projekt „Klassenbibliothek (für U-SQL-Anwendung)“
Abbildung 1: Projekt „Klassenbibliothek (für U-SQL-Anwendung)“

Registrieren einer U-SQL-Assembly
Abbildung 2: Registrieren einer U-SQL-Assembly

Nun benötigen Sie nur noch die REFERENCE ASSEMBLY-Anweisung in Ihrem U-SQL-Skript, damit die öffentlichen Klassen und Methoden in Ihrem U-SQL-Skript verwendet werden können. Abbildung 3 zeigt dies.

Abbildung 3: Verweisen auf eine benutzerdefinierte Funktion aus einer benutzerdefinierten Assembly

REFERENCE ASSEMBLY master.TweetAnalysis;
USING tweet_fns = TweetAnalysis.Udfs;
@t =
  EXTRACT date string,
          time string,
          author string,
          tweet string
  FROM "/Samples/Data/Tweets/Tweets.csv"
  USING Extractors.Csv();
// Get the mentions from the tweet string
@m =
  SELECT origin
       , tweet_fns.get_mentions(tweet) AS mentions
       , author AS mentioned_by
FROM @t;
...

Verwenden von vorhandenem Code mit U-SQL-Assemblys. Häufig möchten Sie vorhandene Codebibliotheken oder sogar .NET-fremden Code verwenden. Wenn Sie .NET-fremden Code (z. B. eine native Bibliothek oder sogar ein völlig anderes Sprachlaufzeitmodul wie Python oder JavaScript) verwenden möchten, müssen Sie für den .NET-fremden Code eine C#-Interoperabilitätsschicht als Wrapper verwenden, die aus U-SQL aufgerufen wird und dann den .NET-fremden Code aufruft. Dabei erfolgt ein Marshalling der Daten zwischen den Komponenten und die Implementierung eines UDO-Schnittstellenvertrags. In diesem Fall müssen die .NET-fremden Codeartefakte wie die nativen DLLs oder die Dateien des anderen Laufzeitmoduls als zusätzliche Dateien hinzugefügt werden. Dies kann mithilfe der Option „Weitere Datei“ der Assemblyregistrierung erfolgen. Diese Dateien werden automatisch für jeden Knoten bereitgestellt, wenn auf die .NET-Assembly in einem Skript verwiesen wird, und sie werden dem Arbeitsverzeichnis der .NET-Assembly, das für den betreffenden Knoten lokal ist, zur Verfügung gestellt.

Wenn Sie vorhandene .NET-Bibliotheken verwenden möchten, müssen Sie die vorhandenen Codebibliotheken als verwaltete Abhängigkeiten für Ihre eigene Assembly registrieren oder (wenn Sie eine Bibliothek erneut verwenden, die direkt in U-SQL verwendet werden kann) diese direkt in Ihrer U-SQL-Datenbank registrieren. In beiden Fällen muss das Skript auf alle .NET-Assemblys verweisen, die vom Skript benötigt werden.

Ich stelle einige Beispiele für diese Registrierungsoptionen im weiteren Verlauf dieses Artikels vor, wenn ich benutzerdefinierte Codeszenarien behandle, in denen es sinnvoll ist, das Erweiterbarkeitsmodell zu verwenden. Diese Szenarien umfassen z. B. Folgendes: Mergen sich überschneidender Bereiche mit einem benutzerdefinierten Reduzierer, Verarbeiten von JSON-Dokumenten, Verarbeiten von Bilddaten und Verarbeiten von räumlichen Daten. Ich gehe der Reihe nach auf diese Szenarien ein.

Mergen sich überschneidender Bereiche mit einem benutzerdefinierten Reduzierer

Angenommen, Sie verwenden eine Protokolldatei, die nachverfolgt, wenn ein Benutzer mit Ihrem Dienst interagiert. Nehmen Sie außerdem an, dass ein Benutzer mit Ihrem Dienst auf verschiedene Weise interagieren kann (z. B. durch Ausführen von Bing-Suchvorgängen von mehreren Geräten oder aus Browserfenstern). Im Rahmen Ihres U-SQL-Auftrags, der die Protokolldatei für die spätere Analyse vorbereitet, möchten Sie sich überschneidende Bereiche mergen.

Wenn die Eingabeprotokolldatei z. B. wie in Abbildung 4 gezeigt aussieht, möchten Sie die sich überschneidenden Bereiche für jeden Benutzer in Abbildung 5 mergen.

Abbildung 4: Protokolldatei mit sich überschneidenden Zeitbereichen

Startzeit  Endzeit  Benutzername
5:00 Uhr  6:00 Uhr  ABC
5:00 Uhr  6:00 Uhr  XYZ
8:00 Uhr  9:00 Uhr  ABC
8:00 Uhr  10:00 Uhr  ABC
10:00 Uhr  14:00 Uhr  ABC
7:00 Uhr  11:00 Uhr  ABC
9:00 Uhr  11:00 Uhr  ABC
11:00 Uhr  11:30 Uhr  ABC
23:40 Uhr  23:59 Uhr  FOO
23:50 Uhr  0:40 Uhr  FOO

Abbildung 5: Protokolldatei nach dem Mergen sich überschneidender Zeitbereiche

Startzeit  Endzeit  Benutzername
5:00 Uhr  6:00 Uhr  ABC
5:00 Uhr  6:00 Uhr  XYZ
7:00 Uhr  14:00 Uhr  ABC
23:40 Uhr  0:40 Uhr  FOO

Wenn Sie das Problem untersuchen, fällt Ihnen zuerst auf, dass ein Element wie eine benutzerdefinierte Aggregation zum Kombinieren der sich überschneidenden Zeitintervalle definiert werden sollte. Wenn Sie sich jedoch die Eingabedaten ansehen, fällt Ihnen Folgendes auf: Da die Daten nicht sortiert sind, müssen Sie den Zustand für alle möglichen Intervalle beibehalten und dann zusammenhanglose Intervalle mergen, wenn Überbrückungsintervalle auftreten, oder Sie müssen die Intervalle für jeden Benutzernamen vorab sortieren, um das Mergen der Intervalle zu vereinfachen.

Die geordnete Aggregation kann auf einfachere Weise horizontal hochskaliert werden. U-SQL stellt jedoch keine geordneten benutzerdefinierten Aggregatoren (User-Defined Aggregators, UDAGGs) bereit. Außerdem generieren UDAGGs normalerweise eine Zeile pro Gruppe. In diesem Fall kann ich jedoch mehrere Zeilen pro Gruppe verwenden, wenn die Bereiche zusammenhanglose Bereiche sind.

Glücklicherweise stellt U-SQL einen skalierbaren UDO bereit, der als „Reduzierer“ (bit.ly/2evGsDA) bezeichnet wird. Dieser Reduzierer kann eine Zeilenmenge basierend auf einem Gruppierungsschlüssel aggregieren, der mithilfe von benutzerdefiniertem Code festgelegt wird.

Schreiben wir also zuerst die U-SQL-Programmlogik. Dabei ist „ReduceSample.Range­Reducer“ der benutzerdefinierte Reduzierer (Reduzierer-UDO) aus der RangeReducer-Assembly, und die Protokolldaten befinden sich in der Datei „/Samples/Blogs/MRys/Ranges/ranges.txt“ (bit.ly/2eseZyw). „-” wird als Spaltentrennzeichen verwendet. Hier ist der Code:

REFERENCE ASSEMBLY RangeReducer;
@in = EXTRACT start DateTime, end DateTime, user string
FROM "/Samples/Blogs/MRys/Ranges/ranges.txt"
USING Extractors.Text(delimiter:'-');
@r =  REDUCE @in PRESORT start ON user
      PRODUCE start DateTime, end DateTime, user string
      READONLY user
      USING new ReduceSample.RangeReducer();
OUTPUT @r
TO "/temp/result.csv"
USING Outputters.Csv();

Der REDUCE-Ausdruck nimmt das Rowset „@in“ als Eingabe an, partitioniert es basierend auf der Benutzerspalte, sortiert die Partitionen basierend auf den Werten in der Startspalte vor und wendet dann den „RangeReducer“ an. Das gleiche Rowsetschema wird für die Ausgabe generiert. Da der Reduzierer nur den Bereich vom Start bis zum Ende anpasst, bleibt die Benutzerspalte unberücksichtigt. Sie markieren sie daher als READONLY. Auf diese Weise erhält das Reduziererframework die Berechtigung, die Daten für diese Spalte automatisch zu übergeben, und erlaubt im Gegenzug dem U-SQL-Abfrageprozessor das aggressive Anwenden von Optimierungen um schreibgeschützte Spalten herum, z. B. das Übertragen von Prädikaten mithilfe von Push für eine schreibgeschützte Spalte vor dem Reduzierer.

Ein Reduzierer wird durch Implementieren einer Instanz von „Microsoft.Analytics.Interfaces.IReducer“ erstellt. In diesem Fall muss nur die abstrakte Methode „Reduce“ überschrieben werden, weil keine Parameter angegeben werden müssen. Sie können den Code in eine C#-Bibliothek für U-SQL kopieren und wie oben beschrieben als „RangeReducer“ der Assembly registrieren. Abbildung 6 zeigt die Implementierung von „RangeReducer“. (Beachten Sie, dass in einigen Beispielen die normalen Codeeinzugspraktiken aufgrund von Platzproblemen nicht beachtet wurden.)

Abbildung 6: Implementierung von „RangeReducer“

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ReduceSample
{
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Init aggregation values
      bool first_row_processed = false;
      var begin = DateTime.MaxValue;
      var end = DateTime.MinValue;
      // Requires that the reducer is PRESORTED on begin and
      // READONLY on the reduce key.
      foreach (var row in input.Rows)
      {
        // Initialize the first interval with the first row if i is 0
       if (!first_row_processed)
        {
         first_row_processed = true; // Mark that the first row was handled
          begin = row.Get<DateTime>("start");
          end = row.Get<DateTime>("end");
          // If the end is just a time and not a date, it can be earlier
          // than the begin, indicating it is on the next day;
          // this let's you fix up the end to the next day in that case
          if (end < begin) { end = end.AddDays(1); }
        }
        else // Handle the remaining rows
        {
          var b = row.Get<DateTime>("start");
          var e = row.Get<DateTime>("end");
          // Fix up the date if end is earlier than begin
          if (e < b) { e = e.AddDays(1); }
          // If begin time is still inside the interval,
          // increase the interval if it is longer
          if (b <= end)
          {
            // If the new end time is later than the current,
            // extend the interval
            if (e > end) { end = e; }
          }
          else // Output the previous interval and start a new one
          {
            output.Set<DateTime>("start", begin);
            output.Set<DateTime>("end", end);
            yield return output.AsReadOnly();
            begin = b; end = e;
          } // if
        } // if
      } // foreach
      // Now output the last interval
      output.Set<DateTime>("start", begin);
      output.Set<DateTime>("end", end);
      yield return output.AsReadOnly();
    } // Reduce
  } // RangeReducer
} // ReduceSample

Der REDUCE-Ausdruck von U-SQL wendet die Reduce-Methode ein Mal für jeden eindeutig identifizierbaren Partitionsschlüssel parallel an. Der Eingabeparameter enthält daher nur die Zeilen für eine angegebene Gruppe, und die Implementierung kann NULL als Ausgabe an viele Zeilen zurückgeben.

Da die PRESORT-Klausel garantiert, dass die Zeilen sortiert sind, kann die innere Programmlogik davon ausgehen, dass die Daten sortiert sind. Da die Benutzerspalte außerdem als READONLY markiert ist, wird die Spalte automatisch übergeben, und Sie können Ihren UDO-Code generischer schreiben, indem Sie sich nur auf die Spalten konzentrieren, die transformiert werden sollen.

Wenn Sie nun den Reduzierer auf eine größere Datenmenge anwenden und einige der Benutzer Ihr System ggf. häufiger als andere nutzen, tritt ein als „Datenschiefe“ (Data Skew) bezeichnetes Problem auf: Einige Benutzer haben große Partitionen und andere Benutzer nur kleine Partitionen. Da garantiert wird, dass der Vertrag des Reduzierers alle Daten für die betreffende Partition sieht, müssen alle Daten auf diesen Knoten übertragen und in einem Aufruf gelesen werden. Durch diese Anforderung kann eine solche Datenschiefe im besten Fall dazu führen, dass die Verarbeitung einiger Partitionen sehr viel länger als die Verarbeitung anderer Partitionen dauert. Im schlechtesten Fall gehen einigen Reduzierern die verfügbaren Arbeitsspeicher- und Zeitressourcen aus (für einen U-SQL-Vertex tritt nach einer Ausführungszeit von ungefähr fünf Stunden ein Timeout auf).

Wenn die Reduzierersemantik assoziativ und kommutativ ist und das Ausgabeschema mit dem Eingabeschema identisch ist, kann ein Reduzierer als rekursiv markiert werden. Das Abfragemodul kann dann große Gruppen in kleinere Untergruppen aufteilen und den Reduzierer rekursiv auf diese Untergruppen anwenden, um das Endergebnis zu berechnen. Diese rekursive Anwendung ermöglicht dem Reduzierer eine bessere Ausgeglichenheit sowie das Parallelisieren im Fall von Datenschiefe. Ein Reduzierer wird als rekursiv markiert, indem die Eigenschaftenanmerkung „SqlUserDefinedReducer(IsRecursive = true)“ verwendet wird:

namespace ReduceSample
{
  [SqlUserDefinedReducer(IsRecursive = true)]
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Insert the code from Figure 6 here
    } // Reduce
  } // RangeReducer
} // ReduceSample

In unserem Fall kann der Reduzierer als rekursiv markiert werden, um die Skalierbarkeit und Leistung zu verbessern, wenn die Verarbeitung die Sortierung der Zeilen in jedem rekursiven Aufruf beibehält.

Sie finden ein Visual Studio-Projekt für das Beispiel in unserem GitHub-Repository unter bit.ly/2ecLe5B.

Verarbeiten von JSON-Dokumenten

Eines der am häufigsten verwendeten Datenformate (nach durch Trennzeichen getrennten Textdateien) ist JSON. Im Gegensatz zu CSV-Dateiformaten stellt U-SQL keine integrierte JSON-Extraktionskomponente zur Verfügung. Die U-SQL-Community stellt jedoch unter bit.ly/2d9O4va eine Beispielassembly bereit, die Unterstützung zum Extrahieren und Verarbeiten von JSON- und XML-Dokumenten bietet.

Diese Lösung verwendet die Json.NET-Bibliothek von Newtonsoft (bit.ly/2evWJbz) für die JSON-Extraktion und „System.XML“ für die XML-Verarbeitung. Die Assembly kann Daten aus einem JSON-Dokument mithilfe von „JsonExtractor“ (bit.ly/2dPARsM) extrahieren, ein JSON-Dokument annehmen und in eine „SqlMap“ aufteilen, um die Navigation und Analyse von JSON-Dokumenten mit der JsonTuple-Funktion (bit.ly/2e8tSuX) zu ermöglichen, und schließlich ein Rowset in eine als JSON formatierte Datei mit „JSONOutputter“ (bit.ly/2e4uv3W) transformieren.

Beachten Sie, dass die Assembly als generischer JSON-Prozessor konzipiert ist. Dies bedeutet, dass sie keine Annahmen bezüglich der JSON-Dokumentstruktur trifft und hinsichtlich der halbstrukturierten Natur von JSON einschließlich heterogen typisierten Elementen (skalar in Vergleich zu strukturiert, verschiedene Datentypen für das gleiche Element, fehlende Elemente usw.) belastbar sein muss. Wenn Sie wissen, dass Ihre JSON-Dokumente ein bestimmtes Schema verwenden, können Sie möglicherweise eine effizientere JSON-Extraktionskomponente erstellen.

Im Gegensatz zum oben beschriebenen Reduziererbeispiel, in dem Sie Ihre eigene Assembly schreiben, die Sie anschließend bereitstellen, ist die Projektmappe in diesem Fall verwendungsbereit. Sie können die Projektmappe aus unseren GitHub-Repository in Visual Studio laden und dann selbst erstellen und bereitstellen, oder Sie verwenden die DLLs aus dem Verzeichnis „bin\Debug“ der Projektmappe.

Wie bereits an früherer Stelle erwähnt, erfordert die Nichtsystemabhängigkeit, dass die Assemblys „Samples.Format“ und „Json.NET“ im U-SQL-Metadatenspeicher registriert sein müssen (Sie können die Newtonsoft-Assembly als verwaltete Abhängigkeit auswählen, wenn Sie die Formatassembly mithilfe des Visual Studio-Tools registrieren), und auf beide Assemblys muss verwiesen werden, wenn JSON-Dokumente verarbeitet werden sollen. Wenn Sie Ihre JSON-Assemblys in Ihrem U-SQL-Katalog unter den Namen [Microsoft.Analytics.Samples.Formats] und [NewtonSoft.Json] in der U-SQL-Datenbank „JSONBlog“ installiert haben (siehe Abbildung 7), können Sie die Assemblys verwenden, indem Sie auf diese zu Beginn Ihrer Skripts wie folgt verweisen:

REFERENCE ASSEMBLY JSONBlog.[NewtonSoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];

Registrieren der Formatassembly in Visual Studio
Abbildung 7: Registrieren der Formatassembly in Visual Studio

Die JSON-Extraktionskomponente implementiert die IExtractor-Schnittstelle von U-SQL. Da JSON-Dokumente vollständig analysiert werden müssen, damit sichergestellt ist, dass sie wohlgeformt sind, muss eine Datei, die ein einzelnes JSON-Dokument enthält, in einem einzelnen Extraktionsvertex verarbeitet werden. Sie geben daher an, dass die Extraktionskomponente den vollständigen Dateiinhalt analysieren muss, indem Sie die AtomicFileProcessing-Eigenschaft auf TRUE festlegen (siehe Abbildung 8). Die Extraktionskomponente kann mit einem optionalen Parameter namens „rowpath“ aufgerufen werden, der die Identifikation der JSON-Objekte ermöglicht, die jeweils mithilfe eines JSONPath-Ausdrucks einer Zeile zugeordnet werden (bit.ly/1EmvgKO).

Abbildung 8: Die JSON-Extraktionskomponente

[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class JsonExtractor : IExtractor
{
  private string rowpath;            
  public JsonExtractor(string rowpath = null)
  {
    this.rowpath = rowpath;
  }
  public override IEnumerable<IRow> Extract(
    IUnstructuredReader input, IUpdatableRow output)
  {
    // Json.NET
    using (var reader = new JsonTextReader(
      new StreamReader(input.BaseStream)))
    {
      // Parse Json
      var root = JToken.ReadFrom(reader);
      // Rows
      // All objects are represented as rows
      foreach (JObject o in SelectChildren(root, this.rowpath))
      {
        // All fields are represented as columns
        this.JObjectToRow(o, output);
        yield return output.AsReadOnly();
      }
    }
  }
}

Die Implementierung der Extraktionskomponente übergibt den Eingabedatenstrom, den das U-SQL-Extraktionsframework in die Extraktionskomponente einspeist, an „JsonTextReader“ von „Json.NET“. Anschließend wird „rowpath“ zum Abrufen des Unterbaums verwendet, der einer Zeile mithilfe von „SelectChildren“ zugeordnet wird. Da JSON-Objekte heterogen sein können, gibt der Code das generische „JObject“ (und kein JArray-Positionsobjekt bzw. keine skalaren Werte) zurück.

Beachten Sie, dass diese Extraktionskomponente das JSON-Dokument in den Arbeitsspeicher lädt. Wenn Ihr Dokument zu groß ist, kann es eine Bedingung „Nicht genügend Arbeitsspeicher“ bewirken. In diesem Fall müssen Sie eine eigene Extraktionskomponente schreiben, die das Dokument streamt, ohne dass das vollständige Dokument in den Arbeitsspeicher geladen werden muss.

Verwenden wir nun die JSON-Extraktionskomponente und die JSON-Tupelfunktion zum Analysieren des komplexen JSON-Dokument aus „/Samples/Blogs/MRys/JSON/complex.json“ (bit.ly/2ekwOEQ), das in Abbildung 9 bereitgestellt wird.

Abbildung 9: Ein JSON-Beispieldokument

[{
  "person": {
    "personid": 123456,
    "name": "Person 1",
    "addresses": {
      "address": [{
        "addressid": "2",
        "street": "Street 2",
        "postcode": "1234 AB",
        "city": "City 1"
      }, {
        "addressid": "2",
        "street": "Street 2",
        "postcode": "5678 CD",
        "city": "City 2"
      }]
    }
  }
}, {
     "person": {
     "personid": 798,
     "name": "Person 2",
     "addresses": {
       "address": [{
         "addressid": "1",
         "street": "Street 1",
         "postcode": "1234 AB",
         "city": "City 1"
     }, {
         "addressid": "4",
         "street": "Street 7",
         "postcode": "98799",
         "city": "City 3"
     }]
   }
  }
}]

Das Format ist ein Array von Personen„objekten“ (technisch Objekte mit jeweils einem Personenschlüssel), die ihrerseits einige Personeneigenschaften und Adressobjekte enthalten. Das U-SQL-Skript in Abbildung 10 extrahiert eine Zeile pro Kombination aus Person/Adresse.

Abbildung 10: U-SQL-Skriptverarbeitung für das JSON-Beispieldokument aus Abbildung 9

DECLARE @input string = "/Samples/Blogs/MRys/JSON/complex.json";
REFERENCE ASSEMBLY JSONBlog.[Newtonsoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
@json =
  EXTRACT personid int,
          name string,
          addresses string
  FROM @input
  USING new JsonExtractor("[*].person");
@person =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(
           addresses, "address")["address"] AS address_array
  FROM @json;
@addresses =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(address) AS address
  FROM @person
       CROSS APPLY
         EXPLODE (JsonFunctions.JsonTuple(address_array).Values)
           AS A(address);
@result =
  SELECT personid,
         name,
         address["addressid"]AS addressid,
         address["street"]AS street,
         address["postcode"]AS postcode,
         address["city"]AS city
  FROM @addresses;
OUTPUT @result
TO "/output/json/persons.csv"
USING Outputters.Csv();

Beachten Sie, dass das Skript den JSONPath-Ausdruck „[*].person“ an die Extraktionskomponente übergibt und auf diese Weise eine Zeile für jedes Personenfeld im Array auf oberster Ebene generiert. Das EXTRACT-Schema wird von der Extraktionskomponente verwendet, um die Eigenschaften des sich ergebenden Objekts in Spalten abzurufen. Da das Adressfeld selbst ein geschachteltes JSON-Dokument ist, erstellt der erste Aufruf der JsonTuple-Funktion eine Zuordnung, die die Adressobjekte enthält. Diese werden dann einer Zeile pro Adresse mit dem CROSS APPLY EXPLODE-Ausdruck zugeordnet. Schließlich werden alle Adresseigenschaften aus dem Zuordnungsdatentyp vorausberechnet, um das Rowset zu generieren. Abbildung 11 zeigt dies.

Abbildung 11: Das durch Verarbeiten des JSON-Dokuments aus Abbildung 9 generierte Rowset

123456 Person 1 2 Straße 2 1234 AB Ort 1
123456 Person 1 2 Straße 2 5678 CD Ort 2
798 Person 2 1 Straße 1 1234 AB Ort 1
798 Person 2 4 Straße 7 98799 Ort 3

Sie finden ein Visual Studio-Projekt für das Beispiel sowie weitere JSON-Verarbeitungsszenarien (einschließlich mehrerer JSON-Dokumente in einer Datei) im GitHub-Repository unter bit.ly/2dzceLv.

Verarbeiten von Bilddaten

In diesem Beispiel verarbeite ich eine größere Menge unstrukturierter Daten: Bilder. Insbesondere möchte ich JPEG-Bilder verarbeiten und einige der JPEG EXIF-Eigenschaften extrahieren sowie eine Miniaturansicht des Bilds erstellen. Glücklicherweise stellt .NET eine Vielzahl von Bildverarbeitungsfunktionen in der System.Drawing-Klasse zur Verfügung. Ich muss also nur die U-SQL-Erweiterungsfunktionen und -operatoren erstellen und die JPEG-Verarbeitung an diese Klassen delegieren.

Hierzu gibt es verschiedene Möglichkeiten. Sie können z. B. versuchen, alle Bilder als Bytearrays in ein Rowset zu laden und dann einzelne benutzerdefinierte Funktionen zum Extrahieren der einzelnen Eigenschaften und Erstellen der Miniaturansicht zu verwenden. Abbildung 12 zeigt dies.

Abbildung 12: Verarbeiten von Bildern in U-SQL durch Laden von Bildern in Zeilen

REFERENCE ASSEMBLY Images;
USING Images;
@image_data =
  EXTRACT image_data byte[]  // Max size of row is 4MB!
        , name string
        , format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new ImageExtractor();
// Use UDFs
@image_properties =
  SELECT ImageOps.getImageProperty(image_data, ImageProperties.copyright)
         AS image_copyright,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_make)
         AS image_equipment_make,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_model)
         AS image_equipment_model,
         ImageOps.getImageProperty(image_data, ImageProperties.description)
         AS image_description
  FROM @image_data
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");

Dieser Ansatz weist jedoch mehrere Nachteile auf:

  • U-SQL-Zeilen dürfen maximal 4 MB groß sein. Die Lösung ist daher auf Bilder mit 4 MB eingeschränkt (abzüglich der Größe der anderen Spalten).
  • Jeder der Funktionsaufrufe kann die Speicherauslastung erhöhen und erfordert U-SQL-Verarbeitung des Bytearrays.

Eine bessere Vorgehensweise besteht daher darin, die Eigenschaftenextraktion und die Miniaturansichterstellung direkt in der benutzerdefinierten Extraktionskomponente vorzunehmen. Abbildung 13 zeigt ein überarbeitetes U-SQL-Skript.

Abbildung 13: Verarbeiten von Bildern in U-SQL durch Extrahieren der Features mit einer Extraktionskomponente

REFERENCE ASSEMBLY Images;
@image_features =
  EXTRACT copyright string,
          equipment_make string,
          equipment_model string,
          description string,
          thumbnail byte[],
          name string,
          format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new Images.ImageFeatureExtractor(scaleWidth:500, scaleHeight:300);
@image_features =
  SELECT *
  FROM @image_features
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");
OUTPUT @image_features
TO @"/output/images/image_features.csv"
USING Outputters.Csv();
@scaled_image =
  SELECT thumbnail
  FROM @image_features
  WHERE name == "GT4";
OUTPUT @scaled_image
TO "/output/images/GT4_thumbnail_2.jpg"
USING new Images.ImageOutputter();

Dieses Skript extrahiert die Eigenschaften und die Miniaturansicht aus den durch das Dateigruppenmuster angegebenen Bildern (bit.ly/2ektTY6): /Samples/Data/Images/{name}.{format}. Die SELECT-Anweisung schränkt dann die Extraktion auf JPEG-Dateien ein, indem ein Prädikat nur für die Formatspalte verwendet wird, das alle Nicht-JPEG-Dateien aus der Extraktion eliminiert (der Optimierer wendet die Extraktionskomponente auf die Dateien an, die dem Prädikat für die Formatspalte genügen). Die Extraktionskomponente stellt eine Option zum Angeben der Abmessungen der Miniaturansicht bereit. Das Skript gibt die Features dann in eine CSV-Datei aus und verwendet eine einfache Ausgabe auf Bytedatenstrom-Ebene, um eine Miniaturansichtdatei für eines der zentral herunterskalierten Bilder zu erstellen.

Abbildung 14 zeigt die Implementierung der Extraktionskomponente.

Abbildung 14: Die Extraktionskomponente für die Features des Bilds

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Drawing;
using System.Drawing.Imaging;
using System.Drawing.Drawing2D;
namespace Images
{
  public static class UpdatableRowExtensions
  {
    public static void SetColumnIfExists<T>(this IUpdatableRow source
                                           , string colName, T value)
    {
      var colIdx = source.Schema.IndexOf(colName);
      if (colIdx != -1)
      { source.Set<T>(colIdx, value); }
    }
  }
  [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
  public class ImageFeatureExtractor : IExtractor
  {
    private int _scaleWidth, _scaleHeight;
    public ImageFeatureExtractor(int scaleWidth = 150, int scaleHeight = 150)
    { _scaleWidth = scaleWidth; _scaleHeight = scaleHeight; }
    public override IEnumerable<IRow> Extract(IUnstructuredReader input
                                             , IUpdatableRow output)
    {
      byte[] img = ImageOps.GetByteArrayforImage(input.BaseStream);
      using (StreamImage inImage = new StreamImage(img))
      {
        output.SetColumnIfExists("image", img);
        output.SetColumnIfExists("equipment_make",
          inImage.getStreamImageProperty(ImageProperties.equipment_make));
        output.SetColumnIfExists("equipment_model",
          inImage.getStreamImageProperty(ImageProperties.equipment_model));
        output.SetColumnIfExists("description",
          inImage.getStreamImageProperty(ImageProperties.description));
        output.SetColumnIfExists("copyright",
          inImage.getStreamImageProperty(ImageProperties.copyright));
        output.SetColumnIfExists("thumbnail",
          inImage.scaleStreamImageTo(this._scaleWidth, this._scaleHeight));
      }
      yield return output.AsReadOnly();
    }
  }
}

Die Extraktionskomponente muss erneut die gesamte Datei analysieren. Sie verwendet „input.BaseStream“, erstellt jetzt im Gegensatz zum Skript in Abbildung 12 jedoch nur ein Bild im Arbeitsspeicher. Die Extraktionskomponente überprüft auch das Vorhandensein der angeforderten Spalten und verarbeitet nur die Daten für die angeforderten Spaltennamen mithilfe der Erweiterungsmethode „SetColumnIfExists“.

Weitere Details finden Sie im Visual Studio-Projekt auf unserer GitHub-Website unter bit.ly/2dngXCE.

Verarbeiten räumlicher Daten

In diesem Beispiel zeige ich, wie die räumliche Typassembly „Microsoft.SqlServer.Types.dll“ von SQL Server in U-SQL verwendet wird. Insbesondere möchte ich die Funktionen der räumlichen Bibliothek in den U-SQL-Skripts als benutzerdefinierte Funktionen verwenden. Für die weiter oben beschriebene JSON-Extraktionskomponente bedeutet dies, dass Sie eine bereits vorhandene Assembly in U-SQL registrieren möchten, ohne eine eigene Assembly zu schreiben.

Zuerst müssen Sie die Assembly aus dem SQL Server 2016 Feature Pack (bit.ly/2dZTw1k) herunterladen und installieren. Wählen Sie die 64-Bit-Version des Installationsprogramms („ENU\x64\SQLSysClrTypes.msi“) aus, damit sichergestellt ist, dass Sie die 64-Bit-Version der Bibliotheken verwenden.

Das Installationsprogramm installiert die verwaltete Assembly „Microsoft.Sql­­Server.Types.dll“ in „C:\Programme (x86)\Microsoft SQL Server\130\SDK\Assemblies“ und die native Assembly „SqlServerSpatial130.dll“ in „\Windows\System32\“. Im nächsten Schritt laden Sie die Assemblys in Ihren Azure Data Lake-Speicher hoch (z. B. in einem Ordner namens „/upload/asm/spatial“). Da das Installationsprogramm die native Bibliothek im Systemordner „C:\Windows\System32“ installiert hat, müssen Sie sicherstellen, dass Sie „SqlServerSpatial130.dll“ vor dem Hochladen aus diesem Ordner kopieren oder dass das verwendete Tool keine Dateisystemumleitung (bit.ly/1TYm9YZ) für Systemordner ausführt. Wenn das Hochladen z. B. mit dem aktuellen ADL-Datei-Explorer von Visual Studio erfolgen soll, müssen Sie die Datei zuerst in ein anderes Verzeichnis kopieren. Andernfalls wird (zum Zeitpunkt, an dem ich diesen Artikel geschrieben habe) die 32-Bit-Version hochgeladen (weil Visual Studio eine 32-Bit-Anwendung ist, die Dateisystemumleitung im ADL-Fenster für die Auswahl der hochzuladenden Datei ausführt). Wenn Sie ein U-SQL-Skript ausführen, das die native Assembly aufruft, wird der folgende (innere) Fehler zur Laufzeit ausgegeben: „Innere Ausnahme aus Benutzerausdruck: Es wurde versucht, ein Programm mit einem falschen Format zu laden. (Ausnahme von HRESULT: 0x8007000B)“.

Nach dem Hochladen der beiden Assemblydateien registrieren Sie diese in einer Datenbank namens „SQLSpatial“ mit dem folgenden Skript:

DECLARE @ASSEMBLY_PATH string = "/upload/asm/spatial/";
DECLARE @SPATIAL_ASM string = @ASSEMBLY_PATH+"Microsoft.SqlServer.Types.dll";
DECLARE @SPATIAL_NATIVEDLL string = @ASSEMBLY_PATH+"SqlServerSpatial130.dll";
CREATE DATABASE IF NOT EXISTS SQLSpatial;
USE DATABASE SQLSpatial;
DROP ASSEMBLY IF EXISTS SqlSpatial;
CREATE ASSEMBLY SqlSpatial
FROM @SPATIAL_ASM
WITH ADDITIONAL_FILES =
  (
    @SPATIAL_NATIVEDLL
  );

Beachten Sie, dass Sie in diesem Fall nur eine U-SQL-Assembly registrieren und die native Assembly als eine starke Abhängigkeit für die U-SQL-Assembly einschließen. Damit die räumlichen Assemblys verwendet werden können, müssen Sie nur auf die U-SQL-Assembly verweisen. Die zusätzliche Datei wird dann automatisch für die Assembly zur Verfügung gestellt. Abbildung 15 zeigt ein einfaches Beispielskript, das die räumliche Assembly verwendet.

Abbildung 15: Verwenden der räumlichen Funktionen in U-SQL

REFERENCE SYSTEM ASSEMBLY [System.Xml];
REFERENCE ASSEMBLY SQLSpatial.SqlSpatial;
USING Geometry = Microsoft.SqlServer.Types.SqlGeometry;
USING Geography = Microsoft.SqlServer.Types.SqlGeography;
USING SqlChars = System.Data.SqlTypes.SqlChars;
@spatial =
    SELECT * FROM (VALUES
                   // The following expression is not using the native DDL
                   ( Geometry.Point(1.0,1.0,0).ToString()),   
                   // The following expression is using the native DDL
                   ( Geometry.STGeomFromText(
                     new SqlChars("LINESTRING (100 100, 20 180, 180 180)"),
                     0).ToString())
                  ) AS T(geom);
OUTPUT @spatial
TO "/output/spatial.csv"
USING Outputters.Csv();

Die SQL-Typbibliothek weist eine Abhängigkeit von der System.Xml-Assembly auf. Sie müssen daher darauf verweisen. Außerdem verwenden einige Methoden die System.Data.SqlTypes-Typen anstelle der integrierten C#-Typen. Da „System.Data“ bereits standardmäßig eingeschlossen ist, können Sie einfach auf den benötigten SQL-Typ verweisen. Der Code in Abbildung 15 ist auf unserer GitHub-Website unter bit.ly/2dMSBm9 verfügbar.

Zusammenfassung: Einige Tipps und bewährte Methoden für UDOs

Dieser Artikel kratzt nur an der Oberfläche der leistungsfähigen Erweiterbarkeitsfunktionen von U-SQL. Er zeigt aber, wie es der U-SQL-Erweiterbarkeitsmechanismus ermöglicht, domänenspezifischen Code erneut zu verwenden. Dabei wird das U-SQL-Erweiterungsframework zum horizontalen Hochskalieren der Verarbeitung für das typische Big Data-Volumen verwendet.

Ein so leistungsfähiges Tool kann jedoch auch schnell falsch verwendet werden. Im Folgenden finden Sie daher einige Tipps sowie Hinweise auf bewährte Methoden.

Benutzerdefinierte Datenformate erfordern häufig eine benutzerdefinierte Extraktions- und möglicherweise auch eine Ausgabekomponente. Es sollte jedoch sehr sorgfältig untersucht werden, ob das Datenformat parallel extrahiert werden kann (z. B. bei Formaten im CSV-Typ), oder ob für die Verarbeitung alle Daten in einer Operatorinstanz verfügbar sein müssen. Außerdem ist es empfehlenswert, die Operatoren so generisch wie möglich zu halten, damit eine Verarbeitung nur stattfindet, wenn eine bestimmte Spalte angefordert wird. Dies kann die Leistung potenziell verbessern.

Bei der Verwendung von UDOs wie Prozessoren, Reduzierern, Kombinierern und Anwendern wird dringend angeraten, zuerst eine reine U-SQL-Lösung in Betracht zu ziehen, die integrierte Operatoren nutzt. Das weiter oben erläuterte Bereichsreduziererskript könnte tatsächlich mit einer klugen Verwendung von Windowing- und Bewertungsfunktionen geschrieben werden. Die folgenden Gründe können dennoch für die Verwendung von UDOs sprechen:

  • Die Programmlogik muss dynamisch auf das Eingabe- oder Ausgabeschema des Rowsets zugreifen können, das verarbeitet wird. Erstellen Sie z. B. ein JSON-Dokument für die Daten in der Zeile, in der die Spalten nicht im voraus bekannt sind.
  • Eine Lösung, die verschiedene benutzerdefinierte Funktionen im SELECT-Ausdruck verwendet, führt zu einer zu großen Speicherauslastung. Sie können Ihren Code in einem Prozessor-UDO so schreiben, dass er bezüglich des Speichers effizienter ist.
  • Sie benötigen einen sortierten Aggregator oder einen Aggregator, der mehr als eine Zeile pro Gruppe generiert, und Sie können beide nicht mit Windowingfunktionen schreiben.

Wenn Sie UDOs verwenden, sollten Sie immer die folgenden Tipps berücksichtigen:

  • Verwenden Sie die READONLY-Klausel, um das Übertragen von Prädikaten durch UDOs mithilfe von Push zu ermöglichen.
  • Verwenden Sie die REQUIRED-Klausel, um das Übertragen der Spaltenbereinigung durch UDOs mithilfe von Push zu ermöglichen.
  • Verwenden Sie für den Fall, dass der Abfrageoptimierer den falschen Plan auswählt, einen Kardinalitätshinweis für Ihren Abfrageausdruck, der einen UDO verwendet.

Michael Rys  ist Principal Program Manager bei Microsoft. Er beschäftigt sich seit den 1980er Jahren mit Datenverarbeitung und Abfragesprachen. Er war Repräsentant von Microsoft in den XQuery- und SQL-Entwurfskomitees und hat SQL Server durch XML, Geospatial und Semantic Search zu weit mehr als einem Managementsystem für relationale Datenbanken gemacht. Zurzeit arbeitet er an Big Data-Abfragesprachen wie SCOPE und U-SQL, wenn er nicht gerade Zeit mit seiner Familie beim Tauchen oder Autocross verbringt. Folgen Sie ihm auf Twitter: @MikeDoesBigData.

Unser Dank gilt den folgenden technischen Experten von Microsoft für die Durchsicht dieses Artikels: Clemens Szyperski, Ed Triou, Saveen Reddy und Michael Kadaner
Clemens Szyperski ist Principal Group Engineering Manager bei Microsoft. Schon seit Jahrzehnten begeistern ihn spezialisierte Sprachen, Tools und Ansätze, die das Erstellen komplexer Softwaresysteme vereinfachen. Er ist zurzeit Leiter der Azure Data Lake U-SQL- und Scope-Teams, wenn er nicht gerade mit seiner Familie eine Segeltour unternimmt. Folgen Sie ihm auf Twitter: @ClemensSzy.

Ed Triou ist Principal Development Lead bei Microsoft. In den letzten 20 Jahren hat er sich schwerpunktmäßig mit Datenprogrammierbarkeit (ODBC, OLEDB, ADO.NET, JDBC, PHP und EDM), beschäftigt. Seine Spezialgebiete sind Compiler und Abfragesprachen (IDL, TSQl, LINQ to SQL/Entitäten, eSQL, SCOPE und U-SQL).  Er ist zurzeit Leiter der U-SQL-Compiler- und -Sprachteams und versucht, täglichen externen und internen Geschäftsvorgängen, die auf ADL und Cosmos mit Exabyteskalierung angewiesen sind, immer einen Schritt voraus zu sein.

Saveen Reddy ist Principal Program Manager bei Microsoft. Seine Schwerpunkte sind der Entwurf und das Erstellen der Azure Data Lake-Plattform, die die Komponenten und Benutzeroberflächen bereitstellt, die alle Big Data-Clouddienste von Microsoft unterstützen. Saveen hat eine Abschlussbewertung von 100 Prozent erhalten für Metal Gear Solid V: The Phantom Pain. Folgen Sie ihm auf Twitter: @saveenr.

Michael Kadaner ist Principal Software Engineer bei Microsoft. Auch nach jahrzehntelanger Erfahrung in verschiedenen Bereichen der Informatik und Softwareentwicklung behauptet er, dass das Schreiben von Programmen eine zielgerichtete Kunst ist und dass Software fehlerfrei sein kann. Er hat eine wahre Leidenschaft für das Lösen komplexer algorithmischer und technischer Probleme und das Implementieren der Lösungen mit Code entwickelt, der genau und elegant und schon durch den Entwurf richtig ist. Seine Freizeit verbringt er mit Lesen und Heimwerkerprojekten.