Получение событий из Центров событий с помощью Apache StormReceive events from Event Hubs using Apache Storm

Apache Storm — это распределенная система вычислений в реальном времени, упрощающая надежную обработку неограниченных потоков данных.Apache Storm is a distributed real-time computation system that simplifies reliable processing of unbounded streams of data. В этом разделе показано использование spout Storm Центров событий Azure для приема событий из Центров событий.This section shows how to use an Azure Event Hubs Storm spout to receive events from Event Hubs. С помощью Apache Storm можно разделить события между несколькими процессами, размещенными в разных узлах.Using Apache Storm, you can split events across multiple processes hosted in different nodes. Интеграция Центров событий с помощью Storm упрощает использование событий путем прозрачного определения контрольных точек в ходе выполнения с помощью установки Storm Zookeeper, управляя постоянными контрольными точками и одновременно облегчает получение от Центров событий.The Event Hubs integration with Storm simplifies event consumption by transparently checkpointing its progress using Storm's Zookeeper installation, managing persistent checkpoints and parallel receives from Event Hubs.

Дополнительные сведения о шаблонах получения данных в Центры событий см. в статье Общие сведения о Центрах событий.For more information about Event Hubs receive patterns, see the Event Hubs overview.

Технические условияPrerequisites

Прежде чем приступить к краткому руководству, создать пространство имен концентраторов событий и концентратора событий.Before you start with the quickstart, create an Event Hubs namespace and an event hub. Используйте портала Azure создать пространство имен типа концентраторов событий и получение учетных данных управления, необходимых приложению для взаимодействия с концентратором событий.Use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи.To create a namespace and an event hub, follow the procedure in this article.

Создание проекта и добавление кодаCreate project and add code

В данном руководстве используется установка программы HDInsight Storm, которая поставляется вместе с уже доступным Spout Центров событий.This tutorial uses an HDInsight Storm installation, which comes with the Event Hubs spout already available.

  1. Следуя процедурам, приведенным в разделе HDInsight Storm — начало работы , создайте новый кластер HDInsight и подключитесь к нему через удаленный рабочий стол.Follow the HDInsight Storm - Get Started procedure to create a new HDInsight cluster, and connect to it via Remote Desktop.

  2. Скопируйте файл %STORM_HOME%\examples\eventhubspout\eventhubs-storm-spout-0.9-jar-with-dependencies.jar в локальную среду разработки.Copy the %STORM_HOME%\examples\eventhubspout\eventhubs-storm-spout-0.9-jar-with-dependencies.jar file to your local development environment. Он содержит пакет events-storm-spout.This contains the events-storm-spout.

  3. Для установки пакета в локальный репозиторий Maven выполните следующую команду.Use the following command to install the package into the local Maven store. Это позволит добавить его в качестве ссылки в проекте Storm позже.This enables you to add it as a reference in the Storm project in a later step.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  4. В Eclipse создайте проект Maven (щелкните File (Файл), New (Создать), а затем Project (Проект)).In Eclipse, create a new Maven project (click File, then New, then Project).

    File (Файл) > New (Создать) > Project (Проект)

  5. Выберите параметр Use default Workspace location (Использовать расположение рабочей области по умолчанию), а затем нажмите кнопку Next (Далее).Select Use default Workspace location, then click Next

  6. Выберите архетип maven-archetype-quickstart и нажмите кнопку Next (Далее).Select the maven-archetype-quickstart archetype, then click Next

  7. Вставьте параметры GroupId и ArtifactId, а затем нажмите кнопку Finish (Готово).Insert a GroupId and ArtifactId, then click Finish

  8. В файле pom.xml добавьте следующие зависимости в узел <dependency>:In pom.xml, add the following dependencies in the <dependency> node.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  9. В папке src создайте файл с именем Config.properties и скопируйте следующее содержимое, заменив значения receive rule key и event hub name:In the src folder, create a file called Config.properties and copy the following content, substituting the receive rule key and event hub name values:

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    Значение для eventhub.receiver.credits определяет, сколько событий являются пакетными перед их выпуском в конвейер Storm.The value for eventhub.receiver.credits determines how many events are batched before releasing them to the Storm pipeline. Для простоты в этом примере используется значение 10.For the sake of simplicity, this example sets this value to 10. В рабочей среде обычно указывается более высокое значение, например, 1024.In production, it should usually be set to higher values; for example, 1024.

  10. Создайте новый класс с именем LoggerBolt с использованием следующего кода:Create a new class called LoggerBolt with the following code:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    Этот Storm Bolt регистрирует содержимое полученного события.This Storm bolt logs the content of the received events. Эго можно легко расширить для хранения кортежей в службе хранилища.This can easily be extended to store tuples in a storage service. В примере использования HDInsight Storm с концентратором событий для хранения данных в службе хранилища Azure и Power BI применяется одинаковый подход.The HDInsight Storm with Event Hub example uses this same approach to store data into Azure Storage and Power BI.

  11. Создайте класс с названием LogTopology и со следующим кодом:Create a class called LogTopology with the following code:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    Этот класс создает новую воронку Центров событий, используя свойства в файле конфигурации для создания ее экземпляра.This class creates a new Event Hubs spout, using the properties in the configuration file to instantiate it. Важно отметить, что в данном примере создается такое же количество задач spout, сколько и разделов в концентраторе событий, чтобы обеспечить максимальный параллелизм, допустимый для этого концентратора событий.It is important to note that this example creates as many spouts tasks as the number of partitions in the event hub, in order to use the maximum parallelism allowed by that event hub.

Дальнейшие действияNext steps

Дополнительные сведения о Центрах событий см. в следующих источниках:You can learn more about Event Hubs by visiting the following links: