Usare il cluster HDInsight Spark per analizzare i dati in Data Lake Store

In questa esercitazione si usa un notebook Jupyter disponibile con i cluster HDInsight Spark per eseguire un processo che legge i dati da un account Data Lake Store.

Prerequisiti

Preparare i dati

Nota

Non è necessario eseguire questo passaggio se è stato creato il cluster HDInsight con Data Lake Store come spazio di archiviazione predefinito. I processi di creazione del cluster aggiunge alcuni dati di esempio nell'account di Data Lake Store specificato durante la creazione del cluster. Passare alla sezione Usare il cluster e HDInsight Spark con Data Lake Store.

Se è stato creato un cluster HDInsight con Data Lake Store come risorsa di archiviazione aggiuntiva e un BLOB di Archiviazione di Azure come risorsa di archiviazione predefinita, è necessario copiare prima di tutto alcuni dati di esempio nell'account Data Lake Store. È possibile usare i dati di esempio dal BLOB di Archiviazione di Azure associati al cluster HDInsight. Per farlo, è possibile usare lo strumento ADLCopy . Scaricare e installare lo strumento dal collegamento.

  1. Aprire un prompt dei comandi e passare alla directory in cui è installato AdlCopy, in genere %HOMEPATH%\Documents\adlcopy.

  2. Eseguire il comando seguente per copiare un BLOB specifico dal contenitore di origine a un'istanza di Archivio Data Lake:

     AdlCopy /source https://<source_account>.blob.core.windows.net/<source_container>/<blob name> /dest swebhdfs://<dest_adls_account>.azuredatalakestore.net/<dest_folder>/ /sourcekey <storage_account_key_for_storage_container>
    

    Copiare il file di dati di esempio HVAC.csv disponibile nel percorso /HdiSamples/HdiSamples/SensorSampleData/hvac/ nell'account Azure Data Lake Store. Il frammento di codice dovrebbe essere simile al seguente:

     AdlCopy /Source https://mydatastore.blob.core.windows.net/mysparkcluster/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv /dest swebhdfs://mydatalakestore.azuredatalakestore.net/hvac/ /sourcekey uJUfvD6cEvhfLoBae2yyQf8t9/BpbWZ4XoYj4kAS5Jf40pZaMNf0q6a8yqTxktwVgRED4vPHeh/50iS9atS5LQ==
    
    Avviso

    Assicurarsi che nel nome del file e nel percorso le maiuscole siano corrette.

  3. Verrà richiesto di immettere le credenziali per la sottoscrizione di Azure in cui si trova l'account Azure Data Lake Store. L'output visualizzato sarà simile al seguente:

     Initializing Copy.
     Copy Started.
     100% data copied.
     Copy Completed. 1 file copied.
    

    Il file di dati (HVAC.csv) sarà copiato in una cartella denominata /hvac nell'account Data Lake Store.

Usare un cluster HDInsight Spark con Data Lake Store

  1. Dalla Schermata iniziale del portale di Azurefare clic sul riquadro del cluster Spark (se è stato aggiunto sulla Schermata iniziale). È anche possibile passare al cluster da Esplora tutto > Cluster HDInsight.

  2. Dal pannello del cluster Spark fare clic su Collegamenti rapidi e dal pannello Dashboard cluster fare clic su Notebook di Jupyter. Se richiesto, immettere le credenziali per il cluster.

    Nota

    È anche possibile raggiungere il notebook di Jupyter per il cluster aprendo l'URL seguente nel browser. Sostituire CLUSTERNAME con il nome del cluster:

    https://CLUSTERNAME.azurehdinsight.net/jupyter

  3. Creare un nuovo notebook. Fare clic su Nuovo e quindi su PySpark.

    Creare un nuovo notebook Jupyter

  4. Poiché il notebook è stato creato tramite il kernel PySpark, non è necessario creare contesti in modo esplicito. I contesti Spark e Hive vengono creati automaticamente quando si esegue la prima cella di codice. È possibile iniziare con l'importazione dei tipi necessari per questo scenario. A tale scopo incollare il frammento di codice seguente in una cella e premere MAIUSC+INVIO.

     from pyspark.sql.types import *
    

    Ogni volta che viene eseguito un processo in Jupyter, il titolo della finestra del Web browser visualizzerà lo stato (Occupato) accanto al titolo del notebook. È anche visibile un cerchio pieno accanto al testo PySpark nell'angolo in alto a destra. Dopo il completamento del processo, viene visualizzato un cerchio vuoto.

    Stato di un processo del notebook Jupyter

  5. Caricare i dati di esempio in una tabella temporanea mediante il file HVAC.csv che è stato copiato nell'account Data Lake Store. È possibile accedere ai dati dell'account di Archivio Data Lake mediante il seguente modello di URL.

    • Se è presente un Data Lake Store come risorsa di archiviazione predefinita, il file HVAC.csv sarà disponibile in un percorso analogo all'URL seguente:

        adl://<data_lake_store_name>.azuredatalakestore.net/<cluster_root>/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv
      

      In alternativa, è possibile anche usare un formato abbreviato, ad esempio il seguente:

        adl:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv
      
    • Se è presente un Data Lake Store come risorsa di archiviazione aggiuntiva, il file HVAC.csv sarà disponibile nel percorso in cui è stato copiato, ad esempio:

        adl://<data_lake_store_name>.azuredatalakestore.net/<path_to_file>
      

      In una cella vuota incollare l'esempio di codice seguente, sostituire MYDATALAKESTORE con il nome dell'account Data Lake Store e premere MAIUSC + INVIO. Questo esempio di codice registra i dati in una tabella temporanea denominata hvac.

        # Load the data. The path below assumes Data Lake Store is default storage for the Spark cluster
        hvacText = sc.textFile("adl://MYDATALAKESTORE.azuredatalakestore.net/cluster/mysparkcluster/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
      
        # Create the schema
        hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)])
      
        # Parse the data in hvacText
        hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) ))
      
        # Create a data frame
        hvacdf = sqlContext.createDataFrame(hvac,hvacSchema)
      
        # Register the data fram as a table to run queries against
        hvacdf.registerTempTable("hvac")
      
  6. Dato che si usa un kernel PySpark, è ora possibile eseguire direttamente una query SQL sulla tabella temporanea hvac appena creata usando il comando Magic %%sql. Per altre informazioni sul magic %%sql e sugli altri magic disponibili con il kernel PySpark, vedere Kernel disponibili per i notebook di Jupyter con cluster Spark in HDInsight.

     %%sql
     SELECT buildingID, (targettemp - actualtemp) AS temp_diff, date FROM hvac WHERE date = \"6/1/13\"
    
  7. Una volta che il processo viene completato correttamente, per impostazione predefinita viene visualizzato l'output tabulare seguente.

    Tabella di output dei risultati della query

    È anche possibile visualizzare i risultati in altri formati. Ad esempio, un grafico ad area per lo stesso output apparirebbe come segue.

    Grafico ad area dei risultati della query

  8. Al termine dell'esecuzione dell'applicazione, è necessario arrestare il notebook per rilasciare le risorse. A tale scopo, dal menu File del notebook fare clic su Close and Halt (Chiudi e interrompi). Questa operazione consente di arrestare e chiudere il notebook.

Passaggi successivi