Rychlý start: Příjem událostí z Event Hubs pomocí Apache Storm

Apache Storm je distribuovaný výpočetní systém v reálném čase, který zjednodušuje spolehlivé zpracování nevázaných datových proudů. Tato část ukazuje, jak pomocí Azure Event Hubs storm spout přijímat události z Event Hubs. Pomocí Apache Storm můžete události rozdělit mezi několik procesů hostovaných v různých uzlech. Integrace Event Hubs Storm zjednodušuje spotřebu událostí transparentním vytvářením kontrolních bodů průběhu pomocí instalace Zookeeper stormu, správy trvalých kontrolních bodů a paralelních přijetí z Event Hubs.

Další informace o vzorech příjmu Event Hubs najdete v přehledu Event Hubs systému.

Požadavky

Než začnete s rychlým startem, vytvořte obor názvů Event Hubs a centrum událostí. Pomocí Azure Portal vytvořte obor názvů typu Event Hubs a získejte přihlašovací údaje pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.

Vytvoření projektu a přidání kódu

V tomto kurzu se používá instalace HDInsight Storm, která je součástí Event Hubs spout již k dispozici.

  1. Postupujte podle pokynů HDInsight Storm – Začínáme a vytvořte nový cluster HDInsight a připojte se k něj přes Vzdálenou plochu.

  2. Zkopírujte %STORM_HOME%\examples\eventhubspout\eventhubs-storm-spout-0.9-jar-with-dependencies.jar soubor do místního vývojového prostředí. Obsahuje soubor events-storm-spout.

  3. Pomocí následujícího příkazu nainstalujte balíček do místního úložiště Maven. To vám umožní přidat ho jako referenci v projektu Storm v pozdějším kroku.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  4. V Eclipse vytvořte nový projekt Maven (klikněte na File (Soubor), New (Nový) a Project).

    Soubor -> Nový -> Project

  5. Vyberte Použít výchozí umístění pracovního prostoru a pak klikněte na Další.

  6. Vyberte archetyp maven-archetype-quickstart a pak klikněte na Další.

  7. Vložte GroupId (ID skupiny) a ArtifactId (ID artefaktu) a pak klikněte na Finish (Dokončit).

  8. V pom.xml přidejte do uzlu následující <dependency> závislosti.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  9. Ve složce src vytvořte soubor s názvem Config.properties a zkopírujte následující obsah a nahraďte receive rule key hodnotami a event hub name :

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    Hodnota eventhub.receiver.credits určuje, kolik událostí se před jejich uvolněním do kanálu Stormu dávkově prodá. Pro zjednodušení tento příklad nastaví tuto hodnotu na 10. V produkčním prostředí by měla být obvykle nastavená na vyšší hodnoty. například 1024.

  10. Vytvořte novou třídu s názvem LoggerBolt s následujícím kódem:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    Tento storm bolt protokoluje obsah přijatých událostí. To lze snadno rozšířit tak, aby ukládaly řazené kolekce členů ve službě úložiště. Příklad [STORM HDInsight s centrem událostí] používá stejný přístup k ukládání dat do Azure Storage a Power BI.

  11. Vytvořte třídu s názvem LogTopology s následujícím kódem:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    Tato třída vytvoří novou Event Hubs spout pomocí vlastností v konfiguračním souboru k vytvoření instance. Je důležité si uvědomit, že tento příklad vytvoří tolik úloh spoutů jako počet oddílů v centru událostí, aby bylo možné použít maximální paralelismus povolený tímto centrem událostí.

Další kroky

Další informace o službě Event Hubs najdete na následujících odkazech: