Vytvoření topologie Apache Storm v jazyce JavaCreate an Apache Storm topology in Java

Naučte se vytvořit topologii založenou na jazyce Java pro Apache Storm.Learn how to create a Java-based topology for Apache Storm. Tady vytvoříte topologii, která implementuje aplikaci počtu slov.Here, you create a Storm topology that implements a word-count application. Použijte Apache Maven k sestavení a zabalení projektu.You use Apache Maven to build and package the project. Pak se naučíte, jak definovat topologii pomocí rozhraní Apache Stormch toků .Then, you learn how to define the topology using the Apache Storm Flux framework.

Po dokončení kroků v tomto dokumentu můžete tuto topologii nasadit do Apache Storm ve službě HDInsight.After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

Poznámka

V tomto dokumentu jsou k dispozici kompletní verze ukázek topologie, které https://github.com/Azure-Samples/hdinsight-java-storm-wordcountjsou vytvořeny v tomto dokumentu.A completed version of the Storm topology examples created in this document is available at https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

PožadavkyPrerequisites

Testovací prostředíTest environment

Prostředí použité pro tento článek bylo počítač se systémem Windows 10.The environment used for this article was a computer running Windows 10. Příkazy byly provedeny v příkazovém řádku a různé soubory byly upraveny pomocí poznámkového bloku.The commands were executed in a command prompt, and the various files were edited with Notepad.

Z příkazového řádku zadejte níže uvedené příkazy pro vytvoření funkčního prostředí:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI
cd C:\HDI

Vytvoření projektu MavenCreate a Maven project

Zadejte následující příkaz a vytvořte tak projekt Maven s názvem WORDCOUNT:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

Tento příkaz vytvoří adresář s názvem WordCount v aktuálním umístění, který obsahuje základní projekt Maven.This command creates a directory named WordCount at the current location, which contains a basic Maven project. Druhý příkaz změní stávající pracovní adresář na WordCount.The second command changes the present working directory to WordCount. Třetí příkaz vytvoří nový adresář, resourceskterý bude později použit.The third command creates a new directory, resources, which will be used later. WordCount Adresář obsahuje následující položky:The WordCount directory contains the following items:

  • pom.xml: Obsahuje nastavení pro projekt Maven.pom.xml: Contains settings for the Maven project.
  • src\main\java\com\microsoft\example: Obsahuje kód vaší aplikace.src\main\java\com\microsoft\example: Contains your application code.
  • src\test\java\com\microsoft\example: Obsahuje testy pro vaši aplikaci.src\test\java\com\microsoft\example: Contains tests for your application.

Odebrat generovaný ukázkový kódRemove the generated example code

Odstraňte vygenerované soubory AppTest.javatestů a aplikace a App.java zadáním následujících příkazů:Delete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Přidat úložiště MavenAdd Maven repositories

HDInsight je založený na HDP (Hortonworks data Platform), proto doporučujeme použít úložiště Hortonworks ke stažení závislostí pro vaše projekty Apache Storm.HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

Otevřete pom.xml zadáním následujícího příkazu:Open pom.xml by entering the command below:

notepad pom.xml

Pak přidejte následující XML za <url> https://maven.apache.org</url> řádek:Then add the following XML after the <url>https://maven.apache.org</url> line:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Přidat vlastnostiAdd properties

Maven umožňuje definovat hodnoty na úrovni projektu s názvem Properties (vlastnosti).Maven allows you to define project-level values called properties. Do pom.xmlpřidejte následující text </repositories> za řádek:In pom.xml, add the following text after the </repositories> line:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

Tuto hodnotu teď můžete použít v dalších oddílech pom.xml.You can now use this value in other sections of the pom.xml. Například při určení verze nenáročné komponenty můžete použít ${storm.version} místo pevného kódování hodnoty.For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

Přidat závislostiAdd dependencies

Přidejte závislost pro součásti pro zaplavení.Add a dependency for Storm components. V pom.xmlpřidejte do <dependencies> oddílu následující text:In pom.xml, add the following text in the <dependencies> section:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

V době kompilace Maven používá tyto informace k vyhledání storm-core v úložišti Maven.At compile time, Maven uses this information to look up storm-core in the Maven repository. Nejprve se podíváme na úložiště v místním počítači.It first looks in the repository on your local computer. Pokud tam soubory nejsou, Maven je stáhne z veřejného úložiště Maven a uloží je do místního úložiště.If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

Poznámka

Všimněte si <scope>provided</scope> , že řádek v této části.Notice the <scope>provided</scope> line in this section. Toto nastavení určuje, že Maven vyloučí ze všech vytvořených souborů JAR, aby nedošlo k tomu, že je k dispozici v systému.This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

Konfigurace sestaveníBuild configuration

Moduly plug-in Maven umožňují přizpůsobit fáze sestavení projektu.Maven plug-ins allow you to customize the build stages of the project. Například způsob kompilace projektu nebo jeho zabalení do souboru JAR.For example, how the project is compiled or how to package it into a JAR file. Do pom.xmlpřidejte následující text přímo </project> nad řádek.In pom.xml, add the following text directly above the </project> line.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Tato část slouží k přidání modulů plug-in, prostředků a dalších možností konfigurace sestavení.This section is used to add plug-ins, resources, and other build configuration options. Úplný odkaz na pom.xml soubor naleznete v tématu https://maven.apache.org/pom.html.For a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

Přidat moduly plug-inAdd plug-ins

  • Modul plug-in Maven execExec Maven Plugin

    U Apache Storm topologií implementovaných v jazyce Java je modul plug-in exec Maven užitečný, protože umožňuje snadno spustit topologii místně ve vašem vývojovém prostředí.For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. Do <plugins> částipom.xml souboru přidejte následující obsah, aby zahrnoval modul plug-in exec Maven:Add the following to the <plugins> section of the pom.xml file to include the Exec Maven plugin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
            <goals>
                <goal>exec</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads> 
        </configuration>
    </plugin>
    
  • Modul plug-in kompilátoru Apache MavenApache Maven Compiler Plugin

    Dalším užitečným modulem plug-in je modul plug-in pro Apache Maven, který se používá ke změně možností kompilace.Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. Změňte verzi Java, kterou Maven používá pro zdroj a cíl vaší aplikace.Change the Java version that Maven uses for the source and target for your application.

    • Pro HDInsight __3,4 nebo starší__nastavte zdrojovou a cílovou verzi Java na 1,7.For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

    • V případě HDInsight __3,5__nastavte zdrojovou a cílovou verzi Java na 1,8.For HDInsight 3.5, set the source and target Java version to 1.8.

      Do <plugins> částipom.xml souboru přidejte následující text, který bude zahrnovat modul plug-in Maven pro Apache.Add the following text in the <plugins> section of the pom.xml file to include the Apache Maven Compiler plugin. Tento příklad určuje 1,8, takže cílová verze HDInsight je 3,5.This example specifies 1.8, so the target HDInsight version is 3.5.

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
        <source>1.8</source>
        <target>1.8</target>
        </configuration>
      </plugin>
      

Konfigurace prostředkůConfigure resources

V části Resources (prostředky) můžete zahrnout nekódové zdroje, jako jsou třeba konfigurační soubory, které jsou potřeba součástmi v topologii.The resources section allows you to include non-code resources such as configuration files needed by components in the topology. V tomto příkladu přidejte následující text do <resources> části pom.xml souboru.For this example, add the following text in the <resources> section of the pom.xml file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
        <include>log4j2.xml</include>
    </includes>
</resource>

Tento příklad přidá adresář prostředků do kořenového adresáře projektu (${basedir}) jako umístění, které obsahuje prostředky a obsahuje soubor s názvem. log4j2.xmlThis example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. Tento soubor slouží ke konfiguraci informací, které jsou protokolovány pomocí topologie.This file is used to configure what information is logged by the topology.

Vytvoření topologieCreate the topology

Apache Storm topologie založené na jazyce Java se skládá ze tří komponent, které musíte vytvořit (nebo referenční) jako závislost.A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

  • Spoutů: Čte data z externích zdrojů a vysílá proudy dat do topologie.Spouts: Reads data from external sources and emits streams of data into the topology.

  • Šrouby: Provádí zpracování streamů emitovaných spoutů nebo jiným šrouby a vysílá jeden nebo více datových proudů.Bolts: Performs processing on streams emitted by spouts or other bolts, and emits one or more streams.

  • Topologie: Definuje, jak jsou uspořádány spoutů a šrouby, a poskytuje vstupní bod pro topologii.Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

Vytvoření SpoutCreate the spout

Aby se snížily požadavky na nastavení externích zdrojů dat, následující Spout jednoduše vygeneruje náhodné věty.To reduce requirements for setting up external data sources, the following spout simply emits random sentences. Jedná se o upravenou verzi Spout, která je k dispozici v příkladech pro zaplavování -Starter.It is a modified version of a spout that is provided with the Storm-Starter examples. I když tato topologie používá jenom jeden Spout, ostatní můžou mít několik datových kanálů z různých zdrojů do topologie.Although this topology uses only one spout, others may have several that feed data from different sources into the topology.

Zadáním následujícího příkazu vytvořte a otevřete nový soubor RandomSentenceSpout.java:Enter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Pak zkopírujte a vložte kód Java níže do nového souboru.Then copy and paste the java code below into the new file. Pak soubor zavřete.Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Poznámka

Příklad Spout, který čte z externího zdroje dat, naleznete v jednom z následujících příkladů:For an example of a spout that reads from an external data source, see one of the following examples:

Vytvořit šroubyCreate the bolts

Šrouby zpracuje zpracování dat.Bolts handle the data processing. Šrouby může dělat cokoli, například výpočet, trvalost nebo vzmluvit s externími součástmi.Bolts can do anything, for example, computation, persistence, or talking to external components. Tato topologie používá dvě šrouby:This topology uses two bolts:

  • SplitSentence: Rozdělí věty vypouštěné RandomSentenceSpoutmi do jednotlivých slov.SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

  • WordCount: Spočítá počet výskytů jednotlivých slov.WordCount: Counts how many times each word has occurred.

SplitSentenceSplitSentence

Zadáním následujícího příkazu vytvořte a otevřete nový soubor SplitSentence.java:Enter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Pak zkopírujte a vložte kód Java níže do nového souboru.Then copy and paste the java code below into the new file. Pak soubor zavřete.Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCountWordCount

Zadáním následujícího příkazu vytvořte a otevřete nový soubor WordCount.java:Enter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

Pak zkopírujte a vložte kód Java níže do nového souboru.Then copy and paste the java code below into the new file. Pak soubor zavřete.Then close the file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Definování topologieDefine the topology

Topologie spojuje spoutů a šrouby dohromady do grafu, který definuje způsob toku dat mezi komponentami.The topology ties the spouts and bolts together into a graph, which defines how data flows between the components. Poskytuje také pomocný parametr paralelismus, který využívá při vytváření instancí součástí v clusteru.It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

Následující obrázek je základní diagram grafu komponent pro tuto topologii.The following image is a basic diagram of the graph of components for this topology.

Diagram znázorňující uspořádání spoutů a šrouby

Chcete-li implementovat topologii, zadejte následující příkaz pro vytvoření a otevření nového souboru WordCountTopology.java:To implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Pak zkopírujte a vložte kód Java níže do nového souboru.Then copy and paste the java code below into the new file. Pak soubor zavřete.Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

Konfigurovat protokolováníConfigure logging

K protokolování informací používá protokol Apache log4j 2 .Storm uses Apache Log4j 2 to log information. Pokud protokolování nenakonfigurujete, vygeneruje tato topologie diagnostické informace.If you do not configure logging, the topology emits diagnostic information. Chcete-li řídit, co je zaznamenáno, log4j2.xml vytvořte soubor resources s názvem v adresáři zadáním následujícího příkazu:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

Potom zkopírujte a vložte text XML níže do nového souboru.Then copy and paste the XML text below into the new file. Pak soubor zavřete.Then close the file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
    <Console name="STDOUT" target="SYSTEM_OUT">
        <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
</Appenders>
<Loggers>
    <Logger name="com.microsoft.example" level="trace" additivity="false">
        <AppenderRef ref="STDOUT"/>
    </Logger>
    <Root level="error">
        <Appender-Ref ref="STDOUT"/>
    </Root>
</Loggers>
</Configuration>

Tento kód XML nakonfiguruje nový protokolovací nástroj com.microsoft.example pro třídu, která obsahuje komponenty v této ukázkové topologii.This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. Úroveň je nastavená na trasování pro tento protokolovací nástroj, který zachycuje všechny informace o protokolování vydávané součástmi v této topologii.The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

V <Root level="error"> části se konfiguruje kořenová úroveň protokolování (vše není v com.microsoft.example), aby se zaprotokoloval jenom informace o chybě.The <Root level="error"> section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

Další informace o konfiguraci protokolování pro log4j 2 najdete v tématu https://logging.apache.org/log4j/2.x/manual/configuration.html.For more information on configuring logging for Log4j 2, see https://logging.apache.org/log4j/2.x/manual/configuration.html.

Poznámka

Ne0.10.0 verze a vyšší použití log4j 2. x.Storm version 0.10.0 and higher use Log4j 2.x. Starší verze systému log4j používaly 1. x, které používaly jiný formát pro konfiguraci protokolu.Older versions of storm used Log4j 1.x, which used a different format for log configuration. Informace o starší konfiguraci najdete v tématu https://wiki.apache.org/logging-log4j/Log4jXmlFormat.For information on the older configuration, see https://wiki.apache.org/logging-log4j/Log4jXmlFormat.

Místní otestování topologieTest the topology locally

Po uložení souborů použijte následující příkaz k otestování topologie místně.After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Při spuštění topologie zobrazí informace o spuštění.As it runs, the topology displays startup information. Následující text je příkladem výstupu počtu slov:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

Tento ukázkový protokol indikuje, že slovo a bylo vygenerováno 113 krát.This example log indicates that the word 'and' has been emitted 113 times. Počet pokračuje v provozu, dokud se topologie spustí, protože Spout průběžně generuje stejné věty.The count continues to go up as long as the topology runs because the spout continuously emits the same sentences.

Mezi emisemi slov a počtů je interval 5 sekund.There is a 5-second interval between emission of words and counts. Komponenta WORDCOUNT je nakonfigurována tak, aby vygenerovala pouze informace, když dorazí do řazené kolekce členů.The WordCount component is configured to only emit information when a tick tuple arrives. Vyžádá, aby se řazené kolekce členů prodávaly jenom každých pět sekund.It requests that tick tuples are only delivered every five seconds.

Převod topologie na tokConvert the topology to Flux

Tok je nová architektura dostupná s využitím 0.10.0 a vyšší, která umožňuje oddělit konfiguraci od implementace.Flux is a new framework available with Storm 0.10.0 and higher, which allows you to separate configuration from implementation. Vaše komponenty jsou pořád definované v jazyce Java, ale topologie je definovaná pomocí souboru YAML.Your components are still defined in Java, but the topology is defined using a YAML file. Můžete zabalit výchozí definici topologie s vaším projektem nebo při odesílání topologie použít samostatný soubor.You can package a default topology definition with your project, or use a standalone file when submitting the topology. Při odesílání topologie do prostředí můžete pomocí proměnných prostředí nebo konfiguračních souborů naplnit hodnoty v definici topologie YAML.When submitting the topology to Storm, you can use environment variables or configuration files to populate values in the YAML topology definition.

Soubor YAML definuje komponenty, které se mají použít pro topologii a tok dat mezi nimi.The YAML file defines the components to use for the topology and the data flow between them. Soubor YAML můžete zahrnout jako součást souboru jar nebo můžete použít externí soubor YAML.You can include a YAML file as part of the jar file or you can use an external YAML file.

Další informace o toku najdete v tématu tok rozhraní (https://storm.apache.org/releases/current/flux.html).For more information on Flux, see Flux framework (https://storm.apache.org/releases/current/flux.html).

Varování

Kvůli chybě (https://issues.apache.org/jira/browse/STORM-2055) s využitím systému 1.0.1 se může stát, že budete muset nainstalovat vývojové prostředí s více systémy, aby bylo možné místně spouštět topologie toků.Due to a bug (https://issues.apache.org/jira/browse/STORM-2055) with Storm 1.0.1, you may need to install a Storm development environment to run Flux topologies locally.

  1. WordCountTopology.java Dřív definovala topologii, ale u toku není potřeba.Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. Odstraňte soubor pomocí následujícího příkazu:Delete the file with the following command:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. Zadáním následujícího příkazu vytvořte a otevřete nový soubor topology.yaml:Enter the command below to create and open a new file topology.yaml:

    notepad resources\topology.yaml
    

    Pak zkopírujte a vložte text uvedený níže do nového souboru.Then copy and paste the text below into the new file. Pak soubor zavřete.Then close the file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
      topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
      className: "com.microsoft.example.RandomSentenceSpout"
      parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
      className: "com.microsoft.example.SplitSentence"
      parallelism: 1
    
    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 10
      parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
      from: "sentence-spout"       # The stream emitter
      to: "splitter-bolt"          # The stream consumer
      grouping:                    # Grouping type
        type: SHUFFLE
    
    - name: "Splitter -> Counter"
      from: "splitter-bolt"
      to: "counter-bolt"
      grouping:
        type: FIELDS
        args: ["word"]           # field(s) to group on
    
  3. Zadejte následující příkaz, který se pom.xml otevře, aby byly popsané revize uvedené níže:Enter the command below to open pom.xml to make the described revisions below:

    notepad pom.xml
    
    • Do <dependencies> oddílu přidejte následující novou závislost:Add the following new dependency in the <dependencies> section:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    • Do <plugins> oddílu přidejte následující modul plug-in.Add the following plugin to the <plugins> section. Tento modul plug-in zpracovává vytvoření balíčku (souboru jar) pro projekt a při vytváření balíčku aplikuje některé transformace specifické pro tok.This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    • V části exec-Maven-plugin <configuration> změňte hodnotu pro <mainClass> z ${storm.topology} na org.apache.storm.flux.Flux.In the exec-maven-plugin <configuration> section, change the value for <mainClass> from ${storm.topology} to org.apache.storm.flux.Flux. Toto nastavení umožňuje, aby tok zpracovával místně probíhající topologii.This setting allows Flux to handle running the topology locally in development.

    • V části přidejte následující do <includes>. <resources>In the <resources> section, add the following to <includes>. Tento kód XML obsahuje soubor YAML, který definuje topologii v rámci projektu.This XML includes the YAML file that defines the topology as part of the project.

      <include>topology.yaml</include>
      

Místní testování topologie tokůTest the flux topology locally

  1. Zadejte následující příkaz pro zkompilování a spuštění topologie toků pomocí Maven:Enter the following command to compile and execute the Flux topology using Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Varování

    Pokud vaše topologie používá 1.0.1 bitů, tento příkaz se nezdařil.If your topology uses Storm 1.0.1 bits, this command fails. Tato chyba je způsobena https://issues.apache.org/jira/browse/STORM-2055nástrojem.This failure is caused by https://issues.apache.org/jira/browse/STORM-2055. Místo toho nainstalujte do vývojového prostředí vše a použijte následující postup:Instead, install Storm in your development environment and use the following steps:

    Pokud máte ve svém vývojovém prostředí nainstalovanoupráci, můžete místo toho použít následující příkazy:If you have installed Storm in your development environment, you can use the following commands instead:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    --local Parametr spustí topologii v místním režimu ve vývojovém prostředí.The --local parameter runs the topology in local mode on your development environment. -R /topology.yaml Parametrtopology.yaml používá soubor prostředků ze souboru jar k definování topologie.The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

    Při spuštění topologie zobrazí informace o spuštění.As it runs, the topology displays startup information. Následující text je příklad výstupu:The following text is an example of the output:

     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
     17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Mezi dávkami protokolovaných informací je prodleva o délce 10 sekund.There is a 10-second delay between batches of logged information.

  2. Vytvoří novou topologii YAML z projektu.Create a new topology yaml from the project.

    a.a. Zadejte následující příkaz, který chcete topology.xmlotevřít:Enter the command below to open topology.xml:

    notepad resources\topology.yaml
    

    b.b. Vyhledejte následující část a změňte hodnotu 10 na. 5Find the following section and change the value of 10 to 5. Tato změna změní interval mezi vygenerováním dávek slov od 10 sekund do 5.This modification changes the interval between emitting batches of word counts from 10 seconds to 5.

    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 5
      parallelism: 1  
    

    c.c. Uložte soubor jako newtopology.yaml.Save file as newtopology.yaml.

  3. Pokud chcete spustit topologii, zadejte následující příkaz:To run the topology, enter the following command:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    Nebo, pokud máte ve svém vývojovém prostředí vše:Or, if you have Storm on your development environment:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    Tento příkaz používá newtopology.yaml jako definici topologie.This command uses the newtopology.yaml as the topology definition. Vzhledem k compile tomu, že jsme parametr nezahrnuli, používá Maven verzi projektu sestavené v předchozích krocích.Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

    Po spuštění topologie byste si měli všimnout, že se změnil čas mezi vygenerovanými dávkami tak, aby odrážel newtopology.yamlhodnotu v.Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. Takže vidíte, že můžete změnit konfiguraci prostřednictvím souboru YAML, aniž byste museli znovu kompilovat topologii.So you can see that you can change your configuration through a YAML file without having to recompile the topology.

Další informace o těchto a dalších funkcích rozhraní toků najdete v tématu tok (https://storm.apache.org/releases/current/flux.html).For more information on these and other features of the Flux framework, see Flux (https://storm.apache.org/releases/current/flux.html).

TridentTrident

Trident je abstrakce na vysoké úrovni, která je poskytována pomocí průchozího.Trident is a high-level abstraction that is provided by Storm. Podporuje stavové zpracování.It supports stateful processing. Primární výhodou Trident je, že může zaručit, že se každá zpráva, která vstoupí do topologie, zpracovává jenom jednou.The primary advantage of Trident is that it can guarantee that every message that enters the topology is processed only once. Bez použití Trident může vaše topologie zaručit, že se zprávy zpracovávají aspoň jednou.Without using Trident, your topology can only guarantee that messages are processed at least once. K dispozici jsou také jiné rozdíly, například integrované komponenty, které lze použít místo vytvoření šrouby.There are also other differences, such as built-in components that can be used instead of creating bolts. Ve skutečnosti jsou šrouby nahrazeny méně obecnými součástmi, jako jsou filtry, projekce a funkce.In fact, bolts are replaced by less-generic components, such as filters, projections, and functions.

Aplikace Trident lze vytvořit pomocí projektů Maven.Trident applications can be created by using Maven projects. Použijete stejný základní postup, jak je uvedeno výše v tomto článku – pouze kód je jiný.You use the same basic steps as presented earlier in this article—only the code is different. Trident také nelze (aktuálně) použít s rozhraním toků.Trident also cannot (currently) be used with the Flux framework.

Další informace o Trident naleznete v tématu Přehled rozhraní Trident API.For more information about Trident, see the Trident API Overview.

Další krokyNext Steps

Zjistili jste, jak vytvořit topologii Apache Storm pomocí jazyka Java.You have learned how to create an Apache Storm topology by using Java. Teď se dozvíte, jak:Now learn how to:

Další příklady Apache Storm topologií najdete v tématu Příklady topologií pro Apache Storm v HDInsight.You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.