Java 'da Apache Storm topolojisi oluşturmaCreate an Apache Storm topology in Java

Apache Stormiçin Java tabanlı topoloji oluşturmayı öğrenin.Learn how to create a Java-based topology for Apache Storm. Burada, bir sözcük sayısı uygulaması uygulayan bir fırtınası topolojisi oluşturursunuz.Here, you create a Storm topology that implements a word-count application. Projeyi derlemek ve paketlemek için Apache Maven 'yi kullanırsınız.You use Apache Maven to build and package the project. Daha sonra, Apache Storm Flox çerçevesini kullanarak topolojiyi nasıl tanımlayacağınızı öğreneceksiniz.Then, you learn how to define the topology using the Apache Storm Flux framework.

Bu belgedeki adımları tamamladıktan sonra, HDInsight üzerinde Apache Storm için topolojiyi dağıtabilirsiniz.After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

Not

Bu belgede oluşturulan bir fırtınası topolojisi örneklerinin tamamlanmış bir sürümü https://github.com/Azure-Samples/hdinsight-java-storm-wordcount' de kullanılabilir.A completed version of the Storm topology examples created in this document is available at https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

ÖnkoşullarPrerequisites

Test ortamıTest environment

Bu makale için kullanılan ortam, Windows 10 çalıştıran bir bilgisayardır.The environment used for this article was a computer running Windows 10. Komutlar bir komut isteminde yürütülürler ve çeşitli dosyalar Notepad ile düzenlendi.The commands were executed in a command prompt, and the various files were edited with Notepad.

Bir komut isteminden, çalışan bir ortam oluşturmak için aşağıdaki komutları girin:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI
cd C:\HDI

Maven projesi oluşturmaCreate a Maven project

WORDCOUNTadlı bir Maven projesi oluşturmak için aşağıdaki komutu girin:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

Bu komut, temel bir Maven projesi içeren geçerli konumda WordCount adlı bir dizin oluşturur.This command creates a directory named WordCount at the current location, which contains a basic Maven project. İkinci komut, mevcut çalışma dizinini WordCountolarak değiştirir.The second command changes the present working directory to WordCount. Üçüncü komut daha sonra kullanılacak yeni bir dizin resourcesoluşturur.The third command creates a new directory, resources, which will be used later. WordCount dizin aşağıdaki öğeleri içerir:The WordCount directory contains the following items:

  • pom.xml: Maven projesinin ayarlarını Içerir.pom.xml: Contains settings for the Maven project.
  • src\main\java\com\microsoft\example: uygulama kodunuzu Içerir.src\main\java\com\microsoft\example: Contains your application code.
  • src\test\java\com\microsoft\example: uygulamanız için testler Içerir.src\test\java\com\microsoft\example: Contains tests for your application.

Oluşturulan örnek kodu kaldırınRemove the generated example code

Oluşturulan test ve uygulama dosyalarını AppTest.javasilin ve aşağıdaki komutları girerek App.java:Delete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Maven depoları eklemeAdd Maven repositories

HDInsight, Hortonçalışmalar veri platformunu (HDP) temel alır, bu nedenle Apache Storm projeleriniz için bağımlılıkları indirmek üzere Hortonlıs deposunu kullanmanızı öneririz.HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

Aşağıdaki komutu girerek pom.xml açın:Open pom.xml by entering the command below:

notepad pom.xml

Sonra <url> https://maven.apache.org</url> satırdan sonra aşağıdaki XML 'i ekleyin:Then add the following XML after the <url>https://maven.apache.org</url> line:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Özellik EkleAdd properties

Maven, özellikler olarak adlandırılan proje düzeyi değerlerini tanımlamanızı sağlar.Maven allows you to define project-level values called properties. pom.xml, </repositories> satırından sonra aşağıdaki metni ekleyin:In pom.xml, add the following text after the </repositories> line:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

Artık bu değeri pom.xmldiğer bölümlerinde kullanabilirsiniz.You can now use this value in other sections of the pom.xml. Örneğin, fırtınası bileşenleri sürümünü belirtirken, bir değeri sabit bir şekilde kodlamak yerine ${storm.version} kullanabilirsiniz.For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

Bağımlılık EkleAdd dependencies

Fırtınası bileşenleri için bir bağımlılık ekleyin.Add a dependency for Storm components. pom.xml, <dependencies> bölümüne aşağıdaki metni ekleyin:In pom.xml, add the following text in the <dependencies> section:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

Derleme zamanında Maven, Maven deposunda storm-core aramak için bu bilgileri kullanır.At compile time, Maven uses this information to look up storm-core in the Maven repository. İlk olarak yerel bilgisayarınızdaki depoya bakar.It first looks in the repository on your local computer. Dosyalar orada yoksa Maven bunları ortak Maven deposundan indirir ve bunları yerel depoda depolar.If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

Not

Bu bölümdeki <scope>provided</scope> satırına dikkat edin.Notice the <scope>provided</scope> line in this section. Bu ayar, Maven 'yi, sistem tarafından sağlandığı için oluşturulan tüm JAR dosyalarından fırtınası çekirdeğini hariç tutmasını söyler.This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

Derleme yapılandırmasıBuild configuration

Maven eklentileri projenin derleme aşamalarını özelleştirmenizi sağlar.Maven plug-ins allow you to customize the build stages of the project. Örneğin, projenin nasıl derlendiğini veya bir JAR dosyasına nasıl paketleneceğini öğrenin.For example, how the project is compiled or how to package it into a JAR file. pom.xml, aşağıdaki metni doğrudan </project> satırının üstüne ekleyin.In pom.xml, add the following text directly above the </project> line.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Bu bölüm eklenti, kaynak ve diğer derleme yapılandırma seçeneklerini eklemek için kullanılır.This section is used to add plug-ins, resources, and other build configuration options. pom.xml dosyanın tam bir başvurusu için bkz. https://maven.apache.org/pom.html.For a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

Eklentiler eklemeAdd plug-ins

  • Exec Maven eklentisiExec Maven Plugin

    Java 'da uygulanan Apache Storm topolojileri için, geliştirme ortamınızda yerel olarak topolojiyi kolayca çalıştırmanıza olanak sağladığından, Exec Maven eklentisi faydalıdır.For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. Exec Maven eklentisini dahil etmek için pom.xml dosyasının <plugins> bölümüne aşağıdakini ekleyin:Add the following to the <plugins> section of the pom.xml file to include the Exec Maven plugin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
                <goals>
                    <goal>exec</goal>
                </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
        </configuration>
    </plugin>
    
  • Apache Maven derleyicisi eklentisiApache Maven Compiler Plugin

    Diğer bir faydalı eklenti, derleme seçeneklerini değiştirmek için kullanılan Apache Maven derleyicisieklentisidir.Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. Maven 'nin uygulamanızın kaynağı ve hedefi için kullandığı Java sürümünü değiştirin.Change the Java version that Maven uses for the source and target for your application.

    • HDInsight 3,4 veya önceki sürümlerde, kaynak ve hedef Java sürümünü __1,7__olarak ayarlayın.For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

    • HDInsight __3,5__için, kaynak ve hedef Java sürümünü __1,8__olarak ayarlayın.For HDInsight 3.5, set the source and target Java version to 1.8.

    Apache Maven derleyicisi eklentisini dahil etmek için pom.xml dosyasının <plugins> bölümüne aşağıdaki metni ekleyin.Add the following text in the <plugins> section of the pom.xml file to include the Apache Maven Compiler plugin. Bu örnek 1,8 belirtir, bu nedenle hedef HDInsight sürümü 3,5 ' dir.This example specifies 1.8, so the target HDInsight version is 3.5.

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
              <source>1.8</source>
              <target>1.8</target>
      </configuration>
    </plugin>
    

Kaynakları yapılandırmaConfigure resources

Kaynaklar bölümü, topolojideki bileşenlere gereken yapılandırma dosyaları gibi kod olmayan kaynakları dahil etmenize olanak tanır.The resources section allows you to include non-code resources such as configuration files needed by components in the topology. Bu örnek için, pom.xml dosyasının <resources> bölümüne aşağıdaki metni ekleyin.For this example, add the following text in the <resources> section of the pom.xml file. Sonra dosyayı kaydedip kapatın.Then save and close the file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
            <include>log4j2.xml</include>
    </includes>
</resource>

Bu örnek, proje kökündeki kaynak dizinini (${basedir}) kaynakları içeren bir konum olarak ekler ve log4j2.xmladlı dosyayı içerir.This example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. Bu dosya, topoloji tarafından hangi bilgilerin günlüğe kaydedileceğini yapılandırmak için kullanılır.This file is used to configure what information is logged by the topology.

Topolojiyi oluşturmaCreate the topology

Java tabanlı Apache Storm topolojisi, bir bağımlılık olarak yazmak (veya başvuru yapmanız gereken üç bileşenden oluşur).A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

  • Spout: dış kaynaklardaki verileri okur ve veri akışlarını topolojiye yayar.Spouts: Reads data from external sources and emits streams of data into the topology.

  • Cıvatalar: spolar veya diğer cıvatları tarafından yayılan akışlar üzerinde işleme gerçekleştirir ve bir veya daha fazla akış yayar.Bolts: Performs processing on streams emitted by spouts or other bolts, and emits one or more streams.

  • Topoloji: Spout ve cıvatların nasıl düzenlendiğini tanımlar ve topoloji için giriş noktası sağlar.Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

Spout oluşturmaCreate the spout

Dış veri kaynaklarını ayarlamaya yönelik gereksinimleri azaltmak için aşağıdaki Spout rastgele cümleler yayar.To reduce requirements for setting up external data sources, the following spout simply emits random sentences. Bu, fırtınası-Starter örnekleriile birlikte sunulan bir Spout 'ın değiştirilmiş sürümüdür.It's a modified version of a spout that is provided with the Storm-Starter examples. Bu topoloji yalnızca bir Spout kullanıyor olsa da, diğerleri farklı kaynaklardan alınan çeşitli akış verilerine topolojiye sahip olabilir.Although this topology uses only one spout, others may have several that feed data from different sources into the topology.

RandomSentenceSpout.javayeni bir dosya oluşturmak ve açmak için aşağıdaki komutu girin:Enter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Ardından aşağıdaki Java kodunu kopyalayıp yeni dosyaya yapıştırın.Then copy and paste the java code below into the new file. Sonra dosyayı kapatın.Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Not

Dış veri kaynağından okuyan bir Spout örneği için aşağıdaki örneklerden birine bakın:For an example of a spout that reads from an external data source, see one of the following examples:

Cıvatları oluşturmaCreate the bolts

Cıvatalar, veri işlemeyi işler.Bolts handle the data processing. Cıvatalar, örneğin hesaplama, kalıcılık veya dış bileşenlere konuşuyor gibi her şeyi gerçekleştirebilir.Bolts can do anything, for example, computation, persistence, or talking to external components. Bu topoloji iki cıvatları kullanır:This topology uses two bolts:

  • Splitcümlesini: Rasgelesentencespout tarafından tek tek sözcüklere yayılan cümleler böler.SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

  • WORDCOUNT: her bir sözcüğün kaç kez oluştuğunu sayar.WordCount: Counts how many times each word has occurred.

SplitcümlesiSplitSentence

SplitSentence.javayeni bir dosya oluşturmak ve açmak için aşağıdaki komutu girin:Enter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Ardından aşağıdaki Java kodunu kopyalayıp yeni dosyaya yapıştırın.Then copy and paste the java code below into the new file. Sonra dosyayı kapatın.Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCountWordCount

WordCount.javayeni bir dosya oluşturmak ve açmak için aşağıdaki komutu girin:Enter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

Ardından aşağıdaki Java kodunu kopyalayıp yeni dosyaya yapıştırın.Then copy and paste the java code below into the new file. Sonra dosyayı kapatın.Then close the file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Topolojiyi tanımlamaDefine the topology

Topoloji, biriktirmelerin ve cıvatları, verilerin bileşenler arasında nasıl akacağını tanımlayan bir grafikte birleştirir.The topology ties the spouts and bolts together into a graph, which defines how data flows between the components. Ayrıca, bir küme içindeki bileşenlerin örneklerini oluştururken, fırtınası tarafından kullanılan paralellik ipuçları da sağlar.It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

Aşağıdaki görüntü, bu topolojinin bileşen grafiğinin temel bir diyagramıdır.The following image is a basic diagram of the graph of components for this topology.

Spout ve cıvatları düzenlemeyi gösteren diyagram

Topolojiyi uygulamak için, aşağıdaki komutu girerek yeni bir dosya oluşturun WordCountTopology.javaaçın:To implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Ardından aşağıdaki Java kodunu kopyalayıp yeni dosyaya yapıştırın.Then copy and paste the java code below into the new file. Sonra dosyayı kapatın.Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

Günlüğe kaydetmeyi yapılandırmaConfigure logging

Fırtınası, bilgileri günlüğe kaydetmek için Apache Log4J 2 kullanır.Storm uses Apache Log4j 2 to log information. Günlüğe kaydetmeyi yapılandırmazsanız, topoloji tanılama bilgilerini yayar.If you don't configure logging, the topology emits diagnostic information. Günlüğe nelerin kaydedildiğini denetlemek için, aşağıdaki komutu girerek resources dizininde log4j2.xml adlı bir dosya oluşturun:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

Sonra aşağıdaki XML metnini kopyalayıp yeni dosyaya yapıştırın.Then copy and paste the XML text below into the new file. Sonra dosyayı kapatın.Then close the file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Console name="STDOUT" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Logger name="com.microsoft.example" level="trace" additivity="false">
            <AppenderRef ref="STDOUT"/>
        </Logger>
        <Root level="error">
            <Appender-Ref ref="STDOUT"/>
        </Root>
    </Loggers>
</Configuration>

Bu XML, bu örnek topolojideki bileşenleri içeren com.microsoft.example sınıfı için yeni bir günlükçü yapılandırır.This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. Bu, bu topolojideki bileşenler tarafından yayılan günlüğe kaydetme bilgilerini yakalayan bu günlükçü için izleme olarak ayarlanır.The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

<Root level="error"> bölümü, günlük kaydetme düzeyini (com.microsoft.examplebulunmayan her şey) yalnızca hata bilgilerini günlüğe kaydetmek üzere yapılandırır.The <Root level="error"> section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

Log4J 2 için günlüğü yapılandırma hakkında daha fazla bilgi için bkz. https://logging.apache.org/log4j/2.x/manual/configuration.html.For more information on configuring logging for Log4j 2, see https://logging.apache.org/log4j/2.x/manual/configuration.html.

Not

Fırtınası sürümü 0.10.0 ve üzeri Log4J 2. x kullanır.Storm version 0.10.0 and higher use Log4j 2.x. Daha eski fırtınası sürümleri, günlük yapılandırması için farklı bir biçim kullanan 1. x Log4J kullandı.Older versions of storm used Log4j 1.x, which used a different format for log configuration. Eski yapılandırma hakkında daha fazla bilgi için bkz. https://wiki.apache.org/logging-log4j/Log4jXmlFormat.For information on the older configuration, see https://wiki.apache.org/logging-log4j/Log4jXmlFormat.

Topolojiyi yerel olarak test etmeTest the topology locally

Dosyaları kaydettikten sonra, topolojiyi yerel olarak test etmek için aşağıdaki komutu kullanın.After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Çalıştırıldığında, topoloji başlangıç bilgilerini görüntüler.As it runs, the topology displays startup information. Aşağıdaki metin, sözcük sayısı çıkışının bir örneğidir:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

Bu örnek günlük, ' ve ' kelimesinin 113 katı olduğunu gösterir.This example log indicates that the word 'and' has been emitted 113 times. Spout sürekli aynı cümleleri yaydığı için, sayı topoloji çalıştığı sürece çalışmaya devam eder.The count continues to go up as long as the topology runs because the spout continuously emits the same sentences.

Sözcükler ve sayımlar arasında 5 saniyelik bir Aralık vardır.There's a 5-second interval between emission of words and counts. WORDCOUNT bileşeni yalnızca bir değer tanımlama grubu geldiğinde bilgileri göstermek üzere yapılandırılmıştır.The WordCount component is configured to only emit information when a tick tuple arrives. Değer tanımlama gruplarının yalnızca beş saniyede bir teslim edildiğini ister.It requests that tick tuples are only delivered every five seconds.

Topolojiyi Flox 'e DönüştürConvert the topology to Flux

Flox , 0.10.0 ve üzeri bir sürümü kullanarak, yapılandırmayı uygulamadan ayırmanızı sağlayan yeni bir çerçevedir.Flux is a new framework available with Storm 0.10.0 and higher, which allows you to separate configuration from implementation. Bileşenleriniz hala Java 'da tanımlanmıştır, ancak topoloji bir YAML dosyası kullanılarak tanımlanır.Your components are still defined in Java, but the topology is defined using a YAML file. Projeniz ile varsayılan bir topoloji tanımını paketleyebilir veya topolojiyi gönderirken tek başına bir dosya kullanabilirsiniz.You can package a default topology definition with your project, or use a standalone file when submitting the topology. Topolojiyi fırtınası 'ya gönderirken, YAML topolojisi tanımındaki değerleri doldurmak için ortam değişkenlerini veya yapılandırma dosyalarını kullanabilirsiniz.When submitting the topology to Storm, you can use environment variables or configuration files to populate values in the YAML topology definition.

YAML dosyası, topoloji ve aralarında veri akışı için kullanılacak bileşenleri tanımlar.The YAML file defines the components to use for the topology and the data flow between them. Jar dosyasının bir parçası olarak bir YAML dosyası ekleyebilirsiniz veya bir dış YAML dosyası kullanabilirsiniz.You can include a YAML file as part of the jar file or you can use an external YAML file.

Flox hakkında daha fazla bilgi için bkz. Flox Framework (https://storm.apache.org/releases/current/flux.html).For more information on Flux, see Flux framework (https://storm.apache.org/releases/current/flux.html).

Uyarı

Bir hata nedeniyle ( fırtınası 1.0.1 ile https://issues.apache.org/jira/browse/STORM-2055), Flox topolojilerini yerel olarak çalıştırmak Için bir fırtınası geliştirme ortamı yüklemeniz gerekebilir.Due to a bug (https://issues.apache.org/jira/browse/STORM-2055) with Storm 1.0.1, you may need to install a Storm development environment to run Flux topologies locally.

  1. Daha önce WordCountTopology.java topoloji tanımladı, ancak Flox ile gerekli değildir.Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. Aşağıdaki komutla dosyayı silin:Delete the file with the following command:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. topology.yamlyeni bir dosya oluşturmak ve açmak için aşağıdaki komutu girin:Enter the command below to create and open a new file topology.yaml:

    notepad resources\topology.yaml
    

    Ardından aşağıdaki metni kopyalayıp yeni dosyaya yapıştırın.Then copy and paste the text below into the new file. Sonra dosyayı kapatın.Then close the file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
           topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
           className: "com.microsoft.example.RandomSentenceSpout"
           parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
           className: "com.microsoft.example.SplitSentence"
           parallelism: 1
    
    - id: "counter-bolt"
           className: "com.microsoft.example.WordCount"
           constructorArgs:
             - 10
           parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
           from: "sentence-spout"       # The stream emitter
           to: "splitter-bolt"          # The stream consumer
           grouping:                    # Grouping type
             type: SHUFFLE
    
    - name: "Splitter -> Counter"
           from: "splitter-bolt"
           to: "counter-bolt"
           grouping:
             type: FIELDS
             args: ["word"]           # field(s) to group on
    
  3. Aşağıda açıklanan düzeltmeleri yapmak üzere pom.xml açmak için aşağıdaki komutu girin:Enter the command below to open pom.xml to make the described revisions below:

    notepad pom.xml
    
    1. <dependencies> bölümüne aşağıdaki yeni bağımlılığı ekleyin:Add the following new dependency in the <dependencies> section:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    2. <plugins> bölümüne aşağıdaki eklentiyi ekleyin.Add the following plugin to the <plugins> section. Bu eklenti, proje için bir paket (jar dosyası) oluşturmayı işler ve paketi oluştururken Flox 'e özgü bazı dönüştürmeleri uygular.This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    3. Exec Maven eklentisi bölümünde <configuration> > <mainClass> gidin ve ${storm.topology} org.apache.storm.flux.Fluxolarak değiştirin.For the Exec Maven Plugin section, navigate to <configuration> > <mainClass> and change ${storm.topology} to org.apache.storm.flux.Flux. Bu ayar, akıcı x 'in, geliştirme sırasında topolojiyi yerel olarak işlemesini sağlar.This setting allows Flux to handle running the topology locally in development.

    4. <resources> bölümünde, <includes>için aşağıdakini ekleyin.In the <resources> section, add the following to <includes>. Bu XML, projenin bir parçası olarak topolojiyi tanımlayan YAML dosyasını içerir.This XML includes the YAML file that defines the topology as part of the project.

      <include>topology.yaml</include>
      

Flox topolojisini yerel olarak test etmeTest the flux topology locally

  1. Maven kullanarak Flox topolojisini derlemek ve yürütmek için aşağıdaki komutu girin:Enter the following command to compile and execute the Flux topology using Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Uyarı

    Topolojiniz fırtınası 1.0.1 bitleri kullanıyorsa, bu komut başarısız olur.If your topology uses Storm 1.0.1 bits, this command fails. Bu hata neden https://issues.apache.org/jira/browse/STORM-2055.This failure is caused by https://issues.apache.org/jira/browse/STORM-2055. Bunun yerine, geliştirme ortamınıza fırtınası 'yı yükledikten sonra aşağıdaki adımları kullanın:Instead, install Storm in your development environment and use the following steps:

    Geliştirme ortamınıza fırtınası yüklediyseniz, bunun yerine aşağıdaki komutları kullanabilirsiniz:If you have installed Storm in your development environment, you can use the following commands instead:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    --local parametresi, topolojiyi geliştirme ortamınızda yerel modda çalıştırır.The --local parameter runs the topology in local mode on your development environment. -R /topology.yaml parametresi, topolojiyi tanımlamak için jar dosyasındaki topology.yaml dosya kaynağını kullanır.The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

    Çalıştırıldığında, topoloji başlangıç bilgilerini görüntüler.As it runs, the topology displays startup information. Aşağıdaki metin, çıktının bir örneğidir:The following text is an example of the output:

    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
    17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
    17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
    17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Günlüğe kaydedilen bilgilerin toplu işleri arasında 10 saniyelik bir gecikme vardır.There's a 10-second delay between batches of logged information.

  2. Projeden yeni bir topoloji YAML oluşturun.Create a new topology yaml from the project.

    1. topology.xmlaçmak için aşağıdaki komutu girin:Enter the command below to open topology.xml:
    notepad resources\topology.yaml
    
    1. Aşağıdaki bölümü bulun ve 10 değerini 5olarak değiştirin.Find the following section and change the value of 10 to 5. Bu değişiklik, sözcük sayısı, 10 saniyeden 5 ' e kadar olan yayma toplu işleri arasındaki aralığı değiştirir.This modification changes the interval between emitting batches of word counts from 10 seconds to 5.
    - id: "counter-bolt"
           className: "com.microsoft.example.WordCount"
           constructorArgs:
             - 5
           parallelism: 1  
    
    1. Dosyayı newtopology.yamlolarak kaydedin.Save file as newtopology.yaml.
  3. Topolojiyi çalıştırmak için aşağıdaki komutu girin:To run the topology, enter the following command:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    Ya da, geliştirme ortamınızda bir fırtınası varsa:Or, if you have Storm on your development environment:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    Bu komut, newtopology.yaml topoloji tanımı olarak kullanır.This command uses the newtopology.yaml as the topology definition. compile parametresini içermedik, Maven önceki adımlarda oluşturulan projenin sürümünü kullanıyor.Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

    Topoloji başladıktan sonra, yayılan toplu işler arasındaki sürenin newtopology.yamldeğeri yansıtacak şekilde değiştiğini fark etmelisiniz.Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. Bu nedenle, topolojiyi yeniden derlemek zorunda kalmadan bir YAML dosyası aracılığıyla yapılandırmanızı değiştirebilmenizi sağlayabilirsiniz.So you can see that you can change your configuration through a YAML file without having to recompile the topology.

Bu ve akışkan x çerçevesinin diğer özellikleri hakkında daha fazla bilgi için bkz. Flox (https://storm.apache.org/releases/current/flux.html).For more information on these and other features of the Flux framework, see Flux (https://storm.apache.org/releases/current/flux.html).

ÇatalTrident

Trident , fırtınası tarafından sunulan üst düzey bir soyutlamadır.Trident is a high-level abstraction that is provided by Storm. Durum bilgisi olan işlemeyi destekler.It supports stateful processing. Trident 'nin birincil avantajı, topolojiye giren her iletinin yalnızca bir kez işlendiğini garanti edebileceğinizin güvencesidir.The primary advantage of Trident is that it can guarantee that every message that enters the topology is processed only once. Trident kullanılmadan, topolojiniz yalnızca iletilerin en az bir kez işlenmesini garanti edebilir.Without using Trident, your topology can only guarantee that messages are processed at least once. Ayrıca, cıvatları oluşturmak yerine kullanılabilecek yerleşik bileşenler gibi başka farklılıklar da vardır.There are also other differences, such as built-in components that can be used instead of creating bolts. Aslında, cıvatalar, filtreler, tahminler ve işlevler gibi daha az genel bileşenlerle değiştirilmiştir.In fact, bolts are replaced by less-generic components, such as filters, projections, and functions.

Trident uygulamaları Maven projeleri kullanılarak oluşturulabilir.Trident applications can be created by using Maven projects. Bu makalede daha önce sunulan temel adımların aynısını kullanırsınız; yalnızca kod farklıdır.You use the same basic steps as presented earlier in this article—only the code is different. Trident, Flox çerçevesiyle de (Şu anda) kullanılamaz.Trident also can't (currently) be used with the Flux framework.

Trident hakkında daha fazla bilgi için bkz. Trident API 'Sine genel bakış.For more information about Trident, see the Trident API Overview.

Sonraki AdımlarNext Steps

Java kullanarak Apache Storm topolojisi oluşturmayı öğrendiniz.You've learned how to create an Apache Storm topology by using Java. Şimdi şunları yapmayı öğrenirsiniz:Now learn how to:

HDInsight üzerinde Apache Storm Için örnek topolojileriniziyaret ederek daha fazla örnek Apache Storm topolojiden ulaşabilirsiniz.You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.