Esercitazione: Usare lo streaming strutturato Spark con Kafka in HDInsightTutorial: Use Spark Structured Streaming with Kafka on HDInsight

Questa esercitazione illustra come usare lo streaming strutturato Spark per leggere e scrivere dati con Apache Kafka in Azure HDInsight.This tutorial demonstrates how to use Spark Structured Streaming to read and write data with Apache Kafka on Azure HDInsight.

Lo streaming strutturato Spark è un motore di elaborazione del flusso basato su Spark SQL.Spark structured streaming is a stream processing engine built on Spark SQL. Consente di esprimere i calcoli di streaming come il calcolo di batch in dati statici.It allows you to express streaming computations the same as batch computation on static data.

In questa esercitazione si apprenderà come:In this tutorial, you learn how to:

  • Streaming strutturato con KafkaStructured Streaming with Kafka
  • Creare cluster Kafka e SparkCreate Kafka and Spark clusters
  • Caricare il notebook in SparkUpload the notebook to Spark
  • Usare il notebookUse the notebook
  • Pulire le risorseClean up resources

Al termine della procedura descritta in questo documento, eliminare i cluster per evitare costi supplementari.When you are done with the steps in this document, remember to delete the clusters to avoid excess charges.

prerequisitiPrerequisites

Importante

La procedura descritta in questo documento richiede che venga creato un gruppo di risorse di Azure che contenga sia un cluster Spark in HDInsight che un cluster Kafka in HDInsight.The steps in this document require an Azure resource group that contains both a Spark on HDInsight and a Kafka on HDInsight cluster. Entrambi questi cluster si trovano all'interno di una rete virtuale di Azure, che consente al cluster Spark di comunicare direttamente con il cluster Kafka.These clusters are both located within an Azure Virtual Network, which allows the Spark cluster to directly communicate with the Kafka cluster.

Per comodità, questo documento si collega a un modello in grado di creare tutte le risorse di Azure necessarie.For your convenience, this document links to a template that can create all the required Azure resources.

Per altre informazioni sull'uso di HDInsight in una rete virtuale, vedere il documento Estendere Azure HDInsight usando Rete virtuale di Azure.For more information on using HDInsight in a virtual network, see the Extend HDInsight using a virtual network document.

Streaming strutturato con KafkaStructured Streaming with Kafka

Lo streaming strutturato Spark è un motore di elaborazione del flusso basato sul motore SQL di Spark.Spark Structured Streaming is a stream processing engine built on the Spark SQL engine. Quando si usa lo streaming strutturato, è possibile scrivere query di streaming nello stesso modo in cui si scrivono le query batch.When using Structured Streaming, you can write streaming queries the same way that you write batch queries.

I frammenti di codice seguenti illustrano la lettura da Kafka e l'archiviazione in file.The following code snippets demonstrate reading from Kafka and storing to file. La prima è un'operazione batch, mentre la seconda è un'operazione di flusso:The first one is a batch operation, while the second one is a streaming operation:

// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()
// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .write
                .format("parquet")
                .option("path","/example/batchtripdata")
                .option("checkpointLocation", "/batchcheckpoint")
                .save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()
// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .writeStream
                .format("parquet")
                .option("path","/example/streamingtripdata")
                .option("checkpointLocation", "/streamcheckpoint")
                .start.awaitTermination(30000)

In entrambi i frammenti di codice, i dati vengono letti da Kafka e scritti nel file.In both snippets, data is read from Kafka and written to file. Le differenze tra gli esempi sono:The differences between the examples are:

BatchBatch StreamingStreaming
read readStream
write writeStream
save start

L'operazione di streaming usa anche awaitTermination(30000), che arresta il flusso dopo 30000 ms.The streaming operation also uses awaitTermination(30000), which stops the stream after 30000 ms.

Per usare lo streaming strutturato con Kafka, il progetto deve avere una dipendenza sul pacchetto org.apache.spark : spark-sql-kafka-0-10_2.11.To use Structured Streaming with Kafka, your project must have a dependency on the org.apache.spark : spark-sql-kafka-0-10_2.11 package. La versione di questo pacchetto deve corrispondere alla versione di Spark in HDInsight.The version of this package should match the version of Spark on HDInsight. Per Spark 2.2.0 (disponibile in HDInsight 3.6), è possibile trovare le informazioni sulle dipendenze per diversi tipi di progetto in https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.For Spark 2.2.0 (available in HDInsight 3.6), you can find the dependency information for different project types at https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.

Per Jupyter Notebook incluso con questa esercitazione, la cella seguente carica questa dipendenza dal pacchetto:For the Jupyter Notebook provided with this tutorial, the following cell loads this package dependency:

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

Creare i clusterCreate the clusters

Apache Kafka in HDInsight non fornisce l'accesso ai broker Kafka tramite Internet pubblico.Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Qualunque elemento che usa Kafka deve trovarsi nella stessa rete virtuale di Azure.Anything that uses Kafka must be in the same Azure virtual network. In questa esercitazione i cluster Kafka e Spark si trovano nella stessa rete virtuale di Azure.In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.

Il diagramma seguente illustra il flusso delle comunicazioni tra Spark e Kafka:The following diagram shows how communication flows between Spark and Kafka:

Diagramma dei cluster Spark e Kafka in una rete virtuale di Azure

Nota

Il servizio Kafka è limitato alle comunicazioni all'interno della rete virtuale.The Kafka service is limited to communication within the virtual network. Altri servizi nel cluster, ad esempio SSH e Ambari, sono accessibili tramite Internet.Other services on the cluster, such as SSH and Ambari, can be accessed over the internet. Per altre informazioni sulle porte pubbliche disponibili con HDInsight, vedere Porte e URI usati da HDInsight.For more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

Per creare una Rete virtuale di Microsoft Azure e quindi crearvi i cluster Kafka e Spark, seguire questa procedura:To create an Azure Virtual Network, and then create the Kafka and Spark clusters within it, use the following steps:

  1. Usare il pulsante seguente per accedere ad Azure e aprire il modello nel portale di Azure.Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    Il modello di Azure Resource Manager è disponibile in https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.The Azure Resource Manager template is located at https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.

    Questo modello crea le risorse seguenti:This template creates the following resources:

    • Kafka nel cluster HDInsight 3.6.A Kafka on HDInsight 3.6 cluster.
    • Spark 2.2.0 nel cluster HDInsight 3.6.A Spark 2.2.0 on HDInsight 3.6 cluster.
    • Una rete virtuale Azure contenente i cluster HDInsight.An Azure Virtual Network, which contains the HDInsight clusters.

      Importante

      Il notebook di streaming strutturato che è stato usato in questa esercitazione richiede Spark 2.2.0 in HDInsight 3.6.The structured streaming notebook used in this tutorial requires Spark 2.2.0 on HDInsight 3.6. Se si usa una versione precedente di Spark in HDInsight, si ricevono errori durante l'uso del notebook.If you use an earlier version of Spark on HDInsight, you receive errors when using the notebook.

  2. Usare le informazioni seguenti per popolare le voci nella sezione Modello personalizzato:Use the following information to populate the entries on the Customized template section:

    ImpostazioneSetting ValoreValue
    SottoscrizioneSubscription Sottoscrizione di AzureYour Azure subscription
    Gruppo di risorseResource group Gruppo di risorse che contiene le risorse.The resource group that contains the resources.
    LocalitàLocation Area di Azure in cui vengono create le risorse.The Azure region that the resources are created in.
    Nome del cluster SparkSpark Cluster Name Nome del cluster Spark.The name of the Spark cluster. I primi sei caratteri devono essere diversi dal nome di cluster Kafka.The first six characters must be different than the Kafka cluster name.
    Nome del cluster KafkaKafka Cluster Name Nome del cluster Kafka.The name of the Kafka cluster. I primi sei caratteri devono essere diversi dal nome di cluster Spark.The first six characters must be different than the Spark cluster name.
    Nome utente dell'account di accesso del clusterCluster Login User Name Nome utente dell'amministratore per i cluster.The admin user name for the clusters.
    Password di accesso al clusterCluster Login Password Password dell'utente amministratore per i cluster.The admin user password for the clusters.
    Nome utente SSHSSH User Name Utente SSH da creare per i cluster.The SSH user to create for the clusters.
    Password SSHSSH Password Password per l'utente SSH.The password for the SSH user.

    Screenshot del modello personalizzato

  3. Leggere le Condizioni e quindi selezionare Accetto le condizioni riportate sopra.Read the Terms and Conditions, and then select I agree to the terms and conditions stated above

  4. Selezionare infine Aggiungi al dashboard e quindi Acquista.Finally, check Pin to dashboard and then select Purchase.

Nota

La creazione dei cluster può richiedere fino a 20 minuti.It can take up to 20 minutes to create the clusters.

Caricare il notebookUpload the notebook

Per caricare il notebook dal progetto nel cluster Spark su HDInsight, seguire questa procedura:To upload the notebook from the project to your Spark on HDInsight cluster, use the following steps:

  1. Scaricare il progetto da https://github.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming.Download the project from https://github.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming.

  2. Nel Web browser connettersi al notebook Jupyter nel cluster Spark.In your web browser, connect to the Jupyter notebook on your Spark cluster. Nell'URL seguente sostituire CLUSTERNAME con il nome del cluster Spark:In the following URL, replace CLUSTERNAME with the name of your Spark cluster:

     https://CLUSTERNAME.azurehdinsight.net/jupyter
    

    Quando richiesto, immettere l'account di accesso (amministratore) e la password usati durante la creazione del cluster.When prompted, enter the cluster login (admin) and password used when you created the cluster.

  3. Usare il pulsante Carica in alto a destra nella pagina per caricare il file spark-structured-streaming-kafka.ipynb nel cluster.From the upper right side of the page, use the Upload button to upload the spark-structured-streaming-kafka.ipynb file to the cluster. Selezionare Apri per avviare il caricamento.Select Open to start the upload.

    Usare il pulsante di caricamento per selezionare e caricare un notebook

    Selezionare il file KafkaStreaming.ipynb

  4. Trovare la voce spark-structured-streaming-kafka.ipynb nell'elenco dei notebook e selezionare il pulsante Carica accanto.Find the spark-structured-streaming-kafka.ipynb entry in the list of notebooks, and select Upload button beside it.

    Per caricare Notebook, usare il pulsante di caricamento accanto alla voce KafkaStreaming.ipynb

Usare il notebookUse the notebook

Dopo aver caricato i file, selezionare la voce spark-structured-streaming-kafka.ipynb per aprire il notebook.Once the files have been uploaded, select the spark-structured-streaming-kafka.ipynb entry to open the notebook. Per informazioni sull'uso dello streaming strutturato di Spark con Kafka su HDInsight, seguire le istruzioni disponibili nel notebook.To learn how to use Spark structured streaming with Kafka on HDInsight, follow the instructions in the notebook.

Pulire le risorseClean up resources

Per pulire le risorse create da questa esercitazione, eliminare il gruppo di risorse.To clean up the resources created by this tutorial, you can delete the resource group. Se si elimina il gruppo di risorse, vengono eliminati anche il cluster HDInsight associato e tutte le altre risorse correlate al gruppo di risorse.Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Per rimuovere il gruppo di risorse usando il portale di Azure:To remove the resource group using the Azure portal:

  1. Nel portale di Azure espandere il menu a sinistra per aprire il menu dei servizi e quindi scegliere Gruppi di risorse per visualizzare l'elenco dei gruppi di risorse.In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Individuare il gruppo di risorse da eliminare e quindi fare clic con il pulsante destro del mouse su Altro (...) a destra dell'elenco.Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Scegliere Elimina gruppo di risorse e quindi confermare.Select Delete resource group, and then confirm.

Avviso

La fatturazione del cluster HDInsight inizia dopo la creazione del cluster e si interrompe solo quando questo viene eliminato.HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. La fatturazione avviene con tariffa oraria, perciò si deve sempre eliminare il cluster in uso quando non lo si usa più.Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.

Se si elimina un cluster Kafka su HDInsight vengono eliminati anche eventuali dati archiviati in Kafka.Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.

Passaggi successiviNext steps

In questa esercitazione è stato descritto come usare lo streaming strutturato Spark per scrivere e leggere i dati da Kafka in HDInsight.In this tutorial, you learned how to use Spark Structured Streaming to write and read data from Kafka on HDInsight. Usare il collegamento seguente per informazioni su come usare Storm con Kafka.Use the following link to learn how to use Storm with Kafka.