Este artigo foi traduzido por máquina.

StreamInsight

Restringindo o Fluxo do Evento: Contagem rápida aproximada

Michael Meijer

Baixar o código de exemplo

 

Então você tem um fluxo volumoso e potencialmente infinito de eventos como cliques, dados do sensor, dados de transações de cartão de crédito ou tráfego de Internet. É inviável para armazenar todos os eventos ou analisá-los em várias passagens. Porque não recorrer a uma janela de acontecimentos recentes para simplificar a análise?

Suponha que você deseja contar o número de eventos interessantes em uma janela grande, cobrindo os últimos acontecimentos de N do fluxo. Uma abordagem ingênua para contagem exige que todos os eventos de N estar na memória e uma iteração completa sobre eles. Como a janela desliza sobre a chegada de um novo evento, expira seu evento mais antigo e o novo evento é inserido. Contando sobre a janela nova do zero desperdiça o tempo de processamento gastado na N-2 eventos compartilhados. ECA! Este artigo explica uma estrutura de dados para reduzir o uso de espaço de memória e tempo de processamento para uma pequena fração do que seria necessário com esse método, apoiando-se uma taxa de evento superior a muitos milhares de eventos por segundo em hardware de mercadoria. Este artigo também mostra como incorporar a estrutura de dados em um operador de fluxo definido pelo usuário em c# para o processador streaming de dados da Microsoft StreamInsight 2.1. Habilidades de programação intermediárias são obrigadas a acompanhar, e alguma experiência com StreamInsight pode vir a calhar.

Um conto de contar

Antes de mergulhar na StreamInsight, eu vou investigar o problema aparentemente trivial de contagem. Para simplificar, supor que o fluxo tem eventos com cargas de 0 ou 1 — eventos interessantes e desinteressantes, respectivamente (independentemente do que constitui "interessante" em seu cenário específico). O número de 1s é contado sobre uma janela baseada em contagem (tamanho fixo), contendo os mais recentes eventos de N. Ingênuo contagem leva o (n) tempo e espaço.

Como um leitor perspicaz, você provavelmente surgiu com a idéia de manter a contagem entre janelas consecutivas e incrementando-o para novo 1s e diminuindo ele para 1s expirado, compartilhando os eventos de N-2 já processados. Bem pensado! Manter a contagem agora leva tempo (1). No entanto, você deve diminuir para um evento expirado ou não? Se você não sabe o evento real, a contagem não pode ser mantida. Infelizmente, a saber os eventos até que eles tenham expirado requer toda a janela na memória — ou seja, leva o (n) espaço. Outra estratégia pode ser para filtrar os eventos desinteressantes e contar apenas os restantes interessantes eventos. Mas isso não reduz a complexidade computacional e deixa-lo com uma janela de tamanho variável.

A besta de memória pode ser domada? Sim, pode! No entanto, ela requer um compromisso entre o espaço de tempo e memória em detrimento da precisão de processamento. O artigo seminal de Mayur Datar, Aristides Gionis, Piotr Indyk e Rajeev Motwani intitulado "Mantendo fluxo estatísticas sobre deslizamento Windows" (stanford.io/SRjWT0) descreve uma estrutura de dados chamada histograma exponencial. Ele mantém uma contagem aproximada sobre os últimos acontecimentos de N com um erro relativo limitado ε. Isso significa que em todos os tempos:

 

|exact count – approximate count|  ≤ ε, where 0 < ε < 1         exact count

Conceitualmente, o histograma armazena eventos em baldes. Cada balde inicialmente cobre um evento, por isso tem uma contagem de 1 e um timestamp do evento abrange. Quando chega a um evento, expiraram baldes (cobrindo eventos expirados) são removidos. Um balde é criado apenas para um evento interessante. Como baldes são criados ao longo do tempo, elas são mescladas para economizar memória. Baldes são mescladas para que eles tenham exponencialmente crescente conta do mais recente ao último balde, isto é, 1, 1,..., 2, 2,..., 4, 4,..., 8, 8 e assim por diante. Desta forma, o número de recipientes é logarítmico do tamanho de janela s. Mais precisamente, ele requer O (1⁄ε n log N) tempo e espaço para manutenção. Todos, mas o último balde cobrem eventos só não decorrido. O último balde cobre pelo menos um evento não decorrido. Sua contagem deve ser estimada, o que faz com que o erro na aproximação a contagem geral. Assim, o último balde deve ser mantido pequeno o suficiente para respeitar o limite superior de erro relativo.

Na próxima seção, a implementação do histograma exponencial em c# é discutida com um mínimo de matemática. Leia o livro acima mencionado para os intrincados detalhes. Vou explicar o código e o acompanhamento com um exemplo de papel e caneta. O histograma é um bloco de construção para o StreamInsight operador de fluxo definido pelo usuário desenvolvido neste artigo.

Balde ou não da cubeta

Aqui é a classe de balde:

[DataContract] public class Bucket {   [DataMember]   private long timestamp;   [DataMember]   private long count;   public long Timestamp {     get { return timestamp; }     set { timestamp = value; } }   public long Count { get { return count; } set { count = value; } } }

Tem uma contagem dos eventos (interessantes) cobre e um carimbo de hora do evento mais recente que abrange. Apenas o último balde pode cobrir eventos expirados, como mencionado, mas ele deve cobrir pelo menos um evento não decorrido. Portanto, todos, mas o último conta de balde são exato. A última contagem de balde deve ser estimada pelo histograma. Baldes contendo apenas expirados eventos são expirou-se e podem ser removidos do histograma.

Usando apenas duas operações, o histograma exponencial garante um limite superior de erro relativo ε sobre a contagem de eventos interessantes sobre os eventos mais recentes de N. Uma operação é para atualizar o histograma com eventos novos e expirados, mantendo os baldes. O outro é para consultar a contagem aproximada de baldes. O contorno de classe do histograma é mostrado em Figura 1. Ao lado a lista vinculada de baldes, suas principais variáveis são o tamanho da janela (n), o limite superior de erro relativo (epsilon) e a soma em cache de todas as contagens de balde (total). No construtor, o tamanho da janela determinado, o limite superior determinado erro relativo e um vazio lista inicial de baldes são definidas.

Figura 1 o contorno de classe do histograma exponencial

[DataContract] public class ExponentialHistogram {   [DataMember]   private long n;   [DataMember]   private double epsilon;   [DataMember]   private long total;   [DataMember]   private LinkedList<Bucket> buckets;   public ExponentialHistogram(long n, double epsilon)   {     this.
n = n;     this.epsilon = epsilon;     this.buckets = new LinkedList<Bucket>();   }   public void Update(long timestamp, bool e) { ...
}   protected void ExpireBuckets(long timestamp) { ...
}   protected void PrependNewBucket(long timestamp) { ...
}   protected void MergeBuckets() { ...
}   public long Query() { ...
} }

A manutenção do histograma é realizada por esse método de atualização:

public void Update(long timestamp, bool eventPayload) {   RemoveExpiredBuckets(timestamp);   // No new bucket required; done processing   if (!eventPayload)     return;   PrependNewBucket(timestamp);   MergeBuckets(); }

Ele aceita um timestamp discreto, ao contrário do tempo relógio de parede, para determinar quais são os últimos acontecimentos de N. Isso é usado para localizar e remover baldes expirados. Se o novo evento tem uma carga de 0 (falso), o processamento pára. Quando o novo evento tem uma carga de 1 (verdadeiro), um balde novo é criado e anexado à lista de baldes. Os fogos de artifício reais estão mesclando os baldes. Os métodos chamados pelo método de atualização são discutidos na seqüência.

Aqui está o código para a remoção de caçambas:

protected void RemoveExpiredBuckets(long timestamp) {   LinkedListNode<Bucket> node = buckets.Last;   // A bucket expires if its timestamp   // is before or at the current timestamp - n   while (node != null && node.Value.Timestamp <= timestamp - n)   {     total -= node.Value.Count;     buckets.RemoveLast();     node = buckets.Last;   } }

O percurso inicia-se do mais antigo balde (último) e termina no primeiro balde não decorrido. Cada balde timestamp do evento cujo mais recente é expirado — isto é, cujo carimbo de hora não é superior a hora corrente menos o tamanho da janela — é removido da lista. É onde entra o timestamp discreto. A soma de todas as contagens de balde (total) é diminuída pela contagem de cada balde expirado.

Após expirado eventos e baldes são contabilizados, pois, o novo evento é processado:

protected void PrependNewBucket(long timestamp) {   Bucket newBucket = new Bucket()   {     Timestamp = timestamp,     Count = 1   };   buckets.AddFirst(newBucket);   total++; }

Um novo balde para o evento com uma carga de 1 (verdadeiro) é criado com uma contagem de 1 e um carimbo de hora igual ao timestamp atual. O novo balde é anexado à lista de baldes e a soma de todas as contagens de balde (total) é incrementada.

A magia de economia de espaço e limite de erro de memória é a fusão dos baldes. O código está listado em Figura 2. Baldes são mescladas para que baldes consecutivos têm crescimento exponencial contagens, ou seja, 1, 1,..., 2, 2,..., 4, 4,..., 8, 8 e assim por diante. O número de recipientes com a mesma contagem é determinado pela escolha do limite superior de erro relativo ε. O número total de recipientes cresce logaritmicamente com o tamanho da janela n, que explica a economia de espaço de memória. Como muitos baldes possível são mescladas, mas a contagem do último balde é mantida pequena suficiente (em comparação com a soma dos outros Condes balde) para garantir que o erro relativo é limitado.

Figura 2 baldes no histograma de fusão

protected void MergeBuckets() {   LinkedListNode<Bucket> current = buckets.First;   LinkedListNode<Bucket> previous = null;   int k = (int)Math.Ceiling(1 / epsilon);   int kDiv2Add2 = (int)(Math.Ceiling(0.5 * k) + 2);   int numberOfSameCount = 0;   // Traverse buckets from first to last, hence in order of   // descending timestamp and ascending count   while (current != null)   {     if (previous != null && previous.Value.Count == current.Value.Count)       numberOfSameCount++;     else       numberOfSameCount = 1;     // Found k/2+2 buckets of the same count?
if (numberOfSameCount == kDiv2Add2)     {       // Merge oldest (current and previous) into current       current.Value.Timestamp = previous.Value.Timestamp;       current.Value.Count = previous.Value.Count + current.Value.Count;       buckets.Remove(previous);       // A merged bucket can cause a cascade of merges due to       // its new count, continue iteration from merged bucket       // otherwise the cascade might go unnoticed       previous = current.Previous;     }     else     {       // No merge, continue iteration with next bucket        previous = current;       current = current.Next;     }   } }

Mais formalmente, baldes têm contagens não diminuindo, desde o primeiro (mais recente) para o último balde (mais antigo) na lista. As contagens de balde são restritos para potências de dois. Deixe k = 1⁄εe k⁄2 ser um número inteiro, ou então substituir por este último. Exceto para a contagem do último balde, haja pelo menos k⁄2 e no máximo k⁄2 + 1 baldes da mesma contagem. Sempre que houver k⁄2 + 2 baldes da mesma contagem, as duas mais antigas são mescladas em um balde com duas vezes a contagem do balde mais antiga e mais recente de seus carimbos de hora. Sempre que dois baldes são mescladas, travessia continua do balde mesclado. A fusão pode causar uma cascata de mescla. Caso contrário travessia continua da seguinte.

Para obter uma sensação para a aproximação de contagem, veja método de consulta do histograma:

 

public long Query() {   long last = buckets.Last != null ?
buckets.Last.Value.Count : 0;   return (long)Math.Ceiling(total - last / 2.0); }

A soma das contagens até o último balde balde é exata. O último balde deve cobrir pelo menos um evento não-expirado, caso contrário o balde é expirado e removido. Sua contagem deve ser estimada porque pode cobrir eventos expirados. Estimando a contagem real do balde como contagem de na metade o último balde última, o erro absoluto de estimativa de que não é maior que count na metade esse balde. A contagem total é estimada pela soma de todas as contagens de balde (total) menos contagem de na metade o último balde. Para garantir que o erro absoluto é dentro dos limites do erro relativo, influência do último balde deve ser suficientemente pequeno comparado à soma das outras contagens balde. Felizmente, isso é assegurado pelo processo de mesclagem.

As listagens de código e explicações até este ponto deixá-lo intrigado sobre o funcionamento do histograma? Leia o exemplo a seguir.

Suponha que você tenha um histograma recém-inicializado com janela de tamanho n = erro relativo e 7 limite superior ε = 0,5, então k = 2. O histograma desenvolve conforme Figura 3, e uma visão esquemática deste histograma é representada em Figura 4. Em Figura 3, mescla correm timestamps, 5, 7 e 9. Uma mesclagem em cascata é em timestamp 9. Um balde expirado é em timestamp 13. Darei mais detalhes sobre isso.

Figura 3 exemplo de histograma exponencial

Timestamp Evento

Baldes (Timestamp, contagem)

Mais novo … Mais antigo

Total Consulta Exata

Relativo

Error

1 0   0 0 0 0
2 1 (2,1) 1 1 1 0
3 1 (3,1) “ (2,1) 2 2 2 0
4 0 (3,1) “ (2,1) 2 2 2 0

5

(mesclagem)

1 (5,1) “ (3,1) “ (2,1) 3 2 3 0.333...
(5,1) “ (3,2)
6 1 (6,1) “ (5,1) “ (3,2) 4 3 4 0.25

7

(mesclagem)

1 (7,1) “ (6,1) “ (5,1) “ (3,2) 5 4 5 0.2
(7,1) “ (6,2) “ (3,2)
8 1 (8,1) “ (7,1) “ (6,2) “ (3,2) 6 5 6 0.166...

9

(mesclagem)

(Mesclar em cascata)

1 (9,1) “ (8,1) “ (7,1) “ (6,2) “ (3,2) 7 5 6 0.166...
(9,1) “ (8,2) “ (6,2) “ (3,2)
(9,1) “ (8,2) “ (6,4)
10 0 (9,1) “ (8,2) “ (6,4) 7 5 5 0
11 0 (9,1) “ (8,2) “ (6,4) 7 5 5 0
12 0 (9,1) “ (8,2) “ (6,4) 7 5 4 0.25
13 0 (9,1) “ (8,2) 3 2 3 0.333...


Figura 4 visão esquemática do histograma representado na Figura 3

O primeiro evento não tem efeito. No evento de quinto, uma mesclagem dos baldes mais antigas ocorre porque há uma caixa de texto: k⁄2 + 2 baldes com a mesma contagem de 1. Novamente, uma mesclagem acontece no sétimo evento. No nono evento, uma mesclagem de cascatas em outra mesclagem. Note-se que, após o sétimo evento, o primeiro evento expira. Nenhum balde carrega um timestamp expirado até o evento 13. No evento XIII, o balde com timestamp 6 já não cobre pelo menos um evento não-expirado e assim expira. Observe que o erro relativo observado é claramente menor que o limite superior de erro relativo.

Em Figura 4, uma caixa pontilhada é o tamanho da janela nesse ponto; Ele contém os baldes e implica a extensão dos eventos cobertos. Uma caixa sólida é um balde com timestamp no topo e na parte inferior. Situação A mostra o histograma em timestamp 7 com setas para os eventos contados. Situação B mostra o histograma em timestamp 9. O último balde cobre eventos expirados. Situação C mostra o histograma em timestamp 13. O balde com timestamp 6 expirou.

Depois de colocá-lo todos juntos, eu escrevi um programa de demonstração pequena para o histograma exponencial (Confira o download do código fonte para este artigo). Os resultados são mostrados na Figura 5. Ele simula um fluxo de eventos 100 milhões, com um tamanho de janela baseada em contagem N de 1 milhão de eventos. Cada evento tem uma carga de 0 ou 1 com 50 por cento de chance. Ele estima a contagem aproximada de 1s, com um limite superior de erro relativo arbitrariamente escolhida ε de 1 por cento, ou precisão de 99%. A economia de memória do histograma é enorme em relação ao windows; o número de recipientes é distante menos do que o número de eventos na janela. Uma taxa de evento de alguns de centenas de milhares de eventos por segundo é conseguida em uma máquina com um processador de núcleo duplo Intel de 2,4 GHz e 3 GB de RAM rodando Windows 7.


Figura 5 resultados empíricos para o histograma exponencial

Uma beleza chamada StreamInsight

Talvez você esteja se perguntando o que é Microsoft StreamInsight e onde ele se encaixa. Esta seção fornece algumas noções básicas. StreamInsight é um motor robusto, alto desempenho, baixa sobrecarga, perto-latência zero e extremamente flexível para o processamento de fluxos. Ele está atualmente na versão 2.1. A versão completa requer uma licença de SQL Server , embora uma versão de teste está disponível. Ele tem executado como um serviço independente ou incorporado no processo.

No centro de processamento de dados de streaming é um modelo com fluxos temporais dos eventos. Conceitualmente, é uma potencialmente infinita e volumosa coleção de dados que chegam ao longo do tempo. Pense preços de bolsa, telemetria de clima, alimentação, monitoramento, Web clica, tráfego de Internet, pedágios e assim por diante. Cada evento no fluxo tem um cabeçalho com uma carga de dados e metadados. No cabeçalho do evento, um timestamp é mantido, no mínimo. Eventos podem chegar constante, intermitente ou talvez em rajadas de até vários milhares por segundo. Eventos vêm em três sabores: Um evento pode ser confinado a um ponto no tempo; ser válidos para um determinado intervalo; ou seja válido um intervalo aberto (borda). Além de eventos do fluxo, um evento de Pontuação especial é emitido pelo mecanismo chamado o incremento de tempo comum (CTI). Eventos não podem ser inseridos no fluxo com um carimbo de hora menos do que o carimbo de hora do CTI. Efetivamente, eventos CTI determinam a extensão a que os eventos podem chegar fora de ordem. Felizmente, StreamInsight cuida disso.

Fontes heterogêneas de entrada e dissipadores de fluxos de saída de alguma forma devem ser adaptados para se encaixam nesse modelo. Os eventos nos córregos temporais (queryable) são capturados em um IQStreamable <TPayload>. Cargas de evento são conceitualmente puxadas por enumeração ou empurradas pela observação no fluxo. Daí, a base de dados pode ser exposto através um IEnumerable <T> / IQueryable <T> (Extensão reativa) ou IObservable <T> / IQbservable <T> (Extensão reativa), respectivamente, parametrizado com o tipo de dados expostos. Eles deixam a manutenção dos aspectos temporais para o mecanismo de processamento. Conversão de e para várias interfaces é possível.

As fontes e os sumidouros que acabamos de discutir ao vivo nos limites, Considerando que o processamento real acontece dentro de consultas. Uma consulta é uma unidade básica de composição escrita em LINQ. Ele processa os eventos de um ou mais fluxos e continuamente produz outro fluxo. As consultas são usados para o projeto, filtrar, aplique de grupo, multicast, operar/agregado, junção, União e janela de eventos. Os operadores podem ser definidos pelo usuário. Eles trabalham em eventos (incrementais) ou windows (não incrementais) assim que chegarem.

Uma nota sobre janelas está em ordem. Janelas particiona um fluxo em subconjuntos finitos de eventos que podem se sobrepor entre janelas consecutivas. Janelas efetivamente produz um fluxo de janelas, refletido por um IQWindowedStreamable <TPayload> em StreamInsight. Atualmente, três diferentes tipos de construções de janelas são suportados: windows baseados em contagem, baseados em tempo e de instantâneos. Windows baseados em contagem cobrem os eventos mais recentes de N e slide após a chegada de um novo evento, que termina a mais antiga e inserir o novo. Capa windows baseados em tempo, os eventos mais recentes no mais recente inter­val de tempo e slide por algum intervalo (também chamado de salto ou caindo). Janelas de instantâneo são conduzidas pelo início do evento e término; que é, para cada par de evento mais próximo início e fim dos tempos, uma janela é criada. Em contraste com o windows baseado em tempo, impulsionados por intervalos na linha de tempo, independentemente de eventos, janelas de instantâneo não estão fixos na linha de tempo.

Que apenas arranhões na superfície. Mais informações estão disponíveis de várias fontes, incluindo o on-line guia do desenvolvedor (bit.ly/T7Trrx), "Guia do Mochileiro um StreamInsight 2.1 consultas" (bit.ly/NbybvY), exemplos do CodePlex, o blog da equipe do StreamInsight (blogs.msdn.com/b/streaminsight) e outros.

Unindo todos os elementos

São os alicerces. Neste ponto, você está provavelmente querendo saber como aproximado contando é trazido à vida em StreamInsight. Em suma, algum fluxo de origem (temporal) de eventos point-in-time, com uma carga de 0 ou 1, é necessário. Ele é alimentado em uma consulta que calcula a contagem aproximada de 1s sobre os eventos mais recentes de N usando o histograma exponencial. A consulta produz alguns (temporal) fluxo de eventos point-in-time — levando a contagem aproximada — que é alimentado em uma pia.

Vamos começar com um operador definido pelo usuário para a contagem aproximada. Você pode ser tentado para capturar os eventos mais recentes de N usando a construção de janelas baseado em contagem. Pense novamente! Que iria desafiar os benefícios de economia de memória do histograma exponencial. Why? A construção de força todo windows de eventos a ser mantido na memória. Não é necessário pelo histograma exponencial, porque tem uma noção implícita equivalente de janelas através da manutenção de caçambas. Além disso, ter um operador sobre as janelas é não incremental, ou seja, com nenhum processamento de eventos assim que chegarem, mas apenas quando uma janela (próxima) está disponível. A solução é um operador de fluxo definido pelo usuário, sem janelas explícita constrói na consulta. O código está listado em Figura 6.

Figura 6 implementação do operador de fluxo definido pelo usuário

 

[DataContract] public class ApproximateCountUDSO : CepPointStreamOperator<bool, long> {   [DataMember]   private ExponentialHistogram histogram;   [DataMember]   private long currentTimestamp;  // Current (discrete) timestamp   public ApproximateCountUDSO(long n, double epsilon)   {     histogram = new ExponentialHistogram(n, epsilon);   }   public override IEnumerable<long> ProcessEvent(     PointEvent<bool> inputEvent)   {     currentTimestamp++;     histogram.Update(currentTimestamp, inputEvent.Payload);     yield return histogram.Query();   }   public override bool IsEmpty   {     get { return false; }   } }

O operador deriva a abstrata CepPointStreamOperator < TInputPayload, TOutputPayload >. Ele tem uma variável de instância de histograma exponencial. Observe a decoração com atributos DataContract e DataMember. Isso informa StreamInsight como serializar o operador — por exemplo, para fins de resiliência. O operador substitui o operador IsEmpty para indicar não está vazio, ou seja, o operador é stateful. Isso impede que StreamInsight mexer com o operador quando minimizando a utilização de memória. O método ProcessEvent é o núcleo do operador. Ele incrementa o timestamp atual (discreta) e passa junto com a carga do evento para o método de atualização do histograma. O histograma manipula a segmentação e é consultado para a contagem aproximada. Certifique-se de usar a sintaxe de rendimento-Retorno, o que faz com que o operador enumerable. Os operadores são geralmente envolto em algum método de extensão escondido em uma classe de utilitário. Este código mostra como é feito:

public static partial class Utility {   public static IQStreamable<long> ApproximateCount(     this IQStreamable<bool> source, long n, double epsilon)   {     return source.Scan(() => new ApproximateCountUDSO(n, epsilon));   } }

E pronto! Conecte o operador em uma consulta através do método de extensão. Um pouco de código extra é necessário para realmente demonstrar seu uso. Aqui é um fluxo de origem trivial:

public static partial class Utility {   private static Random random = new Random((int)DateTime.Now.Ticks);   public static IEnumerable<bool> EnumeratePayloads()   {     while (true)  // ad infinitum     {       bool payload = random.NextDouble() >= 0.5;       yield return payload;     }   } }

Isso gera cargas aleatórias de 0s e 1s. A sintaxe de yield return transforma-lo em uma fonte de enumerável. Colocá-lo em uma classe de utilitário, se quiserem.

O infame programa classe é mostrada na Figura 7. Ele cria a instância do servidor StreamInsight incorporada no processo. Uma instância de aplicativo chamado chamada ApproximateCountDemo é criada como um streaming de processamento (metadados) contêiner, por exemplo, para fluxos de nomeada, consultas e assim por diante. Uma fonte enumerável de eventos point-in-time é definida, usando o método utilitário de geração de carga descrito anteriormente. Ele é transformado em um fluxo temporal de eventos point-in-time. A consulta é definida com o LINQ e seleciona as contagens aproximada de operador, calculadas sobre o fluxo de origem. Isto é onde o método de extensão para o operador definido pelo usuário vem a calhar. É bootstrapped com um tamanho de janela e o limite superior de erro relativo. Em seguida, a saída da consulta é transformada em um coletor de enumerável, descascar as propriedades temporais. Finalmente, o coletor iterado, puxando assim ativamente os eventos através do encanamento. Execute o programa e desfrutar de sua saída do processamento de números na tela.

Figura 7 incorporação e execução de StreamInsight

class Program {   public const long N = 10000;   public const double Epsilon = 0.05;   static void Main(string[] args)   {     using (Server server = Server.Create("StreamInsight21"))     {       var app = server.CreateApplication("ApproximateCountDemo");       // Define an enumerable source       var source = app.DefineEnumerable(() =>         Utility.EnumeratePayloads());       // Wrap the source in a (temporal) point-in-time event stream       // The time settings determine when CTI events       // are generated by StreamInsight       var sourceStream = source.ToPointStreamable(e =>         PointEvent.CreateInsert(DateTime.Now, e),         AdvanceTimeSettings.IncreasingStartTime);       // Produces a stream of approximate counts       // over the latest N events with relative error bound Epsilon       var query =         from e in sourceStream.ApproximateCount(N, Epsilon) select e;       // Unwrap the query's (temporal) point-in-time       // stream to an enumerable sink       var sink = query.ToEnumerable<long>();       foreach (long estimatedCount in sink)       {         Console.WriteLine(string.Format(           "Enumerated Approximate count: {0}", estimatedCount));       }     }   } }

Recapitulando brevemente, este artigo explica como aproximar a contagem em uma janela de eventos no tempo logarítmico e espaço com erro limitado superior usando uma estrutura de dados do histograma exponencial. O histograma é incorporado em um operador definido pelo usuário do StreamInsight.

O histograma e o operador podem ser estendidos para oferecer suporte a windows de tamanho variável, como o windows baseados em tempo. Isso requer o histograma para saber o intervalo/timespan da janela, em vez do tamanho da janela. Baldes são expirados quando seu timestamp é antes o timestamp do evento novo menos o timespan de janela. Uma extensão para calcular a variância é proposta no livro, "Manutenção de variância e k–Medians sobre o Windows de fluxo de dados," por Brian Babcock, Mayur Datar, Rajeev Motwani e Liadan O'Callaghan (stanford.io/UEUG0i). Além de histogramas, outras estruturas chamados Sinopse são descritas na literatura. Você pode considerar amostras aleatórias, pesos pesados, quantiles e assim por diante.

O código-fonte que acompanha este artigo é escrito em C# 4.0 com Visual Studio 2010 e requer StreamInsight 2.1. O código é livre para uso sob a licença pública do Microsoft (Ms-PL). Note que ele foi desenvolvido para fins educacionais e não foi otimizado nem testado para ambientes de produção.

Michael Meijer é como um engenheiro de software na CIMSOLUTIONS BV, onde ele fornece serviços de consultoria e soluções de desenvolvimento de software para empresas em todo o país. Seus interesses em StreamInsight e o fluxo de processamento de dados começaram durante a sua pesquisa na Universidade de Twente, em Enschede, Países Baixos, onde obteve um mestrado em Engenharia de sistemas de Science–Information do computador.

Agradecemos aos seguintes especialistas técnicos pela revisão deste artigo: Erik Hegeman, Roman Schindlauer e Bas Stemerdink