Tutorial: Utilizar a Transmissão em Fluxo Estruturada do Apache Spark com o Apache Kafka no HDInsight

Este tutorial demonstra como usar o Apache Spark Structured Streaming para ler e gravar dados com o Apache Kafka no Azure HDInsight.

O Spark Structured Streaming é um mecanismo de processamento de fluxo construído no Spark SQL. Permite-lhe expressar computações de transmissão em fluxo, tal como a computação em lotes o faz em dados estáticos.

Neste tutorial, irá aprender a:

  • Usar um modelo do Azure Resource Manager para criar clusters
  • Usar o Spark Structured Streaming com Kafka

Quando terminar as etapas neste documento, lembre-se de excluir os clusters para evitar cobranças excessivas.

Pré-requisitos

  • jq, um processador JSON de linha de comando. Consulte https://stedolan.github.io/jq/.

  • Familiaridade com o uso de Notebooks Jupyter com o Spark no HDInsight. Para obter mais informações, consulte o documento Carregar dados e executar consultas com o Apache Spark no HDInsight .

  • Familiaridade com a linguagem de programação Scala. O código utilizado neste tutorial está escrito em Scala.

  • Familiaridade com a criação de tópicos do Kafka. Para obter mais informações, consulte o documento de início rápido do Apache Kafka no HDInsight.

Importante

Os passos neste documento requerem um grupo de recursos do Azure que contém um cluster do Spark no HDInsight e um cluster do Kafka no HDInsight. Estes dois clusters estão localizados numa Rede Virtual do Azure, o que permite que o cluster do Spark comunique diretamente com o cluster do Kafka.

Para sua comodidade, este documento tem uma ligação para o modelo que pode criar todos os recursos do Azure necessários.

Para obter mais informações sobre como usar o HDInsight em uma rede virtual, consulte o documento Planejar uma rede virtual para o HDInsight .

Streaming estruturado com Apache Kafka

A transmissão em Fluxo Estruturada do Spark é um motor de processamento de fluxos incorporado no motor SQL do Spark. Ao usar o Structured Streaming, você pode escrever consultas de streaming da mesma forma que escreve consultas em lote.

Os fragmentos de código seguintes demonstram a leitura a partir do Kafka e o armazenamento num ficheiro. A primeira é uma operação em lote, enquanto que a segunda é uma operação de transmissão em fluxo:

// Read a batch from Kafka
val kafkaDF = spark.read.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data and write to file
kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .write
                .format("parquet")
                .option("path","/example/batchtripdata")
                .option("checkpointLocation", "/batchcheckpoint")
                .save()
// Stream from Kafka
val kafkaStreamDF = spark.readStream.format("kafka")
                .option("kafka.bootstrap.servers", kafkaBrokers)
                .option("subscribe", kafkaTopic)
                .option("startingOffsets", "earliest")
                .load()

// Select data from the stream and write to file
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip")
                .writeStream
                .format("parquet")
                .option("path","/example/streamingtripdata")
                .option("checkpointLocation", "/streamcheckpoint")
                .start.awaitTermination(30000)

Em ambos os fragmentos, os dados são lidos a partir do Kafka e escritos num ficheiro. As diferenças entre os exemplos são:

Batch Transmissão
read readStream
write writeStream
save start

A operação de streaming também usa awaitTermination(30000), que para o fluxo após 30.000 ms.

Para utilizar a Transmissão em Fluxo Estruturada com o Kafka, o projeto tem de ter uma dependência no pacote org.apache.spark : spark-sql-kafka-0-10_2.11. A versão deste pacote deve corresponder à versão do Spark no HDInsight. Para o Spark 2.4 (disponível no HDInsight 4.0), você pode encontrar as informações de dependência para diferentes tipos de projeto em https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar.

Para o Jupyter Notebook usado com este tutorial, a célula a seguir carrega essa dependência de pacote:

%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
    }
}

Criar os clusters

O Apache Kafka no HDInsight não fornece acesso aos corretores Kafka pela internet pública. Tudo o que utilize Kafka tem de estar na mesma rede virtual do Azure. Neste tutorial, os clusters do Kafka e do Spark estão localizados na mesma rede virtual do Azure.

O diagrama seguinte mostra como a comunicação flui entre o Spark e o Kafka:

Diagram of Spark and Kafka clusters in an Azure virtual network.

Nota

O serviço Kafka está limitado à comunicação na rede virtual. Outros serviços em cluster, como SSH e Ambari, podem ser acedidos através da Internet. Para obter mais informações sobre as portas públicas disponíveis com o HDInsight, veja Portas e URIs utilizados pelo HDInsight.

Para criar uma Rede Virtual do Azure e, em seguida, criar os clusters do Kafka e do Spark na mesma, utilize os passos abaixo:

  1. Utilize o botão seguinte para iniciar sessão no Azure e abrir o modelo no Portal do Azure.

    Deploy to Azure button for new cluster

    O modelo do Azure Resource Manager está localizado em https://raw.githubusercontent.com/Azure-Samples/hdinsight-spark-kafka-structured-streaming/master/azuredeploy.json.

    Este modelo cria os seguintes recursos:

    • Um Kafka no cluster HDInsight 4.0 ou 5.0.

    • Um Spark 2.4 ou 3.1 no cluster HDInsight 4.0 ou 5.0.

    • Uma Rede Virtual do Azure, que contém os clusters do HDInsight.

      Importante

      O notebook de streaming estruturado usado neste tutorial requer o Spark 2.4 ou 3.1 no HDInsight 4.0 ou 5.0. Se utilizar uma versão anterior do Spark no HDInsight, irá receber mensagens de erro ao utilizar o bloco de notas.

  2. Utilize as seguintes informações para preencher as entradas da secção Modelo personalizado:

    Definição Value
    Subscrição a subscrição do Azure
    Grupo de recursos O grupo de recursos que contém os recursos.
    Localização A região do Azure na qual os recursos são criados.
    Nome de Cluster do Spark O nome do cluster do Spark. Os primeiros seis carateres devem ser diferentes do nome do cluster do Kafka.
    Nome do Cluster do Kafka O nome do cluster do Kafka. Os primeiros seis carateres devem ser diferentes do nome do cluster do Spark.
    Nome de Utilizador de Início de Sessão do Cluster O nome de utilizador administrador para os clusters.
    Palavra-passe de Início de Sessão do Cluster A palavra-passe de utilizador administrador para os clusters.
    Nome de Utilizador SSH O nome de utilizador SSH para os clusters.
    Palavra-passe do SSH A palavra-passe do utilizador SSH.

    Screenshot of the customized template.

  3. Leia os Termos e Condições e, em seguida, selecione Concordo com os termos e condições mencionados acima.

  4. Selecione Comprar.

Nota

A criação dos clusters pode demorar até 20 minutos.

Usar o Spark Structured Streaming

Este exemplo demonstra como usar o Spark Structured Streaming com Kafka no HDInsight. Ele usa dados sobre viagens de táxi, que são fornecidos pela cidade de Nova York. O conjunto de dados utilizado por este caderno é de 2016 Green Taxi Trip Data.

  1. Reúna informações do host. Use os comandos curl e jq abaixo para obter suas informações sobre o Kafka ZooKeeper e os hosts do corretor. Os comandos são projetados para um prompt de comando do Windows, pequenas variações serão necessárias para outros ambientes. Substitua KafkaCluster pelo nome do cluster Kafka e KafkaPassword pela senha de login do cluster. Além disso, substitua C:\HDI\jq-win64.exe pelo caminho real para sua instalação jq. Insira os comandos em um prompt de comando do Windows e salve a saída para uso em etapas posteriores.

    REM Enter cluster name in lowercase
    
    set CLUSTERNAME=KafkaCluster
    set PASSWORD=KafkaPassword
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
    
    curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
    
  2. Em um navegador da Web, navegue até https://CLUSTERNAME.azurehdinsight.net/jupyter, onde CLUSTERNAME é o nome do cluster. Quando lhe for pedido, introduza o início de sessão do cluster (admin) e a palavra-passe utilizada quando criou o cluster.

  3. Selecione Novo > Spark para criar um bloco de anotações.

  4. O streaming do Spark tem microbatching, o que significa que os dados vêm como lotes e executores executados nos lotes de dados. Se o executor tiver tempo limite ocioso menor do que o tempo necessário para processar o lote, os executores serão constantemente adicionados e removidos. Se o tempo limite de inatividade dos executores for maior do que a duração do lote, o executor nunca será removido. Portanto, recomendamos que você desative a alocação dinâmica definindo spark.dynamicAllocation.enabled como false ao executar aplicativos de streaming.

    Carregue os pacotes usados pelo Bloco de Anotações inserindo as seguintes informações em uma célula do Bloco de Anotações. Execute o comando usando CTRL + ENTER.

    %%configure -f
    {
        "conf": {
            "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0",
            "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
            "spark.dynamicAllocation.enabled": false
        }
    }
    
  5. Crie o tópico Kafka. Edite o comando abaixo substituindo YOUR_ZOOKEEPER_HOSTS pelas informações do host do Zookeeper extraídas na primeira etapa. Digite o comando editado no seu Jupyter Notebook para criar o tripdata tópico.

    %%bash
    export KafkaZookeepers="YOUR_ZOOKEEPER_HOSTS"
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
    
  6. Recupere dados sobre viagens de táxi. Digite o comando na próxima célula para carregar dados sobre viagens de táxi em Nova York. Os dados são carregados em um dataframe e, em seguida, o dataframe é exibido como a saída da célula.

    import spark.implicits._
    
    // Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
    val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
    val result = scala.io.Source.fromURL(url).mkString
    
    // Create a dataframe from the JSON data
    val taxiDF = spark.read.json(Seq(result).toDS)
    
    // Display the dataframe containing trip data
    taxiDF.show()
    
  7. Defina as informações de hosts do corretor Kafka. Substitua YOUR_KAFKA_BROKER_HOSTS pelo broker hospeda as informações extraídas na etapa 1. Insira o comando editado na próxima célula do Jupyter Notebook.

    // The Kafka broker hosts and topic used to write to Kafka
    val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
    val kafkaTopic="tripdata"
    
    println("Finished setting Kafka broker and topic configuration.")
    
  8. Envie os dados para Kafka. No comando a seguir, o vendorid campo é usado como o valor-chave para a mensagem Kafka. A chave é usada por Kafka ao particionar dados. Todos os campos são armazenados na mensagem Kafka como um valor de cadeia de caracteres JSON. Digite o seguinte comando no Jupyter para salvar os dados no Kafka usando uma consulta em lote.

    // Select the vendorid as the key and save the JSON string as the value.
    val query = taxiDF.selectExpr("CAST(vendorid AS STRING) as key", "to_JSON(struct(*)) AS value").write.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("topic", kafkaTopic).save()
    
    println("Data sent to Kafka")
    
  9. Declare um esquema. O comando a seguir demonstra como usar um esquema ao ler dados JSON do kafka. Digite o comando na próxima célula do Jupyter.

    // Import bits useed for declaring schemas and working with JSON data
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    // Define a schema for the data
    val schema = (new StructType).add("dropoff_latitude", StringType).add("dropoff_longitude", StringType).add("extra", StringType).add("fare_amount", StringType).add("improvement_surcharge", StringType).add("lpep_dropoff_datetime", StringType).add("lpep_pickup_datetime", StringType).add("mta_tax", StringType).add("passenger_count", StringType).add("payment_type", StringType).add("pickup_latitude", StringType).add("pickup_longitude", StringType).add("ratecodeid", StringType).add("store_and_fwd_flag", StringType).add("tip_amount", StringType).add("tolls_amount", StringType).add("total_amount", StringType).add("trip_distance", StringType).add("trip_type", StringType).add("vendorid", StringType)
    // Reproduced here for readability
    //val schema = (new StructType)
    //   .add("dropoff_latitude", StringType)
    //   .add("dropoff_longitude", StringType)
    //   .add("extra", StringType)
    //   .add("fare_amount", StringType)
    //   .add("improvement_surcharge", StringType)
    //   .add("lpep_dropoff_datetime", StringType)
    //   .add("lpep_pickup_datetime", StringType)
    //   .add("mta_tax", StringType)
    //   .add("passenger_count", StringType)
    //   .add("payment_type", StringType)
    //   .add("pickup_latitude", StringType)
    //   .add("pickup_longitude", StringType)
    //   .add("ratecodeid", StringType)
    //   .add("store_and_fwd_flag", StringType)
    //   .add("tip_amount", StringType)
    //   .add("tolls_amount", StringType)
    //   .add("total_amount", StringType)
    //   .add("trip_distance", StringType)
    //   .add("trip_type", StringType)
    //   .add("vendorid", StringType)
    
    println("Schema declared")
    
  10. Selecione os dados e inicie o fluxo. O comando a seguir demonstra como recuperar dados do Kafka usando uma consulta em lotes. E, em seguida, escreva os resultados no HDFS no cluster do Spark. Neste exemplo, o select recupera a mensagem (campo de valor) de Kafka e aplica o esquema a ela. Os dados são então gravados no HDFS (WASB ou ADL) em formato parquet. Digite o comando na próxima célula do Jupyter.

    // Read a batch from Kafka
    val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data and write to file
    val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
    
    println("Wrote data to file")
    
  11. Você pode verificar se os arquivos foram criados digitando o comando em sua próxima célula Jupyter. Ele lista os /example/batchtripdata arquivos no diretório.

    %%bash
    hdfs dfs -ls /example/batchtripdata
    
  12. Enquanto o exemplo anterior usava uma consulta em lotes, o comando a seguir demonstra como fazer a mesma coisa usando uma consulta de streaming. Digite o comando na próxima célula do Jupyter.

    // Stream from Kafka
    val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
    
    // Select data from the stream and write to file
    kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
    println("Wrote data to file")
    
  13. Execute a célula a seguir para verificar se os arquivos foram gravados pela consulta de streaming.

    %%bash
    hdfs dfs -ls /example/streamingtripdata
    

Clean up resources (Limpar recursos)

Para limpar os recursos criados por este tutorial, pode eliminar o grupo de recursos. A exclusão do grupo de recursos também exclui o cluster HDInsight associado. E quaisquer outros recursos associados ao grupo de recursos.

Para remover o grupo de recursos através do Portal do Azure:

  1. No portal do Azure, expanda o menu no lado esquerdo para abrir o menu de serviços e escolha Grupos de Recursos para exibir a lista de seus grupos de recursos.
  2. Encontre o grupo de recursos a eliminar e, em seguida, clique com o botão direito do rato em Mais (...) no lado direito da lista.
  3. Selecione Eliminar grupo de recursos e, em seguida, confirme.

Aviso

A faturação do cluster do HDInsight tem início quando o cluster é criado e termina quando é eliminado. A faturação é rateada por minuto, pelo que deve sempre eliminar o cluster quando deixar de ser utilizado.

Eliminar um cluster do Kafka no HDInsight elimina quaisquer dados armazenados no Kafka.