Membuat topologi Apache Storm di Java

Pelajari bagaimana membuat topologi berbasis Java untuk Apache Storm. Anda membuat topologi Storm yang mengimplementasikan aplikasi hitungan kata. Anda menggunakan Apache Maven untuk membangun dan mengemas memaketkan proyek. Kemudian, Anda belajar bagaimana mendefinisikan topologi menggunakan kerangka kerja Fluks Apache Storm.

Setelah menyelesaikan langkah-langkah dalam dokumen ini, Anda dapat menyebarkan topologi ke Apache Storm di HDInsight.

Catatan

Versi lengkap contoh topologi Storm yang dibuat dalam dokumen ini tersedia di https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

Prasyarat

Lingkungan uji

Lingkungan yang digunakan dalam artikel ini adalah komputer yang menjalankan Windows 10. Perintah dijalankan dalam perintah, dan berbagai file diedit dengan Notepad.

Dari perintah, masukkan perintah berikut untuk membuat lingkungan kerja:

mkdir C:\HDI
cd C:\HDI

Membuat proyek Maven

Masukkan perintah berikut untuk membuat proyek Maven bernama Wordcount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

Perintah ini membuat direktori bernama WordCount pada lokasi saat ini, yang berisi proyek dasar Maven. Perintah kedua mengubah direktori kerja menjadi WordCount. Perintah ketiga membuat direktori baru, resources, yang akan digunakan nanti. Direktori WordCount memuat item berikut ini:

  • pom.xml: Berisi pengaturan untuk proyek Maven.
  • src\main\java\com\microsoft\example: Berisi kode aplikasi Anda.
  • src\test\java\com\microsoft\example: Berisi pengujian untuk aplikasi Anda.

Menghapus kode contoh yang dihasilkan

Menghapus file pengujian dan aplikasi yang dihasilkan AppTest.java, dan App.java dengan memasukkan perintah di bawah:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Menambahkan repositori Maven

HDInsight didasarkan pada Hortonworks Data Platform (HDP), jadi kami sarankan menggunakan repositori Hortonworks untuk mengunduh dependensi untuk proyek Apache Storm Anda.

Buka pom.xml dengan memasukkan perintah berikut:

notepad pom.xml

Lalu tambahkan XML berikut setelah baris <url>https://maven.apache.org</url>:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Menambahkan properti

Maven memungkinkan Anda menentukan nilai tingkat proyek yang disebut properti. Dalam pom.xml, tambahkan teks berikut setelah baris </repositories>:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

Anda sekarang dapat menggunakan nilai ini di bagian lain dari pom.xml. Misalnya, saat menentukan versi komponen Storm, Anda dapat menggunakan ${storm.version} alih-alih pengodean keras nilai.

Menambahkan dependensi

Tambahkan dependensi untuk komponen Storm. Di pom.xml, tambahkan teks berikut di bagian <dependencies>:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

Pada waktu kompilasi, Maven menggunakan informasi ini untuk mencari storm-core di repositori Maven. Pertama-tama terlihat di repositori di komputer lokal Anda. Jika file tidak ada di sana, Maven mengunduhnya dari repositori Maven publik dan menyimpannya di repositori lokal.

Catatan

Perhatikan garis <scope>provided</scope> di bagian ini. Pengaturan ini memberi tahu Maven untuk mengecualikan storm-core dari file JAR apa pun yang dibuat, karena disediakan oleh sistem.

Membangun konfigurasi

Plug-in Maven memungkinkan Anda menyesuaikan tahap build proyek. Misalnya, bagaimana proyek dikompilasi atau cara mengemasnya ke dalam file JAR. Dalam pom.xml, tambahkan teks berikut tepat di atas </project> garis.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Bagian ini digunakan untuk menambahkan plug-in, sumber daya, dan opsi konfigurasi build lainnya. Untuk referensi lengkap pom.xml file, lihat https://maven.apache.org/pom.html.

Menambahkan plug-in

  • Plugin Exec Maven

    Untuk topologi Apache Storm yang diterapkan di Java, Plugin Exec Maven berguna karena memungkinkan Anda untuk dengan mudah menjalankan topologi secara lokal di lingkungan pengembangan Anda. Tambahkan yang berikut ke bagian <plugins> file pom.xml untuk menyertakan plugin Exec Maven:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
                <goals>
                    <goal>exec</goal>
                </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
        </configuration>
    </plugin>
    
  • Plugin Kompilator Apache Maven

    Plug-in lain yang berguna adalah Apache Maven Compiler Plugin, yang digunakan untuk mengubah opsi kompilasi. Ubah versi Java yang digunakan Maven untuk sumber dan target untuk aplikasi Anda.

    • Untuk HDInsight 3.4 atau yang lebih lama, tetapkan versi Java sumber dan target ke 1.7.

    • Untuk HDInsight 3.5, tetapkan sumber dan target versi Java ke 1.8.

    Tambahkan teks berikut di bagian <plugins> file pom.xml untuk menyertakan plugin Kompilator Apache Maven. Contoh ini menentukan 1.8, sehingga versi target HDInsight adalah 3.5.

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
              <source>1.8</source>
              <target>1.8</target>
      </configuration>
    </plugin>
    

Mengonfigurasi sumber daya

Bagian sumber daya memungkinkan Anda untuk menyertakan sumber daya non-kode seperti file konfigurasi yang diperlukan oleh komponen dalam topologi. Untuk contoh ini, tambahkan teks berikut di bagian <resources>file pom.xml. Kemudian simpan dan tutup file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
            <include>log4j2.xml</include>
    </includes>
</resource>

Contoh ini menambahkan direktori sumber daya di akar proyek (${basedir}) sebagai lokasi yang berisi sumber daya, dan menyertakan file bernama log4j2.xml. File ini digunakan untuk mengonfigurasi informasi apa yang dicatat oleh topologi.

Membuat topologi

Topologi Apache Storm berbasis Java terdiri dari tiga komponen yang harus Anda tulis (atau referensi) sebagai dependensi.

  • Spout: Membaca data dari sumber eksternal dan memancarkan aliran data ke dalam topologi.

  • Bolt: Apakah pemrosesan pada aliran yang dipancarkan oleh cerat atau baut lainnya, dan memancarkan satu atau beberapa aliran.

  • Topologi: Mendefinisikan bagaimana cerat dan baut diatur, dan menyediakan titik masuk untuk topologi.

Membuat spout

Untuk mengurangi persyaratan untuk menyiapkan sumber data eksternal, spout berikut ini hanya memancarkan kalimat acak. Ini adalah versi yang dimodifikasi dari spout yang disediakan dengan contoh Storm-Starter. Meskipun topologi ini menggunakan satu spout, yang lain mungkin memiliki beberapa data yang mengumpan data dari sumber yang berbeda ke dalam topologi.

Masukkan perintah di bawah untuk membuat dan membuka file baru RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Kemudian, salin dan tempel kode java di bawah ke file baru. Lalu tutup file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Catatan

Untuk contoh spout yang berbunyi dari sumber data eksternal, lihat salah satu contoh berikut ini:

Membuat bolt

Bolt menangani pemrosesan data. Bolt dapat melakukan apa saja, misalnya, komputasi, kegigihan, atau berbicara dengan komponen eksternal. Topologi ini menggunakan dua bolt:

  • SplitSentence: Membagi kalimat yang dipancarkan oleh RandomSentenceSpout menjadi kata-kata individual.

  • WordCount: Menghitung berapa kali setiap kata telah terjadi.

SplitSentence

Masukkan perintah di bawah untuk membuat dan membuka file baru SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Kemudian, salin dan tempel kode java di bawah ke file baru. Lalu tutup file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCount

Masukkan perintah di bawah untuk membuat dan membuka file baru WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

Kemudian, salin dan tempel kode java di bawah ke file baru. Lalu tutup file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Menentukan topologi

Topologi mengikat spout dan bolt bersama-sama menjadi grafik. Grafik menentukan bagaimana data mengalir di antara komponen. Grafik ini juga menyediakan petunjuk paralelisme yang digunakan Storm saat membuat contoh komponen dalam klaster.

Gambar berikut adalah diagram dasar grafik komponen untuk topologi ini.

diagram memperlihatkan pengaturan spout dan bolt

Untuk mengimplementasikan topologi, masukkan perintah di bawah ini untuk membuat dan membuka file baru WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Kemudian, salin dan tempel kode java di bawah ke file baru. Lalu tutup file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

Mengonfigurasi pengelogan

Storm menggunakan Apache Log4j 2 untuk mencatat informasi. Jika Anda tidak mengonfigurasi pengelogan, topologi akan memancarkan informasi diagnostik. Untuk mengontrol apa yang dicatat, buat file bernama log4j2.xml di direktori resources dengan memasukkan perintah di bawah ini:

notepad resources\log4j2.xml

Kemudian, salin dan tempel teks XML di bawah ke file baru. Lalu tutup file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Console name="STDOUT" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Logger name="com.microsoft.example" level="trace" additivity="false">
            <AppenderRef ref="STDOUT"/>
        </Logger>
        <Root level="error">
            <Appender-Ref ref="STDOUT"/>
        </Root>
    </Loggers>
</Configuration>

XML ini mengonfigurasi pencatat baru untuk kelas com.microsoft.example, yang mencakup komponen dalam contoh topologi ini. Tingkat ini diatur untuk melacak pencatat ini, yang menangkap informasi pencatatan yang dipancarkan oleh komponen dalam topologi ini.

Bagian <Root level="error"> ini mengonfigurasi tingkat akar pengelogan (semuanya tidak ada di com.microsoft.example) untuk hanya mencatat informasi kesalahan.

Untuk informasi selengkapnya tentang mengonfigurasi pengelogan untuk Log4j 2, lihat https://logging.apache.org/log4j/2.x/manual/configuration.html.

Catatan

Storm versi 0.10.0 dan lebih tinggi menggunakan Log4j 2.x. Versi storm yang lebih lama menggunakan Log4j 1.x, yang menggunakan format berbeda untuk konfigurasi log. Untuk informasi tentang konfigurasi yang lebih lama, lihat https://cwiki.apache.org/confluence/display/LOGGINGLOG4J/Log4jXmlFormat.

Menguji topologi secara lokal

Setelah Anda menyimpan file, gunakan perintah berikut untuk menguji topologi secara lokal.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Saat berjalan, topologi menampilkan informasi startup. Teks berikut adalah contoh output hitungan kata:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

Contoh log ini menunjukkan bahwa kata 'dan' telah dipancarkan sebanyak 113 kali. Jumlahnya terus meningkat selama topologi berjalan. Peningkatan ini karena spout terus memancarkan kalimat yang sama.

Ada interval 5 detik antara pemancaran kata dan hitungan. Komponen WordCount dikonfigurasi untuk hanya memancarkan informasi saat tuple centang tiba. Komponen ini meminta agar tuple centang hanya dikirimkan setiap lima detik.

Mengonversikan topologi menjadi Fluks

Fluks adalah kerangka kerja baru yang tersedia dengan Storm 0.10.0 dan yang lebih tinggi. Fluks memungkinkan Anda memisahkan konfigurasi dari implementasi. Komponen Anda masih didefinisikan dalam Java, tetapi topologi didefinisikan menggunakan file YAML. Anda dapat mengemas definisi topologi default dengan proyek Anda, atau menggunakan file mandiri saat mengirimkan topologi. Saat mengirimkan topologi ke Storm, gunakan variabel lingkungan atau file konfigurasi untuk mengisi nilai definisi topologi YAML.

File YAML mendefinisikan komponen yang digunakan untuk topologi dan aliran data di antara mereka. Anda dapat menyertakan file YAML sebagai bagian dari file jar. Atau Anda dapat menggunakan file YAML eksternal.

Untuk informasi selengkapnya tentang Fluks, lihat kerangka kerja Fluks (https://storm.apache.org/releases/current/flux.html).

Peringatan

Karena bug (https://issues.apache.org/jira/browse/STORM-2055) dengan Storm 1.0.1, Anda mungkin perlu memasang lingkungan pengembangan Storm untuk menjalankan topologi Fluks secara lokal.

  1. Sebelumnya, WordCountTopology.java mendefinisikan topologi, tetapi tidak diperlukan dengan Fluks. Hapus file dengan perintah berikut:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. Masukkan perintah di bawah untuk membuat dan membuka file baru topology.yaml:

    notepad resources\topology.yaml
    

    Kemudian, salin dan tempel teks di bawah ke file baru. Lalu tutup file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
           topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
           className: "com.microsoft.example.RandomSentenceSpout"
           parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
           className: "com.microsoft.example.SplitSentence"
           parallelism: 1
    
    - id: "counter-bolt"
           className: "com.microsoft.example.WordCount"
           constructorArgs:
             - 10
           parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
           from: "sentence-spout"       # The stream emitter
           to: "splitter-bolt"          # The stream consumer
           grouping:                    # Grouping type
             type: SHUFFLE
    
    - name: "Splitter -> Counter"
           from: "splitter-bolt"
           to: "counter-bolt"
           grouping:
             type: FIELDS
             args: ["word"]           # field(s) to group on
    
  3. Masukkan perintah di bawah ini untuk membuka pom.xml agar membuat revisi yang dijelaskan di bawah ini:

    notepad pom.xml
    
    1. Tambahkan dependensi baru berikut ini di bagian <dependencies>:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    2. Tambahkan plugin berikut ke bagian <plugins>. Plugin ini menangani pembuatan paket (file jar) untuk proyek, dan menerapkan beberapa transformasi khusus untuk Fluks saat membuat paket.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    3. Untuk bagian Plugin Exec Maven, navigasikan ke <configuration> > <mainClass> dan ubah ${storm.topology} ke org.apache.storm.flux.Flux. Pengaturan ini memungkinkan Fluks untuk menangani menjalankan topologi secara lokal dalam pengembangan.

    4. Di bagian <resources>, tambahkan yang berikut ke <includes>. XML ini mencakup file YAML yang mendefinisikan topologi sebagai bagian dari proyek.

      <include>topology.yaml</include>
      

Menguji topologi fluks secara lokal

  1. Masukkan perintah berikut untuk mengompilasi dan menjalankan topologi Fluks menggunakan Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Peringatan

    Jika topologi Anda menggunakan Storm 1.0.1 bit, perintah ini gagal. Kegagalan ini disebabkan oleh https://issues.apache.org/jira/browse/STORM-2055. Sebagai gantinya, pasang Storm di lingkungan pengembangan Anda dan gunakan langkah-langkah berikut:

    Jika Anda telah memasang Storm di lingkungan pengembangan, Anda dapat menggunakan perintah berikut sebagai gantinya:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    Parameter --local tersebut menjalankan topologi dalam mode lokal pada lingkungan pengembangan Anda. Parameter -R /topology.yaml tersebut menggunakan sumber daya file dari file jar untuk menentukan topology.yaml topologi.

    Saat berjalan, topologi menampilkan informasi startup. Teks berikut adalah contoh output hitungan kata:

    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
    17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
    17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
    17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
    17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Ada penundaan 10 detik antara batch informasi yang dicatat.

  2. Membuat yaml topologi baru dari proyek.

    1. Masukkan perintah di bawah ini untuk membuka topology.xml:
    notepad resources\topology.yaml
    
    1. Temukan bagian berikut dan ubah nilai 10 menjadi 5. Modifikasi ini mengubah interval antara memancarkan batch kata dihitung dari 10 detik menjadi 5.
    - id: "counter-bolt"
           className: "com.microsoft.example.WordCount"
           constructorArgs:
             - 5
           parallelism: 1  
    
    1. Simpan file sebagai newtopology.yaml.
  3. Untuk menjalankan topologi, masukkan perintah berikut:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    Atau, jika Anda memiliki Storm di lingkungan pengembangan Anda:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    Perintah ini menggunakan newtopology.yaml sebagai definisi topologi. Karena kami tidak menyertakan parametercompile, Maven menggunakan versi proyek yang dibangun di langkah-langkah sebelumnya.

    Setelah topologi dimulai, Anda harus memperhatikan bahwa waktu antara batch yang dipancarkan telah berubah untuk mencerminkan nilai dalam newtopology.yaml. Jadi Anda dapat melihat bahwa Anda dapat mengubah konfigurasi Anda melalui file YAML tanpa harus mengolah ulang topologi.

Untuk informasi selengkapnya tentang ini dan fitur lain dari kerangka kerja Flux, lihat Flux (https://storm.apache.org/releases/current/flux.html).

Trident

Trident adalah abstraksi tingkat tinggi yang disediakan oleh Storm. Trident mendukung pemrosesan berstatus. Keuntungan utama Trident adalah Trident menjamin bahwa setiap pesan yang masuk ke topologi diproses hanya sekali. Tanpa menggunakan Trident, topologi Anda hanya dapat menjamin bahwa pesan diproses setidaknya sekali. Ada juga perbedaan lain, seperti komponen bawaan yang dapat digunakan alih-alih membuat baut. Bolt digantikan oleh komponen yang kurang generik, seperti filter, proyeksi, dan fungsi.

Aplikasi Trident dapat dibuat dengan menggunakan proyek Maven. Anda menggunakan langkah-langkah dasar yang sama seperti yang disajikan sebelumnya di artikel ini—hanya kode yang berbeda. Trident juga tidak dapat (saat ini) digunakan dengan kerangka kerja Fluks.

Untuk informasi selengkapnya tentang Trident, lihat Ikhtisar Trident API.

Langkah berikutnya

Anda telah belajar cara membuat topologi Apache Storm dengan menggunakan Java. Sekarang, pelajari cara:

Anda dapat menemukan lebih banyak contoh topologi Apache Storm dengan mengunjungi Contoh topologi untuk Apache Storm di HDInsight.