Scrivere da Apache Storm a HDFS in HDInsight

Informazioni su come usare Storm per scrivere dati in un archivio compatibile con HDFS usato da Apache Storm in HDInsight. HDInsight può usare sia Archiviazione di Azure che Azure Data Lake Store come archivio compatibile con HDFS. Storm offre un componente HdfsBolt che scrive i dati in HDFS. Questo documento contiene informazioni su come eseguire operazioni di scrittura da HdfsBolt in entrambi gli archivi.

Importante

La topologia di esempio usata in questo documento si basa su componenti inclusi con Storm in HDInsight. Può essere necessario apportare delle modifiche per usare Azure Data Lake Store con altri cluster Apache Storm.

Ottenere il codice

Il progetto contenente questa topologia è disponibile per il download da https://github.com/Azure-Samples/hdinsight-storm-azure-data-lake-store.

Per compilare questo progetto, è necessaria la seguente configurazione per l'ambiente di sviluppo:

  • Java JDK 1.8 o versione successiva. Per HDInsight 3.5 o una versione successiva è necessario Java 8.

  • Maven 3.x

Le variabili di ambiente seguenti possono essere impostate quando si installa Java e l'JDK nella workstation di sviluppo. È tuttavia necessario verificare che esistano e che contengano i valori corretti per il sistema in uso.

  • JAVA_HOME - deve puntare alla directory dove è installato JDK.
  • PATH: deve contenere i percorsi seguenti:

    • JAVA_HOME o il percorso equivalente.
    • JAVA_HOME\bin o il percorso equivalente.
    • Directory in cui è installato Maven.

Procedura per usare HdfsBolt con HDInsight

Importante

Prima di usare HdfsBolt con Storm in HDInsight, è necessario usare l'azione script per copiare i file .jar richiesti in extpath per Storm. Per altre informazioni, vedere la sezione Configurare il cluster.

HdfsBolt usa lo schema file specificato dall'utente per capire come scrivere in HDFS. Per HDInsight, usare uno dei seguenti schemi:

  • wasb://: usato con l'account di archiviazione di Azure.
  • adl://: usato con Azure Data Lake Store.

La seguente tabella riporta esempi di utilizzo dello schema di file in diversi scenari:

Schema Note
wasb:/// L'account di archiviazione predefinito è un contenitore BLOB nell'account di archiviazione di Azure.
adl:/// L'account di archiviazione predefinito è una directory in Azure Data Lake Store. Durante la creazione del cluster, si specifica la directory in Data Lake Store che rappresenta la radice dell'HDFS del cluster. Ad esempio, la directory /clusters/myclustername/.
wasb://CONTAINER@ACCOUNT.blob.core.windows.net/ Un account di archiviazione di Azure non predefinito (aggiuntivo) associato al cluster.
adl://STORENAME/ La radice di Data Lake Store usata dal cluster. Questo schema permette di accedere ai dati situati all'esterno della directory che contiene il file system del cluster.

Per altre informazioni, vedere il riferimento HdfsBolt su Apache.org.

Configurazione di esempio

Il seguente YAML è un estratto del file resources/writetohdfs.yaml incluso nell'esempio. Questo file definisce la topologia di Storm usando il framework Flux per Apache Storm.

components:
  - id: "syncPolicy"
    className: "org.apache.storm.hdfs.bolt.sync.CountSyncPolicy"
    constructorArgs:
      - 1000

  - id: "rotationPolicy"
    className: "org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy"

  - id: "fileNameFormat"
    className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
    configMethods:
      - name: "withPath"
        args: ["${hdfs.write.dir}"]
      - name: "withExtension"
        args: [".txt"]

  - id: "recordFormat"
    className: "org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat"
    configMethods:
      - name: "withFieldDelimiter"
        args: ["|"]

# spout definitions
spouts:
  - id: "tick-spout"
    className: "com.microsoft.example.TickSpout"
    parallelism: 1


# bolt definitions
bolts:
  - id: "hdfs-bolt"
    className: "org.apache.storm.hdfs.bolt.HdfsBolt"
    configMethods:
      - name: "withConfigKey"
        args: ["hdfs.config"]
      - name: "withFsUrl"
        args: ["${hdfs.url}"]
      - name: "withFileNameFormat"
        args: [ref: "fileNameFormat"]
      - name: "withRecordFormat"
        args: [ref: "recordFormat"]
      - name: "withRotationPolicy"
        args: [ref: "rotationPolicy"]
      - name: "withSyncPolicy"
        args: [ref: "syncPolicy"]

Questo YAML definisce i seguenti elementi:

  • syncPolicy: specifica quando i file vengono sincronizzati/allineati con il file system. In questo esempio, ogni 1000 tuple.
  • fileNameFormat: specifica il modello di percorso e nome del file da usare per la scrittura di file. In questo esempio, il percorso viene fornito in fase di runtime usando un filtro e l'estensione è .txt.
  • recordFormat: specifica il formato interno dei file scritti. In questo esempio, i campi sono delimitati dal carattere |.
  • rotationPolicy: specifica quando ruotare i file. In questo esempio, non viene eseguita alcuna rotazione.
  • hdfs-bolt: usa i componenti precedenti come parametri di configurazione per la classe HdfsBolt.

Per altre informazioni sul framework Flux, vedere https://storm.apache.org/releases/1.1.0/flux.html.

Configurare il cluster

Per impostazione predefinita, Storm in HDInsight non include i componenti che HdfsBolt usa per comunicare con Archiviazione di Azure o Data Lake Store nel classpath di Storm. Usare la seguente azione script per aggiungere questi componenti alla directory extlib per Storm nel cluster:

| URI script | Nodi per l'applicazione| Parametri | | https://000aarperiscus.blob.core.windows.net/certs/stormextlib.sh | Nimbus, Supervisore | Nessuno |

Per informazioni sull'uso di questo script con il cluster, vedere il documento Customize HDInsight clusters using script actions (Personalizzare i cluster HDInsight con le azioni script).

Compilare e creare il pacchetto della topologia

  1. Scaricare il progetto di esempio da https://github.com/Azure-Samples/hdinsight-storm-azure-data-lake-store nell'ambiente di sviluppo.

  2. Da un prompt dei comandi, un terminale o una sessione shell, modificare le directory impostandole sulla directory principale del progetto scaricato. Per compilare e creare la topologia, usare il seguente comando:

     mvn compile package
    

    Al termine della compilazione e creazione, ci sarà una nuova directory denominata target, che contiene un file denominato StormToHdfs-1.0-SNAPSHOT.jar. Questo file contiene la topologia compilata.

Distribuire ed eseguire la topologia

  1. Usare il comando seguente per copiare la topologia nel cluster HDInsight. Sostituire USER con il nome utente SSH usato durante la creazione del cluster. Sostituire CLUSTERNAME con il nome del cluster.

     scp target\StormToHdfs-1.0-SNAPSHOT.jar USER@CLUSTERNAME-ssh.azurehdinsight.net:StormToHdfs1.0-SNAPSHOT.jar
    

    Quando richiesto, immettere la password usata durante la creazione dell'utente SSH per il cluster. Se è stata usata una chiave pubblica anziché una password, può essere necessario usare il parametro -i per specificare il percorso della chiave privata corrispondente.

    Nota

    Per altre informazioni sull'uso di scp con HDInsight, vedere Use SSH with HDInsight (Uso di SSH con HDInsight).

  2. Al termine del caricamento, usare il comando seguente per connettersi al cluster HDInsight tramite SSH. Sostituire USER con il nome utente SSH usato durante la creazione del cluster. Sostituire CLUSTERNAME con il nome del cluster.

     ssh USER@CLUSTERNAME-ssh.azurehdinsight.net
    

    Quando richiesto, immettere la password usata durante la creazione dell'utente SSH per il cluster. Se è stata usata una chiave pubblica anziché una password, può essere necessario usare il parametro -i per specificare il percorso della chiave privata corrispondente.

    Per altre informazioni, vedere Usare SSH con HDInsight.

  3. Dopo la connessione, usare il comando seguente per creare un file denominato dev.properties:

     nano dev.properties
    
  4. Usare il testo seguente come contenuto del file dev.properties:

     hdfs.write.dir: /stormdata/
     hdfs.url: wasb:///
    

    Importante

    In questo esempio si presuppone che il cluster usi un account di archiviazione di Azure come archivio predefinito. Se il cluster usa Azure Data Lake Store, usare invece hdfs.url: adl:///.

    Per salvare il file, usare CTRL + X, poi Y__e infine premere __Invio. I valori in questo file impostano l'URL di Data Lake Store e il nome della directory in cui vengono scritti i dati.

  5. Per avviare la topologia, usare il comando seguente:

     storm jar StormToHdfs-1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote -R /writetohdfs.yaml --filter dev.properties
    

    Questo comando avvia la topologia usando il framework Flux, inviandolo al nodo Nimbus del cluster. La topologia viene definita mediante il file writetohdfs.yaml incluso nel file con estensione jar. Il file dev.properties viene passato come filtro e la topologia legge i dati contenuti nel file.

Visualizzare i dati di output

Per visualizzare i dati, usare il comando seguente:

hdfs dfs -ls /stormdata/

Viene visualizzato un elenco dei file creati dalla topologia.

Nell'elenco seguente è riportato un esempio dei dati restituiti dai comandi precedenti:

Found 30 items
-rw-r-----+  1 sshuser sshuser       5120 2017-03-03 19:13 /stormdata/hdfs-bolt-3-0-1488568403092.txt
-rw-r-----+  1 sshuser sshuser       5120 2017-03-03 19:13 /stormdata/hdfs-bolt-3-1-1488568404567.txt
-rw-r-----+  1 sshuser sshuser       5120 2017-03-03 19:13 /stormdata/hdfs-bolt-3-10-1488568408678.txt
-rw-r-----+  1 sshuser sshuser       5120 2017-03-03 19:13 /stormdata/hdfs-bolt-3-11-1488568411636.txt
-rw-r-----+  1 sshuser sshuser       5120 2017-03-03 19:13 /stormdata/hdfs-bolt-3-12-1488568411884.txt
-rw-r-----+  1 sshuser sshuser       5120 2017-03-03 19:13 /stormdata/hdfs-bolt-3-13-1488568412603.txt
-rw-r-----+  1 sshuser sshuser       5120 2017-03-03 19:13 /stormdata/hdfs-bolt-3-14-1488568415055.txt

Arrestare la topologia

Le topologie Storm vengono eseguite fino all'arresto o all'eliminazione del cluster. Per arrestare la topologia, usare il seguente comando:

storm kill hdfswriter

Eliminare il cluster

Avviso

La fatturazione dei cluster HDInsight viene calcolata al minuto, indipendentemente dal fatto che siano in uso o meno. Assicurarsi di eliminare il cluster dopo aver finito di usarlo. Per altre informazioni, vedere l'articolo su come eliminare un cluster HDInsight.

Passaggi successivi

Dopo aver appreso come usare Storm per scrivere in Archiviazione di Azure e Azure Data Lake Store, è possibile vedere altri esempi di Storm per HDInsight.