Erstellen einer Apache Storm-Topologie in JavaCreate an Apache Storm topology in Java

In diesem Artikel erfahren Sie, wie Sie eine Java-basierte Topologie für Apache Storm erstellen.Learn how to create a Java-based topology for Apache Storm. Hier erstellen Sie eine Storm-Topologie, die eine Anwendung zur Wortzählung implementiert.Here, you create a Storm topology that implements a word-count application. Sie verwenden Apache Maven zum Erstellen und Verpacken des Projekts.You use Apache Maven to build and package the project. Anschließend erfahren Sie, wie Sie die Topologie mit dem Apache Storm Flux-Framework definieren.Then, you learn how to define the topology using the Apache Storm Flux framework.

Nach Abschluss der Schritte in diesem Dokument können Sie die Topologie für Apache Storm in HDInsight bereitstellen.After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

Hinweis

Eine abgeschlossene Version der Storm-Topologiebeispiele, die in diesem Dokument erstellt wurden, finden Sie unter https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.A completed version of the Storm topology examples created in this document is available at https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

VoraussetzungenPrerequisites

TestumgebungTest environment

Für diesen Artikel wurde ein Computer unter Windows 10 verwendet.The environment used for this article was a computer running Windows 10. Die Befehle wurden an einer Eingabeaufforderung ausgeführt, und die verschiedenen Dateien wurden mit dem Windows-Editor bearbeitet.The commands were executed in a command prompt, and the various files were edited with Notepad.

Geben Sie an einer Eingabeaufforderung die folgenden Befehle ein, um eine Arbeitsumgebung zu erstellen:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI
cd C:\HDI

Erstellen eines Maven-ProjektsCreate a Maven project

Geben Sie den folgenden Befehl ein, um ein Maven-Projekt namens WordCount zu erstellen:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

Dieser Befehl erstellt am aktuellen Speicherort ein neues Verzeichnis namens WordCount, das ein einfaches Maven-Projekt enthält.This command creates a directory named WordCount at the current location, which contains a basic Maven project. Der zweite Befehl ändert das aktuelle Arbeitsverzeichnis in WordCount.The second command changes the present working directory to WordCount. Der dritte Befehl erstellt ein neues Verzeichnis (resources) zur späteren Verwendung.The third command creates a new directory, resources, which will be used later. Das WordCount-Verzeichnis enthält die folgenden Elemente:The WordCount directory contains the following items:

  • pom.xml: Enthält Einstellungen für das Maven-Projekt.pom.xml: Contains settings for the Maven project.
  • src\main\java\com\microsoft\example: Enthält Ihren Anwendungscode.src\main\java\com\microsoft\example: Contains your application code.
  • src\test\java\com\microsoft\example: Enthält Tests für Ihre Anwendung.src\test\java\com\microsoft\example: Contains tests for your application.

Entfernen des generierten BeispielcodesRemove the generated example code

Geben Sie die folgenden Befehle ein, um die generierten Test- und Anwendungsdateien (AppTest.java und App.java) zu löschen:Delete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Hinzufügen von Maven-RepositorysAdd Maven repositories

HDInsight basiert auf der Hortonworks Data Platform (HDP), daher empfehlen wir, das Hortonworks-Repository zum Herunterladen von Abhängigkeiten für Ihre Apache Storm-Projekte zu verwenden.HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

Geben Sie den folgenden Befehl ein, um pom.xml zu öffnen:Open pom.xml by entering the command below:

notepad pom.xml

Fügen Sie anschließend nach der Zeile <url> https://maven.apache.org</url> den folgenden XML-Code hinzu:Then add the following XML after the <url>https://maven.apache.org</url> line:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Eigenschaften hinzufügenAdd properties

Maven ermöglicht Ihnen das Definieren von Werten auf Projektebene, die als Eigenschaften bezeichnet werden.Maven allows you to define project-level values called properties. Fügen Sie in pom.xml nach der Zeile </repositories> den folgenden Text hinzu:In pom.xml, add the following text after the </repositories> line:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

Sie können diesen Wert nun in anderen Abschnitten von pom.xml verwenden.You can now use this value in other sections of the pom.xml. Beim Angeben der Version von Storm-Komponenten können Sie z.B. ${storm.version} verwenden, anstatt einen Wert hartzucodieren.For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

Hinzufügen von AbhängigkeitenAdd dependencies

Fügen Sie eine Abhängigkeit für Storm-Komponenten hinzu.Add a dependency for Storm components. Fügen Sie in pom.xml im Abschnitt <dependencies> den folgenden Text hinzu:In pom.xml, add the following text in the <dependencies> section:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

Zum Zeitpunkt der Kompilierung verwendet Maven diese Informationen für die Suche nach storm-core im Maven-Repository.At compile time, Maven uses this information to look up storm-core in the Maven repository. Zuerst wird im Repository auf dem lokalen Computer gesucht.It first looks in the repository on your local computer. Wenn die Dateien nicht vorhanden sind, lädt Maven sie aus dem öffentlichen Maven-Repository herunter und speichert sie im lokalen Repository.If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

Hinweis

Beachten Sie die Zeile <scope>provided</scope> in diesem Abschnitt.Notice the <scope>provided</scope> line in this section. Diese Einstellung weist Maven an, storm-core aus allen erstellten JAR-Dateien auszuschließen, da es vom System bereitgestellt wird.This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

BuildkonfigurationBuild configuration

Mithilfe von Maven-Plug-Ins können Sie die Buildphasen des Projekts anpassen.Maven plug-ins allow you to customize the build stages of the project. Beispielsweise, wie das Projekt kompiliert oder wie es in eine JAR-Datei verpackt wird.For example, how the project is compiled or how to package it into a JAR file. Fügen Sie in pom.xml direkt über der Zeile </project> den folgenden Text hinzu:In pom.xml, add the following text directly above the </project> line.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Dieser Abschnitt wird zum Hinzufügen von Plug-Ins, Ressourcen und anderen Optionen für die Buildkonfiguration verwendet.This section is used to add plug-ins, resources, and other build configuration options. Eine vollständige Referenz für die Datei pom.xml finden Sie unter https://maven.apache.org/pom.html.For a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

Hinzufügen von Plug-InsAdd plug-ins

  • Exec Maven-Plug-InExec Maven Plugin

    Für in Java implementierte Apache Storm-Topologien ist das Exec Maven-Plug-In hilfreich, da es Ihnen ermöglicht, die Topologie einfach lokal in Ihrer Entwicklungsumgebung auszuführen.For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. Fügen Sie den folgenden Code im Abschnitt <plugins> der Datei pom.xml hinzu, um das Exec Maven-Plug-In einzubeziehen:Add the following to the <plugins> section of the pom.xml file to include the Exec Maven plugin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
            <goals>
                <goal>exec</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads> 
        </configuration>
    </plugin>
    
  • Apache Maven Compiler-Plug-InApache Maven Compiler Plugin

    Ein weiteres nützliches Plug-In ist das Apache Maven Compiler-Plug-In, das zum Ändern von Kompilierungsoptionen verwendet wird.Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. Ändern Sie die Java-Version, die Maven für die Quelle und das Ziel Ihrer Anwendung verwendet.Change the Java version that Maven uses for the source and target for your application.

    • Für HDInsight 3.4 oder früher legen Sie Java-Version 1.7 für Quelle und Ziel fest.For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

    • Für HDInsight 3.5 legen Sie Java-Version 1.8 für Quelle und Ziel fest.For HDInsight 3.5, set the source and target Java version to 1.8.

      Fügen Sie folgenden Text im Abschnitt <plugins> der Datei pom.xml hinzu, um das Apache Maven Compiler-Plug-In einzubeziehen.Add the following text in the <plugins> section of the pom.xml file to include the Apache Maven Compiler plugin. Dieses Beispiel legt 1.8 fest, damit die Ziel-HDInsight-Version 3.5 ist.This example specifies 1.8, so the target HDInsight version is 3.5.

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
        <source>1.8</source>
        <target>1.8</target>
        </configuration>
      </plugin>
      

Konfigurieren der RessourcenConfigure resources

Im Abschnitt „resources“ können Sie nicht codebezogene Ressourcen, z.B. Konfigurationsdateien, hinzufügen, die von Komponenten in der Topologie benötigt werden.The resources section allows you to include non-code resources such as configuration files needed by components in the topology. Fügen Sie für dieses Beispiel dem Abschnitt <resources> der Datei pom.xml den folgenden Text hinzu:For this example, add the following text in the <resources> section of the pom.xml file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
        <include>log4j2.xml</include>
    </includes>
</resource>

Dieses Beispiel fügt dem Verzeichnis „resources“ im Stammverzeichnis des Projekts (${basedir}) als Speicherort hinzu, der Ressourcen und die Datei mit dem Namen log4j2.xml enthält.This example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. Diese Datei dient zum Konfigurieren, welche Informationen von der Topologie protokolliert werden.This file is used to configure what information is logged by the topology.

Erstellen der TopologieCreate the topology

Eine Java-basierte Apache Storm-Topologie besteht aus drei Komponenten, die Sie als Abhängigkeit erstellen (oder referenzieren) müssen.A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

  • Spouts: Liest Daten aus externen Quellen und gibt Datenströme in die Topologie aus.Spouts: Reads data from external sources and emits streams of data into the topology.

  • Bolts: Verarbeitet Datenströme, die von Spouts oder anderen Bolts ausgegeben werden, und gibt einen oder mehrere Datenströme aus.Bolts: Performs processing on streams emitted by spouts or other bolts, and emits one or more streams.

  • Topology: Definiert die Anordnung der Spouts und Bolts und stellt den Einstiegspunkt für die Topologie bereit.Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

Erstellen des SpoutsCreate the spout

Um die Anforderungen für das Einrichten von externen Datenquellen zu verringern, gibt der folgende Spout willkürliche Sätze aus.To reduce requirements for setting up external data sources, the following spout simply emits random sentences. Es handelt sich um eine modifizierte Version eines Spouts, der mit den Storm-Startbeispielenbereitgestellt wird.It is a modified version of a spout that is provided with the Storm-Starter examples. Obwohl in dieser Topologie nur ein Spout verwendet wird, verfügen andere Topologien möglicherweise über mehrere Spouts, die der Topologie Daten aus verschiedenen Quellen zuführen.Although this topology uses only one spout, others may have several that feed data from different sources into the topology.

Geben Sie den folgenden Befehl ein, um die neue Datei RandomSentenceSpout.java zu erstellen und zu öffnen:Enter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Kopieren Sie den folgenden Java-Code, und fügen Sie ihn in die neue Datei ein.Then copy and paste the java code below into the new file. Schließen Sie dann die Datei.Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Hinweis

Ein Beispiel zu einem Spout, der aus einer externen Datenquelle liest, finden Sie in den folgenden Beispielen:For an example of a spout that reads from an external data source, see one of the following examples:

Erstellen des BoltsCreate the bolts

Bolts übernehmen die Datenverarbeitung.Bolts handle the data processing. Bolts können alle Aufgaben übernehmen, beispielsweise Berechnungen, Persistenz oder Kommunikation mit externen Komponenten.Bolts can do anything, for example, computation, persistence, or talking to external components. Diese Topologie verwendet zwei Bolts:This topology uses two bolts:

  • SplitSentence: Unterteilt die von RandomSentenceSpout ausgegebenen Sätze in einzelne Wörter.SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

  • WordCount: Zählt das Vorkommen der einzelnen Wörter.WordCount: Counts how many times each word has occurred.

SplitSentenceSplitSentence

Geben Sie den folgenden Befehl ein, um die neue Datei SplitSentence.java zu erstellen und zu öffnen:Enter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Kopieren Sie den folgenden Java-Code, und fügen Sie ihn in die neue Datei ein.Then copy and paste the java code below into the new file. Schließen Sie dann die Datei.Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCountWordCount

Geben Sie den folgenden Befehl ein, um die neue Datei WordCount.java zu erstellen und zu öffnen:Enter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

Kopieren Sie den folgenden Java-Code, und fügen Sie ihn in die neue Datei ein.Then copy and paste the java code below into the new file. Schließen Sie dann die Datei.Then close the file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Definieren der TopologieDefine the topology

Die Topologie verbindet die Spouts und Bolts in einem Diagramm, in dem definiert ist, wie die Daten zwischen den Komponenten verlaufen.The topology ties the spouts and bolts together into a graph, which defines how data flows between the components. Darüber hinaus bietet sie Hinweise zu Parallelität, die von Storm beim Erstellen von Instanzen der Komponenten innerhalb des Clusters verwendet werden.It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

Nachfolgend sehen Sie eine einfache Abbildung des Komponentendiagramms für diese Topologie.The following image is a basic diagram of the graph of components for this topology.

Diagramm mit der Anordnung von Spouts und Bolts

Geben Sie zum Implementieren der Topologie den folgenden Befehl ein, um die neue Datei WordCountTopology.java zu erstellen und zu öffnen:To implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Kopieren Sie den folgenden Java-Code, und fügen Sie ihn in die neue Datei ein.Then copy and paste the java code below into the new file. Schließen Sie dann die Datei.Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

Konfigurieren der ProtokollierungConfigure logging

Storm verwendet Apache Log4j 2 zum Protokollieren von Informationen.Storm uses Apache Log4j 2 to log information. Wenn Sie die Protokollierung nicht konfigurieren, gibt die Topologie Diagnoseinformationen aus.If you do not configure logging, the topology emits diagnostic information. Erstellen Sie mithilfe des folgenden Befehls eine Datei namens log4j2.xml im Verzeichnis resources, um zu steuern, was protokolliert wird:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

Kopieren Sie den folgenden XML-Text, und fügen Sie ihn in die neue Datei ein.Then copy and paste the XML text below into the new file. Schließen Sie dann die Datei.Then close the file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
    <Console name="STDOUT" target="SYSTEM_OUT">
        <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
</Appenders>
<Loggers>
    <Logger name="com.microsoft.example" level="trace" additivity="false">
        <AppenderRef ref="STDOUT"/>
    </Logger>
    <Root level="error">
        <Appender-Ref ref="STDOUT"/>
    </Root>
</Loggers>
</Configuration>

Hiermit wird eine neue Protokollierung für die com.microsoft.example-Klasse konfiguriert, die die Komponenten in dieser Beispieltopologie enthält.This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. Die Ebene für diese Protokollierung wird auf „trace“ festgelegt. Dadurch werden alle Protokollinformationen erfasst, die von Komponenten in dieser Topologie ausgegeben werden.The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

Der Abschnitt <Root level="error"> konfiguriert die Stammebene der Protokollierung (alles, was nicht in com.microsoft.example enthalten ist) so, dass nur Fehlerinformationen protokolliert werden.The <Root level="error"> section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

Weitere Informationen zum Konfigurieren der Protokollierung für Log4j 2 finden Sie unter https://logging.apache.org/log4j/2.x/manual/configuration.html.For more information on configuring logging for Log4j 2, see https://logging.apache.org/log4j/2.x/manual/configuration.html.

Hinweis

Storm-Version 0.10.0 und höher verwenden Log4j 2.x.Storm version 0.10.0 and higher use Log4j 2.x. Ältere Versionen von Storm verwenden Log4j 1.x mit einem anderen Format für die Protokollkonfiguration.Older versions of storm used Log4j 1.x, which used a different format for log configuration. Informationen zur älteren Konfiguration finden Sie unter https://wiki.apache.org/logging-log4j/Log4jXmlFormat.For information on the older configuration, see https://wiki.apache.org/logging-log4j/Log4jXmlFormat.

Lokales Testen der TopologieTest the topology locally

Nach dem Speichern der Dateien verwenden Sie folgenden Befehl, um die Topologie lokal zu testen.After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Während der Ausführung zeigt die Topologie zunächst Startinformationen an.As it runs, the topology displays startup information. Der folgende Text ist ein Beispiel der Wortzählungsausgabe:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

Dieses Beispielprotokoll gibt an, dass das Wort „und“ 113 Mal ausgegeben wurde.This example log indicates that the word 'and' has been emitted 113 times. Diese Anzahl steigt weiter an, solange die Topologie ausgeführt wird, da der Spout fortlaufend die gleichen Sätze ausgibt.The count continues to go up as long as the topology runs because the spout continuously emits the same sentences.

Zwischen der Ausgabe von Wörtern und der Anzahl liegt außerdem ein Intervall von fünf Sekunden.There is a 5-second interval between emission of words and counts. Die WordCount-Komponente wird so konfiguriert, dass nur Informationen ausgegeben werden, wenn ein „Tick“-Tupel eingeht.The WordCount component is configured to only emit information when a tick tuple arrives. Es wird angefordert, dass „Tick“-Tupel nur alle fünf Sekunden zugestellt werden.It requests that tick tuples are only delivered every five seconds.

Konvertieren der Topologie in FluxConvert the topology to Flux

Flux ist ein neues Framework von Storm 0.10.0 und höher, mit dem Sie die Konfiguration von der Implementierung trennen können.Flux is a new framework available with Storm 0.10.0 and higher, which allows you to separate configuration from implementation. Ihre Komponenten werden weiterhin in Java definiert, aber die Topologie wird mit einer YAML-Datei definiert.Your components are still defined in Java, but the topology is defined using a YAML file. Sie können eine Standardtopologiedefinition mit Ihrem Projekt verpacken oder eine eigenständige Datei verwenden, wenn Sie die Topologie übermitteln.You can package a default topology definition with your project, or use a standalone file when submitting the topology. Wenn Sie die Topologie an Storm übermitteln, können Sie Umgebungsvariablen oder Konfigurationsdateien verwenden, um die YAML-Topologiedefinition mit Werten aufzufüllen.When submitting the topology to Storm, you can use environment variables or configuration files to populate values in the YAML topology definition.

Die YAML-Datei definiert die für die Topologie zu verwendenden Komponenten und den Datenfluss zwischen ihnen.The YAML file defines the components to use for the topology and the data flow between them. Sie können eine YAML-Datei als Teil der JAR-Datei einfügen, oder Sie können eine externe YAML-Datei verwenden.You can include a YAML file as part of the jar file or you can use an external YAML file.

Weitere Informationen zu Flux finden Sie unter Flux-Framework (https://storm.apache.org/releases/1.0.6/flux.html).For more information on Flux, see Flux framework (https://storm.apache.org/releases/1.0.6/flux.html).

Warnung

Aufgrund eines Fehlers (https://issues.apache.org/jira/browse/STORM-2055) in Storm 1.0.1 müssen Sie möglicherweise eine Storm-Entwicklungsumgebung installieren, um Flux-Topologien lokal auszuführen.Due to a bug (https://issues.apache.org/jira/browse/STORM-2055) with Storm 1.0.1, you may need to install a Storm development environment to run Flux topologies locally.

  1. Zuvor wurde die Topologie durch WordCountTopology.java definiert. Dies ist bei Flux jedoch nicht erforderlich.Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. Löschen Sie die Datei mithilfe des folgenden Befehls:Delete the file with the following command:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. Geben Sie den folgenden Befehl ein, um die neue Datei topology.yaml zu erstellen und zu öffnen:Enter the command below to create and open a new file topology.yaml:

    notepad resources\topology.yaml
    

    Kopieren Sie den folgenden Text, und fügen Sie ihn in die neue Datei ein.Then copy and paste the text below into the new file. Schließen Sie dann die Datei.Then close the file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
      topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
      className: "com.microsoft.example.RandomSentenceSpout"
      parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
      className: "com.microsoft.example.SplitSentence"
      parallelism: 1
    
    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 10
      parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
      from: "sentence-spout"       # The stream emitter
      to: "splitter-bolt"          # The stream consumer
      grouping:                    # Grouping type
        type: SHUFFLE
    
    - name: "Splitter -> Counter"
      from: "splitter-bolt"
      to: "counter-bolt"
      grouping:
        type: FIELDS
        args: ["word"]           # field(s) to group on
    
  3. Geben Sie den folgenden Befehl ein, um pom.xml zu öffnen, und nehmen Sie folgende Änderungen vor:Enter the command below to open pom.xml to make the described revisions below:

    notepad pom.xml
    
    • Fügen Sie im Abschnitt <dependencies> die folgende neue Abhängigkeit hinzu:Add the following new dependency in the <dependencies> section:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    • Fügen Sie im Abschnitt <plugins> das folgende Plug-In hinzu.Add the following plugin to the <plugins> section. Dieses Plug-In ist für die Erstellung eines Pakets (JAR-Datei) für das Projekt zuständig und wendet beim Erstellen des Pakets einige spezifische Transformationen für Flux an.This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    • Ändern Sie im Abschnitt exec-maven-plugin <configuration> den Wert für <mainClass> von ${storm.topology} in org.apache.storm.flux.Flux.In the exec-maven-plugin <configuration> section, change the value for <mainClass> from ${storm.topology} to org.apache.storm.flux.Flux. Diese Einstellung ermöglicht Flux, die Ausführung der Topologie lokal in der Entwicklung zu steuern.This setting allows Flux to handle running the topology locally in development.

    • Fügen Sie im Abschnitt <resources> Folgendes zu <includes> hinzu.In the <resources> section, add the following to <includes>. Dieser XML-Code enthält die YAML-Datei, mit der die Topologie als Teil des Projekts definiert wird.This XML includes the YAML file that defines the topology as part of the project.

      <include>topology.yaml</include>
      

Lokales Testen der Flux-TopologieTest the flux topology locally

  1. Geben Sie den folgenden Befehl ein, um die Flux-Topologie unter Verwendung von Maven zu kompilieren und auszuführen:Enter the following command to compile and execute the Flux topology using Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Warnung

    Wenn in der Topologie Storm 1.0.1-Bits verwendet werden, schlägt dieser Befehl fehl.If your topology uses Storm 1.0.1 bits, this command fails. Dieser Fehler wird verursacht durch https://issues.apache.org/jira/browse/STORM-2055.This failure is caused by https://issues.apache.org/jira/browse/STORM-2055. Installieren Sie stattdessen Storm in Ihrer Entwicklungsumgebung, und führen Sie die folgenden Schritte aus:Instead, install Storm in your development environment and use the following steps:

    Wenn Sie Storm in der Entwicklungsumgebung installiert haben, können Sie stattdessen die folgenden Befehle verwenden:If you have installed Storm in your development environment, you can use the following commands instead:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    Mit dem Parameter --local wird die Topologie im lokalen Modus in Ihrer Entwicklungsumgebung ausgeführt.The --local parameter runs the topology in local mode on your development environment. Der Parameter -R /topology.yaml nutzt die Dateiressource topology.yaml aus der JAR-Datei, um die Topologie zu definieren.The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

    Während der Ausführung zeigt die Topologie zunächst Startinformationen an.As it runs, the topology displays startup information. Der folgende Text ist ein Beispiel der Ausgabe:The following text is an example of the output:

     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
     17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Es gibt eine Verzögerung von 10 Sekunden zwischen Batches protokollierter Informationen.There is a 10-second delay between batches of logged information.

  2. Erstellen Sie auf der Grundlage des Projekts eine neue YAML-Datei für die Topologie.Create a new topology yaml from the project.

    a.a. Geben Sie den folgenden Befehl ein, um topology.xml zu öffnen:Enter the command below to open topology.xml:

    notepad resources\topology.yaml
    

    b.b. Suchen Sie nach dem folgenden Abschnitt, und ändern Sie den Wert von 10 in 5.Find the following section and change the value of 10 to 5. Durch diese Änderung wird das Intervall zwischen dem Ausgeben der Batches mit der Wortanzahl von 10 Sekunden in 5 Sekunden geändert.This modification changes the interval between emitting batches of word counts from 10 seconds to 5.

    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 5
      parallelism: 1  
    

    c.c. Speichern Sie die Datei unter newtopology.yaml.Save file as newtopology.yaml.

  3. Geben Sie den folgenden Befehl ein, um die Topologie auszuführen:To run the topology, enter the following command:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    Oder wenn Sie Storm in der Entwicklungsumgebung installiert haben:Or, if you have Storm on your development environment:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    Bei diesem Befehl wird newtopology.yaml als Topologiedefinition verwendet.This command uses the newtopology.yaml as the topology definition. Da wir den Parameter compile nicht eingebunden haben, verwendet Maven wieder die Version des Projekts, das in den vorherigen Schritten erstellt wurde.Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

    Nachdem die Topologie gestartet wurde, sollte erkennbar sein, dass sich der Zeitraum zwischen der Ausgabe der Batches gemäß dem Wert in newtopology.yaml geändert hat.Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. Sie haben gelernt, dass Sie die Konfiguration über eine YAML-Datei ändern können, ohne die Topologie neu kompilieren zu müssen.So you can see that you can change your configuration through a YAML file without having to recompile the topology.

Weitere Informationen zu diesen und anderen Features des Flux-Frameworks finden Sie unter Flux (https://storm.apache.org/releases/current/flux.html).For more information on these and other features of the Flux framework, see Flux (https://storm.apache.org/releases/current/flux.html).

TridentTrident

Trident ist eine von Storm bereitgestellte hohe Abstraktionsebene.Trident is a high-level abstraction that is provided by Storm. Trident ermöglicht eine statusbehaftete Verarbeitung.It supports stateful processing. Der wichtigste Vorteil von Trident ist die Gewährleistung, dass jede Nachricht, die in die Topologie eingegeben wird, nur einmal verarbeitet wird.The primary advantage of Trident is that it can guarantee that every message that enters the topology is processed only once. Ohne Trident kann Ihre Topologie nur sicherstellen, dass Nachrichten mindestens einmal verarbeitet werden.Without using Trident, your topology can only guarantee that messages are processed at least once. Es gibt noch weitere Unterschiede, z. B. integrierte Komponenten, die Anstelle der Erstellung von Bolts verwendet werden können.There are also other differences, such as built-in components that can be used instead of creating bolts. Tatsächlich werden Bolts durch weniger generische Komponenten wie Filter, Projektionen und Funktionen ersetzt.In fact, bolts are replaced by less-generic components, such as filters, projections, and functions.

Trident-Anwendungen können mithilfe von Maven-Projekten erstellt werden.Trident applications can be created by using Maven projects. Sie verwenden die gleichen Schritte wie weiter oben – nur der Code ist unterschiedlich.You use the same basic steps as presented earlier in this article—only the code is different. Trident kann (derzeit) auch nicht mit dem Flux-Framework verwendet werden.Trident also cannot (currently) be used with the Flux framework.

Weitere Informationen zu Trident finden Sie unter Trident API Overview (in englischer Sprache).For more information about Trident, see the Trident API Overview.

Nächste SchritteNext Steps

Sie haben gelernt, wie Sie eine Apache Storm-Topologie mit Java erstellen.You have learned how to create an Apache Storm topology by using Java. Nun lernen Sie folgende Inhalte:Now learn how to:

Weitere beispielhafte Apache Storm-Topologien finden Sie unter Beispieltopologien für Apache Storm in HDInsight.You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.