Tworzenie topologii Apache Storm w języku JavaCreate an Apache Storm topology in Java

Dowiedz się, jak utworzyć topologię opartą na języku Java dla Apache Storm.Learn how to create a Java-based topology for Apache Storm. Tutaj utworzysz topologię burzy, która implementuje aplikację Word-Count.Here, you create a Storm topology that implements a word-count application. Do kompilowania i pakowania projektu służy program Apache Maven .You use Apache Maven to build and package the project. Następnie dowiesz się, jak zdefiniować topologię przy użyciu platformy Apache Storm strumienia .Then, you learn how to define the topology using the Apache Storm Flux framework.

Po wykonaniu kroków opisanych w tym dokumencie można wdrożyć topologię w celu Apache Storm w usłudze HDInsight.After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

Uwaga

Kompletna wersja przykładów topologii burzy utworzonych w tym dokumencie jest dostępna pod https://github.com/Azure-Samples/hdinsight-java-storm-wordcountadresem.A completed version of the Storm topology examples created in this document is available at https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

Wymagania wstępnePrerequisites

Środowisko testoweTest environment

Środowisko używane na potrzeby tego artykułu było komputerem z systemem Windows 10.The environment used for this article was a computer running Windows 10. Polecenia zostały wykonane w wierszu polecenia, a różne pliki były edytowane w Notatniku.The commands were executed in a command prompt, and the various files were edited with Notepad.

W wierszu polecenia wprowadź poniższe polecenia, aby utworzyć środowisko robocze:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI
cd C:\HDI

Tworzenie projektu MavenCreate a Maven project

Wprowadź następujące polecenie, aby utworzyć projekt Maven o nazwie WORDCOUNT:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

To polecenie tworzy katalog o nazwie WordCount w bieżącej lokalizacji, który zawiera podstawowy projekt Maven.This command creates a directory named WordCount at the current location, which contains a basic Maven project. Drugie polecenie zmienia bieżący katalog roboczy na WordCount.The second command changes the present working directory to WordCount. Trzecie polecenie tworzy nowy katalog, resourcesktóry będzie później używany.The third command creates a new directory, resources, which will be used later. WordCount Katalog zawiera następujące elementy:The WordCount directory contains the following items:

  • pom.xml: Zawiera ustawienia dla projektu Maven.pom.xml: Contains settings for the Maven project.
  • src\main\java\com\microsoft\example: Zawiera kod aplikacji.src\main\java\com\microsoft\example: Contains your application code.
  • src\test\java\com\microsoft\example: Zawiera testy dla aplikacji.src\test\java\com\microsoft\example: Contains tests for your application.

Usuwanie wygenerowanego przykładowego koduRemove the generated example code

Usuń wygenerowane pliki AppTest.javatestowe i aplikacje, a App.java następnie wprowadź poniższe polecenia:Delete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Dodawanie repozytoriów MavenAdd Maven repositories

Usługa HDInsight jest oparta na platformie Hortonworks Data Platform (HDP), dlatego zalecamy używanie repozytorium Hortonworks do pobierania zależności dla projektów Apache Storm.HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

Otwórz pom.xml , wprowadzając następujące polecenie:Open pom.xml by entering the command below:

notepad pom.xml

Następnie Dodaj następujący kod XML po <url> https://maven.apache.org</url> wierszu:Then add the following XML after the <url>https://maven.apache.org</url> line:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Dodaj właściwościAdd properties

Maven umożliwia definiowanie wartości na poziomie projektu o nazwie Properties.Maven allows you to define project-level values called properties. W pom.xmlprogramie Dodaj następujący tekst </repositories> po wierszu:In pom.xml, add the following text after the </repositories> line:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

Możesz teraz używać tej wartości w innych sekcjach pom.xml.You can now use this value in other sections of the pom.xml. Na przykład podczas określania wersji składników burzy można użyć ${storm.version} zamiast twardej kodowania wartości.For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

Dodaj zależnościAdd dependencies

Dodaj zależność dla składników burzy.Add a dependency for Storm components. W pom.xmlprogramie Dodaj następujący tekst <dependencies> do sekcji:In pom.xml, add the following text in the <dependencies> section:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

W czasie kompilacji Maven używa tych informacji do wyszukania storm-core w repozytorium Maven.At compile time, Maven uses this information to look up storm-core in the Maven repository. Najpierw szuka repozytorium na komputerze lokalnym.It first looks in the repository on your local computer. Jeśli pliki nie znajdują się tam, Maven pobiera je z publicznego repozytorium Maven i zapisuje je w repozytorium lokalnym.If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

Uwaga

Zwróć uwagę <scope>provided</scope> na wiersz w tej sekcji.Notice the <scope>provided</scope> line in this section. To ustawienie instruuje Maven, aby wykluczać z tworzonych przez siebie plików jar, ponieważ są one dostarczane przez system.This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

Konfiguracja kompilacjiBuild configuration

Wtyczki Maven umożliwiają dostosowanie etapów kompilacji projektu.Maven plug-ins allow you to customize the build stages of the project. Na przykład sposób kompilowania projektu lub sposobu pakowania go do pliku JAR.For example, how the project is compiled or how to package it into a JAR file. W pom.xmlprogramie Dodaj następujący tekst bezpośrednio </project> nad wierszem.In pom.xml, add the following text directly above the </project> line.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Ta sekcja służy do dodawania dodatków plug-in, zasobów i innych opcji konfiguracji kompilacji.This section is used to add plug-ins, resources, and other build configuration options. Aby uzyskać pełne odwołanie pom.xml do pliku, zobacz. https://maven.apache.org/pom.htmlFor a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

Dodaj wtyczkiAdd plug-ins

  • Wtyczka exec MavenExec Maven Plugin

    W przypadku topologii Apache Storm wdrożonej w języku Java wtyczka exec Maven jest przydatna, ponieważ umożliwia ona łatwe uruchamianie topologii lokalnie w środowisku deweloperskim.For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. Dodaj następujące elementy do <plugins> sekcji pom.xml pliku, aby uwzględnić wtyczkę exec Maven:Add the following to the <plugins> section of the pom.xml file to include the Exec Maven plugin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
            <goals>
                <goal>exec</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads> 
        </configuration>
    </plugin>
    
  • Wtyczka kompilatora Apache MavenApache Maven Compiler Plugin

    Kolejną przydatną wtyczką jest wtyczka kompilatora Apache Maven, która służy do zmiany opcji kompilacji.Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. Zmień wersję języka Java używaną przez Maven dla źródła i celu dla aplikacji.Change the Java version that Maven uses for the source and target for your application.

    • W przypadku usługi HDInsight __3,4 lub starszej__Ustaw źródłową i docelową wersję języka Java na 1,7.For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

    • W przypadku usługi HDInsight __3,5__Ustaw źródłową i docelową wersję języka Java na 1,8.For HDInsight 3.5, set the source and target Java version to 1.8.

      Dodaj następujący tekst <plugins> do sekcji pom.xml pliku, aby dołączyć wtyczkę kompilatora Apache Maven.Add the following text in the <plugins> section of the pom.xml file to include the Apache Maven Compiler plugin. W tym przykładzie określono 1,8, więc docelowa wersja usługi HDInsight to 3,5.This example specifies 1.8, so the target HDInsight version is 3.5.

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
        <source>1.8</source>
        <target>1.8</target>
        </configuration>
      </plugin>
      

Konfigurowanie zasobówConfigure resources

Sekcja Resources umożliwia uwzględnienie zasobów niezwiązanych z kodem, takich jak pliki konfiguracji potrzebne przez składniki w topologii.The resources section allows you to include non-code resources such as configuration files needed by components in the topology. W tym przykładzie Dodaj następujący tekst w <resources> sekcji pom.xml pliku.For this example, add the following text in the <resources> section of the pom.xml file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
        <include>log4j2.xml</include>
    </includes>
</resource>

Ten przykład dodaje katalog zasobów w katalogu głównym projektu (${basedir}) jako lokalizację, która zawiera zasoby, i zawiera plik o nazwie. log4j2.xmlThis example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. Ten plik służy do konfigurowania informacji, które są rejestrowane przez topologię.This file is used to configure what information is logged by the topology.

Tworzenie topologiiCreate the topology

Topologia Apache Storm oparta na języku Java składa się z trzech składników, które należy utworzyć (lub jako odwołanie) jako zależność.A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

  • Elementy Spout: Odczytuje dane ze źródeł zewnętrznych i emituje strumienie danych do topologii.Spouts: Reads data from external sources and emits streams of data into the topology.

  • Pioruny: Wykonuje przetwarzanie strumieni emitowanych przez elementy Spout lub inne pioruny i emituje co najmniej jeden strumień.Bolts: Performs processing on streams emitted by spouts or other bolts, and emits one or more streams.

  • Topologia: Definiuje, w jaki sposób elementy Spout i pioruny są rozmieszczone i udostępnia punkt wejścia dla topologii.Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

Tworzenie elementu SpoutCreate the spout

Aby zmniejszyć wymagania dotyczące konfigurowania zewnętrznych źródeł danych, następujące elementu Spout po prostu emituje losowe zdania.To reduce requirements for setting up external data sources, the following spout simply emits random sentences. Jest to zmodyfikowana wersja elementu Spout, która jest dostarczana z przykładami burzy.It is a modified version of a spout that is provided with the Storm-Starter examples. Mimo że Ta topologia używa tylko jednego elementu spoutu, inne mogą mieć kilka źródeł danych pochodzących z różnych źródła do topologii.Although this topology uses only one spout, others may have several that feed data from different sources into the topology.

Wprowadź poniższe polecenie, aby utworzyć i otworzyć nowy plik RandomSentenceSpout.java:Enter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Następnie skopiuj i wklej poniższy kod Java do nowego pliku.Then copy and paste the java code below into the new file. Następnie zamknij plik.Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Uwaga

Przykład elementu Spout, który odczytuje z zewnętrznego źródła danych, można znaleźć w jednym z następujących przykładów:For an example of a spout that reads from an external data source, see one of the following examples:

Tworzenie piorunówCreate the bolts

Pioruny obsługują przetwarzanie danych.Bolts handle the data processing. Pioruny mogą wykonywać dowolne czynności, na przykład obliczenia, trwałość lub rozmowy z zewnętrznymi składnikami.Bolts can do anything, for example, computation, persistence, or talking to external components. Ta topologia używa dwóch piorunów:This topology uses two bolts:

  • SplitSentence: Dzieli zdania emitowane przez RandomSentenceSpout na poszczególne słowa.SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

  • WORDCOUNT: Zlicza liczbę przypadków wystąpienia poszczególnych wyrazów.WordCount: Counts how many times each word has occurred.

SplitSentenceSplitSentence

Wprowadź poniższe polecenie, aby utworzyć i otworzyć nowy plik SplitSentence.java:Enter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Następnie skopiuj i wklej poniższy kod Java do nowego pliku.Then copy and paste the java code below into the new file. Następnie zamknij plik.Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCountWordCount

Wprowadź poniższe polecenie, aby utworzyć i otworzyć nowy plik WordCount.java:Enter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

Następnie skopiuj i wklej poniższy kod Java do nowego pliku.Then copy and paste the java code below into the new file. Następnie zamknij plik.Then close the file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Definiowanie topologiiDefine the topology

Topologia łączy elementy Spout i napływa do grafu, który definiuje sposób przepływu danych między składnikami.The topology ties the spouts and bolts together into a graph, which defines how data flows between the components. Zawiera również wskazówki równoległości, które są używane podczas tworzenia wystąpień składników w klastrze.It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

Na poniższej ilustracji przedstawiono Podstawowy diagram grafu składników dla tej topologii.The following image is a basic diagram of the graph of components for this topology.

Diagram przedstawiający układ elementy Spout i piorunów

Aby zaimplementować topologię, wprowadź poniższe polecenie, aby utworzyć i otworzyć nowy plik WordCountTopology.java:To implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Następnie skopiuj i wklej poniższy kod Java do nowego pliku.Then copy and paste the java code below into the new file. Następnie zamknij plik.Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

Konfigurowanie rejestrowaniaConfigure logging

W przypadku korzystania z oprogramowania Apache Log4J 2 do rejestrowania informacji.Storm uses Apache Log4j 2 to log information. Jeśli rejestrowanie nie zostanie skonfigurowane, topologia emituje informacje diagnostyczne.If you do not configure logging, the topology emits diagnostic information. Aby kontrolować to, co jest rejestrowane, Utwórz plik log4j2.xml o nazwie resources w katalogu, wprowadzając poniższe polecenie:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

Następnie skopiuj i wklej poniższy tekst XML do nowego pliku.Then copy and paste the XML text below into the new file. Następnie zamknij plik.Then close the file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
    <Console name="STDOUT" target="SYSTEM_OUT">
        <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
</Appenders>
<Loggers>
    <Logger name="com.microsoft.example" level="trace" additivity="false">
        <AppenderRef ref="STDOUT"/>
    </Logger>
    <Root level="error">
        <Appender-Ref ref="STDOUT"/>
    </Root>
</Loggers>
</Configuration>

Ten plik XML konfiguruje nowy Rejestrator dla com.microsoft.example klasy, który obejmuje składniki w tej przykładowej topologii.This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. Poziom jest ustawiany na wartość Trace dla tego rejestratora, który przechwytuje wszystkie informacje o rejestrowaniu emitowane przez składniki w tej topologii.The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

Sekcja konfiguruje główny poziom rejestrowania (wszystko nie jest w com.microsoft.example), aby rejestrować tylko informacje o błędach. <Root level="error">The <Root level="error"> section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

Aby uzyskać więcej informacji o konfigurowaniu rejestrowania dla Log4J 2 https://logging.apache.org/log4j/2.x/manual/configuration.html, zobacz.For more information on configuring logging for Log4j 2, see https://logging.apache.org/log4j/2.x/manual/configuration.html.

Uwaga

W wersji 0.10.0 i wyższych użyto Log4J 2. x.Storm version 0.10.0 and higher use Log4j 2.x. Starsze wersje sieci log4j 1. x, które używały innego formatu do konfiguracji dziennika.Older versions of storm used Log4j 1.x, which used a different format for log configuration. Aby uzyskać informacje na temat starszej konfiguracji https://wiki.apache.org/logging-log4j/Log4jXmlFormat, zobacz.For information on the older configuration, see https://wiki.apache.org/logging-log4j/Log4jXmlFormat.

Lokalne testowanie topologiiTest the topology locally

Po zapisaniu plików użyj następującego polecenia, aby przetestować topologię lokalnie.After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Po uruchomieniu topologia wyświetla informacje uruchamiania.As it runs, the topology displays startup information. Poniższy tekst to przykład danych wyjściowych zliczania wyrazów:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

Ten przykładowy dziennik wskazuje, że wyrazy "i" zostały wyemitowane 113 razy.This example log indicates that the word 'and' has been emitted 113 times. Liczba jest w dalszym ciągu tak długo, jak działa topologia, ponieważ elementu Spout ciągle emituje te same zdania.The count continues to go up as long as the topology runs because the spout continuously emits the same sentences.

Występuje 5-sekundowy interwał między emisją wyrazów a liczbą.There is a 5-second interval between emission of words and counts. Składnik WORDCOUNT jest skonfigurowany tak, aby emituje informacje tylko po nadejściu krotki.The WordCount component is configured to only emit information when a tick tuple arrives. Żądania, że krotki są dostarczane tylko co pięć sekund.It requests that tick tuples are only delivered every five seconds.

Konwertuj topologię na strumieńConvert the topology to Flux

Strumień jest nową strukturą, która jest dostępna z burzą 0.10.0 i wyższą, która pozwala na oddzielenie konfiguracji od implementacji.Flux is a new framework available with Storm 0.10.0 and higher, which allows you to separate configuration from implementation. Składniki są nadal zdefiniowane w języku Java, ale topologia jest definiowana przy użyciu pliku YAML.Your components are still defined in Java, but the topology is defined using a YAML file. Można spakować domyślną definicję topologii z projektem lub użyć pliku autonomicznego podczas przesyłania topologii.You can package a default topology definition with your project, or use a standalone file when submitting the topology. Podczas przesyłania topologii do burzy, można użyć zmiennych środowiskowych lub plików konfiguracyjnych, aby wypełnić wartości w definicji topologii YAML.When submitting the topology to Storm, you can use environment variables or configuration files to populate values in the YAML topology definition.

Plik YAML definiuje składniki do użycia w topologii i przepływ danych między nimi.The YAML file defines the components to use for the topology and the data flow between them. Możesz dołączyć plik YAML jako część pliku JAR lub użyć zewnętrznego pliku YAML.You can include a YAML file as part of the jar file or you can use an external YAML file.

Aby uzyskać więcej informacji o strumieniu, zobacz temat platforma strumieniowa (https://storm.apache.org/releases/current/flux.html).For more information on Flux, see Flux framework (https://storm.apache.org/releases/current/flux.html).

Ostrzeżenie

Ze względu na usterkę (https://issues.apache.org/jira/browse/STORM-2055) w przypadku burzy 1.0.1 może być konieczne zainstalowanie środowiska programistycznego burzy w celu lokalnego uruchamiania topologii strumieniowej.Due to a bug (https://issues.apache.org/jira/browse/STORM-2055) with Storm 1.0.1, you may need to install a Storm development environment to run Flux topologies locally.

  1. WordCountTopology.java Wcześniej zdefiniowana topologia, ale nie jest wymagana ze strumieniem.Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. Usuń plik za pomocą następującego polecenia:Delete the file with the following command:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. Wprowadź poniższe polecenie, aby utworzyć i otworzyć nowy plik topology.yaml:Enter the command below to create and open a new file topology.yaml:

    notepad resources\topology.yaml
    

    Następnie skopiuj i wklej poniższy tekst do nowego pliku.Then copy and paste the text below into the new file. Następnie zamknij plik.Then close the file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
      topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
      className: "com.microsoft.example.RandomSentenceSpout"
      parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
      className: "com.microsoft.example.SplitSentence"
      parallelism: 1
    
    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 10
      parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
      from: "sentence-spout"       # The stream emitter
      to: "splitter-bolt"          # The stream consumer
      grouping:                    # Grouping type
        type: SHUFFLE
    
    - name: "Splitter -> Counter"
      from: "splitter-bolt"
      to: "counter-bolt"
      grouping:
        type: FIELDS
        args: ["word"]           # field(s) to group on
    
  3. Wprowadź poniższe polecenie, aby otworzyć pom.xml następujące poprawki:Enter the command below to open pom.xml to make the described revisions below:

    notepad pom.xml
    
    • Dodaj następującą nową zależność w <dependencies> sekcji:Add the following new dependency in the <dependencies> section:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    • Dodaj do <plugins> sekcji następującą wtyczkę.Add the following plugin to the <plugins> section. Ta wtyczka obsługuje tworzenie pakietu (plik JAR) dla projektu i stosuje pewne przekształcenia specyficzne dla strumienia podczas tworzenia pakietu.This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    • W sekcji exec-Maven-plugin <configuration> Zmień wartość parametru <mainClass> z ${storm.topology} na org.apache.storm.flux.Flux.In the exec-maven-plugin <configuration> section, change the value for <mainClass> from ${storm.topology} to org.apache.storm.flux.Flux. To ustawienie umożliwia strumieniom obsługę uruchamiania topologii lokalnie w środowisku programistycznym.This setting allows Flux to handle running the topology locally in development.

    • W sekcji Dodaj następujące polecenie do <includes>. <resources>In the <resources> section, add the following to <includes>. Ten kod XML zawiera plik YAML, który definiuje topologię w ramach projektu.This XML includes the YAML file that defines the topology as part of the project.

      <include>topology.yaml</include>
      

Lokalne testowanie topologii strumieniowejTest the flux topology locally

  1. Wprowadź następujące polecenie, aby skompilować i uruchomić topologię strumienia przy użyciu Maven:Enter the following command to compile and execute the Flux topology using Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Ostrzeżenie

    Jeśli topologia używa usługi burza 1.0.1, to polecenie kończy się niepowodzeniem.If your topology uses Storm 1.0.1 bits, this command fails. Ten błąd jest spowodowany https://issues.apache.org/jira/browse/STORM-2055przez.This failure is caused by https://issues.apache.org/jira/browse/STORM-2055. Zamiast tego należy zainstalować burzę w środowisku deweloperskim i wykonać następujące czynności:Instead, install Storm in your development environment and use the following steps:

    Jeśli masz zainstalowaną burzę w środowisku deweloperskim, możesz użyć następujących poleceń:If you have installed Storm in your development environment, you can use the following commands instead:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    --local Parametr uruchamia topologię w trybie lokalnym w środowisku deweloperskim.The --local parameter runs the topology in local mode on your development environment. -R /topology.yaml Parametr używazasobuplikuzplikuJARdotopology.yaml zdefiniowania topologii.The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

    Po uruchomieniu topologia wyświetla informacje uruchamiania.As it runs, the topology displays startup information. Następujący tekst to przykład danych wyjściowych:The following text is an example of the output:

     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
     17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Występuje 10-sekundowe opóźnienie między partiami zarejestrowanych informacji.There is a 10-second delay between batches of logged information.

  2. Utwórz nowy YAML topologii z projektu.Create a new topology yaml from the project.

    a.a. Wprowadź poniższe polecenie, aby otworzyć topology.xml:Enter the command below to open topology.xml:

    notepad resources\topology.yaml
    

    b.b. Znajdź następującą sekcję i zmień wartość 10 na. 5Find the following section and change the value of 10 to 5. Ta modyfikacja zmienia interwał między emitowaniem partii wyrazów z 10 sekund do 5.This modification changes the interval between emitting batches of word counts from 10 seconds to 5.

    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 5
      parallelism: 1  
    

    c.c. Zapisz plik jako newtopology.yaml.Save file as newtopology.yaml.

  3. Aby uruchomić topologię, wprowadź następujące polecenie:To run the topology, enter the following command:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    Lub, jeśli masz burzę w środowisku deweloperskim:Or, if you have Storm on your development environment:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    To polecenie używa newtopology.yaml jako definicji topologii.This command uses the newtopology.yaml as the topology definition. Ponieważ compile parametr nie został uwzględniony, Maven używa wersji projektu skompilowanej w poprzednich krokach.Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

    Po rozpoczęciu topologii należy zauważyć, że czas między wyemitowanymi partiami zmienił się w celu odzwierciedlenia wartości w newtopology.yaml.Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. Pozwala to zobaczyć, że można zmienić konfigurację za pomocą pliku YAML bez konieczności ponownego kompilowania topologii.So you can see that you can change your configuration through a YAML file without having to recompile the topology.

Aby uzyskać więcej informacji na temat tych i innych funkcji platformy strumieniowej, zobacz strumień https://storm.apache.org/releases/current/flux.html) (.For more information on these and other features of the Flux framework, see Flux (https://storm.apache.org/releases/current/flux.html).

PomocTrident

Trident jest abstrakcją wysokiego poziomu, która jest dostarczana przez burzę.Trident is a high-level abstraction that is provided by Storm. Obsługuje przetwarzanie stanowe.It supports stateful processing. Główną zaletą programu Trident jest możliwość zagwarantowania, że każdy komunikat, który przejdzie do topologii, jest przetwarzany tylko raz.The primary advantage of Trident is that it can guarantee that every message that enters the topology is processed only once. Bez korzystania z programu Trident topologia może zagwarantować, że komunikaty są przetwarzane co najmniej raz.Without using Trident, your topology can only guarantee that messages are processed at least once. Istnieją również inne różnice, takie jak wbudowane składniki, które mogą być używane zamiast tworzenia piorunów.There are also other differences, such as built-in components that can be used instead of creating bolts. W rzeczywistości pioruny są zastępowane przez składniki mniejsze niż ogólne, takie jak filtry, projekcje i funkcje.In fact, bolts are replaced by less-generic components, such as filters, projections, and functions.

Aplikacje Trident można tworzyć za pomocą projektów Maven.Trident applications can be created by using Maven projects. Używasz tych samych podstawowych kroków, które przedstawiono wcześniej w tym artykule — tylko kod jest różny.You use the same basic steps as presented earlier in this article—only the code is different. Nie można również (obecnie) używać programu Trident z platformą strumienia.Trident also cannot (currently) be used with the Flux framework.

Aby uzyskać więcej informacji na temat programu Trident, zobacz Omówienie interfejsu API Trident.For more information about Trident, see the Trident API Overview.

Następne krokiNext Steps

Wiesz już, jak utworzyć topologię Apache Storm przy użyciu języka Java.You have learned how to create an Apache Storm topology by using Java. Teraz Dowiedz się, jak:Now learn how to:

Więcej przykładowych topologii Apache Storm można znaleźć, odwiedzając przykładowe topologie Apache Storm w usłudze HDInsight.You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.