Java'da Apache Storm topolojisi oluşturma

Daha fazla bilgi için Java tabanlı topoloji Apache Storm. Sözcük sayısı uygulaması uygulayan bir Storm topolojisi oluşturabilirsiniz. Apache Maven kullanarak projeyi derler ve paketlersiniz. Ardından, Apache Storm Flux çerçevesini kullanarak topolojiyi tanımlamayı öğrenirsiniz.

Bu belgede yer alan adımları tamamladıktan sonra topolojiyi HDInsight'Apache Storm dağıtabilirsiniz.

Not

Bu belgede oluşturulan Storm topolojisi örneklerinin tamamlanmış bir sürümü şu https://github.com/Azure-Samples/hdinsight-java-storm-wordcount şekildedir: .

Önkoşullar

Test ortamı

Bu makale için kullanılan ortam, Windows 10. Komutlar bir komut isteminde yürütülür ve çeşitli dosyalar komut istemiyle Not Defteri.

Çalışma ortamı oluşturmak için bir komut isteminde aşağıdaki komutları girin:

mkdir C:\HDI
cd C:\HDI

Maven projesi oluşturma

WordCount adlı bir Maven projesi oluşturmak için aşağıdaki komutu girin:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

Bu komut, temel bir WordCount Maven projesi içeren geçerli konumda adlı bir dizin oluşturur. İkinci komut, mevcut çalışma dizinini olarak WordCount değiştirir. Üçüncü komut, daha sonra kullanılacak resources olan yeni bir dizin oluşturur. Dizin WordCount aşağıdaki öğeleri içerir:

  • pom.xml: Maven projesinin ayarlarını içerir.
  • src\main\java\com\microsoft\example: Uygulama kodunuzu içerir.
  • src\test\java\com\microsoft\example: Uygulamanıza testler içerir.

Oluşturulan örnek kodu kaldırma

Oluşturulan test ve uygulama dosyalarını ve AppTest.java aşağıdaki App.java komutları girerek silin:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Maven depoları ekleme

HDInsight, Hortonworks Veri Platformu'na (HDP) dayalıdır, bu nedenle projenizin bağımlılıklarını indirmek için Hortonworks Apache Storm öneririz.

Aşağıdaki pom.xml komutu girerek açın:

notepad pom.xml

Ardından satırın sonra aşağıdaki XML'sini <url>https://maven.apache.org</url> ekleyin:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Özellik ekleme

Maven, özellikler adlı proje düzeyi değerleri tanımlamaya olanak sağlar. içinde, pom.xml satırın ardından aşağıdaki metni </repositories> ekleyin:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

Artık bu değeri diğer bölümlerinde pom.xml kullanabilirsiniz. Örneğin Storm bileşenlerinin sürümünü belirtirken bir değeri sabit ${storm.version} kodlamak yerine kullanabilirsiniz.

Bağımlılık ekleme

Storm bileşenleri için bir bağımlılık ekleyin. içinde pom.xml bölümüne aşağıdaki metni <dependencies> ekleyin:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

Derleme zamanında Maven, Maven deposunda arama storm-core yapmak için bu bilgileri kullanır. İlk olarak yerel bilgisayarınızda depoya bakarak. Dosyalar orada yoksa Maven bunları genel Maven deposundan indirir ve yerel depoda depolar.

Not

Bu <scope>provided</scope> bölümdeki satıra dikkat edin. Bu ayar, sistem tarafından sağlanmış olduğundan Maven'a storm-core'ı oluşturulan jar dosyalarından dışlamalarını söyler.

Yapı yapılandırması

Maven eklentileri, projenin derleme aşamalarını özelleştirmenize olanak sağlar. Örneğin, projenin nasıl derlenmiş olduğu veya bir JAR dosyasında paketlenmiş olduğu. içinde, pom.xml aşağıdaki metni doğrudan satırın üzerine </project> ekleyin.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Bu bölüm eklentileri, kaynakları ve diğer derleme yapılandırma seçeneklerini eklemek için kullanılır. Dosyanın tam başvurusu pom.xml için bkz. https://maven.apache.org/pom.html .

Eklentiler ekleme

  • Exec Maven Eklentisi

    Java Apache Storm uygulanan topolojiler için Exec Maven Eklentisi faydalıdır çünkü geliştirme ortamınıza topolojiyi kolayca yerel olarak çalıştırmanıza olanak sağlar. <plugins>Exec Maven eklentisini pom.xml eklemek için aşağıdakileri dosyanın bölümüne ekleyin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
                <goals>
                    <goal>exec</goal>
                </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
        </configuration>
    </plugin>
    
  • Apache Maven Derleyici Eklentisi

    Bir diğer kullanışlı eklenti de Apache Maven Compiler Plugin derleme seçeneklerini değiştirmek için kullanılan eklentisidir. Maven'ın uygulamanıza yönelik kaynak ve hedef için kullandığı Java sürümünü değiştirme.

    • HDInsight 3.4 veya önceki sürümler için kaynak ve hedef Java sürümünü 1.7 olarak ayarlayın.

    • HDInsight 3.5 için kaynak ve hedef Java sürümünü 1.8 olarak ayarlayın.

    <plugins>Apache Maven Derleyicisi eklentisini pom.xml eklemek için dosyanın bölümüne aşağıdaki metni ekleyin. Bu örnek 1.8'i belirtir, dolayısıyla hedef HDInsight sürümü 3.5'tir.

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
              <source>1.8</source>
              <target>1.8</target>
      </configuration>
    </plugin>
    

Kaynakları yapılandırma

Kaynaklar bölümü, topolojiye bileşenler tarafından gereken yapılandırma dosyaları gibi kod dışı kaynakları dahil etmek için izin verir. Bu örnek için dosyanın bölümüne <resources> aşağıdaki metni pom.xml ekleyin. Sonra dosyayı kaydedin ve kapatın.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
            <include>log4j2.xml</include>
    </includes>
</resource>

Bu örnek, proje kökünde resources dizinini ( ) kaynakları içeren bir konum olarak ekler ${basedir} ve adlı dosyayı log4j2.xml içerir. Bu dosya, topoloji tarafından hangi bilgilerin günlüğe kaydedileceğini yapılandırmak için kullanılır.

Topolojiyi oluşturma

Java tabanlı bir Apache Storm topolojisi, bağımlılık olarak yazmanız (veya göndermeniz) gereken üç bileşenden oluşur.

  • Spouts: Dış kaynaklardan verileri okur ve veri akışlarını topolojiye yalıtır.

  • Boltlar: Spout'lar veya diğer boltlar tarafından yayılan akışlarda işleme yapar ve bir veya daha fazla akış yayır.

  • Topoloji: Spout'ların ve boltların nasıl düzen olduğunu tanımlar ve topoloji için giriş noktasını sağlar.

Spout oluşturma

Dış veri kaynaklarını ayarlama gereksinimlerini azaltmak için aşağıdaki spout yalnızca rastgele cümleler yayır. Storm-Starter örnekleriyle birlikte sağlanan spout'un değiştirilmiş bir sürümüdür. Bu topoloji bir spout kullanıyor olsa da, diğerlerinde farklı kaynaklardan topolojiye veri akışı yapılan çeşitli veriler olabilir.

Yeni bir dosya oluşturmak ve açmak için aşağıdaki komutu RandomSentenceSpout.java girin:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Ardından aşağıdaki java kodunu kopyalayıp yeni dosyaya yapıştırın. Ardından dosyayı kapatın.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Not

Dış veri kaynağından okunan spout örneği için aşağıdaki örneklerden birini okuyun:

Boltları oluşturma

Boltlar veri işlemeyi ele almaktadır. Boltlar hesaplama, kalıcılık veya dış bileşenlerle konuşma gibi her şeyi yapar. Bu topoloji iki bolt kullanır:

  • SplitSentence: RandomSentenceSpout tarafından yayılan cümleleri tek tek sözcüklere böler.

  • WordCount: Her sözcüğün kaç kez olduğunu sayar.

SplitSentence

Yeni bir dosya oluşturmak ve açmak için aşağıdaki komutu SplitSentence.java girin:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Ardından aşağıdaki java kodunu kopyalayıp yeni dosyaya yapıştırın. Ardından dosyayı kapatın.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCount

Yeni bir dosya oluşturmak ve açmak için aşağıdaki komutu WordCount.java girin:

notepad src\main\java\com\microsoft\example\WordCount.java

Ardından aşağıdaki java kodunu kopyalayıp yeni dosyaya yapıştırın. Ardından dosyayı kapatın.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Topolojiyi tanımlama

Topoloji, spout'ları ve boltları bir grafiye bağlar. Grafik, bileşenler arasında veri akışını tanımlar. Ayrıca Storm'ın küme içindeki bileşenlerin örneklerini oluştururken kullandığı paralellik ipuçları sağlar.

Aşağıdaki görüntü, bu topoloji için bileşenlerin grafı temel bir diyagramıdır.

spout'lar ve boltlar düzenlemeyi gösteren diyagram

Topolojiyi uygulamak için aşağıdaki komutu girerek yeni bir dosya oluşturun ve WordCountTopology.java açın:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Ardından aşağıdaki java kodunu kopyalayıp yeni dosyaya yapıştırın. Ardından dosyayı kapatın.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

Günlüğü yapılandırma

Storm, bilgileri günlüğe almak için Apache Log4j 2 kullanır. Günlüğü yapılandırmazsanız topoloji tanılama bilgilerini yalıtır. Günlüğe kaydedilenleri kontrol etmek için aşağıdaki komutu log4j2.xml girerek resources dizininde adlı bir dosya oluşturun:

notepad resources\log4j2.xml

Ardından aşağıdaki XML metnini kopyalayıp yeni dosyaya yapıştırın. Ardından dosyayı kapatın.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Console name="STDOUT" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Logger name="com.microsoft.example" level="trace" additivity="false">
            <AppenderRef ref="STDOUT"/>
        </Logger>
        <Root level="error">
            <Appender-Ref ref="STDOUT"/>
        </Root>
    </Loggers>
</Configuration>

Bu XML, bu örnek topolojide bileşenleri içeren com.microsoft.example sınıfı için yeni bir günlükçer yapılandırıyor. Düzey, bu topolojide bileşenler tarafından yayılan tüm günlük bilgilerini yakalayan bu günlükç için izleme olarak ayarlanır.

bölümü, yalnızca hata bilgilerini günlüğe kaydetmek için günlük kaydının kök düzeyini (içinde yer alan her <Root level="error"> com.microsoft.example şey) yapılandırıyor.

Log4j 2 için günlüğü yapılandırma hakkında daha fazla bilgi için bkz. https://logging.apache.org/log4j/2.x/manual/configuration.html .

Not

Storm sürüm 0.10.0 ve üzerinde Log4j 2.x kullanın. Storm'un eski sürümlerinde günlük yapılandırması için farklı bir biçim kullanılan Log4j 1.x kullanılmıştır. Eski yapılandırma hakkında bilgi için bkz. https://cwiki.apache.org/confluence/display/LOGGINGLOG4J/Log4jXmlFormat .

Topolojiyi yerel olarak test etmek

Dosyaları kaydeddikten sonra, topolojiyi yerel olarak test etmek için aşağıdaki komutu kullanın.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Topoloji, çalışırken başlangıç bilgilerini görüntüler. Aşağıdaki metin, sözcük sayısı çıkışının bir örneğidir:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

Bu örnek günlük, 've' sözcüğü 113 kez yayıldı. Topoloji çalıştırılacağı sürece sayı artmaya devam eder. Bu artış, spout'ın sürekli olarak aynı cümleleri yayma nedenidir.

Sözcüklerin ve sayıların emisyonu arasında 5 saniyelik bir aralık vardır. WordCount bileşeni, yalnızca bir değer işareti gruplama geldiğinde bilgi ya da bilgi yayma için yapılandırılır. Tıklama işaretlerini yalnızca beş saniyede bir teslim etmelerini talep ediyor.

Topolojiyi Flux'a dönüştürme

Flux, Storm 0.10.0 ve üzerinde kullanılabilen yeni bir çerçevedir. Flux, yapılandırmayı uygulamasından ayırmaya olanak sağlar. Bileşenleriniz Java'da hala tanımlanmıştır, ancak topoloji bir YAML dosyası kullanılarak tanımlanır. Varsayılan topoloji tanımını projeniz ile paketlebilirsiniz veya topolojiyi göndererek tek başına bir dosya kullanabilirsiniz. Topolojiyi Storm'a göndererek YAML topolojisi tanım değerlerini doldurmak için ortam değişkenlerini veya yapılandırma dosyalarını kullanın.

YAML dosyası, topoloji için kullanmak üzere bileşenleri ve aralarındaki veri akışını tanımlar. Jar dosyasının bir parçası olarak bir YAML dosyası dahilebilirsiniz. Veya bir dış YAML dosyası kullanabilirsiniz.

Flux hakkında daha fazla bilgi için bkz. Flux çerçevesi ( https://storm.apache.org/releases/current/flux.html) .

Uyarı

Bir hata nedeniyle https://issues.apache.org/jira/browse/STORM-2055) ( Storm 1.0.1 ile, Flux topolojilerini yerel olarak çalıştırmak için bir Storm geliştirme ortamı yüklemeniz gerekir.

  1. Daha önce WordCountTopology.java topolojiyi tanımladı ama Flux ile gerekli değildi. Aşağıdaki komutla dosyayı silin:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. Yeni bir dosya oluşturmak ve açmak için aşağıdaki komutu topology.yaml girin:

    notepad resources\topology.yaml
    

    Ardından aşağıdaki metni kopyalayıp yeni dosyaya yapıştırın. Ardından dosyayı kapatın.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
           topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
           className: "com.microsoft.example.RandomSentenceSpout"
           parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
           className: "com.microsoft.example.SplitSentence"
           parallelism: 1
    
    - id: "counter-bolt"
           className: "com.microsoft.example.WordCount"
           constructorArgs:
             - 10
           parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
           from: "sentence-spout"       # The stream emitter
           to: "splitter-bolt"          # The stream consumer
           grouping:                    # Grouping type
             type: SHUFFLE
    
    - name: "Splitter -> Counter"
           from: "splitter-bolt"
           to: "counter-bolt"
           grouping:
             type: FIELDS
             args: ["word"]           # field(s) to group on
    
  3. Aşağıda açıklanan düzeltmeleri yapmak için pom.xml açmak için aşağıdaki komutu girin:

    notepad pom.xml
    
    1. bölümüne aşağıdaki yeni bağımlılığı <dependencies> ekleyin:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    2. Aşağıdaki eklentiyi bölümüne <plugins> ekleyin. Bu eklenti, proje için paket (jar dosyası) oluşturmayı ele almaktadır ve paketi oluştururken Flux'a özgü bazı dönüştürmeleri uygular.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    3. Exec Maven Eklentisi bölümünde bölümüne gidin <configuration> > <mainClass> ve olarak ${storm.topology} org.apache.storm.flux.Flux değişir. Bu ayar, Flux'ın topolojiyi geliştirmede yerel olarak çalıştırmayı işlemesi için izin verir.

    4. bölümüne <resources> aşağıdakini <includes> ekleyin. Bu XML, projenin bir parçası olarak topolojiyi tanımlayan YAML dosyasını içerir.

      <include>topology.yaml</include>
      

Akış topolojisi yerel olarak test

  1. Maven kullanarak Flux topolojisi derlemek ve yürütmek için aşağıdaki komutu girin:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Uyarı

    Topolojiniz Storm 1.0.1 bitlerini kullanıyorsa bu komut başarısız olur. Bu hatanın https://issues.apache.org/jira/browse/STORM-2055 nedeni. Bunun yerine, geliştirme ortamınıza Storm yükleyin ve aşağıdaki adımları kullanın:

    Geliştirme ortamınıza Storm yüklemişse,bunun yerine aşağıdaki komutları kullanabilirsiniz:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    parametresi, --local topolojiyi geliştirme ortamınız üzerinde yerel modda çalıştırır. parametresi, -R /topology.yaml topology.yaml topolojiyi tanımlamak için jar dosyasındaki dosya kaynağını kullanır.

    Topoloji, çalışırken başlangıç bilgilerini görüntüler. Aşağıdaki metin çıkışın bir örneğidir:

    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
    17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
    17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
    17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Günlüğe kaydedilen bilgi toplu işleri arasında 10 saniyelik bir gecikme olur.

  2. Projeden yeni bir topoloji yaml oluşturun.

    1. açmak için aşağıdaki komutu topology.xml girin:
    notepad resources\topology.yaml
    
    1. Aşağıdaki bölümü bulun ve değerini olarak 10 5 değiştirebilirsiniz. Bu değişiklik, 10 saniyeden 5'e kadar sözcük sayılarından toplu işler yayma aralığını değiştirir.
    - id: "counter-bolt"
           className: "com.microsoft.example.WordCount"
           constructorArgs:
             - 5
           parallelism: 1  
    
    1. Dosyayı olarak newtopology.yaml kaydedin.
  3. Topolojiyi çalıştırmak için aşağıdaki komutu girin:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    Veya geliştirme ortamınız üzerinde Storm varsa:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    Bu komut newtopology.yaml topoloji tanımı olarak kullanır. parametresini dahil etme compile adımız, Maven önceki adımlarda yerleşik olarak projenin sürümünü kullanır.

    Topoloji başladıktan sonra, yayılan toplu işler arasındaki sürenin içinde değerini yansıtacak şekilde değiştiğini fark etmek newtopology.yaml gerekir. Bu nedenle, topolojiyi yeniden derlemek zorunda kalmadan yaml dosyası aracılığıyla yapılandırmanızı değiştirebilirsiniz.

Bu özellikler ve Flux çerçevesinin diğer özellikleri hakkında daha fazla bilgi için bkz. Flux ( https://storm.apache.org/releases/current/flux.html) .

Trident

Trident, Storm tarafından sağlanan üst düzey bir soyutlamadır. Durum işlemeyi destekler. Trident'in birincil avantajı, topolojiye giren her iletinin yalnızca bir kez işlendiğinden emin olmaktır. Trident'i kullanmadan topolojiniz iletilerin en az bir kez işlendiğinden emin olabilir. Ayrıca, bolt oluşturmak yerine kullanılan yerleşik bileşenler gibi başka farklar da vardır. Boltlar filtreler, projeksiyonlar ve işlevler gibi daha az genel bileşenlerle değiştirilir.

Trident uygulamaları Maven projeleri kullanılarak oluşturulabilir. Bu makalede daha önce sunulan temel adımların aynısını kullanırsiniz; yalnızca kod farklıdır. Trident ayrıca (şu anda) Flux çerçevesiyle birlikte kullanılamaz.

Trident hakkında daha fazla bilgi için bkz. Trident API'sini Genel Bakış.

Sonraki Adımlar

Java kullanarak veri topolojisi Apache Storm öğrendinsiniz. Şimdi şunları yapmayı öğrenin:

HDInsight'taApache Storm için örnek topolojiler'i ziyaret ederek daha fazla Apache Storm bulabilirsiniz.