Usare Apache Kafka (anteprima) con Storm in HDInsight

Informazioni su come usare Apache Storm per leggere e scrivere in Apache Kafka. Questo esempio illustra anche come salvare i dati da una topologia di Storm nel file system compatibile con HDFS usato da HDInsight.

Nota

La procedura descritta in questo documento permette di creare un gruppo di risorse di Azure che contiene sia un cluster Storm in HDInsight che un cluster Kafka in HDInsight. Entrambi questi cluster si trovano all'interno di una rete virtuale di Azure, che consente al cluster Storm di comunicare direttamente con il cluster Kafka.

Al termine della procedura descritta in questo documento, eliminare i cluster per evitare costi supplementari.

Ottenere il codice

Il codice per l'esempio usato in questo documento è disponibile all'indirizzo https://github.com/Azure-Samples/hdinsight-storm-java-kafka.

Per compilare questo progetto, è necessaria la seguente configurazione per l'ambiente di sviluppo:

  • Java JDK 1.8 o versione successiva. Per HDInsight 3.5 o una versione successiva è necessario Java 8.

  • Maven 3.x

  • Un client SSH (sono necessari i comandi ssh e scp). Per altre informazioni, vedere Usare SSH con HDInsight.

  • Un editor di testo o ambiente IDE.

Le variabili di ambiente seguenti possono essere impostate quando si installa Java e l'JDK nella workstation di sviluppo. È tuttavia necessario verificare che esistano e che contengano i valori corretti per il sistema in uso.

  • JAVA_HOME: deve puntare alla directory in cui è installato JDK.
  • PATH: deve contenere i percorsi seguenti:

    • JAVA_HOME o il percorso equivalente.
    • JAVA_HOME\bin o il percorso equivalente.
    • Directory in cui è installato Maven.

Creare i cluster

Apache Kafka in HDInsight non fornisce l'accesso ai broker Kafka tramite Internet pubblico. Tutto ciò che comunica con Kafka deve trovarsi nella stessa rete virtuale di Azure dei nodi del cluster Kafka. Per questo esempio, i cluster Storm e Kafka si trovano entrambi in una rete virtuale di Azure. Il diagramma seguente illustra il flusso delle comunicazioni tra i cluster:

Diagramma dei cluster Storm e Kafka in una rete virtuale di Azure

Nota

Altri servizi nel cluster, ad esempio SSH e Ambari, sono accessibili tramite Internet. Per altre informazioni sulle porte pubbliche disponibili con HDInsight, vedere Porte e URI usati da HDInsight.

Anche se è possibile creare manualmente cluster Storm e Kafka e una rete virtuale di Azure, è più semplice usare un modello di Azure Resource Manager. Seguire questa procedura per distribuire cluster Storm e Kafka e una rete virtuale di Azure nella sottoscrizione di Azure.

  1. Usare il pulsante seguente per accedere ad Azure e aprire il modello nel portale di Azure.

    Deploy to Azure

    Il modello di Azure Resource Manager è disponibile all'indirizzo https://hditutorialdata.blob.core.windows.net/armtemplates/create-linux-based-kafka-storm-cluster-in-vnet-v1.json. Crea le risorse seguenti:

    • Gruppo di risorse di Azure
    • Rete virtuale di Azure
    • Account di archiviazione di Azure
    • Kafka in HDInsight versione 3.6 con tre nodi di lavoro
    • Storm in HDInsight versione 3.6 con tre nodi di lavoro
    Avviso

    Per garantire la disponibilità di Kafka in HDInsight, il cluster deve contenere almeno tre nodi del ruolo di lavoro. Questo modello crea un cluster Kafka contenente tre nodi di lavoro.

  2. Usare le linee guida seguenti per popolare le voci nel pannello Distribuzione personalizzata:

    Distribuzione personalizzata di HDInsight

    • Gruppo di risorse: creare un gruppo o selezionarne uno esistente. Questo gruppo contiene il cluster HDInsight.

    • Località: scegliere una località geograficamente vicina.

    • Base Cluster Name (Nome di base del cluster): questo valore viene usato come nome di base per i cluster Storm e Kafka. Ad esempio, se si immette hdi viene creato un cluster Storm denominato storm-hdi e un cluster Kafka denominato kafka-hdi.

    • Cluster Login User Name (Nome utente di accesso del cluster): nome utente amministratore per i cluster Storm e Kafka.

    • Cluster Login Password (Password di accesso del cluster): password amministratore per i cluster Storm e Kafka.

    • SSH User Name (Nome utente SSH): utente SSH da creare per i cluster Storm e Kafka.

    • SSH Password (Password SSH): password dell'utente SSH per i cluster Storm e Kafka.

  3. Leggere le Condizioni e quindi selezionare Accetto le condizioni riportate sopra.

  4. Selezionare infine Aggiungi al dashboard e quindi Acquista. La creazione dei cluster richiede circa 20 minuti.

Dopo avere creato le risorse, viene visualizzato il pannello del gruppo di risorse.

Pannello Gruppo di risorse per la rete virtuale e i cluster

Importante

Si noti che i nomi dei cluster HDInsight sono storm-BASENAME e kafka-BASENAME, dove BASENAME è il nome specificato per il modello. Questi nomi verranno usati nei passaggi successivi per la connessione ai cluster.

Informazioni sul codice

Questo progetto contiene due topologie:

  • KafkaWriter: questa topologia, definita dal file writer.yaml, scrive frasi casuali in Kafka usando il KafkaBolt fornito con Apache Storm.

    Questa topologia usa un componente SentenceSpout personalizzato per generare frasi casuali.

  • KafkaReader: questa topologia, definita dal file reader.yaml, legge i dati da Kafka usando il KafkaSpout fornito con Apache Storm e quindi registra i dati in stdout.

    Questa topologia usa iStorm HdfsBolt per scrivere dati nell'archivio predefinito per il cluster Storm.

    Flux

Le topologie vengono definite tramite Flux. Flux è stato introdotto in Storm 0.10.x e consente di separare la configurazione della topologia dal codice. Per le topologie che fanno uso del framework Flux, la topologia viene definita in un file YAML. Il file YAML può essere incluso come parte della topologia. Può essere anche un file autonomo usato quando si invia la topologia. Flux supporta anche la sostituzione delle variabili in fase di esecuzione, caratteristica che viene usata in questo esempio.

I parametri seguenti vengono impostati in fase di esecuzione per queste topologie:

  • ${kafka.topic}: nome dell'argomento Kafka usato dalla topologie per la lettura/scrittura.

  • ${kafka.broker.hosts}: host in cui vengono eseguiti i broker di Kafka. Le informazioni sui broker vengono usate da KafkaBolt durante la scrittura in Kafka.

  • ${kafka.zookeeper.hosts}: host in cui viene eseguito Zookeeper nel cluster di Kafka.

Per altre informazioni sulle topologie di Flux, vedere https://storm.apache.org/releases/1.1.0/flux.html.

Scaricare e compilare il progetto

  1. Nell'ambiente di sviluppo scaricare il progetto dall'indirizzo https://github.com/Azure-Samples/hdinsight-storm-java-kafka, aprire una riga di comando e passare al percorso in cui è stato scaricato il progetto.

  2. Dalla directory hdinsight-storm-java-kafka usare il comando seguente per compilare il progetto e creare un pacchetto per la distribuzione:

    mvn clean package
    

    Il processo del pacchetto crea un file denominato KafkaTopology-1.0-SNAPSHOT.jar nella directory target.

  3. Usare i comandi seguenti per copiare il pacchetto nel cluster Storm in HDInsight. Sostituire USERNAME con il nome utente SSH per il cluster. Sostituire BASENAME con il nome di base usato durante la creazione del cluster.

    scp ./target/KafkaTopology-1.0-SNAPSHOT.jar USERNAME@storm-BASENAME-ssh.azurehdinsight.net:KafkaTopology-1.0-SNAPSHOT.jar
    

    Quando richiesto, immettere la password usata durante la creazione del cluster.

Configurare la topologia

  1. Usare uno dei metodi seguenti per individuare gli host del broker Kafka:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER" `
        -Credential $creds
    $respObj = ConvertFrom-Json $resp.Content
    $brokerHosts = $respObj.host_components.HostRoles.host_name
    ($brokerHosts -join ":9092,") + ":9092"
    
    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER" | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")'
    
    Importante

    Bash di esempio presuppone che $CLUSTERNAME contenga il nome del cluster HDInsight. Presuppone anche che jq sia installato. Quando richiesto, immettere la password dell'account di accesso al cluster.

    Il valore restituito è simile al testo seguente:

     wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
    

    Salvare questo valore, perché verrà usato in un secondo momento.

  2. Usare uno dei metodi seguenti per individuare gli host di Kafka Zookeeper:

    $creds = Get-Credential -UserName "admin" -Message "Enter the HDInsight login"
    $clusterName = Read-Host -Prompt "Enter the Kafka cluster name"
    $resp = Invoke-WebRequest -Uri "https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" `
        -Credential $creds
    $respObj = ConvertFrom-Json $resp.Content
    $zookeeperHosts = $respObj.host_components.HostRoles.host_name
    ($zookeeperHosts -join ":2181,") + ":2181"
    
    curl -su admin -G "https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")'
    
    Importante

    Bash di esempio presuppone che $CLUSTERNAME contenga il nome del cluster HDInsight. Presuppone anche che jq sia installato. Quando richiesto, immettere la password dell'account di accesso al cluster.

    Il valore restituito è simile al testo seguente:

     zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk3-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
    

    Salvare questo valore, perché verrà usato in un secondo momento.

  3. Modificare il file dev.properties nella radice del progetto. Aggiungere le informazioni di host di Broker e Zookeeper per le righe corrispondenti in questo file. Nell'esempio seguente viene configurato con i valori di esempio dei passaggi precedenti:

     kafka.zookeeper.hosts: zk0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181,zk3-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:2181
     kafka.broker.hosts: wn0-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn1-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092,wn2-kafka.53qqkiavjsoeloiq3y1naf4hzc.ex.internal.cloudapp.net:9092
     kafka.topic: stormtopic
    
  4. Salvare il file dev.properties e quindi usare il comando seguente per caricarlo nel cluster Storm:

    scp dev.properties USERNAME@storm-BASENAME-ssh.azurehdinsight.net:KafkaTopology-1.0-SNAPSHOT.jar
    

    Sostituire USERNAME con il nome utente SSH per il cluster. Sostituire BASENAME con il nome di base usato durante la creazione del cluster.

Avviare il writer

  1. Usare quanto segue per connettersi al cluster Storm tramite SSH. Sostituire USERNAME con il nome utente SSH usato durante la creazione del cluster. Sostituire BASENAME con il nome di base usato durante la creazione del cluster.

    ssh USERNAME@storm-BASENAME-ssh.azurehdinsight.net
    

    Quando richiesto, immettere la password usata durante la creazione del cluster.

    Per altre informazioni, vedere Usare SSH con HDInsight.

  2. Dalla connessione SSH, usare il comando seguente per creare un argomento Kafka usato dalla topologia:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stormtopic --zookeeper $KAFKAZKHOSTS
    

    Sostituire $KAFKAZKHOSTS con l'informazione host di Zookeeper recuperata nella sezione precedente.

  3. Dalla connessione SSH al cluster Storm usare il comando seguente per avviare la topologia del writer:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writer.yaml --filter dev.properties
    

    I parametri usati con questo comando sono i seguenti:

    • org.apache.storm.flux.Flux: usa Flux per configurare ed eseguire questa topologia.

    • --remote: invia la topologia a Nimbus. La topologia viene distribuita ai nodi del ruolo di lavoro nel cluster.

    • -R /writer.yaml: usa il file writer.yaml per configurare la topologia. -R indica che questa risorsa è inclusa nel file JAR. È nella radice del file JAR, quindi /writer.yaml è il relativo percorso.

    • --filter: consente di compilare le voci nella topologia writer.yaml con i valori nel file dev.properties. Ad esempio, il valore della voce kafka.topic nel file viene usato per sostituire la voce ${kafka.topic} nella definizione della topologia.

  4. Dopo aver avviato la topologia, usare il comando seguente per verificare che nell'argomento Kafka vengano scritti i dati:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $KAFKAZKHOSTS --from-beginning --topic stormtopic
    

    Sostituire $KAFKAZKHOSTS con l'informazione host di Zookeeper recuperata nella sezione precedente.

    Questo comando usa uno script fornito con Kafka per monitorare l'argomento. Dopo qualche secondo dovrebbe iniziare a restituire frasi casuali che sono state scritte nell'argomento. L'output è simile all'esempio seguente:

     i am at two with nature             
     an apple a day keeps the doctor away
     snow white and the seven dwarfs     
     the cow jumped over the moon        
     an apple a day keeps the doctor away
     an apple a day keeps the doctor away
     the cow jumped over the moon        
     an apple a day keeps the doctor away
     an apple a day keeps the doctor away
     four score and seven years ago      
     snow white and the seven dwarfs     
     snow white and the seven dwarfs     
     i am at two with nature             
     an apple a day keeps the doctor away
    

    Usare CTRL+C per arrestare lo script.

Avviare il reader

  1. Dalla sessione SSH nel cluster Storm usare il comando seguente per avviare la topologia del reader:

    storm jar KafkaTopology-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /reader.yaml -e
    
  2. Dopo aver avviato la topologia, aprire l'interfaccia utente di Storm. Questa interfaccia utente Web è disponibile all'indirizzo https://storm-BASENAME.azurehdinsight.net/stormui. Sostituire BASENAME con il nome di base usato durante la creazione del cluster.

    Quando richiesto, usare il nome dell'account di accesso amministratore, la cui impostazione predefinita è admin, e la password usati durante la creazione del cluster. Verrà visualizzata una pagina Web simile all'immagine seguente:

    Interfaccia utente di Storm

  3. Dall'interfaccia utente di Storm, selezionare il collegamento kafka-reader nella sezione Topology Summary (Riepilogo topologia) per visualizzare informazioni sulla topologia kafka-reader.

    Sezione di riepilogo della topologia dell'interfaccia utente Web di Storm

  4. Selezionare il collegamento logger-bolt nella sezione Bolts (All time) (Bolt - Intero periodo) per visualizzare informazioni sulle istanze del componente logger-bolt.

    Collegamento logger-bolt nella sezione dei bolt

  5. Nella sezione Executors (Esecutori) selezionare un collegamento nella colonna Port (Porta) per visualizzare le informazioni di registrazione su questa istanza del componente.

    Collegamento esecutori

    Il log contiene un elenco dei dati letti dall'argomento Kafka. Le informazioni contenute nel log sono simili all'esempio seguente:

     2016-11-04 17:47:14.907 c.m.e.LoggerBolt [INFO] Received data: four score and seven years ago
     2016-11-04 17:47:14.907 STDIO [INFO] the cow jumped over the moon
     2016-11-04 17:47:14.908 c.m.e.LoggerBolt [INFO] Received data: the cow jumped over the moon
     2016-11-04 17:47:14.911 STDIO [INFO] snow white and the seven dwarfs
     2016-11-04 17:47:14.911 c.m.e.LoggerBolt [INFO] Received data: snow white and the seven dwarfs
     2016-11-04 17:47:14.932 STDIO [INFO] snow white and the seven dwarfs
     2016-11-04 17:47:14.932 c.m.e.LoggerBolt [INFO] Received data: snow white and the seven dwarfs
     2016-11-04 17:47:14.969 STDIO [INFO] an apple a day keeps the doctor away
     2016-11-04 17:47:14.970 c.m.e.LoggerBolt [INFO] Received data: an apple a day keeps the doctor away
    

Arrestare le topologie

Dalla sessione SSH nel cluster Storm usare i comandi seguenti per arrestare le topologie di Storm:

storm kill kafka-writer
storm kill kafka-reader

Eliminare il cluster

Avviso

La fatturazione dei cluster HDInsight viene calcolata al minuto, indipendentemente dal fatto che siano in uso o meno. Assicurarsi di eliminare il cluster dopo aver finito di usarlo. Per altre informazioni, vedere l'articolo su come eliminare un cluster HDInsight.

Le procedure illustrate in questo documento creano entrambi i cluster nello stesso gruppo di risorse di Azure. È quindi possibile eliminare il gruppo di risorse dal portale di Azure. Se si elimina il gruppo di risorse, tutte le risorse create seguendo questo documento vengono rimosse.

Passaggi successivi

Per altri esempi di topologie che possono essere usate con Storm in HDInsight, vedere Esempi di topologie e componenti Storm per Apache Storm in HDInsight.

Per informazioni sulla distribuzione e sul monitoraggio di topologie in HDInsight basato su Linux, vedere Distribuzione e gestione di topologie Apache Storm in HDInsight basato su Linux.