Przewodnik programowania SCP dla Apache Storm w usłudze Azure HDInsightSCP programming guide for Apache Storm in Azure HDInsight

SCP to platforma do kompilowania w czasie rzeczywistym, niezawodnej, spójnej i wysokiej wydajności aplikacji do przetwarzania danych.SCP is a platform to build real time, reliable, consistent, and high-performance data processing application. Jest on zbudowany na podstawie Apache Storm --a systemu przetwarzania strumienia zaprojektowanego przez społeczności OSS.It is built on top of Apache Storm -- a stream processing system designed by the OSS communities. Burza jest zaprojektowana przez Nathana marza i została otwarta przez serwis Twitter.Storm is designed by Nathan Marz and was open sourced by Twitter. Wykorzystuje Apache ZooKeeper, innym projekcie Apache, aby zapewnić wysoce niezawodne dystrybuowanie rozproszonej koordynacji i zarządzania stanami.It leverages Apache ZooKeeper, another Apache project to enable highly reliable distributed coordination and state management.

Nie tylko projekt SCP jest przeszukiwany w systemie Windows, ale również dodaliśmy do projektu rozszerzenia i dostosowania ekosystemu systemu Windows.Not only the SCP project ported Storm on Windows but also the project added extensions and customization for the Windows ecosystem. Rozszerzenia obejmują środowisko deweloperskie platformy .NET i biblioteki, dostosowanie obejmuje wdrożenie oparte na systemie Windows.The extensions include .NET developer experience, and libraries, the customization includes Windows-based deployment.

Rozszerzenie i dostosowanie są wykonywane w taki sposób, że nie musimy rozwidlenia projektów OSS i możemy wykorzystać ekosystemy pochodne utworzone w oparciu o burze.The extension and customization is done in such a way that we do not need to fork the OSS projects and we could leverage derived ecosystems built on top of Storm.

Model przetwarzaniaProcessing model

Dane w usłudze SCP są modelowane jako ciągłe strumienie krotek.The data in SCP is modeled as continuous streams of tuples. Zazwyczaj krotki przepływają do pewnej kolejki, a następnie są pobierane i przekształcane przez logikę biznesową hostowaną w topologii burzy, na koniec mogą być potoki jako krotki do innego systemu SCP lub być zatwierdzone do magazynów, takich jak rozproszonego systemu plików lub baz danych. takie jak SQL Server.Typically the tuples flow into some queue first, then picked up, and transformed by business logic hosted inside a Storm topology, finally the output could be piped as tuples to another SCP system, or be committed to stores like distributed file system or databases like SQL Server.

Diagram przedstawiający dane do przetworzenia w kolejce, w której są przechowywane źródła danych

W obszarze burza Topologia aplikacji definiuje wykres obliczeń.In Storm, an application topology defines a graph of computation. Każdy węzeł w topologii zawiera logikę przetwarzania, a linki między węzłami wskazują przepływ danych.Each node in a topology contains processing logic, and links between nodes indicate data flow. Węzły, które mają wstrzyknąć dane wejściowe do topologii, są nazywane elementy Spout, które mogą służyć do sekwencjonowania danych.The nodes to inject input data into the topology are called spouts, which can be used to sequence the data. Dane wejściowe mogą znajdować się w dziennikach plików, transakcyjnych baz danych, liczniku wydajności systemu itp. Węzły z przepływem danych wejściowych i wyjściowych są nazywane piorunami, które wykonują rzeczywiste filtrowanie danych i ich wybór i agregację.The input data could reside in file logs, transactional database, system performance counter etc. The nodes with both input and output data flows are called bolts, which do the actual data filtering and selections and aggregation.

Punkt połączenia usługi obsługuje najlepsze wysiłki, co najmniej raz i dokładnie raz na przetwarzanie danych.SCP supports best efforts, at-least-once and exactly-once data processing. W aplikacji przetwarzania strumieniowego rozproszonego mogą wystąpić różne błędy podczas przetwarzania danych, takie jak awaria sieci, awaria maszyny lub błąd kodu użytkownika itp. Co najmniej jednokrotne przetwarzanie gwarantuje, że wszystkie dane zostaną przetworzone co najmniej raz przez odtwarzanie automatycznie tych samych danych, gdy wystąpi błąd.In a distributed streaming processing application, various errors may happen during data processing, such as network outage, machine failure, or user code error etc. At-least-once processing ensures all data will be processed at least once by replaying automatically the same data when error happens. Przetwarzanie co najmniej jednokrotne jest proste i niezawodne oraz jest bardziej wydajna w wielu aplikacjach.At-least-once processing is simple and reliable and suits well many applications. Jeśli jednak aplikacja wymaga dokładnego zliczania, to co najmniej jednokrotne przetwarzanie jest niewystarczające, ponieważ te same dane mogą być potencjalnie odtwarzane w topologii aplikacji.However, when an application requires exact counting, at-least-once processing is insufficient since the same data could potentially be played in the application topology. W takim przypadku przetwarzanie dokładnie jednokrotne jest tak zaprojektowane, aby upewnić się, że wynik jest poprawny nawet wtedy, gdy dane mogą być odtwarzane i przetwarzane wiele razy.In that case, exactly-once processing is designed to make sure the result is correct even when the data may be replayed and processed multiple times.

Usługa SCP umożliwia deweloperom platformy .NET opracowywanie aplikacji do przetwarzania danych w czasie rzeczywistym, jednocześnie wykorzystując wirtualna maszyna Java (JVM) z burzą w ramach okładek.SCP enables .NET developers to develop real time data process applications while leveraging on Java Virtual Machine (JVM) with Storm under the covers. .NET i JVM komunikują się za pośrednictwem lokalnych gniazd TCP.The .NET and JVM communicate via TCP local sockets. Zasadniczo każda elementu Spout/piorun to para procesów .NET/Java, w której logika użytkownika jest uruchamiana w procesie .NET jako wtyczka.Basically each Spout/Bolt is a .NET/Java process pair, where the user logic runs in .NET process as a plugin.

Aby utworzyć aplikację do przetwarzania danych na punkcie połączenia usługi, wymagane są kilka czynności:To build a data processing application on top of SCP, several steps are needed:

  • Projektuj i Implementuj elementy Spout, aby ściągnąć dane z kolejki.Design and implement the Spouts to pull in data from queue.
  • Projektuj i Implementuj pioruny, aby przetwarzać dane wejściowe i zapisywać dane w sklepach zewnętrznych, takich jak baza danych.Design and implement Bolts to process the input data, and save data to external stores such as a Database.
  • Zaprojektowanie topologii, a następnie przesłanie i uruchomienie topologii.Design the topology, then submit and run the topology. Topologia definiuje wierzchołki i dane między wierzchołkami.The topology defines vertexes and the data flows between the vertexes. Punkt połączenia usługi będzie przyjmować specyfikację topologii i wdrożyć ją w klastrze burzowym, gdzie każdy wierzchołek jest uruchamiany w jednym węźle logicznym.SCP will take the topology specification and deploy it on a Storm cluster, where each vertex runs on one logical node. Przełączenie w tryb failover i skalowanie będzie obsługiwane przez harmonogram zadań burzy.The failover and scaling will be taken care of by the Storm task scheduler.

W tym dokumencie przedstawiono niektóre proste przykłady umożliwiające tworzenie aplikacji do przetwarzania danych za pomocą punktu połączenia usługi.This document uses some simple examples to walk through how to build data processing application with SCP.

Interfejs wtyczki SCPSCP Plugin Interface

Wtyczki usługi SCP (lub aplikacje) to autonomiczna exe, która może być uruchamiana w programie Visual Studio podczas fazy tworzenia, i być podłączona do potoku burzy po wdrożeniu w środowisku produkcyjnym.SCP plugins (or applications) are standalone EXEs that can both run inside Visual Studio during the development phase, and be plugged into the Storm pipeline after deployment in production. Pisanie wtyczki SCP jest tak samo samo jak w przypadku pisania innych standardowych aplikacji konsolowych systemu Windows.Writing the SCP plugin is just the same as writing any other standard Windows console applications. Platforma SCP.NET deklaruje jakiś interfejs dla elementu Spout/pioruna, a kod wtyczki użytkownika powinien implementować te interfejsy.SCP.NET platform declares some interface for spout/bolt, and the user plugin code should implement these interfaces. Głównym celem tego projektu jest to, że użytkownik może skupić się na własnych logikach firmy i pozostawia inne rzeczy, które mają być obsługiwane przez platformę SCP.NET.The main purpose of this design is that the user can focus on their own business logics, and leaving other things to be handled by SCP.NET platform.

Kod wtyczki użytkownika powinien implementować jeden z następujących interfejsów, zależy od tego, czy topologia jest transakcyjna, czy nietransakcyjna, oraz czy składnik jest elementu Spout czy piorunem.The user plugin code should implement one of the followings interfaces, depends on whether the topology is transactional or non-transactional, and whether the component is a spout or bolt.

  • ISCPSpoutISCPSpout
  • ISCPBoltISCPBolt
  • ISCPTxSpoutISCPTxSpout
  • ISCPBatchBoltISCPBatchBolt

ISCPPluginISCPPlugin

ISCPPlugin jest wspólnym interfejsem dla wszystkich rodzajów wtyczek.ISCPPlugin is the common interface for all kinds of plugins. Obecnie jest to fikcyjny interfejs.Currently, it is a dummy interface.

public interface ISCPPlugin 
{
}

ISCPSpoutISCPSpout

ISCPSpout jest interfejsem dla nietransakcyjnych elementu Spout.ISCPSpout is the interface for non-transactional spout.

 public interface ISCPSpout : ISCPPlugin                    
 {
     void NextTuple(Dictionary<string, Object> parms);         
     void Ack(long seqId, Dictionary<string, Object> parms);   
     void Fail(long seqId, Dictionary<string, Object> parms);  
 }

Gdy NextTuple() jest wywoływana, kod użytkownika# C może emitować jedną lub kilka krotek.When NextTuple() is called, the C# user code can emit one or more tuples. Jeśli nie ma nic do emisji, ta metoda powinna zwracać bez emitowania żadnych elementów.If there is nothing to emit, this method should return without emitting anything. Należy NextTuple()zauważyć, że wszystkie, Ack()i Fail() są wywoływane w ścisłej pętli w pojedynczym wątku w procesie języka C# .It should be noted that NextTuple(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. Jeśli nie ma spójnych krotek, courteous NextTuple uśpienie przez krótki czas (na przykład 10 milisekund), tak aby nie było to zbyt duże wykorzystanie procesora CPU.When there are no tuples to emit, it is courteous to have NextTuple sleep for a short amount of time (such as 10 milliseconds) so as not to waste too much CPU.

Ack()i Fail() są wywoływane tylko wtedy, gdy mechanizm ACK jest włączony w pliku specyfikacji.Ack() and Fail() are called only when ack mechanism is enabled in spec file. seqId Służy do identyfikowania spójnej kolekcji lub jej niepowodzenie.The seqId is used to identify the tuple that is acknowledged or failed. Dlatego jeśli ACK jest włączony w topologii nietransakcyjnej, w elementu Spout należy używać następującej funkcji emisji:So if ack is enabled in non-transactional topology, the following emit function should be used in Spout:

public abstract void Emit(string streamId, List<object> values, long seqId); 

Jeśli ACK nie jest obsługiwany w topologii nietransakcyjnej, Ack() i Fail() może pozostać jako pusta funkcja.If ack is not supported in non-transactional topology, the Ack() and Fail() can be left as empty function.

Parametr parms wejściowy w tych funkcjach jest pustym słownikiem, który jest zarezerwowany do użytku w przyszłości.The parms input parameter in these functions is an empty Dictionary, it is reserved for future use.

ISCPBoltISCPBolt

ISCPBolt jest interfejsem dla nietransakcyjnego pioruna.ISCPBolt is the interface for non-transactional bolt.

public interface ISCPBolt : ISCPPlugin 
{
void Execute(SCPTuple tuple);           
}

Gdy jest dostępna nowa krotka, Execute() funkcja jest wywoływana, aby ją przetworzyć.When new tuple is available, the Execute() function is called to process it.

ISCPTxSpoutISCPTxSpout

ISCPTxSpout jest interfejsem dla transakcyjnego elementu spoutu.ISCPTxSpout is the interface for transactional spout.

public interface ISCPTxSpout : ISCPPlugin
{
    void NextTx(out long seqId, Dictionary<string, Object> parms);  
    void Ack(long seqId, Dictionary<string, Object> parms);         
    void Fail(long seqId, Dictionary<string, Object> parms);        
}

Podobnie jak w NextTx()przypadku nietransakcyjnej części Ack()licznika,,, i Fail() są wywoływane w ścisłej pętli w pojedynczym wątku w procesie języka# C.Just like their non-transactional counter-part, NextTx(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. Gdy nie ma danych do emisji, courteous NextTx się w stan uśpienia przez krótki czas (10 milisekund), tak aby nie było to zbyt duże użycie procesora CPU.When there are no data to emit, it is courteous to have NextTx sleep for a short amount of time (10 milliseconds) so as not to waste too much CPU.

NextTx()wywołuje się, by rozpocząć nową transakcję, parametr seqId out służy do identyfikowania transakcji, która również jest używana w Ack() i Fail().NextTx() is called to start a new transaction, the out parameter seqId is used to identify the transaction, which is also used in Ack() and Fail(). W NextTx()programie użytkownik może emitować dane po stronie Java.In NextTx(), user can emit data to Java side. Dane są przechowywane w dozorcy, aby umożliwić odtwarzanie.The data is stored in ZooKeeper to support replay. Ponieważ pojemność dozorcy jest ograniczona, użytkownik powinien jedynie emitować metadane, a nie dane zbiorcze w transakcyjnych elementu Spout.Because the capacity of ZooKeeper is limited, user should only emit metadata, not bulk data in transactional spout.

Burza automatycznie powtarza transakcję, jeśli zakończy się Fail() niepowodzeniem, dlatego nie powinna być wywoływana w normalnym przypadku.Storm will replay a transaction automatically if it fails, so Fail() should not be called in normal case. Ale jeśli punkt połączenia usługi może sprawdzić metadane emitowane przez elementu Spout transakcyjny, może on Fail() wywołać, gdy metadane są nieprawidłowe.But if SCP can check the metadata emitted by transactional spout, it can call Fail() when the metadata is invalid.

Parametr parms wejściowy w tych funkcjach jest pustym słownikiem, który jest zarezerwowany do użytku w przyszłości.The parms input parameter in these functions is an empty Dictionary, it is reserved for future use.

ISCPBatchBoltISCPBatchBolt

ISCPBatchBolt jest interfejsem dla błyskawicy transakcyjnej.ISCPBatchBolt is the interface for transactional bolt.

public interface ISCPBatchBolt : ISCPPlugin           
{
    void Execute(SCPTuple tuple);
    void FinishBatch(Dictionary<string, Object> parms);  
}

Execute()jest wywoływana, gdy nowa krotka dociera do błyskawicy.Execute() is called when there is new tuple arriving at the bolt. FinishBatch()jest wywoływana, gdy transakcja zostanie zakończona.FinishBatch() is called when this transaction is ended. Parametr parms wejściowy jest zarezerwowany do użytku w przyszłości.The parms input parameter is reserved for future use.

W przypadku topologii transakcyjnej istnieje ważna koncepcja — StormTxAttempt.For transactional topology, there is an important concept – StormTxAttempt. Ma dwa pola TxId i AttemptId.It has two fields, TxId and AttemptId. TxIdsłuży do identyfikowania określonej transakcji i dla danej transakcji, jeśli transakcja nie powiedzie się i zostanie powtórzona, może wystąpić wiele prób.TxId is used to identify a specific transaction, and for a given transaction, there may be multiple attempts if the transaction fails and is replayed. SCP.NET tworzy nowy obiekt StormTxAttemptISCPBatchBolt, tak jak w przypadku środowiska Java.SCP.NET creates a new ISCPBatchBolt object to process each StormTxAttempt, just like what Storm does in Java. Celem tego projektu jest obsługa przetwarzania transakcji równoległych.The purpose of this design is to support parallel transactions processing. Użytkownik powinien pamiętać, że jeśli zostanie podjęta próba transakcji, odpowiedni obiekt ISCPBatchBolt zostaje zniszczony i wyrzucony.User should keep it in mind that if transaction attempt is finished, the corresponding ISCPBatchBolt object is destroyed and garbage collected.

Model obiektówObject Model

Program SCP.NET udostępnia również prosty zestaw obiektów kluczowych dla deweloperów, którzy mają być używani.SCP.NET also provides a simple set of key objects for developers to program with. Są one kontekstowe, stan klientówi SCPRuntime.They are Context, StateStore, and SCPRuntime. Są one omówione w części REST tej sekcji.They are discussed in the rest part of this section.

KontekstContext

Kontekst zapewnia uruchomione środowisko dla aplikacji.Context provides a running environment to the application. Każde wystąpienie ISCPPlugin (ISCPSpout/ISCPBolt/ISCPTxSpout/ISCPBatchBolt) ma odpowiednie wystąpienie kontekstu.Each ISCPPlugin instance (ISCPSpout/ISCPBolt/ISCPTxSpout/ISCPBatchBolt) has a corresponding Context instance. Funkcje udostępnione przez kontekst można podzielić na dwie części: (1) część statyczna, która jest dostępna w całym procesie C# , (2) części dynamicznej, która jest dostępna tylko dla określonego wystąpienia kontekstu.The functionality provided by Context can be divided into two parts: (1) the static part, which is available in the whole C# process, (2) the dynamic part, which is only available for the specific Context instance.

Część statycznaStatic Part

public static ILogger Logger = null;
public static SCPPluginType pluginType;                      
public static Config Config { get; set; }                    
public static TopologyContext TopologyContext { get; set; }  

Loggerjest dostępny do celów dzienników.Logger is provided for log purpose.

pluginTypesłuży do wskazania typu wtyczki procesu C# .pluginType is used to indicate the plugin type of the C# process. Jeśli proces C# jest uruchamiany w trybie testowania lokalnego (bez języka Java), typ wtyczki to. SCP_NET_LOCALIf the C# process is run in local test mode (without Java), the plugin type is SCP_NET_LOCAL.

public enum SCPPluginType 
{
    SCP_NET_LOCAL = 0,       
    SCP_NET_SPOUT = 1,       
    SCP_NET_BOLT = 2,        
    SCP_NET_TX_SPOUT = 3,   
    SCP_NET_BATCH_BOLT = 4  
}

Configzapewnia pobieranie parametrów konfiguracyjnych ze strony języka Java.Config is provided to get configuration parameters from Java side. Parametry są przesyłane po stronie Java po zainicjowaniu wtyczki języka C# .The parameters are passed from Java side when C# plugin is initialized. Parametry są podzielone na dwie części: stormConf i pluginConf. ConfigThe Config parameters are divided into two parts: stormConf and pluginConf.

public Dictionary<string, Object> stormConf { get; set; }  
public Dictionary<string, Object> pluginConf { get; set; }  

stormConfparametry są definiowane przez burzę pluginConf i są parametrami zdefiniowanymi przez punkt połączenia usługi.stormConf is parameters defined by Storm and pluginConf is the parameters defined by SCP. Przykład:For example:

public class Constants
{
    … …

    // constant string for pluginConf
    public static readonly String NONTRANSACTIONAL_ENABLE_ACK = "nontransactional.ack.enabled";  

    // constant string for stormConf
    public static readonly String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";           
    public static readonly String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";                 
}

TopologyContextjest dostarczany w celu uzyskania kontekstu topologii, jest najbardziej przydatny w przypadku składników o wielu równoległości.TopologyContext is provided to get the topology context, it is most useful for components with multiple parallelism. Oto przykład:Here is an example:

//demo how to get TopologyContext info
if (Context.pluginType != SCPPluginType.SCP_NET_LOCAL)                      
{
    Context.Logger.Info("TopologyContext info:");
    TopologyContext topologyContext = Context.TopologyContext;                    
    Context.Logger.Info("taskId: {0}", topologyContext.GetThisTaskId());          
    taskIndex = topologyContext.GetThisTaskIndex();
    Context.Logger.Info("taskIndex: {0}", taskIndex);
    string componentId = topologyContext.GetThisComponentId();                    
    Context.Logger.Info("componentId: {0}", componentId);
    List<int> componentTasks = topologyContext.GetComponentTasks(componentId);  
    Context.Logger.Info("taskNum: {0}", componentTasks.Count);                    
}

Część dynamicznaDynamic Part

Poniższe interfejsy są powiązane z określonym wystąpieniem kontekstu.The following interfaces are pertinent to a certain Context instance. Wystąpienie kontekstu jest tworzone przez platformę SCP.NET i przesyłane do kodu użytkownika:The Context instance is created by SCP.NET platform and passed to the user code:

// Declare the Output and Input Stream Schemas

public void DeclareComponentSchema(ComponentStreamSchema schema);   

// Emit tuple to default stream.
public abstract void Emit(List<object> values);                   

// Emit tuple to the specific stream.
public abstract void Emit(string streamId, List<object> values);  

W przypadku nietransakcyjnego potwierdzenia elementu Spout jest dostępna następująca metoda:For non-transactional spout supporting ack, the following method is provided:

// for non-transactional Spout which supports ack
public abstract void Emit(string streamId, List<object> values, long seqId);  

Dla nietransakcyjnego pioruna, należy jawnie Ack() lub Fail() spójną kolekcję.For non-transactional bolt supporting ack, it should explicitly Ack() or Fail() the tuple it received. I podczas emitowania nowej spójnej kolekcji, należy również określić kotwice nowej krotki.And when emitting new tuple, it must also specify the anchors of the new tuple. Podano następujące metody.The following methods are provided.

public abstract void Emit(string streamId, IEnumerable<SCPTuple> anchors, List<object> values); 
public abstract void Ack(SCPTuple tuple);
public abstract void Fail(SCPTuple tuple);

Stan klientówStateStore

StateStorezapewnia usługi metadanych, generowanie sekwencji monotoniczny i niezależną koordynację.StateStore provides metadata services, monotonic sequence generation, and wait-free coordination. Rozproszone abstrakcje współbieżności na wyższym poziomie mogą być StateStorewbudowane, w tym blokady rozproszone, kolejki rozproszone, bariery i usługi transakcyjne.Higher-level distributed concurrency abstractions can be built on StateStore, including distributed locks, distributed queues, barriers, and transaction services.

Aplikacje SCP mogą używać State obiektu do utrwalania niektórych informacji w Apache ZooKeeper, szczególnie w przypadku topologii transakcyjnej.SCP applications may use the State object to persist some information in Apache ZooKeeper, especially for transactional topology. Wykonanie tej czynności, jeśli transakcyjny elementu Spout ulega awarii i ponowne uruchomienie, może pobrać niezbędne informacje z dozorcy i ponownie uruchomić potok.Doing so, if transactional spout crashes and restart, it can retrieve the necessary information from ZooKeeper and restart the pipeline.

StateStore Obiekt ma głównie następujące metody:The StateStore object mainly has these methods:

/// <summary>
/// Static method to retrieve a state store of the given path and connStr 
/// </summary>
/// <param name="storePath">StateStore Path</param>
/// <param name="connStr">StateStore Address</param>
/// <returns>Instance of StateStore</returns>
public static StateStore Get(string storePath, string connStr);

/// <summary>
/// Create a new state object in this state store instance
/// </summary>
/// <returns>State from StateStore</returns>
public State Create();

/// <summary>
/// Retrieve all states that were previously uncommitted, excluding all aborted states 
/// </summary>
/// <returns>Uncommitted States</returns>
public IEnumerable<State> GetUnCommitted();

/// <summary>
/// Get all the States in the StateStore
/// </summary>
/// <returns>All the States</returns>
public IEnumerable<State> States();

/// <summary>
/// Get state or registry object
/// </summary>
/// <param name="info">Registry Name(Registry only)</param>
/// <typeparam name="T">Type, Registry or State</typeparam>
/// <returns>Return Registry or State</returns>
public T Get<T>(string info = null);

/// <summary>
/// List all the committed states
/// </summary>
/// <returns>Registries contain the Committed State </returns> 
public IEnumerable<Registry> Committed();

/// <summary>
/// List all the Aborted State in the StateStore
/// </summary>
/// <returns>Registries contain the Aborted State</returns>
public IEnumerable<Registry> Aborted();

/// <summary>
/// Retrieve an existing state object from this state store instance 
/// </summary>
/// <returns>State from StateStore</returns>
/// <typeparam name="T">stateId, id of the State</typeparam>
public State GetState(long stateId)

State Obiekt ma głównie następujące metody:The State object mainly has these methods:

/// <summary>
/// Set the status of the state object to commit 
/// </summary>
public void Commit(bool simpleMode = true); 

/// <summary>
/// Set the status of the state object to abort 
/// </summary>
public void Abort();

/// <summary>
/// Put an attribute value under the give key 
/// </summary>
/// <param name="key">Key</param> 
/// <param name="attribute">State Attribute</param> 
public void PutAttribute<T>(string key, T attribute); 

/// <summary>
/// Get the attribute value associated with the given key 
/// </summary>
/// <param name="key">Key</param> 
/// <returns>State Attribute</returns>               
public T GetAttribute<T>(string key);                    

Commit() Dla metody, gdy simplemode ma wartość true, usuwa odpowiednie ZNode w dozorcy.For the Commit() method, when simpleMode is set to true, it deletes the corresponding ZNode in ZooKeeper. W przeciwnym razie usuwa bieżącą ZNode i dodaje nowy węzeł w ścieżce zatwierdzonej_.Otherwise, it deletes the current ZNode, and adding a new node in the COMMITTED_PATH.

SCPRuntimeSCPRuntime

SCPRuntime udostępnia dwie następujące metody:SCPRuntime provides the following two methods:

public static void Initialize();

public static void LaunchPlugin(newSCPPlugin createDelegate);  

Initialize()służy do inicjowania środowiska uruchomieniowego SCP.Initialize() is used to initialize the SCP runtime environment. W tej metodzie proces C# nawiązuje połączenie ze stroną Java i pobiera parametry konfiguracji i kontekst topologii.In this method, the C# process connects to the Java side, and gets configuration parameters and topology context.

LaunchPlugin()służy do uruchamiania pętli przetwarzania komunikatów.LaunchPlugin() is used to kick off the message processing loop. W tej pętli wtyczka C# odbiera komunikaty formularz Java (w tym krotki i sygnały kontrolne), a następnie przetwarza komunikaty, na przykład wywołanie metody interfejsu przez kod użytkownika.In this loop, the C# plugin receives messages form Java side (including tuples and control signals), and then process the messages, perhaps calling the interface method provide by the user code. Parametr wejściowy metody LaunchPlugin() jest delegatem, który może zwrócić obiekt, który implementuje interfejs ISCPSpout/IScpBolt/ISCPTxSpout/ISCPBatchBolt.The input parameter for method LaunchPlugin() is a delegate that can return an object that implement ISCPSpout/IScpBolt/ISCPTxSpout/ISCPBatchBolt interface.

public delegate ISCPPlugin newSCPPlugin(Context ctx, Dictionary\<string, Object\> parms); 

W przypadku usługi ISCPBatchBolt możemy pobrać StormTxAttempt z parmsprogramu, a następnie użyć go do oceny, czy jest to powtórzona próba.For ISCPBatchBolt, we can get StormTxAttempt from parms, and use it to judge whether it is a replayed attempt. Sprawdzenie, czy próba powtórzenia jest często wykonywana na miejscu zatwierdzeń i pokazano w HelloWorldTx przykładzie.The check for a replay attempt is often done at the commit bolt, and it is demonstrated in the HelloWorldTx example.

Ogólnie mówiąc, wtyczki punktu połączenia usługi mogą działać w dwóch trybów tutaj:Generally speaking, the SCP plugins may run in two modes here:

  1. Tryb testu lokalnego: W tym trybie wtyczki usługi SCP (kod użytkownika C# ) działają w programie Visual Studio podczas fazy tworzenia.Local Test Mode: In this mode, the SCP plugins (the C# user code) run inside Visual Studio during the development phase. LocalContextmoże być używany w tym trybie, który zapewnia metodę serializacji emitowanych krotek do plików lokalnych i odczytuje je z powrotem do pamięci.LocalContext can be used in this mode, which provides method to serialize the emitted tuples to local files, and read them back to memory.

     public interface ILocalContext
     {
         List\<SCPTuple\> RecvFromMsgQueue();
         void WriteMsgQueueToFile(string filepath, bool append = false);  
         void ReadFromFileToMsgQueue(string filepath);                    
     }
    
  2. Tryb regularny: W tym trybie wtyczki usługi SCP są uruchamiane przez proces burzy języka Java.Regular Mode: In this mode, the SCP plugins are launched by storm java process.

    Oto przykład uruchamiania wtyczki SCP:Here is an example of launching SCP plugin:

     namespace Scp.App.HelloWorld
     {
     public class Generator : ISCPSpout
     {
         … …
         public static Generator Get(Context ctx, Dictionary<string, Object> parms)
         {
         return new Generator(ctx);
         }
     }
    
     class HelloWorld
     {
         static void Main(string[] args)
         {
         /* Setting the environment variable here can change the log file name */
         System.Environment.SetEnvironmentVariable("microsoft.scp.logPrefix", "HelloWorld");
    
         SCPRuntime.Initialize();
         SCPRuntime.LaunchPlugin(new newSCPPlugin(Generator.Get));
         }
     }
     }
    

Język specyfikacji topologiiTopology Specification Language

Specyfikacja topologii punktu połączenia usługi to język specyficzny dla domeny do opisywania i konfigurowania topologii punktu połączenia.SCP Topology Specification is a domain-specific language for describing and configuring SCP topologies. Jest on oparty na Clojure DSL (https://storm.incubator.apache.org/documentation/Clojure-DSL.html) i jest rozszerzany przez punkt połączenia usługi.It is based on Storm’s Clojure DSL (https://storm.incubator.apache.org/documentation/Clojure-DSL.html) and is extended by SCP.

Specyfikacje topologii można przesłać bezpośrednio do klastra burzy do wykonania za pomocą polecenia runspec .Topology specifications can be submitted directly to storm cluster for execution via the runspec command.

SCP.NET dodał następujące funkcje, aby zdefiniować topologie transakcyjne:SCP.NET has added the following functions to define Transactional Topologies:

Nowe funkcjeNew Functions ParametryParameters OpisDescription
TX-topolopytx-topolopy Topologia — nazwatopology-name
elementu Spout — Mapaspout-map
Mapa błyskawicybolt-map
Zdefiniuj topologię transakcyjną z nazwą topologii  , mapą definicji elementy Spout i mapą definicji błyskawicyDefine a transactional topology with the topology name,  spouts definition map and the bolts definition map
SCP-TX-elementu Spoutscp-tx-spout exec-Nameexec-name
argsargs
polafields
Zdefiniuj elementu Spout transakcyjny.Define a transactional spout. Uruchamia aplikację z nazwą exec przy użyciu argumentów.It runs the application with exec-name using args.

Pola są polami danych wyjściowych dla elementu SpoutThe fields is the Output Fields for spout
SCP-TX-Batchscp-tx-batch-bolt exec-Nameexec-name
argsargs
polafields
Definiowanie transakcyjnego pioruna partii.Define a transactional Batch Bolt. Uruchamia aplikację z nazwą exec przy użyciu argumentów.It runs the application with exec-name using args.

Pola są polami wynikowymi dla elementu piorun.The Fields is the Output Fields for bolt.
scp-tx-commit-boltscp-tx-commit-bolt exec-Nameexec-name
argsargs
polafields
Zdefiniuj pioruna zatwierdzeń transakcyjnych.Define a transactional commit bolt. Uruchamia aplikację z nazwą exec przy użyciu argumentów.It runs the application with exec-name using args.

Pola są polami wynikowymi dla elementu PiorunThe fields is the Output Fields for bolt
nontx-topolopynontx-topolopy Topologia — nazwatopology-name
elementu Spout — Mapaspout-map
Mapa błyskawicybolt-map
Definiowanie topologii nietransakcyjnej przy użyciu nazwy topologii  , mapy definicji elementy Spout oraz mapy definicji piorunówDefine a nontransactional topology with the topology name,  spouts definition map and the bolts definition map
SCP — elementu Spoutscp-spout exec-Nameexec-name
argsargs
polafields
parametersparameters
Zdefiniuj nietransakcyjny elementu Spout.Define a nontransactional spout. Uruchamia aplikację z nazwą exec przy użyciu argumentów.It runs the application with exec-name using args.

Pola są polami danych wyjściowych dla elementu SpoutThe fields is the Output Fields for spout

Parametry są opcjonalne, przy użyciu których można określić niektóre parametry, takie jak "nietransakcyjny. ACK. Enabled".The parameters are optional, using it to specify some parameters such as "nontransactional.ack.enabled".
punkt połączenia usługiscp-bolt exec-Nameexec-name
argsargs
polafields
parametersparameters
Zdefiniuj piorun nietransakcyjny.Define a nontransactional Bolt. Uruchamia aplikację z nazwą exec przy użyciu argumentów.It runs the application with exec-name using args.

Pola są polami wynikowymi dla elementu PiorunThe fields is the Output Fields for bolt

Parametry są opcjonalne, przy użyciu których można określić niektóre parametry, takie jak "nietransakcyjny. ACK. Enabled".The parameters are optional, using it to specify some parameters such as "nontransactional.ack.enabled".

SCP.NET ma zdefiniowane następujące słowa kluczowe:SCP.NET has the following keywords defined:

słowa kluczoweKeywords OpisDescription
: Nazwa:name Zdefiniuj nazwę topologiiDefine the Topology Name
: topologia:topology Zdefiniuj topologię przy użyciu poprzednich funkcji i skompiluj w nich.Define the Topology using the previous functions and build in ones.
:p:p Zdefiniuj wskazówkę równoległą dla każdego elementu spoutu lub pioruna.Define the parallelism hint for each spout or bolt.
: config:config Zdefiniuj parametr Configure lub zaktualizuj istniejąceDefine configure parameter or update the existing ones
: schemat:schema Zdefiniuj schemat strumienia.Define the Schema of Stream.

I często używane parametry:And frequently used parameters:

ParametrParameter OpisDescription
"plugin.name""plugin.name" Nazwa pliku exe C# wtyczkiexe file name of the C# plugin
"wtyczka. args""plugin.args" argumenty wtyczkiplugin args
"Output. Schema""output.schema" Schemat danych wyjściowychOutput schema
"nietransakcyjny. ACK. Enabled""nontransactional.ack.enabled" Czy ACK jest włączony dla topologii nietransakcyjnejWhether ack is enabled for nontransactional topology

Polecenie runspec jest wdrażane wraz z usługą BITS, użycie jest podobne do:The runspec command is deployed together with the bits, the usage is like:

.\bin\runSpec.cmd
usage: runSpec [spec-file target-dir [resource-dir] [-cp classpath]]
ex: runSpec examples\HelloWorld\HelloWorld.spec specs examples\HelloWorld\Target

Parametr Resource-dir jest opcjonalny, należy go określić, gdy chcesz podłączyć aplikację C# , a ten katalog zawiera aplikację, zależności i konfiguracje.The resource-dir parameter is optional, you need to specify it when you want to plug a C# application, and this directory contains the application, the dependencies, and configurations.

Parametr ścieżki klas jest również opcjonalny.The classpath parameter is also optional. Służy do określania ścieżki klas Java, jeśli plik specyfikacji zawiera Java elementu Spout lub piorun.It is used to specify the Java classpath if the spec file contains Java Spout or Bolt.

Różne funkcjeMiscellaneous Features

Deklaracja schematu danych wejściowych i wyjściowychInput and Output Schema Declaration

Użytkownicy mogą emitować krotki w procesach# języka C, a platforma musi serializować krotkę do postaci Byte [], przenieść do strony Java, a burza przeniesie Tę spójną krotkę do obiektów docelowych.Users can emit tuples in C# processes, the platform needs to serialize the tuple into byte[], transfer to Java side, and Storm will transfer this tuple to the targets. W przypadku składników podrzędnych procesy języka# C będą odbierać krotki z powrotem po stronie Java i konwertowane na oryginalne typy według platformy, a wszystkie te operacje są ukrywane na platformie.Meanwhile in downstream components, C# processes will receive tuples back from java side, and convert it to the original types by platform, all these operations are hidden by the Platform.

Aby zapewnić obsługę serializacji i deserializacji, kod użytkownika musi zadeklarować schemat danych wejściowych i wyjściowych.To support the serialization and deserialization, user code needs to declare the schema of the inputs and outputs.

Schemat strumienia danych wejściowych/wyjściowych jest zdefiniowany jako słownik.The input/output stream schema is defined as a dictionary. Klucz jest obecny streamid.The key is the StreamId. Wartość jest typami kolumn.The value is the Types of the columns. Składnik może mieć zadeklarowane wiele strumieni.The component can have multi-streams declared.

public class ComponentStreamSchema
{
    public Dictionary<string, List<Type>> InputStreamSchema { get; set; }
    public Dictionary<string, List<Type>> OutputStreamSchema { get; set; }
    public ComponentStreamSchema(Dictionary<string, List<Type>> input, Dictionary<string, List<Type>> output)
    {
        InputStreamSchema = input;
        OutputStreamSchema = output;
    }
}

W obiekcie kontekstu mamy dodany następujący interfejs API:In Context object, we have the following API added:

public void DeclareComponentSchema(ComponentStreamSchema schema)

Deweloperzy muszą upewnić się, że krotki emitowane zgodnie ze schematem zdefiniowanym dla tego strumienia, w przeciwnym razie system zgłosi wyjątek czasu wykonywania.Developers must ensure that the tuples emitted obey the schema defined for that stream, otherwise the system will throw a runtime exception.

Obsługa WIELOSTRUMIENIOWAMulti-Stream Support

Punkt połączenia usługi obsługuje kod użytkownika, aby emitować lub odbierać z wielu odrębnych strumieni jednocześnie.SCP supports user code to emit or receive from multiple distinct streams at the same time. Obsługa jest odzwierciedlana w obiekcie kontekstu, ponieważ metoda emisji przyjmuje opcjonalny parametr identyfikatora strumienia.The support reflects in the Context object as the Emit method takes an optional stream ID parameter.

Dodano dwie metody z obiektu kontekstu SCP.NET.Two methods in the SCP.NET Context object have been added. Są one używane do emitowania krotek lub krotek w celu określenia obecny streamid.They are used to emit Tuple or Tuples to specify StreamId. Obecny streamid jest ciągiem i musi być spójna zarówno w specyfikacji C# , jak i definicji topologii.The StreamId is a string and it needs to be consistent in both C# and the Topology Definition Spec.

    /* Emit tuple to the specific stream. */
    public abstract void Emit(string streamId, List<object> values);

    /* for non-transactional Spout only */
    public abstract void Emit(string streamId, List<object> values, long seqId);

Emitowanie do nieistniejącego strumienia powoduje wyjątki środowiska uruchomieniowego.The emitting to a non-existing stream causes runtime exceptions.

Grupowanie pólFields Grouping

Wbudowane grupowanie pól w obszarze burza nie działa prawidłowo w SCP.NET.The built-in Fields Grouping in Storm is not working properly in SCP.NET. Po stronie serwera proxy Java wszystkie typy danych pól są w rzeczywistości bajtowe [], a grupowanie pól używa kodu skrótu obiektu Byte [] do przeprowadzenia grupowania.On the Java Proxy side, all the fields data types are actually byte[], and the fields grouping uses the byte[] object hash code to perform the grouping. Kod skrótu obiektu Byte [] jest adresem tego obiektu w pamięci.The byte[] object hash code is the address of this object in memory. W związku z tym grupowanie będzie niewłaściwe dla dwubajtowych obiektów, które współużytkują tę samą zawartość, ale nie tego samego adresu.So the grouping will be wrong for two byte[] objects that share the same content but not the same address.

SCP.NET dodaje dostosowaną metodę grupowania i używa zawartości Byte [] do wykonania grupowania.SCP.NET adds a customized grouping method, and it uses the content of the byte[] to do the grouping. W pliku specyfikacji , składnia jest następująca:In SPEC file, the syntax is like:

(bolt-spec
    {
        "spout_test" (scp-field-group :non-tx [0,1])
    }
    …
)

EtapieHere,

  1. "SCP-Field-Group" oznacza "dostosowane grupowanie pól zaimplementowane przez punkt połączenia usługi"."scp-field-group" means "Customized field grouping implemented by SCP".
  2. ": TX" lub ": non-TX" oznacza, że jest to topologia transakcyjna.":tx" or ":non-tx" means if it’s transactional topology. Potrzebujemy tych informacji, ponieważ indeks początkowy jest inny w topologii TX a inne niż TX.We need this information since the starting index is different in tx vs. non-tx topologies.
  3. [0, 1] oznacza zestaw skrótów identyfikatorów pól, rozpoczynając od 0.[0,1] means a hash set of field Ids, starting from 0.

Topologia hybrydowaHybrid topology

Natywna burza jest zapisywana w języku Java.The native Storm is written in Java. Program SCP.NET został ulepszony, aby umożliwić# deweloperom języka c# pisanie kodu w języku c w celu obsługi logiki biznesowej.And SCP.NET has enhanced it to enable C# developers to write C# code to handle their business logic. Obsługuje również topologie hybrydowe, które zawierają nie tylko elementy Spout# C/pioruny, ale również Java elementu Spout/piorunów.But it also supports hybrid topologies, which contains not only C# spouts/bolts, but also Java Spout/Bolts.

Określ kod języka Java elementu Spout/piorun w pliku specyfikacjiSpecify Java Spout/Bolt in spec file

W pliku specyfikacji "SCP-elementu Spout" i "SCP-piorun" można również użyć do określenia języka Java elementy Spout i piorunów, poniżej przedstawiono przykład:In spec file, "scp-spout" and "scp-bolt" can also be used to specify Java Spouts and Bolts, here is an example:

(spout-spec 
  (microsoft.scp.example.HybridTopology.Generator.)           
  :p 1)

Oto microsoft.scp.example.HybridTopology.Generator nazwa klasy elementu Spout języka Java.Here microsoft.scp.example.HybridTopology.Generator is the name of the Java Spout class.

Określ ścieżkę klas Java w runSpec polecenieSpecify Java Classpath in runSpec Command

Jeśli chcesz przesłać topologię zawierającą elementy Spout Java lub piorunów, musisz najpierw skompilować środowisko Java elementy Spout lub pioruny i pobrać pliki jar.If you want to submit topology containing Java Spouts or Bolts, you need to first compile the Java Spouts or Bolts and get the Jar files. Następnie należy określić ścieżkę klasy Java, która zawiera pliki jar podczas przesyłania topologii.Then you should specify the java classpath that contains the Jar files when submitting topology. Oto przykład:Here is an example:

bin\runSpec.cmd examples\HybridTopology\HybridTopology.spec specs examples\HybridTopology\net\Target -cp examples\HybridTopology\java\target\*

Tutaj przedstawiono\przykłady\HybridTopology\obiektówdocelowych\ Java to folder zawierający plik JAR elementu Spout/piorun języka Java.Here examples\HybridTopology\java\target\ is the folder containing the Java Spout/Bolt Jar file.

Serializacja i deserializacja między językami Java i C#Serialization and Deserialization between Java and C#

Składnik SCP zawiera stronę Java i stronę# C.SCP component includes Java side and C# side. Aby można było korzystać z natywnych elementy spoutów języka Java, serializacji/deserializacji należy przeprowadzić między stroną Java a stroną C# , jak pokazano na poniższym wykresie.In order to interact with native Java Spouts/Bolts, Serialization/Deserialization must be carried out between Java side and C# side, as illustrated in the following graph.

Diagram składnika Java, który wysyła do składnika punktu SCP wysyłanie do składnika Java

  1. Serializacja po stronie Java i deserializacji na stronie# CSerialization in Java side and Deserialization in C# side

    Najpierw udostępniamy domyślną implementację serializacji w środowisku Java po stronie i deserializacji# .First we provide default implementation for serialization in Java side and deserialization in C# side. Metodę serializacji w stronie Java można określić w pliku specyfikacji:The serialization method in Java side can be specified in SPEC file:

    (scp-bolt
        {
            "plugin.name" "HybridTopology.exe"
            "plugin.args" ["displayer"]
            "output.schema" {}
            "customized.java.serializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONSerializer"]
        })
    

    Metoda deserializacji w stronie c# powinna być określona w kodzie użytkownika# c:The deserialization method in C# side should be specified in C# user code:

    Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
    inputSchema.Add("default", new List<Type>() { typeof(Person) });
    this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, null));
    this.ctx.DeclareCustomizedDeserializer(new CustomizedInteropJSONDeserializer());            
    

    Ta domyślna implementacja powinna obsługiwać większość przypadków, gdy typ danych nie jest zbyt skomplikowany.This default implementation should handle most cases provided the data type is not too complex. W niektórych przypadkach, ponieważ typ danych użytkownika jest zbyt skomplikowany lub wydajność implementacji domyślnej nie spełnia wymagań użytkownika, użytkownicy mogą dołączać własne implementacje.For certain cases, either because the user data type is too complex, or because the performance of our default implementation does not meet the user's requirement, users can plug-in their own implementation.

    Serializowany interfejs na stronie Java jest zdefiniowany jako:The serialize interface in java side is defined as:

    public interface ICustomizedInteropJavaSerializer {
        public void prepare(String[] args);
        public List<ByteBuffer> serialize(List<Object> objectList);
    }
    

    Interfejs deserializacji w stronie# C jest zdefiniowany jako:The deserialize interface in C# side is defined as:

    ICustomizedInteropCSharpDeserializer interfejsu publicznegopublic interface ICustomizedInteropCSharpDeserializer

    public interface ICustomizedInteropCSharpDeserializer
    {
        List<Object> Deserialize(List<byte[]> dataList, List<Type> targetTypes);
    }
    
  2. Serializacja po stronie# i deserializacji w języku C po stronie JavaSerialization in C# side and Deserialization in Java side

    Metoda serializacji w stronie c# powinna być określona w kodzie użytkownika# c:The serialization method in C# side should be specified in C# user code:

    this.ctx.DeclareCustomizedSerializer(new CustomizedInteropJSONSerializer()); 
    

    Metoda deserializacji po stronie Java powinna być określona w pliku specyfikacji:The Deserialization method in Java side should be specified in SPEC file:

    (SCP-elementu Spout(scp-spout

    {
      "plugin.name" "HybridTopology.exe"
      "plugin.args" ["generator"]
      "output.schema" {"default" ["person"]}
      "customized.java.deserializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" "microsoft.scp.example.HybridTopology.Person"]
    })
    

    W tym miejscu "Microsoft. scp. burz. CustomizedInteropJSONDeserializer" jest nazwą deserializacji, a "Microsoft. scp. example. HybridTopology. Person" jest klasą docelową, do której dane są deserializowane.Here "microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" is the name of Deserializer, and "microsoft.scp.example.HybridTopology.Person" is the target class the data is deserialized to.

    Użytkownik może również podłączyć własną implementację serializatora C# i deserializacji Java.User can also plug in their own implementation of C# serializer and Java Deserializer. Ten kod jest interfejsem dla serializatora języka C# :This code is the interface for C# serializer:

    public interface ICustomizedInteropCSharpSerializer
    {
        List<byte[]> Serialize(List<object> dataList);
    }
    

    Ten kod jest interfejsem dla deserializacji języka Java:This code is the interface for Java Deserializer:

    public interface ICustomizedInteropJavaDeserializer {
        public void prepare(String[] targetClassNames);
        public List<Object> Deserialize(List<ByteBuffer> dataList);
    }
    

Tryb hosta SCPSCP Host Mode

W tym trybie użytkownik może skompilować swoje kody do biblioteki DLL i użyć SCPHost. exe dostarczonej przez punkt połączenia usługi do przesyłania topologii.In this mode, user can compile their codes to DLL, and use SCPHost.exe provided by SCP to submit topology. Plik specyfikacji wygląda podobnie do tego kodu:The spec file looks like this code:

(scp-spout
  {
    "plugin.name" "SCPHost.exe"
    "plugin.args" ["HelloWorld.dll" "Scp.App.HelloWorld.Generator" "Get"]
    "output.schema" {"default" ["sentence"]}
  })

W tym miejscu SCPHost.exe jest określony zgodnie z zestawem SDK punktu połączenia usługi. plugin.nameHere, plugin.name is specified as SCPHost.exe provided by SCP SDK. SCPHost. exe akceptuje trzy parametry:SCPHost.exe accepts three parameters:

  1. Pierwszym z nich jest nazwa biblioteki DLL, która jest "HelloWorld.dll" w tym przykładzie.The first one is the DLL name, which is "HelloWorld.dll" in this example.
  2. Druga z nich jest nazwą klasy, która jest "Scp.App.HelloWorld.Generator" w tym przykładzie.The second one is the Class name, which is "Scp.App.HelloWorld.Generator" in this example.
  3. Trzecią jest nazwa publicznej metody statycznej, która może być wywoływana w celu uzyskania wystąpienia ISCPPlugin.The third one is the name of a public static method, which can be invoked to get an instance of ISCPPlugin.

W trybie hosta kod użytkownika jest kompilowany jako biblioteka DLL i wywoływany przez platformę SCP.In host mode, user code is compiled as DLL, and is invoked by SCP platform. Dlatego platforma SCP może uzyskać pełną kontrolę nad całą logiką przetwarzania.So SCP platform can get full control of the whole processing logic. Dlatego zalecamy naszym klientom przesyłanie topologii w trybie hosta usługi SCP, ponieważ może to uprościć środowisko programistyczne i zapewnić większą elastyczność i lepszą zgodność z poprzednimi wersjami.So we recommend our customers to submit topology in SCP host mode since it can simplify the development experience and bring us more flexibility and better backward compatibility for later release as well.

Przykłady programowania usługi SCPSCP Programming Examples

HelloWorldHelloWorld

HelloWorld to prosty przykład przedstawiający smak SCP.NET.HelloWorld is a simple example to show a taste of SCP.NET. Używa ona topologii nietransakcyjnej, z elementu Spout o nazwie Generatori dwóch piorunów o nazwie rozdzielacza i licznika.It uses a non-transactional topology, with a spout called generator, and two bolts called splitter and counter. Generator elementu Spout losowo generuje zdania i emituje te zdania do rozdzielacza.The spout generator randomly generates sentences, and emit these sentences to splitter. Rozdzielacza pioruna dzieli zdania na słowa i emituje te wyrazy do błyskawicy .The bolt splitter splits the sentences to words and emit these words to counter bolt. Piorun "licznik" używa słownika do rejestrowania numeru wystąpienia każdego wyrazu.The bolt "counter" uses a dictionary to record the occurrence number of each word.

W tym przykładzie istnieją dwa pliki specyfikacji HelloWorld. spec i _HelloWorld EnableAck. spec .There are two spec files, HelloWorld.spec and HelloWorld_EnableAck.spec for this example. W kodzie C# , można dowiedzieć się, czy ACK jest włączony, pobierając pluginConf ze strony Java.In the C# code, it can find out whether ack is enabled by getting the pluginConf from Java side.

/* demo how to get pluginConf info */
if (Context.Config.pluginConf.ContainsKey(Constants.NONTRANSACTIONAL_ENABLE_ACK))
{
    enableAck = (bool)(Context.Config.pluginConf[Constants.NONTRANSACTIONAL_ENABLE_ACK]);
}
Context.Logger.Info("enableAck: {0}", enableAck);

W elementu Spout, jeśli ACK jest włączona, słownik jest używany do buforowania krotek, które nie zostały potwierdzone.In the spout, if ack is enabled, a dictionary is used to cache the tuples that have not been acknowledged. Jeśli operacja zakończy się niepowodzeniem (), niepomyślna krotka zostanie powtórzona:If Fail() is called, the failed tuple is replayed:

public void Fail(long seqId, Dictionary<string, Object> parms)
{
    Context.Logger.Info("Fail, seqId: {0}", seqId);
    if (cachedTuples.ContainsKey(seqId))
    {
        /* get the cached tuple */
        string sentence = cachedTuples[seqId];

        /* replay the failed tuple */
        Context.Logger.Info("Re-Emit: {0}, seqId: {1}", sentence, seqId);
        this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new Values(sentence), seqId);
    }
    else
    {
        Context.Logger.Warn("Fail(), can't find cached tuple for seqId {0}!", seqId);
    }
}

HelloWorldTxHelloWorldTx

W przykładzie HelloWorldTx pokazano, jak wdrożyć topologię transakcyjną.The HelloWorldTx example demonstrates how to implement transactional topology. Ma jeden elementu Spout o nazwie generatora, obiekt wsadowy o nazwie częściowa liczbai obiekt zatwierdzający o nazwie Count-sum.It has one spout called generator, a batch bolt called partial-count, and a commit bolt called count-sum. Istnieją również trzy wstępnie utworzone pliki txt: DataSource0. txt, DataSource1. txti DataSource2. txt.There are also three pre-created txt files: DataSource0.txt, DataSource1.txt, and DataSource2.txt.

W każdej transakcji Generator elementu Spout losowo wybiera dwa pliki z wstępnie utworzonych trzech plików i emitują dwie nazwy plików do pioruna Count .In each transaction, the spout generator randomly selects two files from the pre-created three files, and emit the two file names to the partial-count bolt. Wartość częściowa "piorun-Count" Pobiera nazwę pliku z otrzymanej spójnej kolekcji, a następnie otwiera plik i zlicza liczbę słów w tym pliku, a następnie emituje numer wyrazu do błyskawicy licznika .The bolt partial-count gets the file name from the received tuple, then open the file and count the number of words in this file, and finally emit the word number to the count-sum bolt. Wartość licznika-suma jest podsumowanie łącznej liczby.The count-sum bolt summarizes the total count.

Aby osiągnąć dokładnie jednokrotne semantykę, Liczba piorunów zatwierdzeń musi ocenić, czy jest to transakcja odtwarzana.To achieve exactly once semantics, the commit bolt count-sum need to judge whether it is a replayed transaction. W tym przykładzie ma statyczną zmienną członkowską:In this example, it has a static member variable:

public static long lastCommittedTxId = -1; 

Po utworzeniu wystąpienia ISCPBatchBolt pobiera txAttempt parametry wejściowe z:When an ISCPBatchBolt instance is created, it gets the txAttempt from input parameters:

public static CountSum Get(Context ctx, Dictionary<string, Object> parms)
{
    /* for transactional topology, we can get txAttempt from the input parms */
    if (parms.ContainsKey(Constants.STORM_TX_ATTEMPT))
    {
        StormTxAttempt txAttempt = (StormTxAttempt)parms[Constants.STORM_TX_ATTEMPT];
        return new CountSum(ctx, txAttempt);
    }
    else
    {
        throw new Exception("null txAttempt");
    }
}

Gdy FinishBatch() jest wywoływana lastCommittedTxId , zostanie ona zaktualizowana, jeśli nie jest transakcją powtarzaną.When FinishBatch() is called, the lastCommittedTxId will be updated if it is not a replayed transaction.

public void FinishBatch(Dictionary<string, Object> parms)
{
    /* judge whether it is a replayed transaction? */
    bool replay = (this.txAttempt.TxId <= lastCommittedTxId);

    if (!replay)
    {
        /* If it is not replayed, update the totalCount and lastCommittedTxId value */
        totalCount = totalCount + this.count;
        lastCommittedTxId = this.txAttempt.TxId;
    }
    … …
}

HybridTopologyHybridTopology

Ta topologia zawiera elementu Spout Java i pioruna# C.This topology contains a Java Spout and a C# Bolt. Używa domyślnej serializacji i deserializacji implementacji dostarczonej przez platformę SCP.It uses the default serialization and deserialization implementation provided by SCP platform. Zobacz HybridTopology. spec in \przykłady HybridTopology folder dla szczegółowych informacji o pliku i SubmitTopology. bat , aby dowiedzieć się, jak określić ścieżkę klas Java.See the HybridTopology.spec in examples\HybridTopology folder for the spec file details, and SubmitTopology.bat for how to specify Java classpath.

SCPHostDemoSCPHostDemo

Ten przykład jest taki sam jak HelloWorld.This example is the same as HelloWorld in essence. Jedyną różnicą jest to, że kod użytkownika jest kompilowany jako biblioteka DLL, a topologia jest przesyłana przy użyciu SCPHost. exe.The only difference is that the user code is compiled as DLL and the topology is submitted by using SCPHost.exe. Aby uzyskać bardziej szczegółowy opis, zobacz sekcję "tryb hosta usługi SCP".See the section "SCP Host Mode" for more detailed explanation.

Następne krokiNext Steps

Przykłady topologii Apache Storm utworzonych za pomocą usługi SCP można znaleźć w następujących dokumentach:For examples of Apache Storm topologies created using SCP, see the following documents: