Januar 2016

Band 31, Nummer 1

Big Data – Datenanalyse in Echtzeit für .NET-Entwickler mit HDInsight

Von Omid Afnan | Januar 2016

Unternehmen sämtlicher Größen beginnen, den Wert ihrer riesigen Datensammlungen und die Notwendigkeit zu erkennen, daraus einen Nutzen zu ziehen. Wenn Unternehmen anfangen, das Thema Big Data anzugehen, ist der erste Schritt meist die Batchverarbeitung ihrer Big Data-Bestände. Dazu zählt beispielsweise das Sammeln und Aggregieren von Weblogdaten, der Benutzerklicks in einer Anwendung, von Telemetriedaten aus IoT-Geräten (Internet of Things, Internet der Dinge) bzw. einer Vielzahl anderer von Benutzern oder Computern generierten Daten. Vor einem Jahr habe ich in einem Artikel (msdn.com/magazine/dn890370) eine grundlegende Webloganalyse mithilfe von Hive in HDInsight vorgestellt. Doch sobald die Vorteile der Batchverarbeitung zum Gewinnen von Erkenntnissen aus Vergangenheitsdaten offenkundig werden, stehen viele Unternehmen vor dem Problem des Umgangs mit Echtzeitdaten und der Frage, wie fortlaufende Datenströme in Echtzeit gesammelt und analysiert werden sollen bzw. darauf reagiert werden soll.

Wie Sie vielleicht schon ahnen, gibt es im Big Data-Segment Technologien zum Erfüllen dieser Anforderungen. Die Microsoft Azure-Plattform bietet leistungsstarke Big Data-Lösungen, zu denen Azure Data Lake und HDInsight zählen. Mit Apache Storm gibt es eine Open-Source-Technologie, die überaus verteilte Analysen in Echtzeit ermöglicht. Storm wird von HDInsight systemeigen unterstützt, der von Azure verwalteten Lösung mit Apache Big Data-Diensten. In diesem Artikel stelle ich ein einfaches, aber leistungsfähiges Beispielszenario vor, in dem mit Storm als wichtigstem Tool ein Datenstrom aus Tweets zum Ermöglichen einer fortlaufenden Analyse in Echtzeit verarbeitet wird.

Wie Sie sehen werden, erleichtert Microsoft das Bewältigen dieser Entwicklungsaufgabe im Vergleich mit anderen Angeboten auf dem Markt dank leistungsstarker Erstellungs- und Debuggingtools in Visual Studio. Die HDInsight-Tools für Visual Studio (die im Azure SDK verfügbar sind) bieten eine Programmier- und Debuggingumgebung, mit der .NET-Entwickler vertraut sind. Diese Tools ermöglichen eine wesentlich einfachere Möglichkeit zum Arbeiten mit Big Data-Technologien als die einfachen Editoren und Befehlszeilentools, die derzeit in der Open-Source-Welt angeboten werden. Während Storm für HDInsight die Programmierung mit Java vollständig unterstützt, ermöglicht Microsoft .NET-Programmierern auch den Einsatz von C# zum Schreiben (und Wiederverwenden) von Geschäftslogik. Anhand der Beispiele in diesem Artikel werden diese .NET-Funktionen veranschaulicht.

Szenario zum Nachverfolgen von Stimmungen

Die Nachverfolgung und Analyse neuer Trends ist kein neues Phänomen. Nachrichtenberichterstattung, Wettertrends und Katastrophenerkennung sind Beispiele aus der Zeit von dem Cloud Computing. Allerdings sind sowohl die Anzahl der Gebiete, in denen eine Trenderkennung gewünscht wird, als auch das Maß, in dem Daten für Analysen zur Verfügung stehen, mit Fortschreiten des Cloudzeitalters unvorstellbar gestiegen. Soziale Netzwerke eignen sich besonders für die Analyse von Stimmungen. Dienste wie Twitter stellen ihre Daten über APIs zur Verfügung. Im Zusammenspiel mit Big Data-Plattformen mit nutzungsabhängiger Zahlung wie HDInsight können Unternehmen beliebiger Größe Stimmungen nun einfacher untersuchen.

Die einfachste Art der Stimmungsanalyse mittels Twitter ist das Zählen, wie oft ein bestimmtes Thema bzw. ein Hashtag in einem bestimmten Zeitraum in Tweets vorkommt. Die Analyse eines kurzen Zeitraums wie beispielsweise einer Minute ist dabei bei Weitem nicht so interessant wie die Analyse eines gesamten Tages mit Untersuchung der Zu- und Abnahme einer Rate. Das Ausmachen von Spitzen bei einem bestimmten Begriff kann für das Erkennen eines Trends von Nutzen sein. Beispielsweise kann das Erkennen von Begriffen im Zusammenhang mit einem Sturm oder Erdbeben sehr schnell einen Hinweis auf Gebiete, die von einer Naturkatastrophe betroffen sind, und den Schweregrad geben.

Zum Veranschaulichen der Grundlagen einer solchen Analyse erläutere ich das Einrichten einer Streamingtopologie zum Sammeln von Twitter-Daten, Auswählen verschiedener Tweets, Berechnen von Metriken, Speichern sämtlicher Daten in einer Datenbank und Veröffentlichen von Ergebnissen. In Abbildung 1 wird diese Topologie gezeigt. Für diesen Artikel habe ich Tweets über eine einfache Stichwortsuche ausgewählt. Die berechneten Metriken sind die Anzahl von Tweets, die mit den Suchkriterien übereinstimmen. Die ausgewählten Tweets werden in einer SQL-Datenbank abgelegt und auch auf einer Website veröffentlicht. Alle Schritte erfolgen in der Azure-Cloud unter Verwendung von Storm, SQL Server und derzeit verfügbaren Webdiensten. Nach Durchlaufen des Beispiels gehe ich auf einige andere Technologien ein, die für das Lösen dieser Art von Problem bei der Analyse von Streamingdaten zur Verfügung stehen.

Topologie für die Stimmungsanalyse
Abbildung 1: Topologie für die Stimmungsanalyse

Grundlegendes zu Storm

Storm ist ein Open-Source-Projekt von Apache (storm.apache.org), mit dem verteilte Berechnungen in Echtzeit auf Datenströme angewendet werden können. Storm gehört zum Hadoop-Ökosystem von Big Data-Verarbeitungstools und wird in HDInsight direkt unterstützt. Storm-Aufträge werden als ein Graph von Verarbeitungsknoten definiert, die mittels Datenströmen in Form von Tupeln verbunden sind. Ein solcher Graph wird in Storm als „Topologie“ bezeichnet. Topologien werden nicht wie andere Abfragen beendet, sondern so lange ausgeführt, bis sie angehalten oder abgebrochen werden.

Im Azure-Verwaltungsportal können Sie einen neuen HDInsight-Cluster erstellen und Storm als Typ auswählen. Azure richtet dann binnen Minuten einen Cluster virtueller Computer ein, auf denen alle benötigten Betriebssystem-, Hadoop- und Storm-Komponenten vorab geladen sind. Ich kann die gewünschte Anzahl von Knoten bestimmen, andere Kern- und Arbeitsspeichergrößen wählen und die Anzahl der Knoten jederzeit ändern. Was die Vereinfachung der Hadoop-Umgebung angeht, konnte ich so bereits viel Zeit und Aufwand beim Beziehen und Konfigurieren mehrerer VMs sparen.

Die Komponenten einer Topologie werden als „Spouts“ und „Bolts“ bezeichnet. Spouts dienen zum Erzeugen von aus Tupeln bestehenden Datenströmen, bei denen es sich im Wesentlichen um Typ-Wert-Paare handelt. Ein Spout ist also ein Codeelement, das weiß, wie Daten gesammelt oder generiert werden, und diese dann in Blöcken ausgibt. Bolts sind Codeeinheiten, die einen Datenstrom nutzen können. Ihre Aufgabe ist z. B. das Bereinigen von Daten oder Berechnen von Statistiken. In solchen Fällen wird meist ein weiterer Datenstrom mit Tupeln an nachgelagerte Bolts ausgegeben. Andere Bolts schreiben Daten in den Speicher oder auf ein anderes System.

Jede dieser Komponenten kann viele Aufgaben parallel ausführen. Aus diesem Grund ist Storm hinsichtlich Skalierbarkeit und Zuverlässigkeit besonders. Ich kann für jede Komponente den Grad an Parallelität angeben. Storm ordnet anschließend die entsprechende Anzahl von Aufgaben zum Ausführen der Logik in meinem Spout oder Bolt zu. Storm bietet Fehlertoleranz, indem Aufgaben verwaltet und nicht erfolgreich ausgeführte Aufgaben automatisch neu gestartet werden. Schließlich wird eine bestimmte Topologie für eine Gruppe von Arbeitsprozessen ausgeführt, bei denen es sich im Wesentlichen um Ausführungscontainer handelt. Arbeitsprozesse können hinzugefügt werden, um die Verarbeitungskapazität einer Topologie zu steigern. Diese Features bilden die wesentlichen Merkmale zum Ermöglichen der Skalierbarkeit und Fehlertoleranz von Storm.

Eine Topologie kann so komplex wie nötig sein, um die Verarbeitung durchzuführen, die für das gesamte Szenario einer Analyse in Echtzeit erforderlich ist. Die Architektur eignet sich für die Wiederverwendung von Komponenten, führt aber auch zu einem anspruchsvollen Verwaltungs- und Bereitstellungsproblem, sobald die Anzahl von Spouts und Bolts zunimmt. Das Visual Studio-Projektkonzept ist eine nützliche Möglichkeit zum Verwalten der Code- und Konfigurationskomponenten, die zum Instanziieren einer Topologie benötigt werden. Da eine Topologie in erster Linie ein grafisches Konzept ist, erweist sich die Möglichkeit der Visualisierung der Topologie während der Entwicklung und des Betriebs des Systems als sehr nützlich. Dies veranschaulicht die Ausführungsansicht der HDInsight-Tools für Visual Studio in Abbildung 2.

Überwachungsansicht einer aktiven Storm-Topologie
Abbildung 2: Überwachungsansicht einer aktiven Storm-Topologie

Die Architektur von Storm basiert auf Apache Thrift, einem Framework, das die Entwicklung von Diensten ermöglicht, die in mehreren Sprachen implementiert werden. Wenngleich viele Entwickler Spouts und Bolts mit Java schreiben, ist dies keine Voraussetzung. Dank Einführung des SCP.Net-Bibliothekspakets kann ich Spouts und Bolts in C# entwickeln. Dieses Paket ist im Download der HDInsight-Tools für Visual Studio enthalten, kann aber auch über NuGet heruntergeladen werden.

Filtern von Tweets nahezu in Echtzeit

Lassen Sie uns nun einen Blick auf die Erstellung der Topologie zum Filtern von Tweet-Datenströmen werfen, um zu sehen, wie diese Komponenten in der Praxis funktionieren. Meine Beispieltopologie besteht aus einem Spout und drei Bolts. Abbildung 2 zeigt die grafische Ansicht dieser Topologie in den HDInsight-Tools für Visual Studio. Wenn ich ein Storm-Projekt zur Ausführung in Azure übermittle, zeigt mir Visual Studio diese grafische Ansicht an. Diese Ansicht wird im Zeitverlauf mit der Anzahl der im System erfolgenden Ereignisse sowie mit etwaigen Fehlerbedingungen aktualisiert, die auf den Knoten auftreten.

„TwitterSpout“ ist hier zuständig für das Abrufen des Datenstroms von Tweets, die ich verarbeiten möchte. Hierzu erfolgt eine Interaktion mit den Twitter-APIs zum Sammeln von Tweets, die in Datentupel umgewandelt und durch den Rest der Topologie strömen können. „TwitterBolt“ wählt den Datenstrom aus und kann Aggregationen durchführen, z. B. Tweets zählen oder diese mit anderen Daten kombinieren, die aus anderen Datenquellen abgerufen wurden. Dieser Bolt gibt basierend auf der ausgeführten Geschäftslogik einen neuen Datenstrom aus (möglicherweise in einem neuen Format). Die Komponenten „AzureSQLBolt“ und „SignalRBroadcastBolt“ nutzen diesen Datenstrom und schreiben Teile der Daten in eine in Azure gehostete SQLServer-Datenbank bzw. eine SignalR-Website.

Da ich meine Storm-Lösung in C# schreibe, kann ich viele vorhandene Bibliotheken nutzen, mit denen meine Entwicklung vereinfacht und beschleunigt wird. Die beiden wichtigsten Pakete für dieses Beispiel sind die Tweetinvi-Bibliotheken auf CodePlex (bit.ly/1kI9sqV) und die SCP.Net-Bibliotheken auf NuGet (bit.ly/1QwICPj).

Das SCP.Net-Framework sorgt für wesentlich weniger Komplexität beim Umgang mit dem Storm-Programmiermodell und bietet Basisklassen zum Kapseln eines Großteils der Aufgaben, die andernfalls manuell erfolgen müssten. Ich beginnen mit dem Erben von der Basisklasse „Microsoft.SCP.ISCPSpout“. Dadurch erhalte ich drei Hauptmethoden, die für einen Spout benötigt werden: „NextTuple“, „Ack“ und „Fail“. „NextTuple“ gibt das nächste Element verfügbarer Daten für den Datenstrom oder nichts aus. Diese Methode wird von Storm in einer engen Schleife aufgerufen und ist die richtige Stelle zum Hinzufügen eines Ruhezustands, wenn es keine auszugebenden Tupel gibt. Dies ist eine Möglichkeit zum Vermeiden einer 100 %-igen Nutzung meiner CPU-Zyklen, da die Topologie fortlaufend ausgeführt wird.

Wenn ich eine garantierte Nachrichtenverarbeitung implementieren möchten, wie z. B. eine Semantik vom Typ „mindestens einmal“, verwende ich die Methoden „Ack“ und „Fail“ zum Implementieren der erforderlichen Handshakes zwischen Bolts. In diesem Beispiel verwende ich keinen Wiederholungsmechanismus, weshalb nur die „NextTuple“-Methode mithilfe von Code implementiert wird, der Tweets aus einem privaten Warteschlangenelement in der „TwitterSpout“-Klasse abruft und an die Topologie weitergibt.

Datenströme innerhalb der Topologie werden als Schemas erfasst, die von einem Spout oder Bolt veröffentlicht werden. Diese werden als Vertrag zwischen Komponenten in der Topologie und auch als Serialisierungs- und Deserialisierungsregeln verwendet, die SCP.Net beim Übertragen der Daten verwendet. Die „Context“-Klasse dient zum Speichern von Konfigurationsinformationen für jede Instanz eines Spouts oder Bolts. Das Schema der vom Spout ausgegebenen Tupeln wird in der „Context“-Klasse gespeichert und von SCP.Net zum Erstellen von Komponentenverbindungen verwendet.

Werfen wir nun einen Blick auf die „TwitterSpout“-Klasse, die in Abbildung 3 teilweise gezeigt wird.

Abbildung 3: Initialisierung der „TwitterSpout“-Klasse

public TwitterSpout(Context context)
{
  this.context = context;
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  this.context.DeclareComponentSchema(new ComponentStreamSchema(
    null, outputSchema));
  // Specify your Twitter credentials
  TwitterCredentials.SetCredentials(
    ConfigurationManager.AppSettings["TwitterAccessToken"],
    ConfigurationManager.AppSettings["TwitterAccessTokenSecret"],
    ConfigurationManager.AppSettings["TwitterConsumerKey"],
    ConfigurationManager.AppSettings["TwitterConsumerSecret"]);
  // Setup a Twitter Stream
  var stream = Tweetinvi.Stream.CreateFilteredStream();
  stream.MatchingTweetReceived += (sender, args) => { NextTweet(args.Tweet); };
  // Setup your filter criteria
  stream.AddTrack("China");
  stream.StartStreamMatchingAnyConditionAsync();
}

Abbildung 3 zeigt die Initialisierung des Kontexts für diesen Spout unter Verwendung eines während des Starts der Topologie übergebenen Kontexts. Dieser Kontext wird anschließend durch Hinzufügen einer Schemadefinition aktualisiert. Ich erstelle ein „Dictionary“-Objekt, dem ich einen Bezeichner für den Datenstromtyp (DEFAULT_STREAM) und eine Liste der Typen für alle Felder in meinem Tupel hinzufüge, in diesem Fall „SerializableTweet“. Der Kontext enthält nun die Schemadefinition, die ich befolgen muss, wenn ich Tupel in dieser Klasse ausgebe, und wenn ich sie in „TwitterBolt“ nutze.

Der Rest dieses Codeausschnitts zeigt die Einrichtung des Twitter-Datenstroms. Das Tweetinvi-Paket bietet Abstraktionen für die REST- und Streaming-APIs von Twitter. Nach Codierung der entsprechenden Anmeldeinformationen instanziiere ich einfach die Art von Quelle, die ich verwenden möchte. Bei Streamingquellen kann ich einen von mehreren Typen wählen, so z. B. gefilterte, Stichproben- oder Benutzerdatenströme. Diese bieten vereinfachte Oberflächen für das Filtern nach Stichwörtern in sämtlichen Tweets, das Erstellen von Stichproben in nach dem Zufallsprinzip gewählten öffentlichen Tweets und das Nachverfolgen von Ereignissen, die mit einem bestimmten Benutzer verknüpft sind. Hier verwende ich den gefilterten Datenstrom, der die Auswahl von Tweets aus allen öffentlichen Tweets ermöglicht, indem eine Überprüfung auf das Vorhandensein eines der Stichwörter erfolgt.

Hier führe ich die gewünschte Filterung von Tweets im Spout durch, da die Tweetinvi-APIs diese Aufgabe erleichtern. Ich kann auch eine Filterung in der Komponente „TwitterBolt“ sowie andere Berechnungen oder Aggregationen durchführen, die für die Tweets stattfinden sollen. Die Filterung im Spout ermöglicht mir zu einem frühen Zeitpunkt das Verringern des Umfangs des Streamings von Daten durch die Topologie. Die Leistungsstärke von Storm besteht jedoch darin, dass die Verarbeitung großer Datenvolumen in jeder Komponente der Topologie möglich ist, indem eine horizontale Skalierung erfolgt. Storm bietet eine nahezu lineare Skalierung mit hinzugefügten Ressourcen, sodass ich mehr Arbeitsprozesse hinzufügen kann, um auftretende Engpässe mittels Skalierung zu vermeiden. HDInsight unterstützt diesen Ansatz, indem es mir die Auswahl der Größe meines Clusters und der Typen von Knoten bei der Einrichtung überlässt und mich später Knoten zum Cluster hinzufügen lässt. Dank dieses horizontalen Skalierungsansatzes kann ich Storm-Cluster bilden, die Millionen von Ereignissen pro Sekunde verarbeiten. In Rechnung gestellt wird mir die Anzahl der in meinem Cluster ausgeführten Knoten, weshalb ich einen Kompromiss zwischen Kosten und Skalierung wählen muss.

Das einzige andere Element, das in Abbildung 3 hervorzuheben ist, ist die Registrierung eines Rückrufs für das „Tweetinvi“-Streamobjekt, der erfolgt, wenn ein Tweet gefunden wird, der meinen Kriterien entspricht. Die „NextTweet“-Methode ist dieser Rückruf, über den der bereitgestellte Tweet der zuvor erwähnten privaten Warteschlange in der „TwitterSpout“-Klasse hinzugefügt wird:

public void NextTweet(ITweet tweet)
{
  queue.Enqueue(new SerializableTweet(tweet));
}

Die Bolts für meine Topologie sind auf ähnliche Weise programmiert. Sie sind von der „Microsoft.SCP.ISCPBolt“-Klasse abgeleitet und müssen die „Execute“-Methode implementieren. Hier wird das Tupel als allgemeiner „SCPTuple“-Typ übergeben und muss zunächst in den ordnungsgemäßen Typ umgewandelt werden. Anschließend kann ich C#-Code schreiben, mit dessen Hilfe die erforderliche detaillierte Verarbeitung erfolgt. In diesem Fall verwende ich einfach eine globale Variable zum Kumulieren der Anzahl von Tupeln, die vom Bolt erkannt werden, und protokolliere die Anzahl und den Text des Tweets. Schließlich gebe ich einen neuen Typ von Tupel aus, den nachgelagerte Bolts nutzen können. Hier ist der Code:

public void Execute(SCPTuple tuple)
{
  var tweet = tuple.GetValue(0) as SerializableTweet;
  count++;
  Context.Logger.Info("ExecuteTweet: Count = {0}, Tweet = {1}", count, tweet.Text);
  this.context.Emit(new Values(count, tweet.Text));
}

Bei der Einrichtung eines Bolts muss ich ein Eingabe- und Ausgabeschema angeben. Das Format entspricht exakt der vorangegangenen Schemadefinition für einen Spout. Ich definiere einfach eine weitere „Dictionary“-Variable mit dem Namen „outputSchema“ und listet die Typen „integer“ und „string“ der Ausgabefelder auf (siehe Abbildung 4).

Abbildung 4: Angeben von Eingabe- und Ausgabeschema für „TwitterBolt“

public TwitterBolt(Context context, Dictionary<string, Object> parms)
{
  this.context = context;
  Dictionary<string, List<Type>> inputSchema =
    new Dictionary<string, List<Type>>();
  inputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(SerializableTweet) });
  Dictionary<string, List<Type>> outputSchema =
    new Dictionary<string, List<Type>>();
  outputSchema.Add(Constants.DEFAULT_STREAM_ID,
    new List<Type>() { typeof(long), typeof(string) });
  this.context.DeclareComponentSchema(
    new ComponentStreamSchema(inputSchema,
    outputSchema));
}

Die anderen Bolts befolgen dasselbe Muster, rufen aber spezifische APIs für SQL Azure und SignalR auf. Der letzte wichtige Schritt ist das Definieren der Topologie durch Aufzählen der Komponenten und ihrer Verbindungen. Hierfür muss eine weitere Methode in alle Spouts und Bolts implementiert werden, und zwar die „Get“-Methode, die ein Objekt dieser Klasse mit einer „Context“-Variablen instanziiert, die von „SCPContext“ während des Starts der Storm-Aufgabe aufgerufen wird. SCP.Net instanziiert einen untergeordneten C#-Prozess, der Ihre C#-Spout- oder Bolt-Aufgabe mithilfe der folgenden Delegatenmethode startet:

return new TwitterSpout(context);

Nach der Einrichtung der Spouts und Bolts kann ich nun die Topologie erstellen. Hierfür bietet SCP.Net wiederum eine Klasse und Hilfsfunktionen. Ich erstelle eine von „Microsoft.SCP.Topology.Topology­Descriptor“ abgeleitete Klasse und überschreibe die „GetTopologyBuilder“-Methode. In dieser Methode verwende ich ein Objekt des Typs „TopologyBuilder“, das die Methoden „SetSpout“ und „SetBolt“ bereitstellt. Diese Methoden ermöglichen mir das Angeben des Namens und Eingabe- und Ausgabeschemas der Komponente. Sie erlauben mir auch das Angeben des „Get“-Delegaten, der zum Initialisieren der Komponente verwendet wird, und, was am wichtigsten ist, der vorgelagerten Komponente, die mit der aktuellen Komponente verbunden werden soll. Abbildung 5 zeigt den Code zum Definieren meiner Topologie.

Abbildung 5: Erstellen der Topologie für die Twitter-Analyse

namespace TwitterStream
{
  [Active(true)]
  class TwitterTopology : TopologyDescriptor
  {
    public ITopologyBuilder GetTopologyBuilder()
    {
      TopologyBuilder topologyBuilder = new TopologyBuilder(
        typeof(TwitterTopology).Name + DateTime.Now.ToString("-yyyyMMddHHmmss"));
      topologyBuilder.SetSpout(
        typeof(TwitterSpout).Name,
        TwitterSpout.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"tweet"}}
        },
        1);
      topologyBuilder.SetBolt(
        typeof(TwitterBolt).Name,
        TwitterBolt.Get,
        new Dictionary<string, List<string>>()
        {
          {Constants.DEFAULT_STREAM_ID, new List<string>(){"count", "tweet"}}
        },
        1).shuffleGrouping(typeof(TwitterSpout).Name);
      topologyBuilder.SetBolt(
        typeof(SqlAzureBolt).Name,
        SqlAzureBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      topologyBuilder.SetBolt(
        typeof(SignalRBroadcastBolt).Name,
        SignalRBroadcastBolt.Get,
        new Dictionary<string, List<string>>(),
        1).shuffleGrouping(typeof(TwitterBolt).Name);
      return topologyBuilder;
    }
  }
}

Das vollständige Twitter-Analyseprojekt kann in Visual Studio unter Verwendung des Projekttyps „Storm“ erstellt werden. Dieses Projekt ermöglicht das komfortable Anordnen der verschiedenen benötigten Komponenten auf einfache und vertraute Weise und deren Anzeige im Projektmappen-Explorer (siehe Abbildung 6). Im Kontextmenü eines Projekts können Sie über die Option „Hinzufügen | Neues Element“ Komponenten wie Bolts und Spouts hinzufügen. Bei Treffen einer Wahl unter den Storm-Elementtypen werden eine neue Datei und die Gliederung aller erforderlichen Methoden hinzugefügt. Mithilfe des Storm-Projekts in Visual Studio kann ich Verweise auf Bibliotheken wie Tweetinvi hinzufügen, entweder direkt oder über NuGet. Das Übermitteln der in Azure auszuführenden Topologie erfolgt über einen Einfachklick im Kontextmenü von Projektmappen-Explorer. Alle erforderlichen Komponenten werden in den HDInsight Storm-Cluster meiner Wahl hochgeladen, und die Topologie wird übermittelt.

Übermitteln einer Topologie aus Projektmappen-Explorer
Abbildung 6: Übermitteln einer Topologie aus Projektmappen-Explorer

Nach der Übermittlung wird die Topologieansicht in Abbildung 2 angezeigt, über die ich den Zustand meiner Topologie überwachen kann. Storm lässt für meine Topologie mehrere Zustände zu, z. B. aktiviert, deaktiviert oder beendet, und ermöglicht eine Umverteilung von Aufgaben an Arbeitsprozesse basierend auf Skalierbarkeitsparametern. Ich kann alle diese Zustandsänderungen in Visual Studio verwalten und den aktuellen Fluss von Tupeln beobachten. Um Komponenten detailliert zu untersuchen und Probleme zu beheben, kann ich eine Detailsuche in einer einzelnen Komponente wie „SqlAzureBolt“ durchführen, für die eine Fehlerbedingung angezeigt wird (die rote Kontur und Markierung in der Topologieansicht). Durch Doppelklicken auf diesen Bolt werden detailliertere Statistiken zum Tupelfluss sowie eine Übersicht der Fehler im Bolt angezeigt. Sie können sogar auf den Link „Fehlerport“ klicken, um zu den vollständigen Protokollen der einzelnen Aufgaben zu gelangen, ohne Visual Studio verlassen zu müssen.

Den Code und das Projekt für die einfache in diesem Artikel behandelte Topologie finden Sie auf GitHub im Repository „MicrosoftBigData“. Wechseln Sie zum Ordner „HDInsight“ und Beispielprojekt „TwitterStream“. Weitere Artikel und Beispiele finden Sie unter bit.ly/1MCfsqM.

Komplexere Analysen

Das vorgestellte Beispiel einer Storm-Topologie ist einfach. Ich habe verschiedene Möglichkeiten, die Leistungsfähigkeit und Komplexität meiner Echtzeitverarbeitung in Storm zu erhöhen.

Wie bereits erwähnt, kann die Anzahl der einem Storm-Cluster zugewiesenen Ressourcen in HDInsight den Anforderungen entsprechend skaliert werden. Die Leistung meines Systems kann ich anhand der Daten in der Laufzeitansicht der Topologie in Visual Studio beobachten (siehe Abbildung 2). Hier sehe ich die Anzahl der ausgegebenen Tupel und Executors sowie die Aufgaben und Latenzen. Abbildung 7 zeigt die Ansicht im Azure-Verwaltungsportal, die weitere Details zur Anzahl der Knoten, ihres Typs und der Menge liefert, die ich gerade verwende. Basierend auf diesen Informationen kann ich entscheiden, ob ich meinen Cluster skalieren, d. h. ihm weitere Supervisorknoten (Workerknoten) hinzufügen soll. Diese Hochskalierung erfordert keinen Neustart und erfolgt binnen Minuten, sobald ich in meiner Topologieansicht in Visual Studio oder im Verwaltungsportal eine Umverteilung auslöse.

Azure-Verwaltungsportalansicht eines Storm-Clusters
Abbildung 7: Azure-Verwaltungsportalansicht eines Storm-Clusters

Die meisten Analyseanwendungen werden auf mehrere unstrukturierte Big Data-Datenströme angewendet. In diesem Fall enthält die Topologie mehrere Spouts und Bolts, die von mehr als einem Spout gelesen werden können. Dies kann in der Konfiguration der Topologie mühelos ausgedrückt werden, indem im Aufruf der „SetBolt“-Methode mehrere Eingaben angegeben werden. Die Geschäftslogik für den Umgang mit mehreren Quellen im selben Bolt ist allerdings komplexer, da einzelne Tupel mit unterschiedlichen Datenstrom-IDs eingehen. Sollte die Komplexität des Geschäftsproblems zunehmen, werden vermutlich auch relationale oder strukturierte Datenquellen während der Verarbeitung benötigt. Während Spouts für warteschlangenähnliche Datenquellen ideal sind, werden von einem Bolt eher relationale Daten abgerufen. Wiederum ermöglichen die flexible Implementierung von Bolts und die Verwendung von C# oder Java das mühelose Programmieren des Zugriffs auf eine Datenbank mithilfe etablierter APIs oder Abfragesprachen. Die hier entstehende Komplexität beruht auf der Tatsache, dass diese Aufrufe remote aus Storm-Containern in einem Cluster auf dem Datenbankserver erfolgen. SQL Azure und HDInsight arbeiten in derselben Azure-Fabric und interagieren problemlos, doch es gibt auch andere Optionen für cloudbasierte Dienste, die auch verwendet werden können.

Die Storm-Laufzeit ermöglicht mir das Festlegen bzw. Optimieren zahlreicher detaillierter Verhalten des Systems. Viele dieser Einstellungen werden als Konfigurationsparameter angezeigt, die auf Topologie- oder Aufgabenebene angewendet werden können. Der Zugriff auf diese Einstellungen, die zum Optimieren des gesamten Storm-Workloads dienen, erfolgt in der „Microsoft.SCP.Topology.StormConfig“-Klasse. Beispiele dieser Einstellungen sind die maximale Anzahl ausstehender Tupel pro Spout, Tupel des Typs „tick“ und eine Ruhemodusstrategie für Spouts. Andere Änderungen an der Topologie können im Topologie-Generator erfolgen. In meiner Beispieltopologie ist das Streaming zwischen allen Komponenten auf „Shuffle grouping“ festgelegt. Für jede bestimmte Komponente kann und wird das Storm-Ausführungssystem viele einzelne Aufgaben erstellen. Diese Aufgaben sind unabhängige Arbeitsthreads, die auf mehreren Kernen bzw. in mehreren Containern ausgeführt werden können, um den Workload für den Bolt auf mehrere Ressourcen zu verteilen. Ich kann steuern, wie Aufgaben von einem Bolt zum nächsten übergeben werden. Durch Wählen von „Shuffle grouping“ bestimme ich, dass ein beliebiges Tupel an einen beliebigen Arbeitsprozess im nächsten Bolt übergeben werden kann. Ich kann auch eine andere Option wie „Field grouping“ wählen, die bewirkt, dass Tupel basierend auf dem Wert eines bestimmten Felds im Tupel an denselben Arbeitsprozess übergeben werden. Diese Option dient zum Steuern des Datenflusses für Vorgänge mit einem Zustand, z. B. eine laufende Zählung eines bestimmten Worts im Tweet-Datenstrom.

Schließlich kann ein System für die Analyse in Echtzeit Teil einer größeren Analysepipeline innerhalb einer Organisation sein. Ein System für die Analyse von Weblogs weist meist eine große batchorientierte Komponente auf, die die Weblogs eines Webdiensts täglich verarbeitet. Dabei werden Übersichten über den Datenverkehr auf Websites und geringfügig aggregierte Daten bereitgestellt, mit deren Hilfe Big Data-Experten Muster erkennen können. Basierend auf dieser Analyse kann sich das Team für das Erstellen von Echtzeittriggern bei bestimmten Verhalten entscheiden, z. B. bei Erkennung von Systemausfällen oder böswilliger Nutzung. Im zweiten Fall ist eine Echtzeitanalyse von Protokoll- oder Telemetriedatenströmen erforderlich. Doch womöglich besteht eine Abhängigkeit von Referenzdaten, die vom Batchsystem täglich aktualisiert werden. Solch größere Pipelines erfordern ein Tool zur Workflowverwaltung, das die Synchronisierung von Aufgaben in einer Vielzahl von Berechnungsmodellen und Technologien zulässt. Azure Data Factory (ADF) bietet ein System zur Workflowverwaltung, das die Azure-Analyse- und Speicherdienste systemeigen unterstützt und eine aufgabenübergreifende Koordinierung basierend auf der Verfügbarkeit von Eingabedaten ermöglicht. ADF unterstützt HDInsight und Azure Data Lake Analytics sowie das Verschieben von Daten zwischen Azure Storage, Azure Data Lake Storage, Azure SQL Database und lokalen Datenquellen.

Andere Streamingtechnologien

In diesem Artikel habe ich die Grundlagen einer Analyse von Streamingdaten in Echtzeit mithilfe von Storm in HDInsight vorgestellt. Storm kann aber auch in Ihrem Rechenzentrum bzw. Ihrer Testumgebung auf Ihrem eigenen VM-Cluster eingerichtet werden. Die Storm-Distribution kann über Hortonworks, Cloudera oder direkt von Apache bezogen werden. Die Installation und Konfiguration ist in diesen Fällen zeitaufwendiger, wobei jedoch die Konzepte und Codeartefakte identisch sind.

Spark (spark.apache.org) ist ein weiteres Apache-Projekt, das für Echtzeitanalysen genutzt werden kann und bereits sehr beliebt ist. Spark unterstützt die allgemeine Big Data-Verarbeitung, doch dank seiner Unterstützung der speicherinternen Verarbeitung und seiner Bibliothek von Streamingfunktionen ist es auch eine interessante Option für die Hochleistungsverarbeitung von Daten in Echtzeit. HDInsight bietet Spark-Clustertypen, die Sie zum Experimentieren mit dieser Technologie nutzen können. Zum Dienst gehören Zeppelin und Jupyter Notebooks genannte Schnittstellen, die das Erstellen von Abfragen in diesen Sprachen und Anzeigen interaktiver Ergebnisse ermöglichen. Sie sind ideal für die Untersuchung von Daten und die Entwicklung von Abfragen riesiger Datasets.

Das Interesse an einer Streaminganalyse in Echtzeit nimmt zu, da Unternehmen sich immer komplexer werdenden Szenarien bei der Big Data-Analyse gegenüber sehen. Zugleich reifen und entwickeln sich Technologien in diesem Bereich immer weiter, um neue Möglichkeiten des Gewinnens von Einblicken aus Big Data bieten zu können. Auf diesen Seiten werden auch künftig Artikel erscheinen, die sich mit der Nutzung von Technologien wie Spark und Azure Data Lake Analytics beschäftigen werden.


Omid Afnanist ein leitender Programm-Manager im Azure Big Data-Team und arbeitet an Implementierungen verteilter Rechensysteme und dazugehöriger Entwicklertools. Er lebt und arbeitet in China. Sie erreichen ihn unter omafnan@microsoft.com.

Unser Dank gilt den folgenden technischen Experten für die Durchsicht dieses Artikels: Asad Khan und Ravi Tandon
Ravi Tandon (Microsoft), Asad Khan (Microsoft)

Asad Khan ein leitender Programm-Manager in der Microsoft Big Data-Gruppe mit Schwerpunkt auf Hadoop-basierten Umgebungen in der Cloud unter Verwendung von Azure HDInsight Service. Derzeit liegt sein Fokus auf Spark und der Echtzeitanalyse über Apache Storm. In den vergangenen Jahren hat er an der Entwicklung von Datenzugriffstechnologien der nächsten Generation von Microsoft einschließlich Hadoop, OData und BI für Big Data gearbeitet. Asad hat einen Master-Abschluss der Stanford University.

Ravi Tandon ist ein leitender Softwareentwicklung im Microsoft Azure HDInsight-Team. Er arbeitet an Apache Storm- und Apache Kafka-Lösungen für Microsoft Azure HDInsight.