Rychlý start: Příjem událostí z Event Hubs pomocí Apache Storm
Apache Storm je distribuovaný výpočetní systém v reálném čase, který zjednodušuje spolehlivé zpracování nevázaných datových proudů. Tato část ukazuje, jak pomocí Azure Event Hubs storm spout přijímat události z Event Hubs. Pomocí Apache Storm můžete události rozdělit mezi několik procesů hostovaných v různých uzlech. Integrace Event Hubs Storm zjednodušuje spotřebu událostí transparentním vytvářením kontrolních bodů průběhu pomocí instalace Zookeeper stormu, správy trvalých kontrolních bodů a paralelních přijetí z Event Hubs.
Další informace o vzorech příjmu Event Hubs najdete v přehledu Event Hubs systému.
Požadavky
Než začnete s rychlým startem, vytvořte obor názvů Event Hubs a centrum událostí. Pomocí Azure Portal vytvořte obor názvů typu Event Hubs a získejte přihlašovací údaje pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.
Vytvoření projektu a přidání kódu
V tomto kurzu se používá instalace HDInsight Storm, která je součástí Event Hubs spout již k dispozici.
Postupujte podle pokynů HDInsight Storm – Začínáme a vytvořte nový cluster HDInsight a připojte se k něj přes Vzdálenou plochu.
Zkopírujte
%STORM_HOME%\examples\eventhubspout\eventhubs-storm-spout-0.9-jar-with-dependencies.jarsoubor do místního vývojového prostředí. Obsahuje soubor events-storm-spout.Pomocí následujícího příkazu nainstalujte balíček do místního úložiště Maven. To vám umožní přidat ho jako referenci v projektu Storm v pozdějším kroku.
mvn install:install-file -Dfile=target\eventhubs-storm-spout-0.9-jar-with-dependencies.jar -DgroupId=com.microsoft.eventhubs -DartifactId=eventhubs-storm-spout -Dversion=0.9 -Dpackaging=jarV Eclipse vytvořte nový projekt Maven (klikněte na File (Soubor), New (Nový) a Project).

Vyberte Použít výchozí umístění pracovního prostoru a pak klikněte na Další.
Vyberte archetyp maven-archetype-quickstart a pak klikněte na Další.
Vložte GroupId (ID skupiny) a ArtifactId (ID artefaktu) a pak klikněte na Finish (Dokončit).
V pom.xml přidejte do uzlu následující
<dependency>závislosti.<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.2-incubating</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.microsoft.eventhubs</groupId> <artifactId>eventhubs-storm-spout</artifactId> <version>0.9</version> </dependency> <dependency> <groupId>com.netflix.curator</groupId> <artifactId>curator-framework</artifactId> <version>1.3.3</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> <scope>provided</scope> </dependency>Ve složce src vytvořte soubor s názvem Config.properties a zkopírujte následující obsah a nahraďte
receive rule keyhodnotami aevent hub name:eventhubspout.username = ReceiveRule eventhubspout.password = {receive rule key} eventhubspout.namespace = ioteventhub-ns eventhubspout.entitypath = {event hub name} eventhubspout.partitions.count = 16 # if not provided, will use storm's zookeeper settings # zookeeper.connectionstring=localhost:2181 eventhubspout.checkpoint.interval = 10 eventhub.receiver.credits = 10Hodnota eventhub.receiver.credits určuje, kolik událostí se před jejich uvolněním do kanálu Stormu dávkově prodá. Pro zjednodušení tento příklad nastaví tuto hodnotu na 10. V produkčním prostředí by měla být obvykle nastavená na vyšší hodnoty. například 1024.
Vytvořte novou třídu s názvem LoggerBolt s následujícím kódem:
import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class LoggerBolt extends BaseRichBolt { private OutputCollector collector; private static final Logger logger = LoggerFactory .getLogger(LoggerBolt.class); @Override public void execute(Tuple tuple) { String value = tuple.getString(0); logger.info("Tuple value: " + value); collector.ack(tuple); } @Override public void prepare(Map map, TopologyContext context, OutputCollector collector) { this.collector = collector; this.count = 0; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // no output fields } }Tento storm bolt protokoluje obsah přijatých událostí. To lze snadno rozšířit tak, aby ukládaly řazené kolekce členů ve službě úložiště. Příklad [STORM HDInsight s centrem událostí] používá stejný přístup k ukládání dat do Azure Storage a Power BI.
Vytvořte třídu s názvem LogTopology s následujícím kódem:
import java.io.FileReader; import java.util.Properties; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import com.microsoft.eventhubs.samples.EventCount; import com.microsoft.eventhubs.spout.EventHubSpout; import com.microsoft.eventhubs.spout.EventHubSpoutConfig; public class LogTopology { protected EventHubSpoutConfig spoutConfig; protected int numWorkers; protected void readEHConfig(String[] args) throws Exception { Properties properties = new Properties(); if (args.length > 1) { properties.load(new FileReader(args[1])); } else { properties.load(EventCount.class.getClassLoader() .getResourceAsStream("Config.properties")); } String username = properties.getProperty("eventhubspout.username"); String password = properties.getProperty("eventhubspout.password"); String namespaceName = properties .getProperty("eventhubspout.namespace"); String entityPath = properties.getProperty("eventhubspout.entitypath"); String zkEndpointAddress = properties .getProperty("zookeeper.connectionstring"); // opt int partitionCount = Integer.parseInt(properties .getProperty("eventhubspout.partitions.count")); int checkpointIntervalInSeconds = Integer.parseInt(properties .getProperty("eventhubspout.checkpoint.interval")); int receiverCredits = Integer.parseInt(properties .getProperty("eventhub.receiver.credits")); // prefetch count // (opt) System.out.println("Eventhub spout config: "); System.out.println(" partition count: " + partitionCount); System.out.println(" checkpoint interval: " + checkpointIntervalInSeconds); System.out.println(" receiver credits: " + receiverCredits); spoutConfig = new EventHubSpoutConfig(username, password, namespaceName, entityPath, partitionCount, zkEndpointAddress, checkpointIntervalInSeconds, receiverCredits); // set the number of workers to be the same as partition number. // the idea is to have a spout and a logger bolt co-exist in one // worker to avoid shuffling messages across workers in storm cluster. numWorkers = spoutConfig.getPartitionCount(); if (args.length > 0) { // set topology name so that sample Trident topology can use it as // stream name. spoutConfig.setTopologyName(args[0]); } } protected StormTopology buildTopology() { TopologyBuilder topologyBuilder = new TopologyBuilder(); EventHubSpout eventHubSpout = new EventHubSpout(spoutConfig); topologyBuilder.setSpout("EventHubsSpout", eventHubSpout, spoutConfig.getPartitionCount()).setNumTasks( spoutConfig.getPartitionCount()); topologyBuilder .setBolt("LoggerBolt", new LoggerBolt(), spoutConfig.getPartitionCount()) .localOrShuffleGrouping("EventHubsSpout") .setNumTasks(spoutConfig.getPartitionCount()); return topologyBuilder.createTopology(); } protected void runScenario(String[] args) throws Exception { boolean runLocal = true; readEHConfig(args); StormTopology topology = buildTopology(); Config config = new Config(); config.setDebug(false); if (runLocal) { config.setMaxTaskParallelism(2); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("test", config, topology); Thread.sleep(5000000); localCluster.shutdown(); } else { config.setNumWorkers(numWorkers); StormSubmitter.submitTopology(args[0], config, topology); } } public static void main(String[] args) throws Exception { LogTopology topology = new LogTopology(); topology.runScenario(args); } }Tato třída vytvoří novou Event Hubs spout pomocí vlastností v konfiguračním souboru k vytvoření instance. Je důležité si uvědomit, že tento příklad vytvoří tolik úloh spoutů jako počet oddílů v centru událostí, aby bylo možné použít maximální paralelismus povolený tímto centrem událostí.
Další kroky
Další informace o službě Event Hubs najdete na následujících odkazech: