Snabbstart: Ta emot händelser från Event Hubs med Apache Storm

Apache Storm är ett distribuerat beräkningssystem i realtid som förenklar tillförlitlig bearbetning av obundna dataströmmar. Det här avsnittet visar hur du använder Azure Event Hubs Storm-spout för att ta emot händelser från Event Hubs. Med Apache Storm kan du dela upp händelser över flera processer som finns på olika noder. Den Event Hubs integreringen med Storm förenklar händelseförbrukningen genom att transparent kontrollera förloppet med stormens Zookeeper-installation, hantera beständiga kontrollpunkter och parallella tar emot från Event Hubs.

Mer information om hur Event Hubs kan ta emot mönster finns i Event Hubs översikt.

Förutsättningar

Innan du börjar med snabbstarten skapar du en Event Hubs namnrymd och en händelsehubb. Använd Azure Portal för att skapa ett namnområde av Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln.

Skapa projekt och lägga till kod

Den här självstudien använder en HDInsight Storm-installation, som levereras med Event Hubs-spout som redan är tillgänglig.

  1. Följ proceduren HDInsight Storm – Kom igång för att skapa ett nytt HDInsight-kluster och ansluta till det via Fjärrskrivbord.

  2. Kopiera filen %STORM_HOME%\examples\eventhubspout\eventhubs-storm-spout-0.9-jar-with-dependencies.jar till din lokala utvecklingsmiljö. Detta innehåller events-storm-spout.

  3. Använd följande kommando för att installera paketet i det lokala Maven-arkivet. På så sätt kan du lägga till den som en referens i Storm-projektet i ett senare steg.

    mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jar
    
  4. I Eclipse skapar du ett nytt Maven-projekt (klicka på Arkiv och sedan på Nytt och sedan Project).

    File -> New -> Project

  5. Välj Använd standardplats för arbetsyta och klicka sedan på Nästa

  6. Välj arketypen maven-archetype-quickstart och klicka sedan på Nästa

  7. Infoga ett GroupId och ArtifactId och klicka sedan på Slutför

  8. I pom.xml lägger du till följande beroenden i <dependency> noden.

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.2-incubating</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.microsoft.eventhubs</groupId>
        <artifactId>eventhubs-storm-spout</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>com.netflix.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>1.3.3</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
        <scope>provided</scope>
    </dependency>
    
  9. I mappen src skapar du en fil med namnet Config.properties och kopierar följande innehåll och ersätter receive rule key värdena och event hub name :

    eventhubspout.username = ReceiveRule
    eventhubspout.password = {receive rule key}
    eventhubspout.namespace = ioteventhub-ns
    eventhubspout.entitypath = {event hub name}
    eventhubspout.partitions.count = 16
    
    # if not provided, will use storm's zookeeper settings
    # zookeeper.connectionstring=localhost:2181
    
    eventhubspout.checkpoint.interval = 10
    eventhub.receiver.credits = 10
    

    Värdet för eventhub.receiver.credits avgör hur många händelser som batchas innan de släpps till Storm-pipelinen. För enkelhetens skull anger det här exemplet värdet till 10. I produktion bör den vanligtvis anges till högre värden. till exempel 1024.

  10. Skapa en ny klass med namnet LoggerBolt med följande kod:

    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class LoggerBolt extends BaseRichBolt {
        private OutputCollector collector;
        private static final Logger logger = LoggerFactory
                  .getLogger(LoggerBolt.class);
    
        @Override
        public void execute(Tuple tuple) {
            String value = tuple.getString(0);
            logger.info("Tuple value: " + value);
    
            collector.ack(tuple);
        }
    
        @Override
        public void prepare(Map map, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            this.count = 0;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // no output fields
        }
    
    }
    

    Den här Storm-bulten loggar innehållet i de mottagna händelserna. Detta kan enkelt utökas till att lagra tupplar i en lagringstjänst. [HdInsight Storm med Event Hub-exemplet använder] samma metod för att lagra data i Azure Storage och Power BI.

  11. Skapa en klass med namnet LogTopology med följande kod:

    import java.io.FileReader;
    import java.util.Properties;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;
    import com.microsoft.eventhubs.samples.EventCount;
    import com.microsoft.eventhubs.spout.EventHubSpout;
    import com.microsoft.eventhubs.spout.EventHubSpoutConfig;
    
    public class LogTopology {
        protected EventHubSpoutConfig spoutConfig;
        protected int numWorkers;
    
        protected void readEHConfig(String[] args) throws Exception {
            Properties properties = new Properties();
            if (args.length > 1) {
                properties.load(new FileReader(args[1]));
            } else {
                properties.load(EventCount.class.getClassLoader()
                        .getResourceAsStream("Config.properties"));
            }
    
            String username = properties.getProperty("eventhubspout.username");
            String password = properties.getProperty("eventhubspout.password");
            String namespaceName = properties
                    .getProperty("eventhubspout.namespace");
            String entityPath = properties.getProperty("eventhubspout.entitypath");
            String zkEndpointAddress = properties
                    .getProperty("zookeeper.connectionstring"); // opt
            int partitionCount = Integer.parseInt(properties
                    .getProperty("eventhubspout.partitions.count"));
            int checkpointIntervalInSeconds = Integer.parseInt(properties
                    .getProperty("eventhubspout.checkpoint.interval"));
            int receiverCredits = Integer.parseInt(properties
                    .getProperty("eventhub.receiver.credits")); // prefetch count
                                                                // (opt)
            System.out.println("Eventhub spout config: ");
            System.out.println("  partition count: " + partitionCount);
            System.out.println("  checkpoint interval: "
                    + checkpointIntervalInSeconds);
            System.out.println("  receiver credits: " + receiverCredits);
    
            spoutConfig = new EventHubSpoutConfig(username, password,
                    namespaceName, entityPath, partitionCount, zkEndpointAddress,
                    checkpointIntervalInSeconds, receiverCredits);
    
            // set the number of workers to be the same as partition number.
            // the idea is to have a spout and a logger bolt co-exist in one
            // worker to avoid shuffling messages across workers in storm cluster.
            numWorkers = spoutConfig.getPartitionCount();
    
            if (args.length > 0) {
                // set topology name so that sample Trident topology can use it as
                // stream name.
                spoutConfig.setTopologyName(args[0]);
            }
        }
    
        protected StormTopology buildTopology() {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig);
            topologyBuilder.setSpout("EventHubsSpout", eventHubSpout,
                    spoutConfig.getPartitionCount()).setNumTasks(
                    spoutConfig.getPartitionCount());
            topologyBuilder
                    .setBolt("LoggerBolt", new LoggerBolt(),
                            spoutConfig.getPartitionCount())
                    .localOrShuffleGrouping("EventHubsSpout")
                    .setNumTasks(spoutConfig.getPartitionCount());
            return topologyBuilder.createTopology();
        }
    
        protected void runScenario(String[] args) throws Exception {
            boolean runLocal = true;
            readEHConfig(args);
            StormTopology topology = buildTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (runLocal) {
                config.setMaxTaskParallelism(2);
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("test", config, topology);
                Thread.sleep(5000000);
                localCluster.shutdown();
            } else {
                config.setNumWorkers(numWorkers);
                StormSubmitter.submitTopology(args[0], config, topology);
            }
        }
    
        public static void main(String[] args) throws Exception {
            LogTopology topology = new LogTopology();
            topology.runScenario(args);
        }
    }
    

    Den här klassen skapar en Event Hubs med hjälp av egenskaperna i konfigurationsfilen för att instansiera den. Det är viktigt att observera att det här exemplet skapar lika många aktiviteter som antalet partitioner i händelsehubben för att kunna använda den maximala parallellitet som tillåts av händelsehubben.

Nästa steg

Du kan lära dig mer om Event Hubs genom att gå till följande länkar: