Esercizio - Eseguire lo streaming dei dati Kafka in un notebook di Jupyter e definire finestre temporali per i dati

Completato

Il cluster Kafka sta scrivendo i dati nel log per una successiva elaborazione tramite Spark Structured Streaming.

Nell'esempio clonato è incluso un notebook di Spark, che deve essere caricato nel cluster Spark per l'uso.

Caricare il notebook Python nel cluster Spark

  1. Nel portale di Azure fare clic su Home > Cluster HDInsight e quindi selezionare il cluster Spark appena creato (non il cluster Kafka).

  2. Nel riquadro Dashboard cluster fare clic su Notebook di Jupyter.

    Opening a Jupyter notebook

  3. Quando vengono richieste le credenziali, immettere un nome utente di amministratore e la password definita durante la creazione dei cluster. Verrà visualizzato il sito Web di Jupyter.

  4. Fare clic su PySpark e nella pagina PySpark fare clic su Carica.

  5. Passare al percorso in cui l'esempio è stato scaricato da GitHub, selezionare il file RealTimeStocks.ipynb, fare clic su Apri, quindi su Carica e infine fare clic su Aggiorna nel browser Internet.

  6. Dopo il caricamento del notebook nella cartella PySpark, fare clic su RealTimeStocks.ipynb per aprire il notebook nel browser.

  7. Eseguire la prima cella nel notebook posizionando il cursore nella cella e quindi premendo MAIUSC+INVIO.

    La cella Configure Libraries and Packages risulta completata quando viene visualizzato il messaggio di avvio dell'applicazione Spark insieme a informazioni aggiuntive, come illustrato nella didascalia della schermata seguente.

    Configuring libraries in a Jupyter notebook

  8. Nella riga .option("kafka.bootstrap.servers", "") della cella Set-up Connection to Kafka immettere il broker Kafka all'interno del secondo set di virgolette. Ad esempio, .option("kafka.bootstrap.servers", "wn0-kafka.mdlamldganads.gx.internal.cloudapp.net:9092") e quindi premere MAIUSC+INVIO per eseguire la cella.

    La cella Set-up Connection to Kafka risulta completata quando viene visualizzato il messaggio seguente: inputDf: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]. Per la lettura dei dati, Spark usa l'API readStream.

    Set-up a connection to Kafka

  9. Selezionare la cella Read from Kafka into Streaming Dataframe e quindi premere MAIUSC+INVIO per eseguirla.

    La cella risulta completata quando viene visualizzato il messaggio seguente: stockDf: org.apache.spark.sql.DataFrame = [symbol: string, time: string ... 2 more fields]

    Read from Kafka into Streaming Dataframe

  10. Selezionare la cella Output Streaming Dataframe to Console e quindi premere MAIUSC+INVIO per eseguirla.

    La cella risulta completata quando vengono visualizzate informazioni simili a quelle riportate di seguito. L'output mostra il valore di ogni cella passata nel microbatch e viene riportato un batch al secondo.

    Output a Streaming Dataframe to a Console

  11. Selezionare la cella Windowed Stock Min / Max e quindi premere MAIUSC+INVIO per eseguirla.

    La cella risulta completata quando fornisce il prezzo massimo e minimo per ogni titolo nella finestra di 4 secondi, come definita nella cella. Come spiegato in un'unità precedente, la disponibilità di informazioni su finestre temporali specifiche è uno dei vantaggi che è possibile ottenere usando Spark Structured Streaming.

    An example of a using a minimum and maximum aggregate function

  12. Selezionare la cella Collect all values for a stock e quindi premere MAIUSC+INVIO per eseguirla.

    La cella risulta completata quando restituisce una tabella dei valori dei titoli azionari nella tabella. Il valore di outputMode è "complete" e quindi tutti i dati sono visualizzati.

    An example of a using a total aggregate function

In questa unità è stato caricato un notebook di Jupyter in un cluster Spark, è stata stabilita una connessione al cluster Kafka, sono stati passati i dati di streaming creati dal file producer Python al notebook di Spark, è stata definita una finestra temporale per i dati di streaming e sono stati visualizzati i valori massimi e minimi dei titoli azionari in tale finestra. Sono stati infine visualizzati tutti i valori dei titoli nella tabella. La generazione del flusso strutturato con Spark e Kafka è stata completata.