Zelfstudie: Apache Spark Structured Streaming gebruiken met Apache Kafka op HDInsightTutorial: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight

Deze zelfstudie laat zien hoe u Apache Spark Structured Streaming gebruikt om gegevens te lezen en te schrijven met Apache Kafka in Azure HDInsight.This tutorial demonstrates how to use Apache Spark Structured Streaming to read and write data with Apache Kafka on Azure HDInsight.

Spark Structured Streaming is een streamverwerkingsengine gebaseerd op Spark SQL.Spark Structured Streaming is a stream processing engine built on Spark SQL. Hiermee kunt u streamingberekeningen op dezelfde manier weergeven als batchberekeningen van statische gegevens.It allows you to express streaming computations the same as batch computation on static data.

In deze zelfstudie leert u het volgende:In this tutorial, you learn how to:

  • Een Azure Resource Manager-sjabloon gebruiken om clusters te makenUse an Azure Resource Manager template to create clusters
  • Gebruik Spark Structured Streaming met KafkaUse Spark Structured Streaming with Kafka

Nadat u de stappen in dit document hebt doorlopen, moet u niet vergeten de clusters te verwijderen om overtollige kosten te voorkomen.When you are done with the steps in this document, remember to delete the clusters to avoid excess charges.

VereistenPrerequisites

Belangrijk

Voor de stappen in dit document is een Azure-resourcegroep nodig die zowel een Spark in HDInsight- als een Kafka in HDInsight-cluster bevat.The steps in this document require an Azure resource group that contains both a Spark on HDInsight and a Kafka on HDInsight cluster. Deze clusters bevinden zich beide binnen een Azure Virtual Network, waardoor het Spark-cluster rechtstreeks kan communiceren met het Kafka-cluster.These clusters are both located within an Azure Virtual Network, which allows the Spark cluster to directly communicate with the Kafka cluster.

Voor uw gemak is dit document gekoppeld aan een sjabloon waarmee u alle vereiste Azure-resources kunt maken.For your convenience, this document links to a template that can create all the required Azure resources.

Zie het document Azure HDInsight met behulp van een Azure-netwerk uitbreiden voor meer informatie over het gebruik van HDInsight in een virtueel netwerk.For more information on using HDInsight in a virtual network, see the Extend HDInsight using a virtual network document.

Gestructureerd streamen met Apache KafkaStructured Streaming with Apache Kafka

Spark Structured Streaming is een streamverwerkingsengine gebaseerd op de Spark SQL-engine.Spark Structured Streaming is a stream processing engine built on the Spark SQL engine. Wanneer u Structured Streaming gebruikt, kunt u streamingquery's schrijven op de manier waarop u ook batchquery's schrijft.When using Structured Streaming, you can write streaming queries the same way that you write batch queries.

De volgende codefragmenten laten zien hoe u gegevens kunt lezen uit Kafka om deze daarna op te slaan in een bestand.The following code snippets demonstrate reading from Kafka and storing to file. Het eerste fragment betreft een batchbewerking, terwijl het tweede een streamingbewerking is:The first one is a batch operation, while the second one is a streaming operation:

// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .write
                .format("parquet")
                .option("path","/example/batchtripdata")
                .option("checkpointLocation", "/batchcheckpoint")
                .save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .writeStream
                .format("parquet")
                .option("path","/example/streamingtripdata")
                .option("checkpointLocation", "/streamcheckpoint")
                .start.awaitTermination(30000)

In beide codefragmenten worden gegevens gelezen uit Kafka en weggeschreven naar een bestand.In both snippets, data is read from Kafka and written to file. Dit zijn de verschillende tussen de voorbeelden:The differences between the examples are:

BatchBatch StreamingStreaming
read readStream
write writeStream
save start

De streaming bewerking eveneens awaitTermination(30000), die de stroom niet meer nadat 30.000 ms.The streaming operation also uses awaitTermination(30000), which stops the stream after 30,000 ms.

Als u Structured Streaming wilt gebruiken met Kafka, moet het project afhankelijk zijn van het pakket org.apache.spark : spark-sql-kafka-0-10_2.11.To use Structured Streaming with Kafka, your project must have a dependency on the org.apache.spark : spark-sql-kafka-0-10_2.11 package. De versie van dit pakket moet overeenkomen met de versie van Spark in HDInsight.The version of this package should match the version of Spark on HDInsight. Voor Spark 2.2.0 (beschikbaar in HDInsight 3.6) kunt u de afhankelijkheidsgegevens voor verschillende projecttypen vinden in https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.For Spark 2.2.0 (available in HDInsight 3.6), you can find the dependency information for different project types at https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.

Voor de Jupyter-Notebook gebruikt in combinatie met deze zelfstudie, wordt deze pakketafhankelijkheid geladen met de volgende cel:For the Jupyter Notebook used with this tutorial, the following cell loads this package dependency:

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

De clusters makenCreate the clusters

Apache Kafka in HDInsight biedt geen toegang tot de Kafka-brokers via het openbare internet.Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Alles wat gebruikmaakt van Kafka moet zich in hetzelfde Azure Virtual Network bevinden.Anything that uses Kafka must be in the same Azure virtual network. In deze zelfstudie bevinden de Kafka- en Spark-clusters zich in hetzelfde Azure Virtual Network.In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.

Het volgende diagram laat zien hoe de communicatie tussen Spark en Kafka verloopt:The following diagram shows how communication flows between Spark and Kafka:

Diagram van Spark- en Kafka-clusters in een Azure Virtual Network

Notitie

De Kafka-service blijft beperkt tot communicatie binnen het virtuele netwerk.The Kafka service is limited to communication within the virtual network. Andere services in het cluster, zoals SSH en Ambari, zijn toegankelijk via internet.Other services on the cluster, such as SSH and Ambari, can be accessed over the internet. Zie Poorten en URI's die worden gebruikt door HDInsight voor meer informatie over de openbare poorten die beschikbaar zijn voor HDInsight.For more information on the public ports available with HDInsight, see Ports and URIs used by HDInsight.

Gebruik de volgende stappen om eerst een virtueel Azure-netwerk te maken en vervolgens de Kafka- en Spark-clusters:To create an Azure Virtual Network, and then create the Kafka and Spark clusters within it, use the following steps:

  1. Gebruik de volgende knop om u aan te melden bij Azure en de sjabloon in de Azure Portal te openen.Use the following button to sign in to Azure and open the template in the Azure portal.

    Deploy to Azure

    De Azure Resource Manager-sjabloon bevindt zich op https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json .The Azure Resource Manager template is located at https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.

    Met deze sjabloon maakt u de volgende bronnen:This template creates the following resources:

    • Een Kafka in HDInsight 3.6-cluster.A Kafka on HDInsight 3.6 cluster.

    • Een Spark 2.2.0 in HDInsight 3.6-cluster.A Spark 2.2.0 on HDInsight 3.6 cluster.

    • Een Azure Virtual Network dat de HDInsight-clusters bevat.An Azure Virtual Network, which contains the HDInsight clusters.

      Belangrijk

      Voor het Structured Streaming-notebook dat in deze zelfstudie wordt gebruikt, is Spark 2.2.0 in HDInsight 3.6 vereist.The structured streaming notebook used in this tutorial requires Spark 2.2.0 on HDInsight 3.6. Als u een eerdere versie van Spark in HDInsight gebruikt, worden er fouten gegenereerd bij gebruik van de notebook.If you use an earlier version of Spark on HDInsight, you receive errors when using the notebook.

  2. Gebruik de volgende informatie voor het vullen van de vermeldingen in de sectie Aangepaste sjabloon:Use the following information to populate the entries on the Customized template section:

    InstellingSetting ValueValue
    AbonnementSubscription Uw Azure-abonnementYour Azure subscription
    ResourcegroepResource group De resourcegroep die de resources bevat.The resource group that contains the resources.
    LocatieLocation De Azure-regio waarin de bronnen worden gemaakt.The Azure region that the resources are created in.
    Naam Spark-clusterSpark Cluster Name De naam van het Spark-cluster.The name of the Spark cluster. De eerste zes tekens moeten verschillen van de naam van het Kafka-cluster.The first six characters must be different than the Kafka cluster name.
    Naam Kafka-clusterKafka Cluster Name De naam van het Kafka-cluster.The name of the Kafka cluster. De eerste zes tekens moeten verschillen van de naam van het Spark-cluster.The first six characters must be different than the Spark cluster name.
    Gebruikersnaam voor clusteraanmeldingCluster Login User Name De beheerdersnaam voor de clusters.The admin user name for the clusters.
    Wachtwoord voor clusteraanmeldingCluster Login Password Het beheerderswachtwoord voor de clusters.The admin user password for the clusters.
    SSH-gebruikersnaamSSH User Name De SSH-gebruiker die voor de clusters wordt gemaakt.The SSH user to create for the clusters.
    SSH-wachtwoordSSH Password Het wachtwoord voor de SSH-gebruiker.The password for the SSH user.

    Schermafbeelding van de aangepaste sjabloon

  3. Lees de voorwaarden en schakel vervolgens het selectievakje Ik ga akkoord met de bovenstaande voorwaarden in.Read the Terms and Conditions, and then select I agree to the terms and conditions stated above

  4. Schakel tot slot Vastmaken aan dashboard in en selecteer Kopen.Finally, check Pin to dashboard and then select Purchase.

Notitie

Het kan 20 minuten duren voordat het cluster is gemaakt.It can take up to 20 minutes to create the clusters.

Spark Structured Streaming gebruikenUse Spark Structured Streaming

In dit voorbeeld laat zien hoe u Spark Structured Streaming met Kafka in HDInsight.This example demonstrates how to use Spark Structured Streaming with Kafka on HDInsight. Het maakt gebruik van gegevens op de gegevens over taxi's, die wordt geleverd door de New York City.It uses data on taxi trips, which is provided by New York City. De gegevensset die worden gebruikt door dit notitieblok afkomstig is van 2016 groen Taxi reisgegevens.The data set used by this notebook is from 2016 Green Taxi Trip Data.

  1. Verzamel informatie over de host.Gather host information. Gebruik de curl en jq verkrijgen van uw Kafka-ZooKeeper en hosts informatie broker onderstaande opdrachten.Use the curl and jq commands below to obtain your Kafka ZooKeeper and broker hosts information. De opdrachten zijn ontworpen voor een Windows-opdrachtprompt, kleine variaties er nodig zijn voor andere omgevingen.The commands are designed for a Windows command prompt, slight variations will be needed for other environments. Vervang KafkaCluster met de naam van uw Kafka-cluster en KafkaPassword met het wachtwoord voor clusteraanmelding.Replace KafkaCluster with the name of your Kafka cluster, and KafkaPassword with the cluster login password. Vervang ook C:\HDI\jq-win64.exe met het werkelijke pad naar de jq-installatie.Also, replace C:\HDI\jq-win64.exe with the actual path to your jq installation. Voer de opdrachten in een Windows-opdrachtprompt en sla de uitvoer voor gebruik in latere stappen.Enter the commands in a Windows command prompt and save the output for use in later steps.

    set CLUSTERNAME=KafkaCluster
    set PASSWORD=KafkaPassword
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
    
  2. Maak vanuit uw webbrowser verbinding met de Jupyter-notebook in uw Spark-cluster.In your web browser, connect to the Jupyter notebook on your Spark cluster. In de volgende URL vervangt u CLUSTERNAME door de naam van uw Spark-cluster:In the following URL, replace CLUSTERNAME with the name of your Spark cluster:

     https://CLUSTERNAME.azurehdinsight.net/jupyter
    

    Typ desgevraagd de cluster-aanmelding (beheerder) en het cluster-wachtwoord die u hebt gebruikt bij het maken van het cluster.When prompted, enter the cluster login (admin) and password used when you created the cluster.

  3. Selecteer New > Spark om een notitieblok te maken.Select New > Spark to create a notebook.

  4. Pakketten die worden gebruikt door de Notebook door te voeren van de volgende informatie in een cel Notebook worden geladen.Load packages used by the Notebook by entering the following information in a Notebook cell. Voer de opdracht uit met behulp van CTRL + ENTER.Run the command by using CTRL + ENTER.

    %%configure -f
    {
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
            "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
        }
    }
    
  5. De Kafka-onderwerp maken.Create the Kafka topic. De onderstaande opdracht bewerken door te vervangen YOUR_ZOOKEEPER_HOSTS hosten met de Zookeeper informatie in de eerste stap hebt uitgepakt.Edit the command below by replacing YOUR_ZOOKEEPER_HOSTS with the Zookeeper host information extracted in the first step. Voer de opdracht in de bewerkte in uw Jupyter-Notebook maken de tripdata onderwerp.Enter the edited command in your Jupyter Notebook to create the tripdata topic.

    %%bash
    export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS"
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
    
  6. Gegevens over taxi trips ophalen.Retrieve data on taxi trips. Voer de opdracht in de volgende cel om gegevens op de gegevens over taxi's in New York City te laden.Enter the command in the next cell to load data on taxi trips in New York City. De gegevens zijn geladen in een dataframe en vervolgens het gegevensframe wordt weergegeven als de uitvoer van de cel.The data is loaded into a dataframe and then the dataframe is displayed as the cell output.

    import spark.implicits._
    
    // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
    val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
    val result = scala.io.Source.fromURL(url).mkString
    
    // Create a dataframe from the JSON data
    val taxiDF = spark.read.json(Seq(result).toDS)
    
    // Display the dataframe containing trip data
    taxiDF.show()
    
  7. De gegevens van Kafka broker hosts ingesteld.Set the Kafka broker hosts information. Vervang YOUR_KAFKA_BROKER_HOSTS met de broker hosts informatie die u in stap 1 hebt uitgepakt.Replace YOUR_KAFKA_BROKER_HOSTS with the broker hosts information you extracted in step 1. Voer de opdracht in bewerkt in de volgende Jupyter-Notebook cel.Enter the edited command in the next Jupyter Notebook cell.

    // The Kafka broker hosts and topic used to write to Kafka
    val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
    val kafkaTopic="tripdata"
    
    println("Finished setting Kafka broker and topic configuration.")
    
  8. De gegevens verzenden naar Kafka.Send the data to Kafka. In de volgende opdracht de vendorid veld wordt gebruikt als de waarde van de sleutel voor het Kafka-bericht.In the following command, the vendorid field is used as the key value for the Kafka message. De sleutel wordt gebruikt door Kafka worden gebruikt bij het partitioneren van gegevens.The key is used by Kafka when partitioning data. Alle velden zijn opgeslagen in het Kafka-bericht als een JSON-tekenreekswaarde.All of the fields are stored in the Kafka message as a JSON string value. Voer de volgende opdracht in Jupyter om op te slaan van de gegevens met Kafka met behulp van een batchquery.Enter the following command in Jupyter to save the data to Kafka using a batch query.

    // Select the vendorid as the key and save the JSON string as the value.
    val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
    
    println("Data sent to Kafka")
    
  9. Declareer een schema.Declare a schema. De volgende opdracht laat zien hoe u een schema gebruikt bij het lezen van JSON-gegevens van kafka.The following command demonstrates how to use a schema when reading JSON data from kafka. Voer de opdracht in de volgende Jupyter cel.Enter the command in your next Jupyter cell.

    // Import bits useed for declaring schemas and working with JSON data
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    // Define a schema for the data
    val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
    // Reproduced here for readability
    //val schema = (new StructType)
    //   .add("dropoff_latitude", StringType)
    //   .add("dropoff_longitude", StringType)
    //   .add("extra", StringType)
    //   .add("fare_amount", StringType)
    //   .add("improvement_surcharge", StringType)
    //   .add("lpep_dropoff_datetime", StringType)
    //   .add("lpep_pickup_datetime", StringType)
    //   .add("mta_tax", StringType)
    //   .add("passenger_count", StringType)
    //   .add("payment_type", StringType)
    //   .add("pickup_latitude", StringType)
    //   .add("pickup_longitude", StringType)
    //   .add("ratecodeid", StringType)
    //   .add("store_and_fwd_flag", StringType)
    //   .add("tip_amount", StringType)
    //   .add("tolls_amount", StringType)
    //   .add("total_amount", StringType)
    //   .add("trip_distance", StringType)
    //   .add("trip_type", StringType)
    //   .add("vendorid", StringType)
    
    println("Schema declared")
    
  10. Selecteer gegevens en de stroom start.Select data and start the stream. De volgende opdracht laat zien hoe u voor het ophalen van gegevens van kafka met behulp van een batchquery en klikt u vervolgens de resultaten uit naar schrijven HDFS op het Spark-cluster.The following command demonstrates how to retrieve data from kafka using a batch query, and then write the results out to HDFS on the Spark cluster. In dit voorbeeld wordt de select haalt het bericht (in het waardeveld) van Kafka en wordt het schema op toegepast.In this example, the select retrieves the message (value field) from Kafka and applies the schema to it. De gegevens worden vervolgens naar HDFS (WASB of ADL) in parquet-indeling geschreven.The data is then written to HDFS (WASB or ADL) in parquet format. Voer de opdracht in de volgende Jupyter cel.Enter the command in your next Jupyter cell.

    // Read a batch from Kafka
    val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data and write to file
    val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
    
    println("Wrote data to file")
    
  11. U kunt controleren dat de bestanden zijn gemaakt met de opdracht in de volgende Jupyter cel.You can verify that the files were created by entering the command in your next Jupyter cell. Geeft een lijst van de bestanden in de /example/batchtripdata directory.It lists the files in the /example/batchtripdata directory.

    %%bash
    hdfs dfs -ls /example/batchtripdata
    
  12. Hoewel het vorige voorbeeld gebruikt een batchquery, geeft de volgende opdracht laat zien hoe u hetzelfde doen met behulp van een streaming-query.While the previous example used a batch query, the following command demonstrates how to do the same thing using a streaming query. Voer de opdracht in de volgende Jupyter cel.Enter the command in your next Jupyter cell.

    // Stream from Kafka
    val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data from the stream and write to file
    kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
    println("Wrote data to file")
    
  13. Voer de volgende cel om te controleren of de bestanden zijn geschreven door de streaming-query.Run the following cell to verify that the files were written by the streaming query.

    %%bash
    hdfs dfs -ls /example/streamingtripdata
    

Resources opschonenClean up resources

Als u de in deze zelfstudie gemaakte resources wilt opschonen, kunt u de resourcegroep verwijderen.To clean up the resources created by this tutorial, you can delete the resource group. Als u de resourcegroep verwijdert, worden ook het bijbehorende HDInsight-cluster en eventuele andere resources die aan de resourcegroep zijn gekoppeld, verwijderd.Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.

Ga als volgt te werk om de resourcegroep te verwijderen in Azure Portal:To remove the resource group using the Azure portal:

  1. Vouw het menu aan de linkerkant in Azure Portal uit om het menu met services te openen en kies Resourcegroepen om de lijst met resourcegroepen weer te geven.In the Azure portal, expand the menu on the left side to open the menu of services, and then choose Resource Groups to display the list of your resource groups.
  2. Zoek de resourcegroep die u wilt verwijderen en klik met de rechtermuisknop op de knop Meer (... ) aan de rechterkant van de vermelding.Locate the resource group to delete, and then right-click the More button (...) on the right side of the listing.
  3. Selecteer Resourcegroep verwijderen en bevestig dit.Select Delete resource group, and then confirm.

Waarschuwing

De facturering voor het gebruik van HDInsight-clusters begint zodra er een cluster is gemaakt en stopt als een cluster wordt verwijderd.HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. De facturering wordt pro-rato per minuut berekend, dus u moet altijd uw cluster verwijderen wanneer het niet meer wordt gebruikt.Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.

Door een Kafka in HDInsight-cluster te verwijderen, worden alle gegevens verwijderd die zijn opgeslagen in Kafka.Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.

Volgende stappenNext steps

In deze zelfstudie hebt u geleerd hoe u met Apache Spark Structured Streaming gegevens kunt lezen uit en wegschrijven naar Apache Kafka in HDInsight.In this tutorial, you learned how to use Apache Spark Structured Streaming to write and read data from Apache Kafka on HDInsight. Gebruik de volgende koppeling voor informatie over hoe u Apache Storm gebruikt met Kafka.Use the following link to learn how to use Apache Storm with Kafka.