Crear una topología de Apache Storm en JavaCreate an Apache Storm topology in Java

Aprenda a crear una topología de Apache Storm basada en Java.Learn how to create a Java-based topology for Apache Storm. Aquí, creará una topología de Storm que implemente una aplicación de recuento de palabras.Here, you create a Storm topology that implements a word-count application. Para compilar y empaquetar el proyecto, puede usar Apache Maven.You use Apache Maven to build and package the project. Después, aprende a definir la topología con el marco de trabajo Apache Storm Flux.Then, you learn how to define the topology using the Apache Storm Flux framework.

Después de completar los pasos descritos en este documento, puede implementar la topología en Apache Storm en HDInsight.After completing the steps in this document, you can deploy the topology to Apache Storm on HDInsight.

Nota

Hay una versión completa de los ejemplos de topología de Storm creada en este documento disponible en https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.A completed version of the Storm topology examples created in this document is available at https://github.com/Azure-Samples/hdinsight-java-storm-wordcount.

Requisitos previosPrerequisites

Entorno de pruebaTest environment

El entorno usado en este artículo fue un equipo donde se ejecuta Windows 10.The environment used for this article was a computer running Windows 10. Los comandos se ejecutaron en un símbolo del sistema, y los distintos archivos se editaron con el Bloc de notas.The commands were executed in a command prompt, and the various files were edited with Notepad.

Desde un símbolo del sistema, escriba los siguientes comandos para crear un entorno de trabajo:From a command prompt, enter the commands below to create a working environment:

mkdir C:\HDI
cd C:\HDI

Creación de un proyecto de MavenCreate a Maven project

Especifique el siguiente comando para crear un proyecto de Maven llamado WordCount:Enter the following command to create a Maven project named WordCount:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart -DgroupId=com.microsoft.example -DartifactId=WordCount -DinteractiveMode=false

cd WordCount
mkdir resources

Este comando crea un directorio denominado WordCount en la ubicación actual, que contiene un proyecto de Maven básico.This command creates a directory named WordCount at the current location, which contains a basic Maven project. El segundo comando cambia el directorio de trabajo actual a WordCount.The second command changes the present working directory to WordCount. El tercer comando crea un directorio, resources, que se usará más adelante.The third command creates a new directory, resources, which will be used later. El directorio WordCount contiene los siguientes elementos:The WordCount directory contains the following items:

  • pom.xml: Contiene la configuración del proyecto de Maven.pom.xml: Contains settings for the Maven project.
  • src\main\java\com\microsoft\example: Contiene el código de la aplicación.src\main\java\com\microsoft\example: Contains your application code.
  • src\test\java\com\microsoft\example: Contiene pruebas para la aplicación.src\test\java\com\microsoft\example: Contains tests for your application.

Eliminar el código de ejemplo generadoRemove the generated example code

Especifique los siguientes comandos para eliminar los archivos de aplicación y de prueba generados (AppTest.java y App.java):Delete the generated test and application files AppTest.java, and App.java by entering the commands below:

DEL src\main\java\com\microsoft\example\App.java
DEL src\test\java\com\microsoft\example\AppTest.java

Agregar repositorios de MavenAdd Maven repositories

HDInsight se basa en Hortonworks Data Platform (HDP), por lo que se recomienda usar el repositorio de Hortonworks para descargar las dependencias correspondientes a los proyectos de Apache Storm.HDInsight is based on the Hortonworks Data Platform (HDP), so we recommend using the Hortonworks repository to download dependencies for your Apache Storm projects.

Especifique el siguiente comando para abrir pom.xml:Open pom.xml by entering the command below:

notepad pom.xml

A continuación, agregue el siguiente código XML después de la línea <url> https://maven.apache.org</url>:Then add the following XML after the <url>https://maven.apache.org</url> line:

<repositories>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPReleases</id>
        <name>HDP Releases</name>
        <url>https://repo.hortonworks.com/content/repositories/releases/</url>
        <layout>default</layout>
    </repository>
    <repository>
        <releases>
            <enabled>true</enabled>
            <updatePolicy>always</updatePolicy>
            <checksumPolicy>warn</checksumPolicy>
        </releases>
        <snapshots>
            <enabled>false</enabled>
            <updatePolicy>never</updatePolicy>
            <checksumPolicy>fail</checksumPolicy>
        </snapshots>
        <id>HDPJetty</id>
        <name>Hadoop Jetty</name>
        <url>https://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
        <layout>default</layout>
    </repository>
</repositories>

Agregar propiedadesAdd properties

Maven permite definir los valores de nivel de proyecto llamados propiedades.Maven allows you to define project-level values called properties. En pom.xml, agregue el siguiente texto después de la línea </repositories>:In pom.xml, add the following text after the </repositories> line:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!--
    This is a version of Storm from the Hortonworks repository that is compatible with HDInsight 3.6.
    -->
    <storm.version>1.1.0.2.6.1.9-1</storm.version>
</properties>

Ahora puede usar este valor en otras secciones de pom.xml.You can now use this value in other sections of the pom.xml. Por ejemplo, al especificar la versión de los componentes de Storm, puede usar ${storm.version} en lugar de codificar un valor de forma rígida.For example, when specifying the version of Storm components, you can use ${storm.version} instead of hard coding a value.

Adición de dependenciasAdd dependencies

Agregue una dependencia para componentes de Storm.Add a dependency for Storm components. En la sección pom.xml, agregue el siguiente texto a la sección <dependencies>:In pom.xml, add the following text in the <dependencies> section:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>${storm.version}</version>
    <!-- keep storm out of the jar-with-dependencies -->
    <scope>provided</scope>
</dependency>

En tiempo de compilación, Maven usa esta información para buscar storm-core en el repositorio de Maven.At compile time, Maven uses this information to look up storm-core in the Maven repository. Busca primero en el repositorio del equipo local.It first looks in the repository on your local computer. Si los archivos no están allí, Maven los descarga desde el repositorio de Maven público y los almacena en el repositorio local.If the files aren't there, Maven downloads them from the public Maven repository and stores them in the local repository.

Nota

Observe la línea <scope>provided</scope> en esta sección.Notice the <scope>provided</scope> line in this section. Esta opción indica a Maven que excluya storm-core de los archivos JAR creados, ya que el sistema lo proporciona.This setting tells Maven to exclude storm-core from any JAR files that are created, because it is provided by the system.

Configuración de compilaciónBuild configuration

Los complementos de Maven permiten personalizar las fases de compilación del proyecto,Maven plug-ins allow you to customize the build stages of the project. por ejemplo, cómo se compila el proyecto o cómo este se empaqueta en un archivo JAR.For example, how the project is compiled or how to package it into a JAR file. En pom.xml, agregue el siguiente texto justo encima de la línea </project>.In pom.xml, add the following text directly above the </project> line.

<build>
    <plugins>
    </plugins>
    <resources>
    </resources>
</build>

Esta sección se usa para agregar complementos, recursos y otras opciones de configuración de compilación.This section is used to add plug-ins, resources, and other build configuration options. Para obtener una referencia completa del archivo pom.xml, consulte https://maven.apache.org/pom.html.For a full reference of the pom.xml file, see https://maven.apache.org/pom.html.

Agregar complementosAdd plug-ins

  • Complemento Exec MavenExec Maven Plugin

    En el caso de las topologías de Apache Storm implementadas en Java, el Complemento Exec Maven es útil porque le permite ejecutar fácilmente la topología de manera local en su entorno de desarrollo.For Apache Storm topologies implemented in Java, the Exec Maven Plugin is useful because it allows you to easily run the topology locally in your development environment. Agregue lo siguiente a la sección <plugins> del archivo pom.xml para incluir el complemento Exec Maven:Add the following to the <plugins> section of the pom.xml file to include the Exec Maven plugin:

    <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.6.0</version>
        <executions>
            <execution>
            <goals>
                <goal>exec</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <executable>java</executable>
            <includeProjectDependencies>true</includeProjectDependencies>
            <includePluginDependencies>false</includePluginDependencies>
            <classpathScope>compile</classpathScope>
            <mainClass>${storm.topology}</mainClass>
            <cleanupDaemonThreads>false</cleanupDaemonThreads> 
        </configuration>
    </plugin>
    
  • Complemento Apache Maven CompilerApache Maven Compiler Plugin

    Otro complemento útil es el Complemento Apache Maven Compiler, que se usa para cambiar las opciones de compilación.Another useful plug-in is the Apache Maven Compiler Plugin, which is used to change compilation options. Cambie la versión de Java que usa Maven para el origen y destino de la aplicación.Change the Java version that Maven uses for the source and target for your application.

    • Para HDInsight 3.4 o una versión anterior, establezca la versión de origen y destino de Java en 1.7.For HDInsight 3.4 or earlier, set the source and target Java version to 1.7.

    • Para HDInsight 3.5, establezca la versión de origen y destino de Java en 1.8.For HDInsight 3.5, set the source and target Java version to 1.8.

      Agregue el siguiente texto en la sección <plugins> del archivo pom.xml para incluir el complemento Apache Maven Compiler.Add the following text in the <plugins> section of the pom.xml file to include the Apache Maven Compiler plugin. Este ejemplo especifica 1.8, por lo que la versión de HDInsight de destino es 3.5.This example specifies 1.8, so the target HDInsight version is 3.5.

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
        <source>1.8</source>
        <target>1.8</target>
        </configuration>
      </plugin>
      

Configure resourcesConfigure resources

Gracias a la sección de recursos, podrá incluir recursos no basados en códigos, como los archivos de configuración que necesitan los componentes de la topología.The resources section allows you to include non-code resources such as configuration files needed by components in the topology. En este ejemplo, agregue el texto siguiente en la sección <resources> del archivo pom.xml.For this example, add the following text in the <resources> section of the pom.xml file.

<resource>
    <directory>${basedir}/resources</directory>
    <filtering>false</filtering>
    <includes>
        <include>log4j2.xml</include>
    </includes>
</resource>

Este ejemplo agrega el directorio de recursos en la raíz del proyecto (${basedir}) como una ubicación que contiene recursos e incluye el archivo denominado log4j2.xml.This example adds the resources directory in the root of the project (${basedir}) as a location that contains resources, and includes the file named log4j2.xml. Este archivo se utiliza para configurar la información que registra la topología.This file is used to configure what information is logged by the topology.

Creación de la topologíaCreate the topology

Una topología de Apache Storm basada en Java consta de tres componentes que debe crear (o hacer referencia) como una dependencia.A Java-based Apache Storm topology consists of three components that you must author (or reference) as a dependency.

  • Spouts: lee datos de orígenes externos y emite flujos de datos a la topología.Spouts: Reads data from external sources and emits streams of data into the topology.

  • Bolts: realiza el procesamiento en flujos que emite spouts u otros bolts, y emite uno o varios flujos.Bolts: Performs processing on streams emitted by spouts or other bolts, and emits one or more streams.

  • Topología: define cómo se organizan los spouts y bolts, y proporciona el punto de entrada de la topología.Topology: Defines how the spouts and bolts are arranged, and provides the entry point for the topology.

Creación del spoutCreate the spout

Para reducir los requisitos de la configuración de los orígenes de datos externos, el spout siguiente simplemente emite frases aleatorias.To reduce requirements for setting up external data sources, the following spout simply emits random sentences. Se trata de una versión modificada de un spout que se proporciona con los ejemplos de Storm-Starter.It is a modified version of a spout that is provided with the Storm-Starter examples. Aunque esta topología solo usa un spout, otras pueden tener varios que alimenten datos desde orígenes distintos en la topología.Although this topology uses only one spout, others may have several that feed data from different sources into the topology.

Escriba el comando siguiente para crear un archivo RandomSentenceSpout.java y abrirlo:Enter the command below to create and open a new file RandomSentenceSpout.java:

notepad src\main\java\com\microsoft\example\RandomSentenceSpout.java

Luego, copie y pegue el código Java siguiente en el nuevo archivo.Then copy and paste the java code below into the new file. y ciérrelo.Then close the file.

package com.microsoft.example;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
  //Collector used to emit output
  SpoutOutputCollector _collector;
  //Used to generate a random number
  Random _rand;

  //Open is called when an instance of the class is created
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  //Set the instance collector to the one passed in
    _collector = collector;
    //For randomness
    _rand = new Random();
  }

  //Emit data to the stream
  @Override
  public void nextTuple() {
  //Sleep for a bit
    Utils.sleep(100);
    //The sentences that are randomly emitted
    String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
        "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
    //Randomly pick a sentence
    String sentence = sentences[_rand.nextInt(sentences.length)];
    //Emit the sentence
    _collector.emit(new Values(sentence));
  }

  //Ack is not implemented since this is a basic example
  @Override
  public void ack(Object id) {
  }

  //Fail is not implemented since this is a basic example
  @Override
  public void fail(Object id) {
  }

  //Declare the output fields. In this case, an sentence
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
  }
}

Nota

Para ver un ejemplo de un spout que lee desde un origen de datos externos, consulte uno de los siguientes ejemplos:For an example of a spout that reads from an external data source, see one of the following examples:

Creación de los boltsCreate the bolts

Los bolts controlan el procesamiento de datos.Bolts handle the data processing. Los bolts pueden hacer cualquier cosa; por ejemplo, cálculo, persistencia o hablar con componentes externos.Bolts can do anything, for example, computation, persistence, or talking to external components. Esta topología utiliza dos bolts:This topology uses two bolts:

  • SplitSentence: divide las frases que emite RandomSentenceSpout en palabras individuales.SplitSentence: Splits the sentences emitted by RandomSentenceSpout into individual words.

  • WordCount: cuenta cuántas veces se ha repetido cada palabra.WordCount: Counts how many times each word has occurred.

SplitSentenceSplitSentence

Escriba el comando siguiente para crear un archivo SplitSentence.java y abrirlo:Enter the command below to create and open a new file SplitSentence.java:

notepad src\main\java\com\microsoft\example\SplitSentence.java

Luego, copie y pegue el código Java siguiente en el nuevo archivo.Then copy and paste the java code below into the new file. y ciérrelo.Then close the file.

package com.microsoft.example;

import java.text.BreakIterator;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class SplitSentence extends BaseBasicBolt {

  //Execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //Get the sentence content from the tuple
    String sentence = tuple.getString(0);
    //An iterator to get each word
    BreakIterator boundary=BreakIterator.getWordInstance();
    //Give the iterator the sentence
    boundary.setText(sentence);
    //Find the beginning first word
    int start=boundary.first();
    //Iterate over each word and emit it to the output stream
    for (int end=boundary.next(); end != BreakIterator.DONE; start=end, end=boundary.next()) {
      //get the word
      String word=sentence.substring(start,end);
      //If a word is whitespace characters, replace it with empty
      word=word.replaceAll("\\s+","");
      //if it's an actual word, emit it
      if (!word.equals("")) {
        collector.emit(new Values(word));
      }
    }
  }

  //Declare that emitted tuples contain a word field
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}

WordCountWordCount

Escriba el comando siguiente para crear un archivo WordCount.java y abrirlo:Enter the command below to create and open a new file WordCount.java:

notepad src\main\java\com\microsoft\example\WordCount.java

Luego, copie y pegue el código Java siguiente en el nuevo archivo.Then copy and paste the java code below into the new file. y ciérrelo.Then close the file.

package com.microsoft.example;

import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;

import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;

// For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

//There are a variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
  //Create logger for this class
  private static final Logger logger = LogManager.getLogger(WordCount.class);
  //For holding words and counts
  Map<String, Integer> counts = new HashMap<String, Integer>();
  //How often to emit a count of words
  private Integer emitFrequency;

  // Default constructor
  public WordCount() {
      emitFrequency=5; // Default to 60 seconds
  }

  // Constructor that sets emit frequency
  public WordCount(Integer frequency) {
      emitFrequency=frequency;
  }

  //Configure frequency of tick tuples for this bolt
  //This delivers a 'tick' tuple on a specific interval,
  //which is used to trigger certain actions
  @Override
  public Map<String, Object> getComponentConfiguration() {
      Config conf = new Config();
      conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequency);
      return conf;
  }

  //execute is called to process tuples
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    //If it's a tick tuple, emit all words and counts
    if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
            && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
      for(String word : counts.keySet()) {
        Integer count = counts.get(word);
        collector.emit(new Values(word, count));
        logger.info("Emitting a count of " + count + " for word " + word);
      }
    } else {
      //Get the word contents from the tuple
      String word = tuple.getString(0);
      //Have we counted any already?
      Integer count = counts.get(word);
      if (count == null)
        count = 0;
      //Increment the count and store it
      count++;
      counts.put(word, count);
    }
  }

  //Declare that this emits a tuple containing two fields; word and count
  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Definición de la topologíaDefine the topology

La topología vincula los spouts y los bolts juntos en un gráfico, que define cómo fluyen los datos entre los componentes.The topology ties the spouts and bolts together into a graph, which defines how data flows between the components. También proporciona sugerencias de paralelismos que utiliza Storm al crear instancias de los componentes dentro del clúster.It also provides parallelism hints that Storm uses when creating instances of the components within the cluster.

La siguiente imagen es un diagrama básico del gráfico de componentes para esta topología.The following image is a basic diagram of the graph of components for this topology.

diagrama que muestra la disposición de los spouts y bolts

Para implementar la topología, escriba el comando siguiente para crear y abrir un nuevo archivo WordCountTopology.java:To implement the topology, enter the command below to create and open a new file WordCountTopology.java:

notepad src\main\java\com\microsoft\example\WordCountTopology.java

Luego, copie y pegue el código Java siguiente en el nuevo archivo.Then copy and paste the java code below into the new file. y ciérrelo.Then close the file.

package com.microsoft.example;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.microsoft.example.RandomSentenceSpout;

public class WordCountTopology {

  //Entry point for the topology
  public static void main(String[] args) throws Exception {
  //Used to build the topology
    TopologyBuilder builder = new TopologyBuilder();
    //Add the spout, with a name of 'spout'
    //and parallelism hint of 5 executors
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    //Add the SplitSentence bolt, with a name of 'split'
    //and parallelism hint of 8 executors
    //shufflegrouping subscribes to the spout, and equally distributes
    //tuples (sentences) across instances of the SplitSentence bolt
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    //Add the counter, with a name of 'count'
    //and parallelism hint of 12 executors
    //fieldsgrouping subscribes to the split bolt, and
    //ensures that the same word is sent to the same instance (group by field 'word')
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    //new configuration
    Config conf = new Config();
    //Set to false to disable debug information when
    // running in production on a cluster
    conf.setDebug(false);

    //If there are arguments, we are running on a cluster
    if (args != null && args.length > 0) {
      //parallelism hint to set the number of workers
      conf.setNumWorkers(3);
      //submit the topology
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    //Otherwise, we are running locally
    else {
      //Cap the maximum number of executors that can be spawned
      //for a component to 3
      conf.setMaxTaskParallelism(3);
      //LocalCluster is used to run locally
      LocalCluster cluster = new LocalCluster();
      //submit the topology
      cluster.submitTopology("word-count", conf, builder.createTopology());
      //sleep
      Thread.sleep(10000);
      //shut down the cluster
      cluster.shutdown();
    }
  }
}

registroConfigure logging

Storm utiliza Apache Log4j 2 para registrar información.Storm uses Apache Log4j 2 to log information. Si no configura el registro, la topología emite información de diagnóstico.If you do not configure logging, the topology emits diagnostic information. Para controlar lo que se registra, cree un archivo denominado log4j2.xml en el directorio resources mediante el comando siguiente:To control what is logged, create a file named log4j2.xml in the resources directory by entering the command below:

notepad resources\log4j2.xml

Luego, copie y pegue el texto XML siguiente en el nuevo archivo.Then copy and paste the XML text below into the new file. y ciérrelo.Then close the file.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
    <Console name="STDOUT" target="SYSTEM_OUT">
        <PatternLayout pattern="%d{HH:mm:ss} [%t] %-5level %logger{36} - %msg%n"/>
    </Console>
</Appenders>
<Loggers>
    <Logger name="com.microsoft.example" level="trace" additivity="false">
        <AppenderRef ref="STDOUT"/>
    </Logger>
    <Root level="error">
        <Appender-Ref ref="STDOUT"/>
    </Root>
</Loggers>
</Configuration>

Este XML configura un nuevo registrador para la clase com.microsoft.example, que incluye los componentes de esta topología de ejemplo.This XML configures a new logger for the com.microsoft.example class, which includes the components in this example topology. El nivel está establecido para realizar un seguimiento de este registrador, que captura cualquier información de registro que generen los componentes de esta topología.The level is set to trace for this logger, which captures any logging information emitted by components in this topology.

La sección <Root level="error"> configura el nivel de registro raíz (todo lo que no esté en com.microsoft.example) para registrar solo la información de los errores.The <Root level="error"> section configures the root level of logging (everything not in com.microsoft.example) to only log error information.

Para más información sobre la configuración del registro para Log4j 2, consulte https://logging.apache.org/log4j/2.x/manual/configuration.html.For more information on configuring logging for Log4j 2, see https://logging.apache.org/log4j/2.x/manual/configuration.html.

Nota

La versión 0.10.0 y superiores de Storm utilizan Log4j 2.x.Storm version 0.10.0 and higher use Log4j 2.x. Las versiones anteriores de Storm usaban Log4j 1.x, que empleaba otro formato de configuración del registro.Older versions of storm used Log4j 1.x, which used a different format for log configuration. Para información sobre la configuración anterior, consulte https://wiki.apache.org/logging-log4j/Log4jXmlFormat.For information on the older configuration, see https://wiki.apache.org/logging-log4j/Log4jXmlFormat.

Prueba de la topología de manera localTest the topology locally

Después de guardar los archivos, use el comando siguiente para probar localmente la topología.After you save the files, use the following command to test the topology locally.

mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCountTopology

Mientras se ejecuta, la topología muestra información de inicio.As it runs, the topology displays startup information. El siguiente texto es un ejemplo de la salida del recuento de palabras:The following text is an example of the word count output:

17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word snow

Este registro de ejemplo indica que la palabra «and» se ha emitido 113 veces.This example log indicates that the word 'and' has been emitted 113 times. El recuento continúa aumentando mientras se ejecute la topología porque el spout emite continuamente las mismas frases.The count continues to go up as long as the topology runs because the spout continuously emits the same sentences.

Hay un intervalo de 5 segundos entre la emisión de las palabras y los recuentos.There is a 5-second interval between emission of words and counts. El componente WordCount está configurado para emitir la información solo cuando llegue una tupla de marca.The WordCount component is configured to only emit information when a tick tuple arrives. Solicita que esas tuplas se entreguen solo cada cinco segundos.It requests that tick tuples are only delivered every five seconds.

Conversión de la topología a FluxConvert the topology to Flux

Flux es un nuevo marco de trabajo disponible con Storm 0.10.0 y versiones superiores que permite separar la configuración de la implementación.Flux is a new framework available with Storm 0.10.0 and higher, which allows you to separate configuration from implementation. Los componentes se siguen definiendo en Java, pero la topología se define con un archivo YAML.Your components are still defined in Java, but the topology is defined using a YAML file. Puede empaquetar una definición de topología predeterminada con el proyecto, o bien usar un archivo independiente al enviar la topología.You can package a default topology definition with your project, or use a standalone file when submitting the topology. Al enviar la topología a Storm, puede usar variables de entorno o archivos de configuración para rellenar los valores de la definición de la topología de YAML.When submitting the topology to Storm, you can use environment variables or configuration files to populate values in the YAML topology definition.

El archivo YAML define los componentes que se usarán para la topología y el flujo de datos entre ellos.The YAML file defines the components to use for the topology and the data flow between them. Puede incluir un archivo YAML como parte del archivo jar o puede usar un archivo YAML externo.You can include a YAML file as part of the jar file or you can use an external YAML file.

Para más información sobre la plataforma Flux, consulte Plataforma Flux (https://storm.apache.org/releases/current/flux.html).For more information on Flux, see Flux framework (https://storm.apache.org/releases/current/flux.html).

Advertencia

Debido a un error (https://issues.apache.org/jira/browse/STORM-2055) con Storm 1.0.1, es posible que deba instalar un entorno de desarrollo de Storm para ejecutar topologías de Flux de forma local.Due to a bug (https://issues.apache.org/jira/browse/STORM-2055) with Storm 1.0.1, you may need to install a Storm development environment to run Flux topologies locally.

  1. Anteriormente, WordCountTopology.java definía la topología, pero no es necesario con Flux.Previously, WordCountTopology.java defined the topology, but isn't needed with Flux. Elimine el archivo con el comando siguiente:Delete the file with the following command:

    DEL src\main\java\com\microsoft\example\WordCountTopology.java
    
  2. Escriba el comando siguiente para crear un archivo topology.yaml y abrirlo:Enter the command below to create and open a new file topology.yaml:

    notepad resources\topology.yaml
    

    Luego, copie y pegue el texto siguiente en el nuevo archivo.Then copy and paste the text below into the new file. y ciérrelo.Then close the file.

    name: "wordcount"       # friendly name for the topology
    
    config:                 # Topology configuration
      topology.workers: 1     # Hint for the number of workers to create
    
    spouts:                 # Spout definitions
    - id: "sentence-spout"
      className: "com.microsoft.example.RandomSentenceSpout"
      parallelism: 1      # parallelism hint
    
    bolts:                  # Bolt definitions
    - id: "splitter-bolt"
      className: "com.microsoft.example.SplitSentence"
      parallelism: 1
    
    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 10
      parallelism: 1
    
    streams:                # Stream definitions
    - name: "Spout --> Splitter" # name isn't used (placeholder for logging, UI, etc.)
      from: "sentence-spout"       # The stream emitter
      to: "splitter-bolt"          # The stream consumer
      grouping:                    # Grouping type
        type: SHUFFLE
    
    - name: "Splitter -> Counter"
      from: "splitter-bolt"
      to: "counter-bolt"
      grouping:
        type: FIELDS
        args: ["word"]           # field(s) to group on
    
  3. Escriba el siguiente comando para abrir pom.xml para hacer las revisiones que se describen a continuación:Enter the command below to open pom.xml to make the described revisions below:

    notepad pom.xml
    
    • Agregue la siguiente dependencia nueva en la sección <dependencies> :Add the following new dependency in the <dependencies> section:

      <!-- Add a dependency on the Flux framework -->
      <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>flux-core</artifactId>
          <version>${storm.version}</version>
      </dependency>
      
    • Agregue el complemento siguiente a la sección <plugins> .Add the following plugin to the <plugins> section. Este complemento administra la creación de un paquete (archivo jar) para el proyecto y aplica algunas transformaciones específicas a Flux cuando se crea el paquete.This plugin handles the creation of a package (jar file) for the project, and applies some transformations specific to Flux when creating the package.

      <!-- build an uber jar -->
      <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.2.1</version>
          <configuration>
              <transformers>
                  <!-- Keep us from getting a "can't overwrite file error" -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                  <!-- We're using Flux, so refer to it as main -->
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                      <mainClass>org.apache.storm.flux.Flux</mainClass>
                  </transformer>
              </transformers>
              <!-- Keep us from getting a bad signature error -->
              <filters>
                  <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                          <exclude>META-INF/*.SF</exclude>
                          <exclude>META-INF/*.DSA</exclude>
                          <exclude>META-INF/*.RSA</exclude>
                      </excludes>
                  </filter>
              </filters>
          </configuration>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
              </execution>
          </executions>
      </plugin>
      
    • En la sección exec-maven-plugin<configuration>, cambie el valor de <mainClass> de ${storm.topology} a org.apache.storm.flux.Flux.In the exec-maven-plugin <configuration> section, change the value for <mainClass> from ${storm.topology} to org.apache.storm.flux.Flux. Esta opción permite a Flux controlar la ejecución de la topología localmente en el entorno de desarrollo.This setting allows Flux to handle running the topology locally in development.

    • En la sección <resources>, agregue lo siguiente a <includes>.In the <resources> section, add the following to <includes>. Este XML incluye el archivo YAML que define la topología como parte del proyecto.This XML includes the YAML file that defines the topology as part of the project.

      <include>topology.yaml</include>
      

Prueba local de la topologíaTest the flux topology locally

  1. Escriba el comando siguiente para compilar y ejecutar la topología de Flux mediante Maven:Enter the following command to compile and execute the Flux topology using Maven:

    mvn compile exec:java -Dexec.args="--local -R /topology.yaml"
    

    Advertencia

    Este comando produce un error si la topología usa bits de Storm 1.0.1.If your topology uses Storm 1.0.1 bits, this command fails. Este error lo causa https://issues.apache.org/jira/browse/STORM-2055.This failure is caused by https://issues.apache.org/jira/browse/STORM-2055. En su lugar, instale Storm en su entorno de desarrollo y siga los siguientes pasos:Instead, install Storm in your development environment and use the following steps:

    Si tiene instalado Storm en el entorno de desarrollo, puede usar en su lugar los siguientes comandos:If you have installed Storm in your development environment, you can use the following commands instead:

    mvn compile package
    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local -R /topology.yaml
    

    El parámetro --local ejecuta la topología en modo local en el entorno de desarrollo.The --local parameter runs the topology in local mode on your development environment. El parámetro -R /topology.yaml usa el recurso de archivo topology.yaml del archivo jar para definir la topología.The -R /topology.yaml parameter uses the topology.yaml file resource from the jar file to define the topology.

    Mientras se ejecuta, la topología muestra información de inicio.As it runs, the topology displays startup information. El siguiente texto es un ejemplo de la salida:The following text is an example of the output:

     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word snow
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 56 for word white
     17:33:27 [Thread-12-count] INFO  com.microsoft.example.WordCount - Emitting a count of 112 for word seven
     17:33:27 [Thread-16-count] INFO  com.microsoft.example.WordCount - Emitting a count of 195 for word the
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 113 for word and
     17:33:27 [Thread-30-count] INFO  com.microsoft.example.WordCount - Emitting a count of 57 for word dwarfs
    

    Hay un retraso de 10 segundos entre los distintos lotes de la información registrada.There is a 10-second delay between batches of logged information.

  2. Cree una nueva topología YAML a partir del proyecto.Create a new topology yaml from the project.

    a.a. Escriba el comando siguiente para abrir topology.xml:Enter the command below to open topology.xml:

    notepad resources\topology.yaml
    

    b.b. Busque la siguiente sección y cambie el valor de 10 a 5.Find the following section and change the value of 10 to 5. Esta modificación altera el intervalo entre la emisión de lotes de recuentos de palabras de 10 segundos a 5.This modification changes the interval between emitting batches of word counts from 10 seconds to 5.

    - id: "counter-bolt"
      className: "com.microsoft.example.WordCount"
      constructorArgs:
        - 5
      parallelism: 1  
    

    c.c. Guarde el archivo como newtopology.yaml.Save file as newtopology.yaml.

  3. Para ejecutar la topología, escriba el siguiente comando:To run the topology, enter the following command:

    mvn exec:java -Dexec.args="--local resources/newtopology.yaml"
    

    También puede hacer lo siguiente si tiene Storm en el entorno de desarrollo:Or, if you have Storm on your development environment:

    storm jar target/WordCount-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local resources/newtopology.yaml
    

    Este comando usa newtopology.yaml como definición de la topología.This command uses the newtopology.yaml as the topology definition. Puesto que no se incluyó el parámetro compile, Maven usa la versión del proyecto compilada en los pasos anteriores.Since we didn't include the compile parameter, Maven uses the version of the project built in previous steps.

    Una vez iniciada la topología, observará que el tiempo entre los lotes emitidos ha cambiado y ahora refleja el valor de newtopology.yaml.Once the topology starts, you should notice that the time between emitted batches has changed to reflect the value in newtopology.yaml. Esto demuestra que puede cambiar la configuración a través de un archivo YAML sin tener que volver a compilar la topología.So you can see that you can change your configuration through a YAML file without having to recompile the topology.

Para más información sobre estas y otras características del marco de trabajo Flux, consulte Flux (https://storm.apache.org/releases/current/flux.html).For more information on these and other features of the Flux framework, see Flux (https://storm.apache.org/releases/current/flux.html).

TridentTrident

Trident es una abstracción de alto nivel que ofrece Storm.Trident is a high-level abstraction that is provided by Storm. Admite el procesamiento con estado.It supports stateful processing. La principal ventaja de Trident es que puede garantizar que todos los mensajes que entran en la topología se procesan una sola vez.The primary advantage of Trident is that it can guarantee that every message that enters the topology is processed only once. Si no se usa Trident, la topología solo puede garantizar que los mensajes se procesan al menos una vez.Without using Trident, your topology can only guarantee that messages are processed at least once. También hay otras diferencias, como los componentes integrados que se pueden usar en lugar de crear bolts.There are also other differences, such as built-in components that can be used instead of creating bolts. De hecho, los bolts se reemplazan por componentes menos genéricos, como filtros, proyecciones y funciones.In fact, bolts are replaced by less-generic components, such as filters, projections, and functions.

Las aplicaciones de Trident se pueden crear mediante proyectos de Maven.Trident applications can be created by using Maven projects. Siga los mismos pasos básicos que se mostraron anteriormente en este artículo; lo único diferente es el código.You use the same basic steps as presented earlier in this article—only the code is different. Actualmente, tampoco puede usarse Trident con el marco de trabajo Flux.Trident also cannot (currently) be used with the Flux framework.

Para obtener más información sobre Trident, consulte Información general sobre la API de Trident.For more information about Trident, see the Trident API Overview.

Pasos siguientesNext Steps

Ha aprendido a crear una topología de Apache Storm con Java.You have learned how to create an Apache Storm topology by using Java. Ahora obtenga información sobre:Now learn how to:

Puede encontrar más topologías de ejemplo de Apache Storm en Topologías de ejemplo para Apache Storm en HDInsight.You can find more example Apache Storm topologies by visiting Example topologies for Apache Storm on HDInsight.