Een Apache Storm topologie maken in JavaCreate an Apache Storm topology in Java

Meer informatie over het maken van een Java-topologie voor Apache Storm.Learn how to create a Java-based topology for Apache Storm. Hier maakt u een storm-topologie waarmee een toepassing met woorden tellen wordt geïmplementeerd.Here, you create a Storm topology that implements a word-count application. U gebruikt Apache Maven om het project te bouwen en op te pakken.You use Apache Maven to build and package the project. Vervolgens leert u hoe u de topologie kunt definiëren met behulp van het Apache Storme stroom raamwerk.Then, you learn how to define the topology using the Apache Storm Flux framework.

Nadat u de stappen in dit document hebt voltooid, kunt u de topologie implementeren voor Apache Storm op HDInsight.After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

Notitie

Een voltooide versie van de Storm-topologie-voor beelden die in dit document https://github.com/Azure-Samples/hdinsight-java-storm-wordcountzijn gemaakt, is beschikbaar op.A completed version of the Storm topology examples created in this document is available at https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

VereistenPrerequisites

Test omgevingTest environment

De omgeving die voor dit artikel wordt gebruikt, is een computer met Windows 10.The environment used for this article was a computer running Windows 10. De opdrachten zijn uitgevoerd in een opdracht prompt en de verschillende bestanden zijn bewerkt met Klad blok.The commands were executed in a command prompt, and the various files were edited with Notepad.

Voer vanaf een opdracht prompt de onderstaande opdrachten in om een werk omgeving te maken:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI
cd C:\HDI

Een Maven-project makenCreate a Maven project

Voer de volgende opdracht in om een Maven-project met de naam WordCountte maken:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

Met deze opdracht maakt u een WordCount map met de naam op de huidige locatie, die een Basic Maven-project bevat.This command creates a directory named WordCount at the current location, which contains a basic Maven project. Met de tweede opdracht wordt de huidige werkmap gewijzigd WordCountin.The second command changes the present working directory to WordCount. Met de derde opdracht maakt u een nieuwe resourcesmap, die later wordt gebruikt.The third command creates a new directory, resources, which will be used later. De WordCount map bevat de volgende items:The WordCount directory contains the following items:

  • pom.xml: Bevat instellingen voor het project maven.pom.xml: Contains settings for the Maven project.
  • src\main\java\com\microsoft\example: Bevat de code van uw toepassing.src\main\java\com\microsoft\example: Contains your application code.
  • src\test\java\com\microsoft\example: Bevat tests voor uw toepassing.src\test\java\com\microsoft\example: Contains tests for your application.

De gegenereerde voorbeeld code verwijderenRemove the generated example code

Verwijder de gegenereerde test-en AppTest.javatoepassings bestanden App.java en voer de volgende opdrachten in:Delete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Maven-opslag plaatsen toevoegenAdd Maven repositories

HDInsight is gebaseerd op het Hortonworks data platform (HDP), daarom raden we u aan de Hortonworks-opslag plaats te gebruiken voor het downloaden van afhankelijkheden voor uw Apache Storm-projecten.HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

Open pom.xml door de volgende opdracht in te voeren:Open pom.xml by entering the command below:

notepad pom.xml

Voeg vervolgens de volgende XML toe na <url> https://maven.apache.org</url> de regel:Then add the following XML after the <url>https://maven.apache.org</url> line:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Eigenschappen toevoegenAdd properties

Met maven kunt u waarden op project niveau met de naam eigenschappen definiëren.Maven allows you to define project-level values called properties. Voeg pom.xmlin de volgende tekst toe na de </repositories> regel:In pom.xml, add the following text after the </repositories> line:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

U kunt deze waarde nu in andere secties van de pom.xmlgebruiken.You can now use this value in other sections of the pom.xml. Wanneer u bijvoorbeeld de versie van Storm-onderdelen opgeeft, kunt u in ${storm.version} plaats van een waarde vaste code ring gebruiken.For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

Afhankelijkheden toevoegenAdd dependencies

Een afhankelijkheid voor Storm-onderdelen toevoegen.Add a dependency for Storm components. Voeg pom.xmlin de <dependencies> sectie de volgende tekst toe:In pom.xml, add the following text in the <dependencies> section:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

Tijdens het compileren gebruikt maven deze informatie om op te storm-core zoeken in de Maven-opslag plaats.At compile time, Maven uses this information to look up storm-core in the Maven repository. Het controleert eerst op de opslag plaats op uw lokale computer.It first looks in the repository on your local computer. Als de bestanden niet aanwezig zijn, worden ze door maven gedownload uit de open bare maven-opslag plaats en opgeslagen in de lokale opslag plaats.If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

Notitie

Let op <scope>provided</scope> de regel in deze sectie.Notice the <scope>provided</scope> line in this section. Met deze instelling wordt aangegeven dat maven Storm-kern moet worden uitgesloten van alle JAR-bestanden die worden gemaakt, omdat deze worden verschaft door het systeem.This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

Configuratie makenBuild configuration

Met maven-invoeg toepassingen kunt u de build-fasen van het project aanpassen.Maven plug-ins allow you to customize the build stages of the project. Bijvoorbeeld hoe het project wordt gecompileerd of hoe het moet worden ingepakt in een JAR-bestand.For example, how the project is compiled or how to package it into a JAR file. Voeg pom.xmlin de volgende tekst toe boven de </project> regel.In pom.xml, add the following text directly above the </project> line.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Deze sectie wordt gebruikt om invoeg toepassingen, resources en andere configuratie opties voor het maken van een toepassing toe te voegen.This section is used to add plug-ins, resources, and other build configuration options. pom.xml Zie https://maven.apache.org/pom.htmlvoor een volledige referentie van het bestand.For a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

Invoeg toepassingen toevoegenAdd plug-ins

  • Exec maven-invoeg toepassingExec Maven Plugin

    Voor Apache Storm topologieën die in Java zijn geïmplementeerd, is de exec maven-invoeg toepassing handig omdat u de topologie eenvoudig lokaal kunt uitvoeren in uw ontwikkel omgeving.For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. Voeg het volgende toe aan <plugins> de sectie van pom.xml het bestand voor het toevoegen van de exec maven-invoeg toepassing:Add the following to the <plugins> section of the pom.xml file to include the Exec Maven plugin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
            <goals>
                <goal>exec</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads> 
        </configuration>
    </plugin>
    
  • Apache Maven compiler-invoeg toepassingApache Maven Compiler Plugin

    Een andere handige invoeg toepassing is de Apache Maven compiler-invoeg toepassing, die wordt gebruikt om de compilatie opties te wijzigen.Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. Wijzig de Java-versie die maven gebruikt voor de bron en het doel voor uw toepassing.Change the Java version that Maven uses for the source and target for your application.

    • Voor HDInsight __3,4 of eerder__stelt u de bron-en doel-Java-versie in op 1,7.For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

    • Voor HDInsight __3,5__stelt u de bron-en doel-Java-versie in op 1,8.For HDInsight 3.5, set the source and target Java version to 1.8.

      Voeg de volgende tekst toe aan <plugins> de sectie van pom.xml het bestand, waarin de invoeg toepassing Apache Maven compiler wordt vermeld.Add the following text in the <plugins> section of the pom.xml file to include the Apache Maven Compiler plugin. In dit voor beeld wordt 1,8 opgegeven, dus is de HDInsight-versie van het doel 3,5.This example specifies 1.8, so the target HDInsight version is 3.5.

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
        <source>1.8</source>
        <target>1.8</target>
        </configuration>
      </plugin>
      

Resources configurerenConfigure resources

In het gedeelte resources kunt u niet-code resources toevoegen, zoals configuratie bestanden die nodig zijn voor onderdelen in de topologie.The resources section allows you to include non-code resources such as configuration files needed by components in the topology. Voor dit voor beeld voegt u de volgende tekst toe <resources> aan de sectie pom.xml van het bestand.For this example, add the following text in the <resources> section of the pom.xml file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
        <include>log4j2.xml</include>
    </includes>
</resource>

In dit voor beeld wordt de map resources in de hoofdmap van het${basedir}project () toegevoegd als een locatie die bronnen bevat, en het log4j2.xmlbestand met de naam.This example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. Dit bestand wordt gebruikt om te configureren welke gegevens worden geregistreerd door de topologie.This file is used to configure what information is logged by the topology.

De topologie makenCreate the topology

Een op Java gebaseerde Apache Storm topologie bestaat uit drie onderdelen die u als een afhankelijkheid (of referentie) moet maken.A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

  • Spouts: Gegevens uit externe bronnen worden gelezen en gegevens stromen worden verzonden naar de topologie.Spouts: Reads data from external sources and emits streams of data into the topology.

  • Bouten: Voert de verwerking uit van stromen die worden verzonden door spouts of andere schichten, en levert een of meer streams.Bolts: Performs processing on streams emitted by spouts or other bolts, and emits one or more streams.

  • Topologie: Hiermee wordt gedefinieerd hoe de spouts en schichten worden gerangschikt en wordt het toegangs punt voor de topologie geboden.Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

De Spout makenCreate the spout

Om de vereisten voor het instellen van externe gegevens bronnen te reduceren, worden in de volgende Spout alleen wille keurige zinnen gegeven.To reduce requirements for setting up external data sources, the following spout simply emits random sentences. Het is een gewijzigde versie van een Spout die wordt meegeleverd met de Storm-starter-voor beelden.It is a modified version of a spout that is provided with the Storm-Starter examples. Hoewel in deze topologie slechts één Spout wordt gebruikt, kunnen andere gegevens van andere bronnen in de topologie worden gefeedd.Although this topology uses only one spout, others may have several that feed data from different sources into the topology.

Voer de onderstaande opdracht in om een nieuw bestand RandomSentenceSpout.javate maken en te openen:Enter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Kopieer en plak de Java-code hieronder in het nieuwe bestand.Then copy and paste the java code below into the new file. Sluit het bestand.Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Notitie

Zie een van de volgende voor beelden voor een voor beeld van een Spout dat van een externe gegevens bron wordt gelezen:For an example of a spout that reads from an external data source, see one of the following examples:

De bouten makenCreate the bolts

Bouten verwerken de gegevens verwerking.Bolts handle the data processing. Schichten kunnen alles doen, bijvoorbeeld berekening, persistentie of praten met externe onderdelen.Bolts can do anything, for example, computation, persistence, or talking to external components. In deze topologie worden twee schichten gebruikt:This topology uses two bolts:

  • SplitSentence: Splitst de zinnen die worden verzonden door RandomSentenceSpout in afzonderlijke woorden.SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

  • WordCount: Telt het aantal keren dat elk woord heeft plaatsgevonden.WordCount: Counts how many times each word has occurred.

SplitSentenceSplitSentence

Voer de onderstaande opdracht in om een nieuw bestand SplitSentence.javate maken en te openen:Enter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Kopieer en plak de Java-code hieronder in het nieuwe bestand.Then copy and paste the java code below into the new file. Sluit het bestand.Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCountWordCount

Voer de onderstaande opdracht in om een nieuw bestand WordCount.javate maken en te openen:Enter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

Kopieer en plak de Java-code hieronder in het nieuwe bestand.Then copy and paste the java code below into the new file. Sluit het bestand.Then close the file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

De topologie definiërenDefine the topology

De topologie verbindt de spouts en schichten samen in een grafiek, die definieert hoe gegevens stromen tussen de onderdelen.The topology ties the spouts and bolts together into a graph, which defines how data flows between the components. Het biedt ook parallellisme-hints die storm gebruiken bij het maken van exemplaren van de onderdelen in het cluster.It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

De volgende afbeelding is een basis diagram van de grafiek van onderdelen voor deze topologie.The following image is a basic diagram of the graph of components for this topology.

diagram van de spouts-en bouten-indeling

Als u de topologie wilt implementeren, voert u de onderstaande opdracht in om een nieuw WordCountTopology.javabestand te maken en te openen:To implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Kopieer en plak de Java-code hieronder in het nieuwe bestand.Then copy and paste the java code below into the new file. Sluit het bestand.Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

Logboek registratie configurerenConfigure logging

Storm gebruikt Apache Log4j 2 om informatie te registreren.Storm uses Apache Log4j 2 to log information. Als u geen logboek registratie configureert, verzendt de topologie diagnostische gegevens.If you do not configure logging, the topology emits diagnostic information. Als u wilt bepalen wat er wordt geregistreerd, maakt log4j2.xml u een resources bestand met de naam in de map door de volgende opdracht in te voeren:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

Kopieer en plak de onderstaande XML-tekst in het nieuwe bestand.Then copy and paste the XML text below into the new file. Sluit het bestand.Then close the file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
    <Console name="STDOUT" target="SYSTEM_OUT">
        <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
</Appenders>
<Loggers>
    <Logger name="com.microsoft.example" level="trace" additivity="false">
        <AppenderRef ref="STDOUT"/>
    </Logger>
    <Root level="error">
        <Appender-Ref ref="STDOUT"/>
    </Root>
</Loggers>
</Configuration>

Met deze XML wordt een nieuwe logboek registratie voor com.microsoft.example de klasse geconfigureerd, waaronder de onderdelen in deze voorbeeld topologie.This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. Het niveau is ingesteld op tracering voor deze logger, waarmee logboek gegevens worden vastgelegd die worden gegenereerd door onderdelen in deze topologie.The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

De <Root level="error"> sectie configureert het basis niveau van logboek registratie (alles niet in com.microsoft.example) om alleen fout gegevens te registreren.The <Root level="error"> section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

Zie https://logging.apache.org/log4j/2.x/manual/configuration.htmlvoor meer informatie over het configureren van logboek registratie voor Log4j 2.For more information on configuring logging for Log4j 2, see https://logging.apache.org/log4j/2.x/manual/configuration.html.

Notitie

Storm-versie 0.10.0 en hoger gebruiken Log4j 2. x.Storm version 0.10.0 and higher use Log4j 2.x. Oudere versies van Storm gebruiken Log4j 1. x, die een andere indeling hebben gebruikt voor de logboek configuratie.Older versions of storm used Log4j 1.x, which used a different format for log configuration. Zie https://wiki.apache.org/logging-log4j/Log4jXmlFormatvoor meer informatie over de oudere configuratie.For information on the older configuration, see https://wiki.apache.org/logging-log4j/Log4jXmlFormat.

De topologie lokaal testenTest the topology locally

Nadat u de bestanden hebt opgeslagen, gebruikt u de volgende opdracht om de topologie lokaal te testen.After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Terwijl deze wordt uitgevoerd, wordt in de topologie opstart gegevens weer gegeven.As it runs, the topology displays startup information. De volgende tekst is een voor beeld van de uitvoer van het aantal woorden:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

In dit voorbeeld logboek wordt aangegeven dat het woord ' en ' 113 keer is verzonden.This example log indicates that the word 'and' has been emitted 113 times. De telling blijft bestaan, zolang de topologie wordt uitgevoerd, omdat de Spout continu dezelfde zinnen verzendt.The count continues to go up as long as the topology runs because the spout continuously emits the same sentences.

Er is een interval van vijf seconden tussen de uitstoot van woorden en tellingen.There is a 5-second interval between emission of words and counts. Het WordCount -onderdeel is zo geconfigureerd dat alleen gegevens worden verzonden wanneer een Tick tuple arriveert.The WordCount component is configured to only emit information when a tick tuple arrives. Er wordt gevraagd dat Tick-Tuples elke vijf seconden worden geleverd.It requests that tick tuples are only delivered every five seconds.

De topologie omzetten naar stroomConvert the topology to Flux

Stroom is een nieuw Framework dat beschikbaar is met Storm 0.10.0 en hoger, waarmee u de configuratie kunt scheiden van implementatie.Flux is a new framework available with Storm 0.10.0 and higher, which allows you to separate configuration from implementation. Uw onderdelen zijn nog in Java gedefinieerd, maar de topologie wordt gedefinieerd met behulp van een YAML-bestand.Your components are still defined in Java, but the topology is defined using a YAML file. U kunt een standaard topologie definitie met uw project inpakken of een zelfstandig bestand gebruiken bij het verzenden van de topologie.You can package a default topology definition with your project, or use a standalone file when submitting the topology. Bij het indienen van de topologie voor Storm kunt u omgevings variabelen of configuratie bestanden gebruiken om waarden in de YAML-topologie definitie in te vullen.When submitting the topology to Storm, you can use environment variables or configuration files to populate values in the YAML topology definition.

Het YAML-bestand definieert de onderdelen die moeten worden gebruikt voor de topologie en de gegevens stroom ertussen.The YAML file defines the components to use for the topology and the data flow between them. U kunt een YAML-bestand opnemen als onderdeel van het jar-bestand of u kunt een extern YAML-bestand gebruiken.You can include a YAML file as part of the jar file or you can use an external YAML file.

Zie voor meer informatie over stroom stroom Framework (https://storm.apache.org/releases/current/flux.html).For more information on Flux, see Flux framework (https://storm.apache.org/releases/current/flux.html).

Waarschuwing

Als gevolg van een bug https://issues.apache.org/jira/browse/STORM-2055) ( met storm 1.0.1 moet u mogelijk een storm- ontwikkel omgeving installeren om de stroom topologie lokaal uit te voeren.Due to a bug (https://issues.apache.org/jira/browse/STORM-2055) with Storm 1.0.1, you may need to install a Storm development environment to run Flux topologies locally.

  1. Voorheen, WordCountTopology.java de topologie gedefinieerd, maar is niet nodig met stroom.Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. Verwijder het bestand met de volgende opdracht:Delete the file with the following command:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. Voer de onderstaande opdracht in om een nieuw bestand topology.yamlte maken en te openen:Enter the command below to create and open a new file topology.yaml:

    notepad resources\topology.yaml
    

    Kopieer en plak vervolgens de onderstaande tekst in het nieuwe bestand.Then copy and paste the text below into the new file. Sluit het bestand.Then close the file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
      topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
      className: "com.microsoft.example.RandomSentenceSpout"
      parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
      className: "com.microsoft.example.SplitSentence"
      parallelism: 1
    
    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 10
      parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
      from: "sentence-spout"       # The stream emitter
      to: "splitter-bolt"          # The stream consumer
      grouping:                    # Grouping type
        type: SHUFFLE
    
    - name: "Splitter -> Counter"
      from: "splitter-bolt"
      to: "counter-bolt"
      grouping:
        type: FIELDS
        args: ["word"]           # field(s) to group on
    
  3. Voer de onderstaande opdracht in om pom.xml de hieronder beschreven revisies te openen:Enter the command below to open pom.xml to make the described revisions below:

    notepad pom.xml
    
    • Voeg de volgende nieuwe afhankelijkheden toe <dependencies> aan de sectie:Add the following new dependency in the <dependencies> section:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    • Voeg de volgende invoeg toepassing toe <plugins> aan de sectie.Add the following plugin to the <plugins> section. Deze invoeg toepassing verwerkt het maken van een pakket (jar-bestand) voor het project en past enkele trans formaties toe die specifiek zijn voor stroom bij het maken van het pakket.This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    • Wijzig in de sectie exec-maven-plugin <configuration> de org.apache.storm.flux.Fluxwaarde voor <mainClass> van ${storm.topology} in.In the exec-maven-plugin <configuration> section, change the value for <mainClass> from ${storm.topology} to org.apache.storm.flux.Flux. Met deze instelling kan stroom worden verwerkt om de topologie lokaal uit te voeren in de ontwikkeling.This setting allows Flux to handle running the topology locally in development.

    • Voeg in <resources> de sectie het volgende toe aan <includes>.In the <resources> section, add the following to <includes>. Deze XML bevat het YAML-bestand dat de topologie definieert als onderdeel van het project.This XML includes the YAML file that defines the topology as part of the project.

      <include>topology.yaml</include>
      

De stroom topologie lokaal testenTest the flux topology locally

  1. Voer de volgende opdracht in om de stroom topologie te compileren en uit te voeren met maven:Enter the following command to compile and execute the Flux topology using Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Waarschuwing

    Als uw topologie Storm 1.0.1-bits gebruikt, mislukt deze opdracht.If your topology uses Storm 1.0.1 bits, this command fails. Deze fout wordt veroorzaakt door https://issues.apache.org/jira/browse/STORM-2055.This failure is caused by https://issues.apache.org/jira/browse/STORM-2055. Installeer in plaats daarvan Storm in uw ontwikkel omgeving en gebruik de volgende stappen:Instead, install Storm in your development environment and use the following steps:

    Als u Storm hebt geïnstalleerd in uw ontwikkel omgeving, kunt u in plaats daarvan de volgende opdrachten gebruiken:If you have installed Storm in your development environment, you can use the following commands instead:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    Met --local de para meter wordt de topologie in de lokale modus uitgevoerd op uw ontwikkel omgeving.The --local parameter runs the topology in local mode on your development environment. De -R /topology.yaml para meter gebruikt topology.yaml de bestands resource uit het jar-bestand om de topologie te definiëren.The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

    Terwijl deze wordt uitgevoerd, wordt in de topologie opstart gegevens weer gegeven.As it runs, the topology displays startup information. De volgende tekst is een voor beeld van de uitvoer:The following text is an example of the output:

     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
     17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Er is een vertraging van 10 seconden tussen batches van geregistreerde gegevens.There is a 10-second delay between batches of logged information.

  2. Een nieuwe topologie yaml maken op basis van het project.Create a new topology yaml from the project.

    a.a. Voer de onderstaande opdracht in om topology.xmlte openen:Enter the command below to open topology.xml:

    notepad resources\topology.yaml
    

    b.b. Zoek de volgende sectie en wijzig de waarde van 10 in 5.Find the following section and change the value of 10 to 5. Deze wijziging wijzigt het interval tussen het verzenden van batches van woorden van 10 seconden naar 5.This modification changes the interval between emitting batches of word counts from 10 seconds to 5.

    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 5
      parallelism: 1  
    

    c.c. Bestand opslaan als newtopology.yaml.Save file as newtopology.yaml.

  3. Voer de volgende opdracht in om de topologie uit te voeren:To run the topology, enter the following command:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    Of, als u Storm hebt in uw ontwikkelings omgeving:Or, if you have Storm on your development environment:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    Met deze opdracht wordt newtopology.yaml de topologie definitie gebruikt.This command uses the newtopology.yaml as the topology definition. Omdat we de compile para meter niet hebben opgenomen, gebruikt maven de versie van het project dat in de vorige stappen is gemaakt.Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

    Zodra de topologie is gestart, ziet u dat de tijd tussen verzonden batches is gewijzigd, zodat deze overeenkomt met newtopology.yamlde waarde in.Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. Zo kunt u zien dat u uw configuratie kunt wijzigen via een YAML-bestand zonder dat u de topologie opnieuw hoeft te compileren.So you can see that you can change your configuration through a YAML file without having to recompile the topology.

Zie voor meer informatie over deze en andere functies van het stroom raamwerk stroom (https://storm.apache.org/releases/current/flux.html).For more information on these and other features of the Flux framework, see Flux (https://storm.apache.org/releases/current/flux.html).

TridentTrident

Trident is een abstracte abstractie op hoog niveau die wordt verschaft door storm.Trident is a high-level abstraction that is provided by Storm. Het ondersteunt stateful verwerking.It supports stateful processing. Het belangrijkste voor deel van Trident is dat het ervoor kan zorgen dat elk bericht dat de topologie invult, slechts één keer wordt verwerkt.The primary advantage of Trident is that it can guarantee that every message that enters the topology is processed only once. Zonder gebruik van Trident kan uw topologie alleen garanderen dat berichten ten minste één keer worden verwerkt.Without using Trident, your topology can only guarantee that messages are processed at least once. Er zijn ook andere verschillen, zoals ingebouwde onderdelen die kunnen worden gebruikt in plaats van bouten te maken.There are also other differences, such as built-in components that can be used instead of creating bolts. In feite worden grendels vervangen door minder algemene onderdelen, zoals filters, prognoses en functies.In fact, bolts are replaced by less-generic components, such as filters, projections, and functions.

Trident-toepassingen kunnen worden gemaakt met behulp van Maven-projecten.Trident applications can be created by using Maven projects. U gebruikt dezelfde basis stappen zoals eerder in dit artikel wordt weer gegeven. alleen de code wijkt af.You use the same basic steps as presented earlier in this article—only the code is different. Trident kan ook niet (momenteel) worden gebruikt met het stroom kader.Trident also cannot (currently) be used with the Flux framework.

Zie de TRIDENT API overview(Engelstalig) voor meer informatie over Trident.For more information about Trident, see the Trident API Overview.

Volgende stappenNext Steps

U hebt geleerd hoe u een Apache Storm topologie maakt met behulp van Java.You have learned how to create an Apache Storm topology by using Java. Lees nu het volgende:Now learn how to:

U vindt meer voorbeeld Apache Storm topologieën door te bezoeken voorbeeld topologieën voor Apache Storm op HDInsight.You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.