Průvodce programováním SCP pro Apache Storm ve službě Azure HDInsightSCP programming guide for Apache Storm in Azure HDInsight

SCP je platforma pro sestavování v reálném čase, spolehlivém, konzistentním a vysoce výkonném aplikaci pro zpracování dat.SCP is a platform to build real time, reliable, consistent, and high-performance data processing application. Je postavená na Apache Storm – systém zpracování datových proudů navržený komunitou OSS.It is built on top of Apache Storm -- a stream processing system designed by the OSS communities. Zaplavování je navržené pomocí Nathan Marz a otevřelo ho pomocí Twitteru.Storm is designed by Nathan Marz and was open sourced by Twitter. Využívá Apache Zookeeper, jiný projekt Apache, který umožňuje vysoce spolehlivou distribuovanou koordinaci a správu stavu.It leverages Apache ZooKeeper, another Apache project to enable highly reliable distributed coordination and state management.

V systému Windows není pouze port spojovacího bodu služby, ale také projekt přidal rozšíření a přizpůsobení ekosystému systému Windows.Not only the SCP project ported Storm on Windows but also the project added extensions and customization for the Windows ecosystem. Mezi tato rozšíření patří prostředí pro vývojáře .NET a knihovny, přizpůsobení zahrnuje nasazení založené na systému Windows.The extensions include .NET developer experience, and libraries, the customization includes Windows-based deployment.

Rozšíření a přizpůsobení se provádí takovým způsobem, že nepotřebujete rozvětvit projekty OSS a můžeme využít odvozené ekosystémy, které jsou postaveny nad sebou.The extension and customization is done in such a way that we do not need to fork the OSS projects and we could leverage derived ecosystems built on top of Storm.

Model zpracováníProcessing model

Data ve spojovacím bodu služby jsou modelována jako kontinuální streamování řazených kolekcí členů.The data in SCP is modeled as continuous streams of tuples. Obvykle se řazené kolekce členů napřed do některé fronty a následně vybírají a transformují se pomocí obchodní logiky hostované v rámci topologie s více podsítěmi, takže výstup může být rozdělený jako řazené kolekce členů do jiného systému SCP nebo je možné ho považovat za úložiště, jako je distribuovaný systém souborů nebo databáze. Podobně jako SQL Server.Typically the tuples flow into some queue first, then picked up, and transformed by business logic hosted inside a Storm topology, finally the output could be piped as tuples to another SCP system, or be committed to stores like distributed file system or databases like SQL Server.

Diagram dat s dořazením do fronty pro zpracování, který informační kanály ukládá do úložiště dat

V sestavení je topologie aplikace definována grafem výpočtu.In Storm, an application topology defines a graph of computation. Každý uzel v topologii obsahuje logiku zpracování a odkazy mezi uzly označují tok dat.Each node in a topology contains processing logic, and links between nodes indicate data flow. Uzly pro vložení vstupních dat do topologie se nazývají spoutů, které lze použít k sesekvencování dat.The nodes to inject input data into the topology are called spouts, which can be used to sequence the data. Vstupní data se můžou nacházet v protokolech souborů, transakční databázi, čítači výkonu systému atd. Uzly, které mají vstupní i výstupní datové toky, se nazývají šrouby, což znamená skutečné filtrování a výběr dat a agregace.The input data could reside in file logs, transactional database, system performance counter etc. The nodes with both input and output data flows are called bolts, which do the actual data filtering and selections and aggregation.

Spojovací bod služby podporuje nejlepší úsilí, nejméně jednou a zpracování dat právě jednou.SCP supports best efforts, at-least-once and exactly-once data processing. V aplikaci pro zpracování distribuovaného streamování může během zpracování dat dojít k různým chybám, jako je výpadek sítě, selhání počítače nebo chyba uživatelského kódu atd. Při nesprávném zpracování se všechna data zpracují aspoň jednou přehráním, když se automaticky přehrají stejná data, když dojde k chybě.In a distributed streaming processing application, various errors may happen during data processing, such as network outage, machine failure, or user code error etc. At-least-once processing ensures all data will be processed at least once by replaying automatically the same data when error happens. Nejméně jeden proces zpracování je jednoduchý a spolehlivý a dobře vyhovuje spoustě aplikací.At-least-once processing is simple and reliable and suits well many applications. Nicméně pokud aplikace vyžaduje přesné počítání, nestačí zpracování nejméně jednou, protože stejná data by mohla být v topologii aplikace potenciálně přehrána.However, when an application requires exact counting, at-least-once processing is insufficient since the same data could potentially be played in the application topology. V takovém případě je zpracování právě jednou navrženo tak, aby bylo zajištěno, že výsledek je správný i v případě, že data mohou být znovu přehrána a zpracována vícekrát.In that case, exactly-once processing is designed to make sure the result is correct even when the data may be replayed and processed multiple times.

Spojovací bod služby umožňuje vývojářům rozhraní .NET vyvíjet aplikace zpracovávající data v reálném čase při využití prostředí Java Virtual Machine (JVM) se zachováním na základě vazeb.SCP enables .NET developers to develop real time data process applications while leveraging on Java Virtual Machine (JVM) with Storm under the covers. Rozhraní .NET a JVM komunikují prostřednictvím místních soketů TCP.The .NET and JVM communicate via TCP local sockets. V podstatě každý Spout/šroub je dvojicí procesů .NET/Java, kde je logika uživatele spouštěna v procesu .NET jako modul plug-in.Basically each Spout/Bolt is a .NET/Java process pair, where the user logic runs in .NET process as a plugin.

Pokud chcete vytvořit aplikaci pro zpracování dat v rámci spojovacího bodu služby, je potřeba provést několik kroků:To build a data processing application on top of SCP, several steps are needed:

  • Navrhněte a implementujte rozhraní Spoutů, které se bude načítat do dat z fronty.Design and implement the Spouts to pull in data from queue.
  • Navrhněte a implementujte šrouby pro zpracování vstupních dat a uložte data do externích úložišť, jako je třeba databáze.Design and implement Bolts to process the input data, and save data to external stores such as a Database.
  • Navrhněte topologii a pak tuto topologii odešlete a spusťte.Design the topology, then submit and run the topology. Topologie definuje vrcholy a datové toky mezi vrcholy.The topology defines vertexes and the data flows between the vertexes. Spojovací bod služby převezme specifikaci topologie a nasadí ji do clusteru s více podmnožinami, kde každý vrchol běží na jednom logickém uzlu.SCP will take the topology specification and deploy it on a Storm cluster, where each vertex runs on one logical node. Převzetí služeb při selhání a škálování bude mít na starosti Plánovač úloh.The failover and scaling will be taken care of by the Storm task scheduler.

Tento dokument používá několik jednoduchých příkladů, které vám pomůžou při sestavování aplikace pro zpracování dat pomocí spojovacího bodu služby.This document uses some simple examples to walk through how to build data processing application with SCP.

Rozhraní Plug-in spojovacího bodu službySCP Plugin Interface

Moduly plug-in SCP (nebo aplikace) jsou samostatné exe, které je možné spustit uvnitř sady Visual Studio během vývojové fáze a připojit se k kanálu po nasazení v produkčním prostředí.SCP plugins (or applications) are standalone EXEs that can both run inside Visual Studio during the development phase, and be plugged into the Storm pipeline after deployment in production. Zápis modulu plug-in SCP je stejný jako při psaní jakýchkoli jiných standardních konzolových aplikací pro Windows.Writing the SCP plugin is just the same as writing any other standard Windows console applications. Platforma SCP.NET deklaruje některé rozhraní pro Spout/šroub a kód modulu plug-in uživatele by měl implementovat tato rozhraní.SCP.NET platform declares some interface for spout/bolt, and the user plugin code should implement these interfaces. Hlavním účelem tohoto návrhu je, že se uživatel může soustředit na své vlastní obchodní logiky a nechat jiné věci, které mají být zpracovány platformou SCP.NET.The main purpose of this design is that the user can focus on their own business logics, and leaving other things to be handled by SCP.NET platform.

Kód modulu plug-in uživatele by měl implementovat jedno z následujících rozhraní, záleží na tom, jestli je topologie transakční, nebo netransakční, a jestli je součástí Spout nebo šroub.The user plugin code should implement one of the followings interfaces, depends on whether the topology is transactional or non-transactional, and whether the component is a spout or bolt.

  • ISCPSpoutISCPSpout
  • ISCPBoltISCPBolt
  • ISCPTxSpoutISCPTxSpout
  • ISCPBatchBoltISCPBatchBolt

ISCPPluginISCPPlugin

ISCPPlugin je společné rozhraní pro všechny druhy modulů plug-in.ISCPPlugin is the common interface for all kinds of plugins. V současné době se jedná o fiktivní rozhraní.Currently, it is a dummy interface.

public interface ISCPPlugin 
{
}

ISCPSpoutISCPSpout

ISCPSpout je rozhraní pro netransakční Spout.ISCPSpout is the interface for non-transactional spout.

 public interface ISCPSpout : ISCPPlugin                    
 {
     void NextTuple(Dictionary<string, Object> parms);         
     void Ack(long seqId, Dictionary<string, Object> parms);   
     void Fail(long seqId, Dictionary<string, Object> parms);  
 }

Při NextTuple() volání metody může uživatelský kód# C vygenerovat jednu nebo více řazených kolekcí členů.When NextTuple() is called, the C# user code can emit one or more tuples. Pokud není nic k vygenerování, tato metoda by měla vracet bez nutnosti vysílat nic.If there is nothing to emit, this method should return without emitting anything. Je nutné poznamenat, NextTuple()že Ack(), a Fail() jsou volány v těsné smyčce v jednom vlákně v procesu# jazyka C.It should be noted that NextTuple(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. Pokud neexistují žádné řazené kolekce členů k vygenerování, je Courteous NextTuple režimu spánku po krátkou dobu (například 10 milisekund), aby nedocházelo k příliš velkému množství PROCESORů.When there are no tuples to emit, it is courteous to have NextTuple sleep for a short amount of time (such as 10 milliseconds) so as not to waste too much CPU.

Ack()a Fail() jsou volány pouze v případě, že je v souboru spec povolen mechanismus ACK.Ack() and Fail() are called only when ack mechanism is enabled in spec file. seqId Slouží k identifikaci řazené kolekce členů, která je potvrzena nebo se nezdařila.The seqId is used to identify the tuple that is acknowledged or failed. Takže pokud je možnost ACK povolena v jiné než transakční topologii, měla by být v Spout použita následující funkce Emit:So if ack is enabled in non-transactional topology, the following emit function should be used in Spout:

public abstract void Emit(string streamId, List<object> values, long seqId); 

Pokud se ACK nepodporuje v jiné než transakční topologii, Ack() může být a funkce a Fail() může být ponechána jako prázdná.If ack is not supported in non-transactional topology, the Ack() and Fail() can be left as empty function.

parms Vstupní parametr v těchto funkcích je prázdný slovník, který je vyhrazen pro budoucí použití.The parms input parameter in these functions is an empty Dictionary, it is reserved for future use.

ISCPBoltISCPBolt

ISCPBolt je rozhraní pro netransakčního šroubu.ISCPBolt is the interface for non-transactional bolt.

public interface ISCPBolt : ISCPPlugin 
{
void Execute(SCPTuple tuple);           
}

Je-li k dispozici nová Execute() řazená kolekce členů, je volána funkce pro její zpracování.When new tuple is available, the Execute() function is called to process it.

ISCPTxSpoutISCPTxSpout

ISCPTxSpout je rozhraní transakčního Spout.ISCPTxSpout is the interface for transactional spout.

public interface ISCPTxSpout : ISCPPlugin
{
    void NextTx(out long seqId, Dictionary<string, Object> parms);  
    void Ack(long seqId, Dictionary<string, Object> parms);         
    void Fail(long seqId, Dictionary<string, Object> parms);        
}

Stejně jako NextTx()jejich netransakční součást,, Ack() Fail() a jsou všechny volány v těsné smyčce v jednom vlákně v procesu C# .Just like their non-transactional counter-part, NextTx(), Ack(), and Fail() are all called in a tight loop in a single thread in C# process. Pokud neexistují žádná data, která by se dala vysílat NextTx , je Courteous, aby se v režimu spánku po krátkou dobu (10 milisekund), takže neodpade příliš mnoho CPU.When there are no data to emit, it is courteous to have NextTx sleep for a short amount of time (10 milliseconds) so as not to waste too much CPU.

NextTx()je volána pro spuštění nové transakce, parametr seqId out slouží k identifikaci transakce, která je také použita v Ack() a Fail().NextTx() is called to start a new transaction, the out parameter seqId is used to identify the transaction, which is also used in Ack() and Fail(). V NextTx()nástroji může uživatel vysílat data na stranu Java.In NextTx(), user can emit data to Java side. Data se ukládají do ZooKeeper pro podporu opětovného přehrání.The data is stored in ZooKeeper to support replay. Vzhledem k tomu, že kapacita ZooKeeper je omezená, by měl uživatel generovat jenom metadata, nikoli Hromadná data v transakčních Spout.Because the capacity of ZooKeeper is limited, user should only emit metadata, not bulk data in transactional spout.

Zaplavení znovu spustí transakci automaticky, pokud selže, takže Fail() by neměl být volán v normálním případě.Storm will replay a transaction automatically if it fails, so Fail() should not be called in normal case. Pokud spojovací bod služby může ale kontrolovat metadata generovaná transakčním Spout, může zavolat Fail() , když metadata nejsou platná.But if SCP can check the metadata emitted by transactional spout, it can call Fail() when the metadata is invalid.

parms Vstupní parametr v těchto funkcích je prázdný slovník, který je vyhrazen pro budoucí použití.The parms input parameter in these functions is an empty Dictionary, it is reserved for future use.

ISCPBatchBoltISCPBatchBolt

ISCPBatchBolt je rozhraní pro transakční šroub.ISCPBatchBolt is the interface for transactional bolt.

public interface ISCPBatchBolt : ISCPPlugin           
{
    void Execute(SCPTuple tuple);
    void FinishBatch(Dictionary<string, Object> parms);  
}

Execute()se volá, když se dorazí na šroub nové řazené kolekce členů.Execute() is called when there is new tuple arriving at the bolt. FinishBatch()se volá, když se tato transakce ukončí.FinishBatch() is called when this transaction is ended. parms Vstupní parametr je vyhrazen pro budoucí použití.The parms input parameter is reserved for future use.

V případě transakční topologie existuje důležitý koncept – StormTxAttempt.For transactional topology, there is an important concept – StormTxAttempt. Má dvě pole TxId a AttemptId.It has two fields, TxId and AttemptId. TxIdslouží k identifikaci konkrétní transakce a pro danou transakci, může dojít k několika pokusům o více pokusů, pokud transakce selhává a je znovu přehrána.TxId is used to identify a specific transaction, and for a given transaction, there may be multiple attempts if the transaction fails and is replayed. SCP.NET vytvoří nový objekt ISCPBatchBolt ke zpracování každého StormTxAttempt, stejně jako v jazyce Java.SCP.NET creates a new ISCPBatchBolt object to process each StormTxAttempt, just like what Storm does in Java. Účelem tohoto návrhu je podpora zpracování paralelních transakcí.The purpose of this design is to support parallel transactions processing. Uživatel by měl mít na paměti, že pokud je pokus o transakci dokončený, je odpovídající objekt ISCPBatchBolt zničený a je uvolněný z paměti.User should keep it in mind that if transaction attempt is finished, the corresponding ISCPBatchBolt object is destroyed and garbage collected.

Objektový modelObject Model

SCP.NET také poskytuje jednoduchou sadu klíčových objektů, pomocí kterých můžou vývojáři programovat.SCP.NET also provides a simple set of key objects for developers to program with. Jsou to kontextové, úložiště stavu SMPa SCPRuntime.They are Context, StateStore, and SCPRuntime. Jsou popsány v části zbytek této části.They are discussed in the rest part of this section.

KontextContext

Kontext nabízí běžící prostředí pro aplikaci.Context provides a running environment to the application. Každá instance ISCPPlugin (ISCPSpout/ISCPBolt/ISCPTxSpout/ISCPBatchBolt) má odpovídající kontextovou instanci.Each ISCPPlugin instance (ISCPSpout/ISCPBolt/ISCPTxSpout/ISCPBatchBolt) has a corresponding Context instance. Funkčnost poskytovaná kontextem může být rozdělena do dvou částí: (1) statická část, která je k dispozici v celém procesu# jazyka C, (2) dynamické součásti, která je k dispozici pouze pro konkrétní instanci kontextu.The functionality provided by Context can be divided into two parts: (1) the static part, which is available in the whole C# process, (2) the dynamic part, which is only available for the specific Context instance.

Statická částStatic Part

public static ILogger Logger = null;
public static SCPPluginType pluginType;                      
public static Config Config { get; set; }                    
public static TopologyContext TopologyContext { get; set; }  

Loggerje k dispozici pro účely protokolování.Logger is provided for log purpose.

pluginTypeslouží k označení typu modulu plug-in v procesu# jazyka C.pluginType is used to indicate the plugin type of the C# process. Pokud je proces# jazyka C spuštěn v místním testovacím režimu (bez Java), je SCP_NET_LOCALtyp modulu plug-in.If the C# process is run in local test mode (without Java), the plugin type is SCP_NET_LOCAL.

public enum SCPPluginType 
{
    SCP_NET_LOCAL = 0,       
    SCP_NET_SPOUT = 1,       
    SCP_NET_BOLT = 2,        
    SCP_NET_TX_SPOUT = 3,   
    SCP_NET_BATCH_BOLT = 4  
}

Configje k dispozici pro získání parametrů konfigurace z boku Java.Config is provided to get configuration parameters from Java side. Parametry jsou předány ze strany Java při inicializaci# modulu plug-in jazyka C.The parameters are passed from Java side when C# plugin is initialized. Parametry jsou rozděleny do dvou částí: stormConf a pluginConf. ConfigThe Config parameters are divided into two parts: stormConf and pluginConf.

public Dictionary<string, Object> stormConf { get; set; }  
public Dictionary<string, Object> pluginConf { get; set; }  

stormConfjsou parametry definované pomocí parametru, pluginConf který je definován pomocí spojovacího bodu služby.stormConf is parameters defined by Storm and pluginConf is the parameters defined by SCP. Příklad:For example:

public class Constants
{
    … …

    // constant string for pluginConf
    public static readonly String NONTRANSACTIONAL_ENABLE_ACK = "nontransactional.ack.enabled";  

    // constant string for stormConf
    public static readonly String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";           
    public static readonly String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";                 
}

TopologyContextje k dispozici pro získání kontextu topologie, je nejužitečnější pro komponenty s více paralelismu.TopologyContext is provided to get the topology context, it is most useful for components with multiple parallelism. Zde naleznete příklad:Here is an example:

//demo how to get TopologyContext info
if (Context.pluginType != SCPPluginType.SCP_NET_LOCAL)                      
{
    Context.Logger.Info("TopologyContext info:");
    TopologyContext topologyContext = Context.TopologyContext;                    
    Context.Logger.Info("taskId: {0}", topologyContext.GetThisTaskId());          
    taskIndex = topologyContext.GetThisTaskIndex();
    Context.Logger.Info("taskIndex: {0}", taskIndex);
    string componentId = topologyContext.GetThisComponentId();                    
    Context.Logger.Info("componentId: {0}", componentId);
    List<int> componentTasks = topologyContext.GetComponentTasks(componentId);  
    Context.Logger.Info("taskNum: {0}", componentTasks.Count);                    
}

Dynamická částDynamic Part

Následující rozhraní jsou relevantní pro určitou instanci kontextu.The following interfaces are pertinent to a certain Context instance. Instance kontextu je vytvořena platformou SCP.NET a předána do uživatelského kódu:The Context instance is created by SCP.NET platform and passed to the user code:

// Declare the Output and Input Stream Schemas

public void DeclareComponentSchema(ComponentStreamSchema schema);   

// Emit tuple to default stream.
public abstract void Emit(List<object> values);                   

// Emit tuple to the specific stream.
public abstract void Emit(string streamId, List<object> values);  

V případě netransakčního spoutu, který podporuje potvrzení, je k dispozici Tato metoda:For non-transactional spout supporting ack, the following method is provided:

// for non-transactional Spout which supports ack
public abstract void Emit(string streamId, List<object> values, long seqId);  

U netransakčního šroubu podporujícího potvrzení by se Ack() mělo Fail() explicitně nebo řazená kolekce členů přijmout.For non-transactional bolt supporting ack, it should explicitly Ack() or Fail() the tuple it received. A při generování nové řazené kolekce členů musí také určovat kotvy nové řazené kolekce členů.And when emitting new tuple, it must also specify the anchors of the new tuple. K dispozici jsou následující metody.The following methods are provided.

public abstract void Emit(string streamId, IEnumerable<SCPTuple> anchors, List<object> values); 
public abstract void Ack(SCPTuple tuple);
public abstract void Fail(SCPTuple tuple);

Úložiště stavu SMPStateStore

StateStoreposkytuje služby metadat, generování sekvencí monotónní a koordinaci bez čekání.StateStore provides metadata services, monotonic sequence generation, and wait-free coordination. Distribuované abstrakce souběžnosti vyšší úrovně můžou být postavené na StateStore, včetně distribuovaných zámků, distribuovaných front, bariér a transakčních služeb.Higher-level distributed concurrency abstractions can be built on StateStore, including distributed locks, distributed queues, barriers, and transaction services.

Aplikace spojovacího bodu služby State mohou použít objekt k uchování některých informací v Apache Zookeeper, zejména pro transakční topologii.SCP applications may use the State object to persist some information in Apache ZooKeeper, especially for transactional topology. V případě, že dojde k selhání transakčního Spout a restartování, může načíst potřebné informace z ZooKeeper a restartovat kanál.Doing so, if transactional spout crashes and restart, it can retrieve the necessary information from ZooKeeper and restart the pipeline.

StateStore Objekt hlavně má tyto metody:The StateStore object mainly has these methods:

/// <summary>
/// Static method to retrieve a state store of the given path and connStr 
/// </summary>
/// <param name="storePath">StateStore Path</param>
/// <param name="connStr">StateStore Address</param>
/// <returns>Instance of StateStore</returns>
public static StateStore Get(string storePath, string connStr);

/// <summary>
/// Create a new state object in this state store instance
/// </summary>
/// <returns>State from StateStore</returns>
public State Create();

/// <summary>
/// Retrieve all states that were previously uncommitted, excluding all aborted states 
/// </summary>
/// <returns>Uncommitted States</returns>
public IEnumerable<State> GetUnCommitted();

/// <summary>
/// Get all the States in the StateStore
/// </summary>
/// <returns>All the States</returns>
public IEnumerable<State> States();

/// <summary>
/// Get state or registry object
/// </summary>
/// <param name="info">Registry Name(Registry only)</param>
/// <typeparam name="T">Type, Registry or State</typeparam>
/// <returns>Return Registry or State</returns>
public T Get<T>(string info = null);

/// <summary>
/// List all the committed states
/// </summary>
/// <returns>Registries contain the Committed State </returns> 
public IEnumerable<Registry> Committed();

/// <summary>
/// List all the Aborted State in the StateStore
/// </summary>
/// <returns>Registries contain the Aborted State</returns>
public IEnumerable<Registry> Aborted();

/// <summary>
/// Retrieve an existing state object from this state store instance 
/// </summary>
/// <returns>State from StateStore</returns>
/// <typeparam name="T">stateId, id of the State</typeparam>
public State GetState(long stateId)

State Objekt hlavně má tyto metody:The State object mainly has these methods:

/// <summary>
/// Set the status of the state object to commit 
/// </summary>
public void Commit(bool simpleMode = true); 

/// <summary>
/// Set the status of the state object to abort 
/// </summary>
public void Abort();

/// <summary>
/// Put an attribute value under the give key 
/// </summary>
/// <param name="key">Key</param> 
/// <param name="attribute">State Attribute</param> 
public void PutAttribute<T>(string key, T attribute); 

/// <summary>
/// Get the attribute value associated with the given key 
/// </summary>
/// <param name="key">Key</param> 
/// <returns>State Attribute</returns>               
public T GetAttribute<T>(string key);                    

Commit() Pro metodu, pokud je simpleMode nastaveno na hodnotu true, odstraní odpovídající ZNode v Zookeeper.For the Commit() method, when simpleMode is set to true, it deletes the corresponding ZNode in ZooKeeper. V opačném případě odstraní aktuální ZNode a do svěřené_cesty přidá nový uzel.Otherwise, it deletes the current ZNode, and adding a new node in the COMMITTED_PATH.

SCPRuntimeSCPRuntime

SCPRuntime poskytuje následující dvě metody:SCPRuntime provides the following two methods:

public static void Initialize();

public static void LaunchPlugin(newSCPPlugin createDelegate);  

Initialize()slouží k inicializaci běhového prostředí spojovacího bodu služby.Initialize() is used to initialize the SCP runtime environment. V této metodě se proces C# připojí k části Java a získá parametry konfigurace a kontext topologie.In this method, the C# process connects to the Java side, and gets configuration parameters and topology context.

LaunchPlugin()slouží k ukončení smyčky zpracování zpráv.LaunchPlugin() is used to kick off the message processing loop. V této smyčce modul plug-# in jazyka C přijímá zprávy ve formě na straně Java (včetně řazených kolekcí členů a řídicích signálů) a pak zpracovává zprávy, třeba volání metody rozhraní, kterou poskytuje uživatelský kód.In this loop, the C# plugin receives messages form Java side (including tuples and control signals), and then process the messages, perhaps calling the interface method provide by the user code. Vstupní parametr metody LaunchPlugin() je delegát, který může vracet objekt, který implementuje rozhraní ISCPSpout/IScpBolt/ISCPTxSpout/ISCPBatchBolt.The input parameter for method LaunchPlugin() is a delegate that can return an object that implement ISCPSpout/IScpBolt/ISCPTxSpout/ISCPBatchBolt interface.

public delegate ISCPPlugin newSCPPlugin(Context ctx, Dictionary\<string, Object\> parms); 

Pro ISCPBatchBolt můžeme získat StormTxAttempt z parmsa použít ho k posouzení, zda se jedná o povedený pokus o přehrání.For ISCPBatchBolt, we can get StormTxAttempt from parms, and use it to judge whether it is a replayed attempt. Pokus o opakované přehrání se často provádí na poli potvrzení a je znázorněno v HelloWorldTx příkladu.The check for a replay attempt is often done at the commit bolt, and it is demonstrated in the HelloWorldTx example.

Moduly plug-in SCP můžou být běžně spuštěné ve dvou režimech:Generally speaking, the SCP plugins may run in two modes here:

  1. Místní testovací režim: V tomto režimu se moduly plug-in SCP (# kód uživatele jazyka C) spouštějí v rámci sady Visual Studio během fáze vývoje.Local Test Mode: In this mode, the SCP plugins (the C# user code) run inside Visual Studio during the development phase. LocalContextdá se použít v tomto režimu, který poskytuje metodu pro serializaci vygenerovaných řazených kolekcí členů do místních souborů a jejich čtení zpátky do paměti.LocalContext can be used in this mode, which provides method to serialize the emitted tuples to local files, and read them back to memory.

     public interface ILocalContext
     {
         List\<SCPTuple\> RecvFromMsgQueue();
         void WriteMsgQueueToFile(string filepath, bool append = false);  
         void ReadFromFileToMsgQueue(string filepath);                    
     }
    
  2. Normální režim: V tomto režimu se moduly plug-in SCP spustí procesem zaplavení Java.Regular Mode: In this mode, the SCP plugins are launched by storm java process.

    Tady je příklad spuštění modulu plug-in SCP:Here is an example of launching SCP plugin:

     namespace Scp.App.HelloWorld
     {
     public class Generator : ISCPSpout
     {
         … …
         public static Generator Get(Context ctx, Dictionary<string, Object> parms)
         {
         return new Generator(ctx);
         }
     }
    
     class HelloWorld
     {
         static void Main(string[] args)
         {
         /* Setting the environment variable here can change the log file name */
         System.Environment.SetEnvironmentVariable("microsoft.scp.logPrefix", "HelloWorld");
    
         SCPRuntime.Initialize();
         SCPRuntime.LaunchPlugin(new newSCPPlugin(Generator.Get));
         }
     }
     }
    

Jazyk specifikace topologieTopology Specification Language

Specifikace topologie spojovacího bodu služby je jazyk specifický pro doménu, který popisuje a konfiguruje topologie SCP.SCP Topology Specification is a domain-specific language for describing and configuring SCP topologies. Vychází z Clojure DSL (https://storm.incubator.apache.org/documentation/Clojure-DSL.html) a je rozšířeno pomocí spojovacího bodu služby.It is based on Storm’s Clojure DSL (https://storm.incubator.apache.org/documentation/Clojure-DSL.html) and is extended by SCP.

Specifikace topologie je možné odeslat přímo do clusteru nečinnosti, aby je bylo možné spustit pomocí příkazu runspec .Topology specifications can be submitted directly to storm cluster for execution via the runspec command.

SCP.NET přidal následující funkce pro definování transakčních topologií:SCP.NET has added the following functions to define Transactional Topologies:

Nové funkceNew Functions ParametryParameters PopisDescription
TX – topolopytx-topolopy název topologietopology-name
Spout – mapaspout-map
Mapa šroubůbolt-map
Definice transakční topologie s názvem topologie,  mapou definice spoutů a mapou definice šroubyDefine a transactional topology with the topology name,  spouts definition map and the bolts definition map
scp-tx-spoutscp-tx-spout Exec – názevexec-name
argsargs
polefields
Definujte transakční Spout.Define a transactional spout. Spustí aplikaci s názvem Exec-Name pomocí argumentů.It runs the application with exec-name using args.

Pole jsou výstupní pole pro SpoutThe fields is the Output Fields for spout
SCP – TX-Batch-šroubscp-tx-batch-bolt Exec – názevexec-name
argsargs
polefields
Definujte transakčního dávkovacího šroubu.Define a transactional Batch Bolt. Spustí aplikaci s názvem Exec-Name pomocí argumentů.It runs the application with exec-name using args.

Pole jsou výstupní pole pro šroub.The Fields is the Output Fields for bolt.
scp-tx-commit-boltscp-tx-commit-bolt Exec – názevexec-name
argsargs
polefields
Definujte hodnotu transakčního potvrzení.Define a transactional commit bolt. Spustí aplikaci s názvem Exec-Name pomocí argumentů.It runs the application with exec-name using args.

Pole jsou výstupní pole pro šroubThe fields is the Output Fields for bolt
nontx-topolopynontx-topolopy název topologietopology-name
Spout – mapaspout-map
Mapa šroubůbolt-map
Definice netransakční topologie s názvem topologie,  mapou definice spoutů a mapou definice šroubyDefine a nontransactional topology with the topology name,  spouts definition map and the bolts definition map
scp-spoutscp-spout Exec – názevexec-name
argsargs
polefields
parametersparameters
Definujte netransakční Spout.Define a nontransactional spout. Spustí aplikaci s názvem Exec-Name pomocí argumentů.It runs the application with exec-name using args.

Pole jsou výstupní pole pro SpoutThe fields is the Output Fields for spout

Parametry jsou volitelné a používají je k určení některých parametrů, například "netransakční transakce. ACK. Enabled".The parameters are optional, using it to specify some parameters such as "nontransactional.ack.enabled".
scp-boltscp-bolt Exec – názevexec-name
argsargs
polefields
parametersparameters
Definování netransakčního šroubuDefine a nontransactional Bolt. Spustí aplikaci s názvem Exec-Name pomocí argumentů.It runs the application with exec-name using args.

Pole jsou výstupní pole pro šroubThe fields is the Output Fields for bolt

Parametry jsou volitelné a používají je k určení některých parametrů, například "netransakční transakce. ACK. Enabled".The parameters are optional, using it to specify some parameters such as "nontransactional.ack.enabled".

SCP.NET má definovaná následující klíčová slova:SCP.NET has the following keywords defined:

klíčová slovaKeywords PopisDescription
: název:name Zadejte název topologie.Define the Topology Name
: topologie:topology Definování topologie pomocí předchozích funkcí a sestavení v nich.Define the Topology using the previous functions and build in ones.
:p:p Definujte pomocný parametr paralelismus pro každý Spout nebo šroub.Define the parallelism hint for each spout or bolt.
: config:config Definovat parametr konfigurace nebo aktualizovat existujícíDefine configure parameter or update the existing ones
:schema:schema Definujte schéma streamu.Define the Schema of Stream.

A často používané parametry:And frequently used parameters:

ParametrParameter PopisDescription
"plugin.name""plugin.name" název souboru exe C# modulu plug-inexe file name of the C# plugin
"plugin.args""plugin.args" argumenty modulu plug-inplugin args
"output.schema""output.schema" Výstupní schémaOutput schema
"netransakční. ACK. Enabled""nontransactional.ack.enabled" Zda je povoleno potvrzení pro netransakční topologiiWhether ack is enabled for nontransactional topology

Příkaz runspec je nasazený společně s bity, použití je například:The runspec command is deployed together with the bits, the usage is like:

.\bin\runSpec.cmd
usage: runSpec [spec-file target-dir [resource-dir] [-cp classpath]]
ex: runSpec examples\HelloWorld\HelloWorld.spec specs examples\HelloWorld\Target

Parametr Resource-dir je nepovinný, je nutné ho zadat, pokud chcete připojit aplikaci jazyka C# a tento adresář obsahuje aplikaci, závislosti a konfigurace.The resource-dir parameter is optional, you need to specify it when you want to plug a C# application, and this directory contains the application, the dependencies, and configurations.

Parametr classpath je také volitelný.The classpath parameter is also optional. Používá se k určení cesty třídy Java, pokud soubor specifikace obsahuje Java Spout nebo šroub.It is used to specify the Java classpath if the spec file contains Java Spout or Bolt.

Různé funkceMiscellaneous Features

Vstupní a výstupní deklarace schématuInput and Output Schema Declaration

Uživatelé mohou nasílat řazené# kolekce členů v procesech jazyka C, platforma musí serializovat řazenou kolekci členů na Byte [], přenést na stranu Java a tato řazená kolekce členů převede do cílů.Users can emit tuples in C# processes, the platform needs to serialize the tuple into byte[], transfer to Java side, and Storm will transfer this tuple to the targets. # Procesy v rámci navazujících součástí dostanou řazené kolekce členů zpátky ze strany Java a převádějí je na původní typy podle platformy. všechny tyto operace jsou pro platformu skryté.Meanwhile in downstream components, C# processes will receive tuples back from java side, and convert it to the original types by platform, all these operations are hidden by the Platform.

Aby bylo možné podporovat serializaci a deserializaci, musí kód uživatele deklarovat schéma vstupů a výstupů.To support the serialization and deserialization, user code needs to declare the schema of the inputs and outputs.

Schéma vstupního/výstupního datového proudu je definováno jako slovník.The input/output stream schema is defined as a dictionary. Klíčem je StreamId.The key is the StreamId. Hodnota je typy sloupců.The value is the Types of the columns. Komponenta může mít deklarované více datových proudů.The component can have multi-streams declared.

public class ComponentStreamSchema
{
    public Dictionary<string, List<Type>> InputStreamSchema { get; set; }
    public Dictionary<string, List<Type>> OutputStreamSchema { get; set; }
    public ComponentStreamSchema(Dictionary<string, List<Type>> input, Dictionary<string, List<Type>> output)
    {
        InputStreamSchema = input;
        OutputStreamSchema = output;
    }
}

V objektu Context máme přidané toto rozhraní API:In Context object, we have the following API added:

public void DeclareComponentSchema(ComponentStreamSchema schema)

Vývojáři musí zajistit, aby se řazené kolekce členů řídily podle schématu definovaného pro tento datový proud, jinak systém vyvolá výjimku za běhu.Developers must ensure that the tuples emitted obey the schema defined for that stream, otherwise the system will throw a runtime exception.

Podpora více proudůMulti-Stream Support

Spojovací bod služby podporuje uživatelský kód pro vygenerování nebo příjem z více různých datových proudů současně.SCP supports user code to emit or receive from multiple distinct streams at the same time. Podpora odráží kontextový objekt, protože metoda Emit přebírá volitelný parametr ID streamu.The support reflects in the Context object as the Emit method takes an optional stream ID parameter.

Byly přidány dvě metody v objektu kontextu SCP.NET.Two methods in the SCP.NET Context object have been added. Slouží k vygenerování řazené kolekce členů nebo řazené kolekce členů k určení StreamId.They are used to emit Tuple or Tuples to specify StreamId. StreamId je řetězec, který musí být konzistentní jak v jazyce C# , tak ve specifikaci definice topologie.The StreamId is a string and it needs to be consistent in both C# and the Topology Definition Spec.

    /* Emit tuple to the specific stream. */
    public abstract void Emit(string streamId, List<object> values);

    /* for non-transactional Spout only */
    public abstract void Emit(string streamId, List<object> values, long seqId);

Vygenerování do neexistujícího datového proudu způsobí výjimky za běhu.The emitting to a non-existing stream causes runtime exceptions.

Seskupení políFields Grouping

Vestavěná pole seskupování v rámci přetvoření nefungují správně v SCP.NET.The built-in Fields Grouping in Storm is not working properly in SCP.NET. Na straně proxy Java jsou všechna pole datových typů skutečně Byte [] a seskupení polí používá k provedení seskupení kód hodnoty hash objektu Byte [].On the Java Proxy side, all the fields data types are actually byte[], and the fields grouping uses the byte[] object hash code to perform the grouping. Bajtový kód hodnoty hash objektu je adresa tohoto objektu v paměti.The byte[] object hash code is the address of this object in memory. Takže seskupení bude chybné pro dva bajty objektů, které sdílejí stejný obsah, ale ne stejnou adresu.So the grouping will be wrong for two byte[] objects that share the same content but not the same address.

SCP.NET přidá přizpůsobenou metodu seskupení a pomocí obsahu Byte [] provede seskupení.SCP.NET adds a customized grouping method, and it uses the content of the byte[] to do the grouping. V souboru spec je syntaxe stejná jako:In SPEC file, the syntax is like:

(bolt-spec
    {
        "spout_test" (scp-field-group :non-tx [0,1])
    }
    …
)

TadyHere,

  1. "SCP-Field-Group" znamená "přizpůsobené seskupení polí implementované spojovacím bod služby"."scp-field-group" means "Customized field grouping implemented by SCP".
  2. ": TX" nebo ": non-TX" znamená, že je transakční topologie.":tx" or ":non-tx" means if it’s transactional topology. Tyto informace potřebujeme, protože počáteční index se liší v topologiích TX vs. non-TX.We need this information since the starting index is different in tx vs. non-tx topologies.
  3. [0, 1] znamená sadu hodnot hash ID polí od 0.[0,1] means a hash set of field Ids, starting from 0.

Hybridní topologieHybrid topology

Nativní přepisování je napsané v jazyce Java.The native Storm is written in Java. A SCP.NET je rozšířila, aby umožnila vývojářům v# jazyce c# psát kód jazyka c pro zpracování obchodní logiky.And SCP.NET has enhanced it to enable C# developers to write C# code to handle their business logic. Podporuje ale také hybridní topologie, které neobsahují jenom C# spoutů/šrouby, ale také Java Spout/šrouby.But it also supports hybrid topologies, which contains not only C# spouts/bolts, but also Java Spout/Bolts.

Zadat Java Spout/šroub v souboru specSpecify Java Spout/Bolt in spec file

V souboru spec se dá také použít spojovací bod služby (SCP-Spout) a spojovací bod služby (SCP-šroub) k určení Java Spoutů a šrouby, tady je příklad:In spec file, "scp-spout" and "scp-bolt" can also be used to specify Java Spouts and Bolts, here is an example:

(spout-spec 
  (microsoft.scp.example.HybridTopology.Generator.)           
  :p 1)

Tady microsoft.scp.example.HybridTopology.Generator je název třídy Java Spout.Here microsoft.scp.example.HybridTopology.Generator is the name of the Java Spout class.

Určení cesty tříd Java v příkazu runSpecSpecify Java Classpath in runSpec Command

Pokud chcete odeslat topologii obsahující Java Spoutů nebo šrouby, musíte nejdřív zkompilovat Java Spoutů nebo šrouby a získat soubory jar.If you want to submit topology containing Java Spouts or Bolts, you need to first compile the Java Spouts or Bolts and get the Jar files. Pak byste měli zadat cestu třídy Java, která obsahuje soubory jar při odesílání topologie.Then you should specify the java classpath that contains the Jar files when submitting topology. Zde naleznete příklad:Here is an example:

bin\runSpec.cmd examples\HybridTopology\HybridTopology.spec specs examples\HybridTopology\net\Target -cp examples\HybridTopology\java\target\*

Tady jsou\příklady\HybridTopologyJava\target\ je složka obsahující soubor JAR Spout/šroubu Java.Here examples\HybridTopology\java\target\ is the folder containing the Java Spout/Bolt Jar file.

Serializace a deserializace mezi Java a C#Serialization and Deserialization between Java and C#

Komponenta SCP zahrnuje stranu Java a stranu# C.SCP component includes Java side and C# side. Aby bylo možné pracovat s nativním Java spoutů/šrouby, je třeba provést serializaci nebo deserializaci mezi stranou Java a# stranou C, jak je znázorněno v následujícím grafu.In order to interact with native Java Spouts/Bolts, Serialization/Deserialization must be carried out between Java side and C# side, as illustrated in the following graph.

Diagram součástí, které se odesílají do komponenty SCP odeslání do komponenty jazyka Java

  1. Serializace na straně jazyka Java a deserializace# v oblasti CSerialization in Java side and Deserialization in C# side

    Nejprve poskytujeme výchozí implementaci serializace na straně Java a deserializaci v oblasti# C.First we provide default implementation for serialization in Java side and deserialization in C# side. Metodu serializace na straně Java lze zadat v souboru specifikace:The serialization method in Java side can be specified in SPEC file:

    (scp-bolt
        {
            "plugin.name" "HybridTopology.exe"
            "plugin.args" ["displayer"]
            "output.schema" {}
            "customized.java.serializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONSerializer"]
        })
    

    Metoda deserializace na straně c# by měla být specifikována v# uživatelském kódu jazyka c:The deserialization method in C# side should be specified in C# user code:

    Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
    inputSchema.Add("default", new List<Type>() { typeof(Person) });
    this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, null));
    this.ctx.DeclareCustomizedDeserializer(new CustomizedInteropJSONDeserializer());            
    

    Tato výchozí implementace by měla zpracovat většinu případů za předpokladu, že datový typ není příliš složitý.This default implementation should handle most cases provided the data type is not too complex. V některých případech buď vzhledem k tomu, že datový typ uživatele je příliš složitý, nebo protože výkon naší výchozí implementace nesplňuje požadavky uživatele, uživatelé můžou vlastní implementaci připojit.For certain cases, either because the user data type is too complex, or because the performance of our default implementation does not meet the user's requirement, users can plug-in their own implementation.

    Rozhraní serializace na straně Java je definováno jako:The serialize interface in java side is defined as:

    public interface ICustomizedInteropJavaSerializer {
        public void prepare(String[] args);
        public List<ByteBuffer> serialize(List<Object> objectList);
    }
    

    Deserializovat rozhraní na straně C# je definováno jako:The deserialize interface in C# side is defined as:

    veřejné rozhraní ICustomizedInteropCSharpDeserializerpublic interface ICustomizedInteropCSharpDeserializer

    public interface ICustomizedInteropCSharpDeserializer
    {
        List<Object> Deserialize(List<byte[]> dataList, List<Type> targetTypes);
    }
    
  2. Serializace na# straně C a deserializace v jazyce JavaSerialization in C# side and Deserialization in Java side

    Metoda serializace na straně# c by měla být specifikována# v uživatelském kódu jazyka c:The serialization method in C# side should be specified in C# user code:

    this.ctx.DeclareCustomizedSerializer(new CustomizedInteropJSONSerializer()); 
    

    Metoda deserializace na straně Java by měla být specifikována v souboru SPEC:The Deserialization method in Java side should be specified in SPEC file:

    (SCP – Spout(scp-spout

    {
      "plugin.name" "HybridTopology.exe"
      "plugin.args" ["generator"]
      "output.schema" {"default" ["person"]}
      "customized.java.deserializer" ["microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" "microsoft.scp.example.HybridTopology.Person"]
    })
    

    Tady je "Microsoft. SCP. CustomizedInteropJSONDeserializer. getlang." je název deserializátoru a "Microsoft. SCP. example. HybridTopology. Person" je cílová třída, na kterou se data deserializovat.Here "microsoft.scp.storm.multilang.CustomizedInteropJSONDeserializer" is the name of Deserializer, and "microsoft.scp.example.HybridTopology.Person" is the target class the data is deserialized to.

    Uživatel může také připojit svou vlastní implementaci serializátoru jazyka# C a deserializaci Java.User can also plug in their own implementation of C# serializer and Java Deserializer. Tento kód je rozhraní serializátoru jazyka# C:This code is the interface for C# serializer:

    public interface ICustomizedInteropCSharpSerializer
    {
        List<byte[]> Serialize(List<object> dataList);
    }
    

    Tento kód je rozhraní pro deserializaci jazyka Java:This code is the interface for Java Deserializer:

    public interface ICustomizedInteropJavaDeserializer {
        public void prepare(String[] targetClassNames);
        public List<Object> Deserialize(List<ByteBuffer> dataList);
    }
    

SCP Host ModeSCP Host Mode

V tomto režimu může uživatel zkompilovat své kódy do knihovny DLL a pomocí SCPHost. exe, který poskytuje spojovací bod služby (SCP) k odeslání topologie.In this mode, user can compile their codes to DLL, and use SCPHost.exe provided by SCP to submit topology. Soubor specifikace vypadá jako tento kód:The spec file looks like this code:

(scp-spout
  {
    "plugin.name" "SCPHost.exe"
    "plugin.args" ["HelloWorld.dll" "Scp.App.HelloWorld.Generator" "Get"]
    "output.schema" {"default" ["sentence"]}
  })

Tady je plugin.name zadaný, jak SCPHost.exe poskytuje sada SCP SDK.Here, plugin.name is specified as SCPHost.exe provided by SCP SDK. SCPHost. exe přijímá tři parametry:SCPHost.exe accepts three parameters:

  1. První z nich je název knihovny DLL, který je "HelloWorld.dll" v tomto příkladu.The first one is the DLL name, which is "HelloWorld.dll" in this example.
  2. Druhá je název třídy, který je "Scp.App.HelloWorld.Generator" v tomto příkladu.The second one is the Class name, which is "Scp.App.HelloWorld.Generator" in this example.
  3. Třetí z nich je název veřejné statické metody, kterou lze vyvolat pro získání instance ISCPPlugin.The third one is the name of a public static method, which can be invoked to get an instance of ISCPPlugin.

V režimu hostitele je kód uživatele kompilován jako knihovna DLL a je vyvoláno platformou SCP.In host mode, user code is compiled as DLL, and is invoked by SCP platform. Proto platforma SCP může získat úplnou kontrolu nad celou logikou zpracování.So SCP platform can get full control of the whole processing logic. Proto doporučujeme, aby naši zákazníci mohli odeslat topologii v hostitelském režimu spojovacího bodu služby, protože můžou zjednodušit vývojové prostředí a zvýšit flexibilitu a lepší zpětnou kompatibilitu pro pozdější vydání.So we recommend our customers to submit topology in SCP host mode since it can simplify the development experience and bring us more flexibility and better backward compatibility for later release as well.

Příklady programování SCPSCP Programming Examples

HellHelloWorld

HelloWorld je jednoduchý příklad pro zobrazení vkusu SCP.NET.HelloWorld is a simple example to show a taste of SCP.NET. Používá netransakční topologii s Spout nazvaný generátora má dvě šrouby označované jako rozdělovač a čítač.It uses a non-transactional topology, with a spout called generator, and two bolts called splitter and counter. Generátor Spout náhodně generuje věty a vygeneruje tyto věty do rozdělovače.The spout generator randomly generates sentences, and emit these sentences to splitter. Rozdělovač šroub rozdělí věty na slova a vygeneruje tato slova na čítač .The bolt splitter splits the sentences to words and emit these words to counter bolt. Hodnota šroubu používá slovník k záznamu počtu výskytů každého slova.The bolt "counter" uses a dictionary to record the occurrence number of each word.

V tomto příkladu jsou dva soubory specifikace: HelloWorld. spec a _HelloWorld EnableAck. spec .There are two spec files, HelloWorld.spec and HelloWorld_EnableAck.spec for this example. V kódu jazyka# C může zjistit, zda je povoleno potvrzení, získáním pluginConf ze strany Java.In the C# code, it can find out whether ack is enabled by getting the pluginConf from Java side.

/* demo how to get pluginConf info */
if (Context.Config.pluginConf.ContainsKey(Constants.NONTRANSACTIONAL_ENABLE_ACK))
{
    enableAck = (bool)(Context.Config.pluginConf[Constants.NONTRANSACTIONAL_ENABLE_ACK]);
}
Context.Logger.Info("enableAck: {0}", enableAck);

Pokud je v Spout povolená možnost ACK, použije se slovník k ukládání řazených kolekcí členů, které nebyly potvrzeny.In the spout, if ack is enabled, a dictionary is used to cache the tuples that have not been acknowledged. Pokud je volána metoda Fail (), dojde k neúspěšnému přehrání řazené kolekce členů:If Fail() is called, the failed tuple is replayed:

public void Fail(long seqId, Dictionary<string, Object> parms)
{
    Context.Logger.Info("Fail, seqId: {0}", seqId);
    if (cachedTuples.ContainsKey(seqId))
    {
        /* get the cached tuple */
        string sentence = cachedTuples[seqId];

        /* replay the failed tuple */
        Context.Logger.Info("Re-Emit: {0}, seqId: {1}", sentence, seqId);
        this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new Values(sentence), seqId);
    }
    else
    {
        Context.Logger.Warn("Fail(), can't find cached tuple for seqId {0}!", seqId);
    }
}

HelloWorldTxHelloWorldTx

Příklad HelloWorldTx ukazuje, jak implementovat transakční topologii.The HelloWorldTx example demonstrates how to implement transactional topology. Má jeden Spout nazvaný generátor, dávkovou hodnotu s názvem částečný početa potvrzovací šroub s názvem Count-suma.It has one spout called generator, a batch bolt called partial-count, and a commit bolt called count-sum. K dispozici jsou také tři předem vytvořené soubory TXT: DataSource0. txt, DataSource1. txta DataSource2. txt.There are also three pre-created txt files: DataSource0.txt, DataSource1.txt, and DataSource2.txt.

V každé transakci generátor Spout náhodně vybere dva soubory z předem vytvořených tří souborů a vygeneruje tyto dva názvy souborů do šroubů s částečným počtem .In each transaction, the spout generator randomly selects two files from the pre-created three files, and emit the two file names to the partial-count bolt. Částečný počet šroubů Získá název souboru z přijaté řazené kolekce členů, pak otevře soubor a spočítá počet slov v tomto souboru a nakonec vygeneruje slovo číslo do pole Count-suma .The bolt partial-count gets the file name from the received tuple, then open the file and count the number of words in this file, and finally emit the word number to the count-sum bolt. Hodnota čítače Count-suma shrnuje celkový počet.The count-sum bolt summarizes the total count.

Chcete-li dosáhnout přesně jedné sémantiky, je nutné, aby součet hodnot šroubů byl posuzovat, zda se jedná o přehrajte transakci.To achieve exactly once semantics, the commit bolt count-sum need to judge whether it is a replayed transaction. V tomto příkladu má statickou členskou proměnnou:In this example, it has a static member variable:

public static long lastCommittedTxId = -1; 

Když je vytvořena instance ISCPBatchBolt, získá txAttempt ze vstupních parametrů:When an ISCPBatchBolt instance is created, it gets the txAttempt from input parameters:

public static CountSum Get(Context ctx, Dictionary<string, Object> parms)
{
    /* for transactional topology, we can get txAttempt from the input parms */
    if (parms.ContainsKey(Constants.STORM_TX_ATTEMPT))
    {
        StormTxAttempt txAttempt = (StormTxAttempt)parms[Constants.STORM_TX_ATTEMPT];
        return new CountSum(ctx, txAttempt);
    }
    else
    {
        throw new Exception("null txAttempt");
    }
}

Když FinishBatch() je volána lastCommittedTxId , bude aktualizována, pokud se nejedná o převedenou transakci.When FinishBatch() is called, the lastCommittedTxId will be updated if it is not a replayed transaction.

public void FinishBatch(Dictionary<string, Object> parms)
{
    /* judge whether it is a replayed transaction? */
    bool replay = (this.txAttempt.TxId <= lastCommittedTxId);

    if (!replay)
    {
        /* If it is not replayed, update the totalCount and lastCommittedTxId value */
        totalCount = totalCount + this.count;
        lastCommittedTxId = this.txAttempt.TxId;
    }
    … …
}

HybridTopologyHybridTopology

Tato topologie obsahuje Java Spout a šroub v jazyce# C.This topology contains a Java Spout and a C# Bolt. Používá výchozí serializaci a deserializaci, která je poskytována platformou SCP.It uses the default serialization and deserialization implementation provided by SCP platform. V tématu HybridTopology. spec v \příkladech HybridTopology složky najdete podrobnosti souboru specifikace a SubmitTopology. bat , jak zadat cestu třídy jazyka Java.See the HybridTopology.spec in examples\HybridTopology folder for the spec file details, and SubmitTopology.bat for how to specify Java classpath.

SCPHostDemoSCPHostDemo

Tento příklad je stejný jako HelloWorld v podstatě.This example is the same as HelloWorld in essence. Jediným rozdílem je, že kód uživatele je kompilován jako knihovna DLL a topologie je odeslána pomocí SCPHost. exe.The only difference is that the user code is compiled as DLL and the topology is submitted by using SCPHost.exe. Podrobnější vysvětlení najdete v části "režim hostitele spojovacího bodu služby".See the section "SCP Host Mode" for more detailed explanation.

Další krokyNext Steps

Příklady topologií Apache Storm vytvořených pomocí spojovacího bodu služby (SCP) najdete v následujících dokumentech:For examples of Apache Storm topologies created using SCP, see the following documents: