Apache Kafka

Les connecteurs Apache Kafka pour la diffusion structurée sont empaquetés dans Databricks Runtime. Le connecteur permet de kafka se connecter à Kafka 0.10 + et au kafka08 connecteur pour se connecter à Kafka 0,8 + (déconseillé).

Connecter Kafka sur HDInsight pour Azure Databricks

  1. Créez un cluster HDInsight Kafka.

    pour obtenir des instructions, consultez Connecter à Kafka sur HDInsight via un réseau virtuel Azure .

  2. Configurez les courtiers Kafka pour qu’ils publient l’adresse correcte.

    Suivez les instructions de la procédure configurer Kafka pour la publication d’adresses IP. Si vous gérez vous-même Kafka sur des machines virtuelles Azure, assurez-vous que la advertised.listeners Configuration des courtiers est définie sur l’adresse IP interne des hôtes.

  3. Créez un cluster Azure Databricks.

    Suivez les instructions du Guide de démarrage rapide : exécuter un travail Spark sur Azure Databricks à l’aide du portail Azure.

  4. Homologuez le cluster Kafka au cluster Azure Databricks.

    Suivez les instructions dans réseaux virtuels homologues.

  5. Validez la connexion en testant les scénarios décrits dans démarrage rapide et flux structuré de production avec le bloc-notes Kafka.

schéma

Le schéma des enregistrements est le suivant :

Colonne Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

Les key et value sont toujours désérialisés en tant que tableaux d’octets avec le ByteArrayDeserializer . Utilisez les opérations tableau ( cast("string") , fonctions définies par l’utilisateur) pour désérialiser explicitement les clés et les valeurs.

Démarrage rapide

Commençons par un exemple canonique WordCount. Le bloc-notes suivant montre comment exécuter WordCount à l’aide de la diffusion en continu structurée avec Kafka.

Notes

Cet exemple de bloc-notes utilise Kafka 0,10. Pour utiliser Kafka 0,8, modifiez le format en kafka08 (autrement dit, .format("kafka08") ).

Kafka WordCount avec bloc-notes de diffusion structurée

Obtenir le notebook

Configuration

Pour obtenir la liste comphensive des options de configuration, consultez le Guide d’intégration de Spark Structured streaming + Kafka. Pour commencer, voici un sous-ensemble des options de configuration les plus courantes.

Notes

Comme la diffusion en continu structurée est toujours en cours de développement, cette liste n’est peut-être pas à jour.

Il existe plusieurs façons de spécifier les rubriques auxquelles s’abonner. Vous ne devez fournir qu’un seul de ces paramètres :

Option Valeur Versions de Kafka prises en charge Description
subscribe Liste de rubriques séparées par des virgules. 0,8, 0,10 Liste de rubriques à laquelle s’abonner.
subscribePattern Chaîne Regex Java. 0.10 Modèle utilisé pour s’abonner à une ou plusieurs rubriques.
assign Chaîne JSON {"topicA":[0,1],"topic":[2,4]} . 0,8, 0,10 TopicPartitions spécifique à consommer.

Autres configurations notables :

Option Valeur Valeur par défaut Versions de Kafka prises en charge Description
Kafka. bootstrap. Servers Liste séparée par des virgules d’hôte : port. empty 0,8, 0,10 Souhaitée Configuration de Kafka bootstrap.servers . Si vous ne trouvez aucune donnée de la part de Kafka, vérifiez d’abord la liste des adresses du Broker. Si la liste d’adresses du Broker est incorrecte, il se peut qu’il n’y ait pas d’erreurs. Cela est dû au fait que le client Kafka part du principe que les courtiers seront disponibles finalement et en cas d’erreurs réseau.
failOnDataLoss true ou false. true 0.10 Facultatif Indique s’il faut faire échouer la requête lorsqu’il est possible que des données soient perdues. Les requêtes peuvent échouer de façon permanente pour lire des données à partir de Kafka en raison de nombreux scénarios, tels que des rubriques supprimées, une troncation de rubrique avant le traitement, etc. Nous essayons d’estimer en toute prudence si les données sont susceptibles d’être perdues ou non. Parfois, cela peut provoquer de fausses alarmes. Définissez cette option sur false si elle ne fonctionne pas comme prévu, ou si vous souhaitez que la requête continue le traitement malgré la perte de données.
minPartitions Entier >= 0, 0 = désactivé. 0 (désactivé) 0.10 Facultatif Nombre minimal de partitions à lire à partir de Kafka. Avec Spark 2.1.0-DB2 et versions ultérieures, vous pouvez configurer Spark pour utiliser un minimum arbitraire de partitions à lire à partir de Kafka à l’aide de l' minPartitions option. Normalement, Spark a un mappage de 1-1 de Kafka topicPartitions sur des partitions Spark consommant de Kafka. Si vous affectez minPartitions à l’option une valeur supérieure à celle de votre Kafka topicPartitions, Spark divvy des partitions Kafka volumineuses à des pièces plus petites. Cette option peut être définie aux heures des pics de charge, à la distorsion des données et à mesure que votre flux est en retard pour augmenter la vitesse de traitement. Il s’agit d’un coût d’initialisation des consommateurs Kafka à chaque déclencheur, ce qui peut avoir un impact sur les performances si vous utilisez SSL lors de la connexion à Kafka.
kafka.group.id ID de groupe de consommateurs Kafka. non défini 0.10 Facultatif ID de groupe à utiliser lors de la lecture à partir de Kafka. Pris en charge dans Spark 2.2 +. Utilisez cette fonction avec précaution. Par défaut, chaque requête génère un ID de groupe unique pour la lecture des données. Cela permet de s’assurer que chaque requête possède son propre groupe de consommateurs qui ne rencontre pas d’interférence de la part d’un autre consommateur et peut donc lire toutes les partitions de ses rubriques abonnées. Dans certains scénarios (par exemple, l’autorisation basée sur le groupe Kafka), vous pouvez utiliser des ID de groupe autorisés spécifiques pour lire les données. Vous pouvez éventuellement définir l’ID de groupe. Toutefois, faites cela avec une extrême prudence, car cela peut provoquer un comportement inattendu.

* L’exécution simultanée de requêtes (à la fois, batch et streaming) avec le même ID de groupe peut interférer entre elles en provoquant la lecture d’une partie des données par chaque requête.
* Cela peut également se produire lorsque des requêtes sont démarrées/redémarrées rapidement. Pour réduire de tels problèmes, définissez la configuration du consommateur Kafka session.timeout.ms sur très petite.

Consultez Guide d’intégration Kafka de la diffusion en continu structurée pour d’autres configurations facultatives.

Important

Vous ne devez pas définir les paramètres Kafka suivants pour le connecteur Kafka 0,10, car il lève une exception :

  • group.id: La définition de ce paramètre n’est pas autorisée pour les versions Spark inférieures à 2,2.
  • auto.offset.reset: À la place, définissez l’option source startingOffsets pour spécifier où démarrer. Pour maintenir la cohérence, la diffusion en continu structurée (par opposition au consommateur Kafka) gère la consommation des décalages en interne. Cela vous permet de vous assurer que vous ne manquez pas les données après vous être abonné de manière dynamique aux nouvelles rubriques/partitions. startingOffsets s’applique uniquement lorsque vous démarrez une nouvelle requête de streaming et que la reprise à partir d’un point de contrôle reprend toujours l’emplacement où la requête s’est arrêtée.
  • key.deserializer: Les clés sont toujours désérialisées en tant que tableaux d’octets avec ByteArrayDeserializer . Utilisez les opérations tableau pour désérialiser explicitement les clés.
  • value.deserializer: Les valeurs sont toujours désérialisées en tant que tableaux d’octets avec ByteArrayDeserializer . Utilisez les opérations tableau pour désérialiser explicitement les valeurs.
  • enable.auto.commit: La définition de ce paramètre n’est pas autorisée. Spark effectue le suivi des décalages Kafka en interne et ne valide aucun décalage.
  • interceptor.classes: La source Kafka lit toujours les clés et les valeurs en tant que tableaux d’octets. Il n’est pas possible d’utiliser en toute sécurité ConsumerInterceptor , car elle risque de rompre la requête.

Production Structured streaming avec bloc- notes Kafka

Obtenir le notebook

Mesures

Notes

Disponible dans Databricks Runtime 8,1 et versions ultérieures.

Vous pouvez obtenir le nombre moyen, minimal et maximal de décalages consommés parmi toutes les rubriques de la dernière version du processus de requête en continu ( https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively ) en tant avgOffsetsBehindLatest que maxOffsetsBehindLatest mesures, et minOffsetsBehindLatest . Si vous exécutez le flux dans un Notebook, vous pouvez voir ces métriques sous l’onglet données brutes dans le tableau de bord progression des requêtes de streaming:

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4"
    },
  } ]
}

Utiliser SSL

Pour activer les connexions SSL à Kafka, suivez les instructions du chiffrement de la documentation confluent et de l' authentification avec SSL. Vous pouvez fournir les configurations décrites ici, avec kafka. comme options. Par exemple, vous spécifiez l’emplacement du magasin d’approbations dans la propriété kafka.ssl.truststore.location .

Nous vous recommandons :

  • stockez vos certificats dans le stockage d’objets Blob Azure ou Azure Data Lake Storage Gen2 et accédez-y à l’aide d’un point de montage DBFS. Associé aux ACL de cluster et de travail, vous pouvez restreindre l’accès aux certificats uniquement aux clusters qui peuvent accéder à Kafka.
  • Stockez vos mots de passe de certificat en tant que secrets dans une étendue secrète.

Une fois les chemins d’accès montés et les secrets stockés, vous pouvez effectuer les opérations suivantes :

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", ...) \
  .option("kafka.ssl.truststore.location", <dbfs-truststore-location>) \
  .option("kafka.ssl.keystore.location", <dbfs-keystore-location>) \
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>)) \
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))

Ressources