Ejercicio: Creación del productor de Kafka

Completado

Ahora que se han implementado los clústeres de Kafka y Spark, se agregará un productor de Kafka al nodo principal de Kafka. Este productor es un simulador de precio de acciones que genera precios de acciones artificiales.

Descarga del ejemplo

  1. En el explorador de Internet, vaya a https://github.com/Azure/hdinsight-mslearn y descargue o clone el ejemplo localmente si todavía no lo ha hecho en un módulo anterior.
  2. Abra el archivo Spark Structured Streaming\python-producer-simulator-template.py localmente.

Recuperación de las direcciones URL de agente de Kafka

A continuación, debe recuperar las direcciones URL de los agentes de Kafka mediante ssh en el nodo principal y agregarlas al archivo de Python.

  1. Para conectarse al nodo principal del clúster de Apache Kafka, debe usar ssh en el nodo. El método recomendado para conectarse consiste en usar Azure Cloud Shell en Azure Portal. En Azure Portal, haga clic en el botón Azure Cloud Shell de la barra de herramientas superior y seleccione Bash. También puede usar un símbolo del sistema habilitado para ssh, como Git Bash.

  2. Si no ha usado Azure Cloud Shell antes, se muestra una notificación en la que se indica que no se ha montado ningún almacenamiento. Seleccione la suscripción de Azure en el cuadro Suscripción y haga clic en Crear almacenamiento.

  3. En el símbolo del sistema en la nube, pegue el comando siguiente. Reemplace sshuser por el nombre de usuario de SSH. Reemplace kafka-mslearn-stock por el nombre del clúster de Apache Kafka; recuerde que debe incluir -ssh después del nombre del clúster.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Cuando se conecte por primera vez al clúster, es posible que el cliente SSH muestre una advertencia de que no se puede establecer la autenticidad del host. Cuando se le solicite, escriba yes (sí) y presione Entrar para agregar el host a la lista de servidores de confianza de su cliente SSH.

  5. Cuando se le solicite, escriba la contraseña del usuario de SSH.

    Una vez que se haya conectado, verá información similar al texto siguiente:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Instale jq, un procesador JSON de línea de comandos. Esta utilidad se usa para analizar documentos JSON y es útil para analizar la información de host. En la conexión SSH abierta, escriba el siguiente comando para instalar jq:

    sudo apt -y install jq
    
  7. Configure una variable de contraseña. Reemplace PASSWORD por la contraseña de inicio de sesión del clúster y, después, escriba el comando:

    export password='PASSWORD'
    
  8. Extraiga el nombre del clúster con las mayúsculas y minúsculas correctas. Las mayúsculas y minúsculas reales del nombre del clúster pueden no ser como cabría esperar, dependen de la forma en que se haya creado el clúster. Este comando obtendrá las mayúsculas y minúsculas reales y después las almacenará en una variable. Escriba el comando siguiente:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Este comando no tiene respuesta.

  9. Para establecer una variable de entorno con la información de host de Zookeeper, use el comando siguiente. El comando recupera todos los hosts de Zookeeper y, a continuación, devuelve solo las dos primeras entradas. Esto se debe a que quiere cierta redundancia en caso de que un host sea inaccesible.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Este comando requiere acceso a Ambari. Si el clúster se encuentra detrás de un grupo de seguridad de red, ejecute este comando desde una máquina que pueda acceder a Ambari.

    Este comando tampoco tiene respuesta.

  10. Para comprobar que la variable de entorno se ha establecido correctamente, use el comando siguiente:

    echo $KAFKAZKHOSTS
    

    Este comando devuelve información similar al texto siguiente:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Para establecer una variable de entorno con la información de host del agente de Apache Kafka, use el comando siguiente:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Este comando requiere acceso a Ambari. Si el clúster se encuentra detrás de un grupo de seguridad de red, ejecute este comando desde una máquina que pueda acceder a Ambari.

    Este comando no tiene ninguna salida.

  12. Para comprobar que la variable de entorno se ha establecido correctamente, use el comando siguiente:

    echo $KAFKABROKERS
    

    Este comando devuelve información similar al texto siguiente:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Copie uno de los valores de agente de Kafka devueltos en el paso anterior en la línea 19 del archivo python-producer-simulator-template.py e inclúyalo entre comillas simples, por ejemplo:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Guarde el archivo python-producer-simulator-template-simulator-template.py.

  15. De nuevo en la ventana de conexión de ssh, use el comando siguiente para crear un tema.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Este comando se conecta a Zookeeper mediante la información de host almacenada en $KAFKAZKHOSTS. Después crea un tema de Apache Kafka denominado stockVals, para que coincida con el nombre del tema en python-producer-simulator-template.py.

Copia del archivo de Python en el nodo principal y ejecución para transmitir datos

  1. En una ventana nueva de Git, navegue hasta la ubicación del archivo python-producer-simulator-template.py y cópielo del equipo local en el nodo principal mediante el comando siguiente. Reemplace kafka-mslearn-stock por el nombre del clúster de Apache Kafka; recuerde que debe incluir -ssh después del nombre del clúster.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Cuando se le pregunte si quiere continuar con la conexión, escriba Sí. Después, en el símbolo del sistema, escriba la contraseña del clúster. Una vez que se haya transferido el archivo, se mostrará la salida siguiente.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Ahora vuelva al símbolo del sistema de Azure donde ha recuperado la información del agente y ejecute el comando siguiente para instalar Kafka:

    sudo pip install kafka-python
    

    Después de que Kafka se instale correctamente, se muestra la salida siguiente.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. En la misma ventana, instale las solicitudes con el comando siguiente:

    sudo apt-get install python-requests
    
  4. Cuando se le pregunte "Después de esta operación, se usarán 4327 kB de espacio en disco adicional. ¿Desea continuar? [S/n]", escriba S.

    Cuando las solicitudes se instalen correctamente, se mostrará una salida similar a la siguiente.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. En la misma ventana, use el comando siguiente para ejecutar el archivo de Python.

    python python-producer-simulator-template.py
    

    Debería ver un resultado similar al siguiente:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Esta salida proporciona los precios simulados de las acciones enumeradas en el archivo python-producer-simulated-template.py, seguido del tema, la partición y el desplazamiento del mensaje en el tema. Puede ver que cada vez que se desencadena el productor (cada segundo), se genera un nuevo lote de precios de acciones y cada mensaje nuevo se agrega a una partición en un desplazamiento concreto.