Dieser Artikel wurde maschinell übersetzt.

StreamInsight

Zügeln des Ereignisdatenstroms: Schnelles annäherndes Zählen

Michael Meijer

 

So haben Sie einen voluminösen und potentiell unendlichen Strom von Veranstaltungen wie Clickstream, Sensordaten, Kreditkarten-Transaktionsdaten oder Internet-Verkehr.Es ist unmöglich ist, alle Ereignisse zu speichern oder sie in mehreren Durchläufen zu analysieren.Warum nicht auf ein Fenster der jüngsten Ereignisse, Analyse zu vereinfachen zurückgreifen?

Genommen Sie an, Sie interessante Ereignisse in einem großen Fenster, die über die neuesten N-Ereignisse des Streams zählen möchten.Ein naive Ansatz zu zählen erfordert alle N Ereignisse im Speicher und eine vollständige Iteration über sie sein.Wie das Fenster bei der Ankunft eines neuen Ereignisses Dias, dessen älteste Ereignis abläuft und das neue Ereignis wird eingefügt.Über das neue Fenster von vorne zählen verschwendet die Bearbeitungszeit auf N-2 Veranstaltungen gemeinsam verbracht.Igitt!Dieser Artikel beschreibt eine Datenstruktur um Raum Speicherauslastung und Bearbeitungszeit auf einen kleinen Bruchteil der was mit dieser Methode erforderlich sein würde, und unterstützt eine Ereignisrate überschreiten viele Tausende von Ereignissen pro Sekunde auf Standardhardware reduzieren.Dieser Artikel veranschaulicht auch die Datenstruktur in einen benutzerdefinierten Stream-Operator in c# für die Microsoft streaming-Verantwortlicher, StreamInsight 2.1 einbetten.Fortgeschrittene Programmierkenntnisse sind erforderlich, um die Folgen, und einige Erfahrung mit StreamInsight kann nützlich sein.

Eine Geschichte des Zählens

Vor dem Tauchen in StreamInsight, werde ich das scheinbar triviale Problem des Zählens untersuchen.Der Einfachheit halber davon ausgehen der Stream verfügt über Ereignisse mit Nutzlasten von 0 oder 1 — uninteressant und interessante Veranstaltungen, bzw. (egal was "interessant" in Ihrem spezifischen Szenario darstellt).Die Anzahl der Einsen wird über ein (fester Größe) auf der Grundlage der Anzahl Fenster, enthält die neuesten N-Ereignisse gezählt.Naiv zählen nimmt o (n) Zeit und Raum.

Als eine aufmerksame Leser kam Sie wahrscheinlich mit der Idee der Erhaltung der Graf zwischen aufeinander folgenden Fenstern und diese für neue Einsen und Dekrementieren erhöhen es für abgelaufene 1s, teilen die N-2-Ereignisse, die bereits verarbeitet.Gutes denken!Aufrechterhaltung der Graf jetzt Zeit O(1).Jedoch sollten Sie für eine abgelaufene Veranstaltung oder nicht verringern?Sofern Sie die eigentliche Veranstaltung wissen, kann nicht die Anzahl die beibehalten werden.Leider wissen die Ereignisse, bis sie abgelaufen sind, erfordert das gesamte Fenster im Speicher — d. h. nimmt O(N) Platz.Eine andere Strategie wäre die uninteressante Ereignisse herauszufiltern und zählen nur die verbleibenden interessante Veranstaltungen.Aber das verringert nicht Komplexitätstheorie und lässt Sie mit einem Fenster mit variabler Größe.

Kann das Gedächtnis-Tier werden gezähmt?Es kann!Allerdings erfordert es einen Kompromiss zwischen der Verarbeitung Zeit und Speicherplatz auf Kosten der Genauigkeit.Die wegweisende Studie von Mayur Datar, Aristides Gionis, Piotr Indyk und Rajeev Motwani mit dem Titel "Pflege Stream Statistiken über Automatik-Fenster" (stanford.io/SRjWT0) beschreibt eine Datenstruktur, die exponentielle Histogramm genannt.Es unterhält geschätzte Anzahl über die letzten N-Ereignisse mit einem berandeten relative Fehler ε.Dies bedeutet, dass überhaupt mal:

|exact count – approximate count|  ≤ ε, where 0 < ε < 1 
       exact count

Konzeptionell speichert das Histogramm Ereignisse in Eimern. Jeder Eimer deckt zunächst ein Ereignis, so es hat eine Anzahl von 1 und einem Zeitstempel des Ereignisses, die es abdeckt. Wenn ein Ereignis eintrifft, abgelaufen Eimer (die abgelaufene Ereignisse) werden entfernt. Ein Eimer wird nur für eine interessante Veranstaltung erstellt. Eimer im Laufe der Zeit erstellt werden, sind sie zusammengeführt, um Speicher zu sparen. Eimer werden zusammengeführt, so dass sie exponentiell wachsenden von zählt die jüngsten zum letzten Eimer, d. h., 1, 1,..., 2, 2,..., 4, 4,..., 8, 8 usw.. Auf diese Weise wird die Anzahl der Buckets in der Fenstergröße N. logarithmische Genauer gesagt, es erfordert O (1⁄ε n Log N) Zeit und Raum für die Wartung. Alle bis auf den letzten Eimer decken nur nicht abgelaufenen Ereignisse. Der letzte Eimer deckt mindestens einer nicht abgelaufenen Veranstaltung. Die Anzahl muss geschätzt werden, wodurch den Fehler bei der Angleichung der insgesamt Anzahl. Daher ist der letzte Eimer klein genug, um die Obergrenze der relative Fehler respektieren aufzubewahren.

Im nächsten Abschnitt wird die Umsetzung des exponentiellen Histogramms in c# mit einem absoluten Minimum an Mathematik behandelt. Lesen Sie die oben genannten Zeitung für die komplizierten Details. Ich werde den Code zu erklären und follow-up mit einem Pen-and-Paper-Beispiel. Das Histogramm ist ein Baustein für die StreamInsight benutzerdefinierte Stream Operator in diesem Artikel entwickelt.

Eimer oder nicht Eimer

Hier ist die Eimer-Klasse:

[DataContract]
public class Bucket
{
  [DataMember]
  private long timestamp;
  [DataMember]
  private long count;
  public long Timestamp {
    get { return timestamp; }
    set { timestamp = value; } }
  public long Count { get { return count; } set { count = value; } }
}

Es hat eine Zählung der (interessanten) Ereignisse, die es abdeckt und einen Zeitstempel des letzten Ereignisses, die es abdeckt. Nur der letzte Eimer kann abgelaufene Veranstaltungen, decken, wie erwähnt, aber sie müssen mindestens eine nicht abgelaufenen Veranstaltung abdecken. Also, alle, aber die letzten Eimer Grafen sind sehr genau. Der letzte Graf der Eimer muss durch das Histogramm geschätzt werden. Eimer, enthält nur abgelaufene Ereignisse selbst abgelaufen sind und das Histogramm entfernt werden können.

Mit nur zwei Operationen das exponentielle Histogramm stellt sicher eine Obergrenze der relative Fehler ε auf die Anzahl der interessanten Veranstaltungen über die jüngsten Ereignisse in N. Eine Operation ist für die Aktualisierung des Histogramms mit neuen und abgelaufene Veranstaltungen, pflegen die Eimer. Das andere ist für die Abfrage der ungefähren Anzahl von den Eimern. Die Gliederung des Histogramm-Klasse zeigt sich an Abbildung 1. Neben der verknüpften Liste der Eimer sind seine wichtigsten Variablen die Fenstergröße (n), die relativen Fehler-Obergrenze (Epsilon) und die zwischengespeicherten Summe aller Eimer zählt (Gesamt). Im Konstruktor werden die angegebene Fenstergröße, die angegebenen relativen Fehler-Obergrenze und eine anfängliche leere Liste der Buckets festgelegt.

Abbildung 1 die exponentielle Histogramm-Klasse-Gliederung

[DataContract]
public class ExponentialHistogram
{
  [DataMember]
  private long n;
  [DataMember]
  private double epsilon;
  [DataMember]
  private long total;
  [DataMember]
  private LinkedList<Bucket> buckets;
  public ExponentialHistogram(long n, double epsilon)
  {
    this.
n = n;
    this.epsilon = epsilon;
    this.buckets = new LinkedList<Bucket>();
  }
  public void Update(long timestamp, bool e) { ...
}
  protected void ExpireBuckets(long timestamp) { ...
}
  protected void PrependNewBucket(long timestamp) { ...
}
  protected void MergeBuckets() { ...
}
  public long Query() { ...
}
}

Die Aufrechterhaltung des Histogramms erfolgt durch diese Updatemethode:

public void Update(long timestamp, bool eventPayload)
{
  RemoveExpiredBuckets(timestamp);
  // No new bucket required; done processing
  if (!eventPayload)
    return;
  PrependNewBucket(timestamp);
  MergeBuckets();
}

Es übernimmt einen diskreten Timestamp, im Gegensatz zu Wanduhr Zeit, um zu bestimmen, was sind die neuesten N-Veranstaltungen. Dies dient zum Suchen und Entfernen von abgelaufenen Eimer. Wenn das neue Ereignis eine Nutzlast von 0 (falsch) hat, wird Sie angehalten. Wenn das neue Ereignis eine Nutzlast von 1 (wahr) hat, ein neuer Eimer erstellt und in die Liste der Eimer vorangestellt. Das echte Feuerwerk sind die Eimer zusammengeführt. Die Methoden, die von der Updatemethode aufgerufen werden nacheinander behandelt.

Hier ist der Code für die Beseitigung der Eimer:

protected void RemoveExpiredBuckets(long timestamp)
{
  LinkedListNode<Bucket> node = buckets.Last;
  // A bucket expires if its timestamp
  // is before or at the current timestamp - n
  while (node != null && node.Value.Timestamp <= timestamp - n)
  {
    total -= node.Value.Count;
    buckets.RemoveLast();
    node = buckets.Last;
  }
}

Die Durchquerung von den ältesten (letzten) Eimer beginnt und endet mit der ersten nicht abgelaufenen Eimer. Jeder Eimer, dessen jüngste Ereignis Zeitstempel abgelaufen ist — deren Timestamp nicht größer als der aktuelle Zeitstempel abzüglich der Fenstergröße ist — ist aus der Liste entfernt. Hier kommt der diskrete Timestamp ins Spiel. Die Summe aller Eimer zählt (Gesamt) wird um die Anzahl der jeweils abgelaufenen Eimer verringert.

Nachdem abgelaufene Veranstaltungen und Eimer entfallen für das neue Ereignis verarbeitet wird:

protected void PrependNewBucket(long timestamp)
{
  Bucket newBucket = new Bucket()
  {
    Timestamp = timestamp,
    Count = 1
  };
  buckets.AddFirst(newBucket);
  total++;
}

Ein neuer Eimer für die Veranstaltung mit einer Nutzlast von 1 (wahr) wird mit einem Zähler von 1 und einen Zeitstempel gleich den aktuellen Zeitstempel erstellt. Der neue Eimer wird zur Liste der Eimer vorangestellt und die Summe aller Eimer zählt (Gesamt) wird erhöht.

Die Speicher-Platz sparend und Fehler-umschließenden Magie ist bei der Verschmelzung der Eimer. Der Code ist aufgeführt, Abbildung 2. Eimer werden zusammengeführt, so dass in Folge Eimer exponentiell zählt, d. h., 1, 1,..., 2, 2,..., 4, 4,..., 8, 8 usw.. Die Anzahl der Buckets mit der gleichen Anzahl richtet sich nach der Wahl der Obergrenze der relative Fehler ε. Die Gesamtzahl der Eimer wächst logarithmisch mit der Größe des Fensters n, die die Speicher-Platzersparnis erklärt. Wie viele Eimer wie möglich werden zusammengeführt, aber die letzten Bucketanzahl wird klein gehalten genug (im Vergleich zu der Summe der anderen Eimer Grafen) zu gewährleisten, wird der relative Fehler begrenzt.

Abbildung 2 Zusammenführen Eimer im Histogramm

protected void MergeBuckets()
{
  LinkedListNode<Bucket> current = buckets.First;
  LinkedListNode<Bucket> previous = null;
  int k = (int)Math.Ceiling(1 / epsilon);
  int kDiv2Add2 = (int)(Math.Ceiling(0.5 * k) + 2);
  int numberOfSameCount = 0;
  // Traverse buckets from first to last, hence in order of
  // descending timestamp and ascending count
  while (current != null)
  {
    if (previous != null && previous.Value.Count == current.Value.Count)
      numberOfSameCount++;
    else
      numberOfSameCount = 1;
    // Found k/2+2 buckets of the same count?
if (numberOfSameCount == kDiv2Add2)
    {
      // Merge oldest (current and previous) into current
      current.Value.Timestamp = previous.Value.Timestamp;
      current.Value.Count = previous.Value.Count + current.Value.Count;
      buckets.Remove(previous);
      // A merged bucket can cause a cascade of merges due to
      // its new count, continue iteration from merged bucket
      // otherwise the cascade might go unnoticed
      previous = current.Previous;
    }
    else
    {
      // No merge, continue iteration with next bucket 
      previous = current;
      current = current.Next;
    }
  }
}

Formeller Eimer haben nicht abnehmen zählt vom ersten (letzten) zu den letzten (älteste) Eimer in der Liste. Die Eimer-Grafen sind auf Zweierpotenzen beschränkt. Lassen Sie k = 1⁄εund k⁄2 eine ganze Zahl sein, sonst ersetzen Sie diese durch. Mit Ausnahme der letzten Bucketanzahl, es werde mindestens k⁄2 und bei den meisten k⁄2 + 1 Eimer der gleichen Anzahl. Wann immer es k⁄2 + 2 Eimer des gleichen Grafen, die ältesten beiden werden in einen Eimer mit zweimal der Graf von den ältesten Eimer und die jüngste ihrer Zeitstempel zusammengeführt. Wann immer zwei Eimer zusammengeführt werden, weiterhin Traversierung von der zusammengeführten Eimer. Die Zusammenführung kann eine Kaskade von Zusammenführungen verursachen. Andernfalls wird Traversierung von den nächsten Eimer.

Um ein Gefühl für die Graf-Näherung zu bekommen, schauen Sie sich das Histogramm-Query-Methode:

public long Query()
{
  long last = buckets.Last != null ?
buckets.Last.Value.Count : 0;
  return (long)Math.Ceiling(total - last / 2.0);
}

Die Summe der Eimer Grafen bis zu den letzten Eimer ist exakt. Der letzte Eimer muss mindestens eine nicht abgelaufenen Veranstaltung abdecken, sonst der Eimer abgelaufen ist und entfernt. Die Anzahl muss geschätzt werden, da sie abgelaufene Ereignisse abdecken kann. Durch Schätzung der tatsächlichen Graf von den letzten Eimer als die Hälfte der letzten Eimer Graf, ist der absolute Fehler dieser Schätzung nicht größer als die Hälfte dieser Bucketanzahl. Die insgesamt Anzahl wird durch die Summe aller Eimer zählt (Gesamt) minus die Hälfte der letzten Eimer Count geschätzt. Um sicherzustellen, dass der absolute Fehler ist innerhalb der Grenzen der der relative Fehler der letzten Eimer Einfluss muss klein genug um die Summe der anderen Eimer Grafen verglichen werden. Zum Glück ist das Merge-Verfahren dafür.

Verlasse die Codebeispiele und Erklärungen bis zu diesem Zeitpunkt Sie erstaunt über die Funktionsweise des Histogramms? Im folgenden Beispiel durchlesen.

Angenommen, Sie haben ein neu initialisierten Histogramm mit Fenster Größe n = 7 und relative Fehler Obergrenze ε = 0.5, also k = 2. Das Histogramm entwickelt, wie in Abbildung 3, und eine schematische Übersicht über diesem Histogramm wird dargestellt, Abbildung 4. In Abbildung 3, Zusammenführungen sind auf Timestamps, 5, 7 und 9. Ein kaskadiertes Merge ist am Timestamp 9. Ein abgelaufener Eimer ist mit der Zeit 13. Ich gehe ins Detail dazu.

Abbildung 3 Beispiel für die exponentielle Histogramm

 

A Schematic Overview of the Histogram Depicted in Figure 3
Abbildung 4 schematische Übersicht des Histogramms in Abbildung 3 dargestellt

Die erste Veranstaltung ist wirkungslos. Bei der fünften Veranstaltung rührt eine Zusammenführung der ältesten Eimer es gibt Textfeld: k⁄2 + 2 Eimer mit der gleichen Anzahl von 1. Auch hier erfolgt eine Zusammenführung auf der siebten Veranstaltung. Bei der neunten Veranstaltung ergießt sich eine Zusammenführung in einem anderen zusammenführen. Beachten Sie, dass nach der siebten Veranstaltung, die erste Veranstaltung läuft. Keine Eimer trägt eine abgelaufene Timestamp bis zum 13. Event. Bei der 13. Veranstaltung der Eimer mit dem Zeitstempel 6 nicht mehr deckt mindestens einer nicht abgelaufenen Veranstaltung und so abläuft. Beachten Sie, dass der beobachtete relative Fehler deutlich weniger als die Obergrenze der relative Fehler ist.

In Abbildung 4, ein gepunktetes Feld ist die Größe des Fensters zu diesem Zeitpunkt; Es enthält die Eimer und impliziert die Spanne der Ereignisse abgedeckt. Ein ausgefülltes Feld ist ein Eimer mit Timestamp an der Spitze und Zähler auf Unterseite. Lage A zeigt das Histogramm am Timestamp 7 mit Pfeilen zu den gezählten Veranstaltungen. Lage B zeigt das Histogramm am Timestamp 9. Der letzte Eimer deckt abgelaufene Ereignisse. C zeigt das Histogramm am Timestamp 13. Der Eimer mit dem Zeitstempel 6 abgelaufen.

Nachdem man es alle zusammen, schrieb ich eine kleine Demonstration-Programm für die exponentielle Histogramm (Check-out der Source-Code-Download für diesen Artikel). Die Ergebnisse sind in Abbildung 5 dargestellt. Es simuliert einen Strom von 100 Millionen Ereignisse Graf basierende Fenstergröße N 1 Million Ereignisse. Jedes Ereignis hat eine Nutzlast von 0 oder 1 mit 50-prozentige Chance. Es schätzt die ungefähre Anzahl der 1s mit einem willkürlich gewählten relativer Fehler-Obergrenze ε 1 % oder 99 % Genauigkeit. Die Speicher-Einsparungen des Histogramms sind riesig im Vergleich zu Windows; die Anzahl der Buckets ist weit weniger als die Anzahl der Ereignisse im Fenster. Auf einem Computer mit einem Intel 2,4 GHz Dual-Core-Prozessor und 3 GB RAM unter Windows 7 wird eine Ereignisrate von ein paar hunderttausend Ereignissen pro Sekunde erreicht.

Empirical Results for the Exponential Histogram
Abbildung 5 empirische Ergebnisse für die exponentielle Histogramm

Eine Schönheit namens StreamInsight

Vielleicht sind Sie Fragen, was Microsoft StreamInsight ist und wo es passt. Dieser Abschnitt enthält einige Grundlagen. StreamInsight ist eine robuste, leistungsstarke, geringem Aufwand, nahe Null-Latenz und extrem flexibel-Engine für die Verarbeitung auf Streams. Es ist derzeit in der Version 2.1. Die Vollversion erfordert eine SQL Server -Lizenz, obwohl eine trial-Version verfügbar ist. Es hat ausgeführt, entweder als eigenständiger Dienst oder eingebettet in-Prozess.

Das Herzstück des streaming Datenverarbeitung ist ein Modell mit zeitlichen Streams von Ereignissen. Im Prinzip ist es eine potenziell unendliche und umfangreiche Datensammlung kommen im Laufe der Zeit. Denken Sie an die Börsenkurse, Wetter-Telemetrie, Überwachung, macht Web klickt, Internet-Verkehr, Mautstationen und so weiter. Jedes Ereignis im Stream hat einen Header mit Metadaten und eine Nutzlast von Daten. In der Kopfzeile des Ereignisses wird ein Zeitstempel auf ein Minimum gehalten. Ereignisse können stetig, zeitweise oder vielleicht in Bursts von bis zu vielen tausend pro Sekunde ankommen. Veranstaltungen gibt es in drei Varianten: Ein Ereignis kann auf einen Punkt in der Zeit beschränkt werden. für einen bestimmten Zeitraum gültig sein; oder für ein offenes Intervall (Rand) gültig sein. Neben Events aus dem Stream wird eine spezielle Satzzeichen-Ereignis von der Engine namens Common Time Increment (CTI) ausgestellt. Ereignisse können nicht in den Stream mit einem Zeitstempel kleiner als die CTI-Zeitstempel eingefügt werden. Tatsächlich, bestimmen CTI-Veranstaltungen das Ausmaß auf dem Ereignisse außerhalb der Reihenfolge eintreffen können. Zum Glück übernimmt StreamInsight dies.

Heterogenen Quellen der Eingabe und senken der Ausgabedatenströme müssen irgendwie passen in dieses Modell angepasst. Die Ereignisse in der (abfragbare) zeitliche Streams werden in ein IQStreamable <TPayload> erfasst. Ereignis Nutzlasten sind konzeptionell durch Enumeration gezogen oder geschoben durch Beobachtung in den Stream. Daher zugrunde liegenden Daten kann ausgesetzt werden durch ein IEnumerable <T> / IQueryable <T> (Reaktive Erweiterung) oder IObservable <T> / IQbservable <T> (Reactive Extension), parametrisierte bzw. mit dem Typ der Daten zugänglich gemacht werden. Die Verarbeitungs-Engine überlassen sie die Wartung von zeitlichen Aspekten. Konvertierung von und zu den verschiedenen Schnittstellen ist möglich.

Die Quellen und senken gerade diskutiert live auf den Grenzen, während die eigentliche Verarbeitung innerhalb von Abfragen geschieht. Eine Abfrage ist eine grundlegende Einheit der Komposition in LINQgeschrieben. Er kontinuierlich verarbeitet Ereignisse aus einem oder mehreren Streams und gibt einen anderen Stream. Abfragen sind zu projizieren, filter, Gruppe anzuwenden, Multicast-, Betrieb/Aggregate, Join, Fenster und Union-Veranstaltungen. Operatoren können benutzerdefinierte sein. Sie arbeiten über Ereignisse (inkrementell) oder Windows (inkrementell), wie sie ankommen.

Ein Hinweis auf Windowing ist in Ordnung. Windowing Partitionen einen Stream in endlichen Teilmengen der Ereignisse, die zwischen aufeinander folgenden Fenster überlappen können. Windowing erzeugt effektiv einen Strom von Windows, von einer IQWindowedStreamable <TPayload> reflektiert in StreamInsight. Derzeit werden drei verschiedene Arten von Windowing-Konstrukte unterstützt: Graf-basierte, zeitbasierte und Snapshot-Fenster. Count-basierte Windows zu decken, den neuesten N-Ereignisse und schieben Sie die Ankunft eines neuen Ereignisses ablaufenden die älteste und die neueste einfügen. Die neuesten Ereignisse in der letzten u. Zeit-basierte Windows-Abdeckung­Val Zeit und Folie durch einige Intervall (auch hüpfen oder tumbling genannt). Snapshot-Windows werden angetrieben durch Ereignis-Start und Endzeiten; die ist, für jedes Paar der nächsten Veranstaltung beginnen und Endzeit, ein Fenster erstellt. Im Gegensatz zum Zeit-basierte Windows angetrieben von Abständen entlang der Zeitachse, unabhängig von Veranstaltungen sind nicht entlang der Zeitachse Snapshot Windows behoben.

Das kratzt nur an der Oberfläche. Weitere Informationen sind aus mehreren Quellen, einschließlich der online-Entwicklerhandbuch (bit.ly/T7Trrx), "A Hitchhiker's Guide to StreamInsight 2.1 Abfragen" (bit.ly/NbybvY), CodePlex Beispiele, die StreamInsight-Teamblog (blogs.msdn.com/b/streaminsight) und andere.

Zusammenfassung

Die Fundamente gelegt. Zu diesem Zeitpunkt sind Sie wahrscheinlich Fragen wie ungefähre zählen in StreamInsight zum Leben gebracht wird. Kurz gesagt, ist einige (zeitliche) Quellbach Zeitpunkt Veranstaltungen, mit Nutzlast von 0 oder 1, erforderlich. Es wird eine Abfrage zugeführt, die die ungefähre Anzahl der Einsen über die neuesten N-Ereignisse mit das exponentielle Histogramm berechnet. Die Abfrage führt zu einigen (zeitliche) Stream-zu einem bestimmten Zeitpunkt Ereignisse — tragen die ungefähre Anzahl —, ist eine Senke zugeführt.

Beginnen wir mit einem benutzerdefinierten Operator für ungefähre zählen. Sie könnten versucht sein, die neuesten N-Ereignisse mithilfe des Graf-basierte Windowing-Konstrukts zu erfassen sein. Einmal darüber nachdenken! Das würde die Speicher sparenden Vorteile des exponentiellen Histogramms trotzen. Warum ist das so? Das Konstrukt zwingt ganze Windows Ereignisse im Speicher gehalten werden. Es ist durch das exponentielle Histogramm nicht erforderlich, da es einen gleichwertigen impliziten Begriff der Windowing durch den Erhalt der Eimer hat. Darüber hinaus ist mit einen Operator über Windows nicht inkrementell, d. h. ohne Verarbeitung der Ereignisse eintreffen, aber nur dann, wenn ein Fenster (nächstes) verfügbar ist. Die Lösung ist ein benutzerdefinierter Stream-Operator, ohne explizite Windowing auf der Abfrage erstellt. Der Code ist aufgeführt, Abbildung 6.

Abbildung 6 benutzerdefinierte Stream-Operator-Implementierung

[DataContract]
public class ApproximateCountUDSO : CepPointStreamOperator<bool, long>
{
  [DataMember]
  private ExponentialHistogram histogram;
  [DataMember]
  private long currentTimestamp;  // Current (discrete) timestamp
  public ApproximateCountUDSO(long n, double epsilon)
  {
    histogram = new ExponentialHistogram(n, epsilon);
  }
  public override IEnumerable<long> ProcessEvent(
    PointEvent<bool> inputEvent)
  {
    currentTimestamp++;
    histogram.Update(currentTimestamp, inputEvent.Payload);
    yield return histogram.Query();
  }
  public override bool IsEmpty
  {
    get { return false; }
  }
}

Der Betreiber leitet sich von der abstrakten CepPointStreamOperator < TInputPayload, TOutputPayload >. Es hat eine exponentielle Histogramm-Instanzvariable. Beachten Sie die Dekoration mit DataContract und DataMember-Attributen. Dies informiert StreamInsight wie den Operator serialisiert — z. B. aus Gründen der Ausfallsicherheit. Die Operator-Überschreibungen der IsEmpty-Operator an, es ist nicht leer, d. h. der Operator ist stateful. Dadurch wird verhindert, dass StreamInsight messing mit dem Operator, wenn Speichernutzung zu minimieren. Die ProcessEvent-Methode ist Kern des Betreibers. Sie erhöht den aktuellen Zeitstempel als (diskreten) und leitet diese zusammen mit der Ereignisnutzlast des Histogramms Update-Methode. Das Histogramm behandelt die Buckets und wird für die ungefähre Anzahl abgefragt. Stellen Sie sicher, die Rendite-Rendite-Syntax zu verwenden, die der Betreiber aufzählbare macht. Operatoren werden im Allgemeinen in einigen Erweiterungsmethode, die versteckt in einem Utility-Klasse eingeschlossen. Dieser Code zeigt, wie es gemacht wird:

public static partial class Utility
{
  public static IQStreamable<long> ApproximateCount(
    this IQStreamable<bool> source, long n, double epsilon)
  {
    return source.Scan(() => new ApproximateCountUDSO(n, epsilon));
  }
}

Das war's! Stecken Sie eine Anfrage über die Erweiterungsmethode den Operator. Ein bisschen zusätzlichen Code wird benötigt, um ihre Verwendung tatsächlich wird. Hier ist eine triviale Quellstream:

public static partial class Utility
{
  private static Random random = new Random((int)DateTime.Now.Ticks);
  public static IEnumerable<bool> EnumeratePayloads()
  {
    while (true)  // ad infinitum
    {
      bool payload = random.NextDouble() >= 0.5;
      yield return payload;
    }
  }
}

Dies generiert zufällige Nutzlasten von 0en und 1en. Die Rendite-Rendite-Syntax wird daraus eine aufzählbare Quelle. Legen Sie es in eine Utility-Klasse, wenn man so will.

Die berüchtigte Program-Klasse zeigt sich an Abbildung 7. Die prozessinterne eingebettete StreamInsight-Server-Instanz erstellt. Eine streaming Verarbeitung (Metadaten) Container, z. B. für benannte Streams, und so weiter fragt wird eine so genannte Anwendungsinstanz mit dem Namen ApproximateCountDemo erstellt. Eine aufzählbare Quelle-zu einem bestimmten Zeitpunkt Ereignisse wird definiert, mit der Nutzlast schaffende Dienstprogrammmethode wie oben beschrieben. Es ist in einen zeitlichen Stream-zu einem bestimmten Zeitpunkt Ereignisse umgewandelt. Die Abfrage wird mit LINQ definiert und wählt die ungefähre Operator-Grafen über dem Quellstream berechnete. Dies ist, wo die Erweiterungsmethode für den benutzerdefinierten Operator kommt in handliches. Es ist mit einer Fenstergröße und Obergrenze relative Fehler Neustart. Als nächstes ist die Ausgabe die Abfrage in eine aufzählbare Waschbecken, verwandelt Strippen die temporalen Eigenschaften. Schließlich ist die Senke, durchlaufen, damit aktiv ziehen die Ereignisse über die Pipeline. Führen Sie das Programm und genießen Sie seine Nummer-Knirschen-Ausgabe auf dem Bildschirm zu.

Abbildung 7 einbetten und Ausführung in StreamInsight

class Program
{
  public const long N = 10000;
  public const double Epsilon = 0.05;
  static void Main(string[] args)
  {
    using (Server server = Server.Create("StreamInsight21"))
    {
      var app = server.CreateApplication("ApproximateCountDemo");
      // Define an enumerable source
      var source = app.DefineEnumerable(() =>
        Utility.EnumeratePayloads());
      // Wrap the source in a (temporal) point-in-time event stream
      // The time settings determine when CTI events
      // are generated by StreamInsight
      var sourceStream = source.ToPointStreamable(e =>
        PointEvent.CreateInsert(DateTime.Now, e),
        AdvanceTimeSettings.IncreasingStartTime);
      // Produces a stream of approximate counts
      // over the latest N events with relative error bound Epsilon
      var query =
        from e in sourceStream.ApproximateCount(N, Epsilon) select e;
      // Unwrap the query's (temporal) point-in-time
      // stream to an enumerable sink
      var sink = query.ToEnumerable<long>();
      foreach (long estimatedCount in sink)
      {
        Console.WriteLine(string.Format(
          "Enumerated Approximate count: {0}", estimatedCount));
      }
    }
  }
}

Um kurz rekapitulieren, erläutert dieser Artikel die Anzahl die über ein Fenster von Ereignissen in logarithmischer Zeit und Raum mit oberen begrenzt Fehler mit eine exponentiellen Histogramm-Datenstruktur ungefähre. Das Histogramm ist eingebettet in einen benutzerdefinierten StreamInsight-Operator.

Das Histogramm und die Betreiber können für die Unterstützung variabler Größe Fenster, z. B. Windows zeitbasierte ausgeweitet werden. Dies erfordert das Histogramm die Fenster Intervall/Timespan, anstatt die Fenstergröße zu kennen. Eimer abgelaufen sind, wenn ihre Zeitstempel vor das neue Event-Zeitstempel abzüglich der Fenster-Timespan ist. Eine Erweiterung zur Berechnung der Varianz wird vorgeschlagen, in der Papier-, "Aufrechterhaltung Varianz und K–Medians über Data Stream Windows," von Brian Babcock, Mayur Datar, Rajeev Motwani und Liadan O'Callaghan (stanford.io/UEUG0i). Neben Histogramme sind andere sogenannte Synopse-Strukturen in der Literatur beschrieben. Sie können Stichproben, Schwergewichten, Quantile usw. vorstellen.

Der Quellcode dieser Artikel ist in c# 4.0 mit Visual Studio 2010 geschrieben und erfordert StreamInsight 2.1. Der Code ist für den Gebrauch unter der Microsoft Public License (Ms-PL) kostenlos. Beachten Sie, dass es für pädagogische Zwecke entwickelt wurde und war weder optimiert noch für Produktionsumgebungen getestet.

Michael Meijer ist als Software Engineer bei CIMSOLUTIONS BV, wo er es Beratung bietet und Entwicklung von Softwarelösungen für Unternehmen in den ganzen Niederlanden. Sein Interesse für StreamInsight und streaming Datenverarbeitung begann während seiner Forschung an der Universität Twente, Enschede, Niederlande, er einen Master Of Science-Abschluss in Informatik Science–Information Systeme erhielt.

Unser Dank gilt den folgenden technischen Experten für die Durchsicht dieses Artikels: Erik Hegeman, Roman Schindlauer und Bas Stemerdink