Share via


Déclencheur Apache Kafka pour Azure Functions

Vous pouvez utiliser le déclencheur Apache Kafka dans Azure Functions pour exécuter votre code de fonction en réponse à des messages dans des rubriques Kafka. Vous pouvez également utiliser une liaison de sortie Kafka pour écrire de votre fonction vers une rubrique. Pour plus d’informations sur l’installation et la configuration, consultez Vue d’ensemble des liaisons Apache Kafka pour Azure Functions.

Important

Les liaisons Kafka sont disponibles uniquement pour les fonctions sur le plan Elastic Premium et le plan (App Service) dédié. Elles sont uniquement prises en charge sur la version 3.x et les versions ultérieures du runtime Functions.

Exemple

L’utilisation du déclencheur dépend de la modalité C# utilisée dans votre application de fonction, qui peut être l’un des modes suivants :

Une bibliothèque de classes de processus Worker isolé est une fonction C# compilée exécutée dans un processus Worker isolé du runtime.

Les attributs que vous utilisez dépendent du fournisseur d’événements.

L’exemple suivant montre une fonction C# qui lit et journalise le message Kafka en tant qu’événement Kafka :

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Pour recevoir des événements dans un lot, utilisez un tableau de chaînes comme entrée, tel qu’indiqué dans l’exemple suivant :

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

La fonction suivante enregistre le message et les en-têtes de l’événement Kafka :

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Pour obtenir un ensemble complet d’exemples .NET opérationnels, consultez le dépôt d’extensions Kafka.

Notes

Pour obtenir un ensemble équivalent d’exemples TypeScript, consultez le référentiel d’extensions Kafka.

Les propriétés spécifiques du fichier function.json dépendent de votre fournisseur d’événements, qui dans ces exemples est Confluent ou Azure Event Hubs. Les exemples suivants illustrent un déclencheur Kafka pour une fonction qui lit et enregistre un message Kafka.

Le fichier function.json suivant définit le déclencheur pour le fournisseur spécifique :

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

Le code suivant s’exécute ensuite lorsque la fonction est déclenchée :

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Pour recevoir des événements dans un lot, affectez la valeur many à cardinality dans le fichier function.json, comme indiqué dans les exemples suivants :

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Le code suivant analyse ensuite le tableau d’événements et enregistre les données d’événement :

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

Le code suivant enregistre également les données d’en-tête :

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. Le fichier function.json suivant définit le déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Le code suivant s’exécute ensuite lorsque la fonction est déclenchée :

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Pour obtenir un ensemble complet d’exemples JavaScript opérationnels, consultez le dépôt d’extensions Kafka.

Les propriétés spécifiques du fichier function.json dépendent de votre fournisseur d’événements, qui dans ces exemples est Confluent ou Azure Event Hubs. Les exemples suivants illustrent un déclencheur Kafka pour une fonction qui lit et enregistre un message Kafka.

Le fichier function.json suivant définit le déclencheur pour le fournisseur spécifique :

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Le code suivant s’exécute ensuite lorsque la fonction est déclenchée :

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Pour recevoir des événements dans un lot, affectez la valeur many à cardinality dans le fichier function.json, comme indiqué dans les exemples suivants :

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Le code suivant analyse ensuite le tableau d’événements et enregistre les données d’événement :

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Le code suivant enregistre également les données d’en-tête :

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. Le fichier function.json suivant définit le déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Le code suivant s’exécute ensuite lorsque la fonction est déclenchée :

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Pour obtenir un ensemble complet d’exemples PowerShell opérationnels, consultez le dépôt d’extensions Kafka.

Les propriétés spécifiques du fichier function.json dépendent de votre fournisseur d’événements, qui dans ces exemples est Confluent ou Azure Event Hubs. Les exemples suivants illustrent un déclencheur Kafka pour une fonction qui lit et enregistre un message Kafka.

Le fichier function.json suivant définit le déclencheur pour le fournisseur spécifique :

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

Le code suivant s’exécute ensuite lorsque la fonction est déclenchée :

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Pour recevoir des événements dans un lot, affectez la valeur many à cardinality dans le fichier function.json, comme indiqué dans les exemples suivants :

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

Le code suivant analyse ensuite le tableau d’événements et enregistre les données d’événement :

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Le code suivant enregistre également les données d’en-tête :

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. Le fichier function.json suivant définit le déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Le code suivant s’exécute ensuite lorsque la fonction est déclenchée :

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Pour obtenir un ensemble complet d’exemples Python opérationnels, consultez le dépôt d’extensions Kafka.

Les annotations que vous utilisez pour configurer votre déclencheur dépendent du fournisseur d’événements spécifique.

L’exemple suivant montre une fonction Java qui lit et enregistre le contenu de l’événement Kafka :

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Pour recevoir des événements dans un lot, utilisez une chaîne d’entrée en tant que tableau, comme indiqué dans l’exemple suivant :

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

La fonction suivante enregistre le message et les en-têtes de l’événement Kafka :

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. La fonction suivante définit un déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Pour obtenir un ensemble complet d’exemples Java opérationnels pour Confluent, consultez le dépôt d’extensions Kafka.

Attributs

Les bibliothèques C# in-process et de processus Worker isolé utilisent l’attribut KafkaTriggerAttribute pour définir le déclencheur de fonction.

Le tableau suivant décrit les propriétés que vous pouvez définir à l’aide de cet attribut de déclencheur :

Paramètre Description
BrokerList (Obligatoire) Liste des répartiteurs Kafka supervisés par le déclencheur. Pour plus d’informations, consultez Connexions .
Rubrique (Obligatoire) Rubrique supervisée par le déclencheur.
ConsumerGroup (Facultatif) Groupe de consommateurs Kafka utilisé par le déclencheur.
AvroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
AuthenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont Gssapi, Plain (par défaut), ScramSha256, ScramSha512.
Nom d’utilisateur (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
Mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
Protocole (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont plaintext, ssl (par défaut), sasl_plaintext, sasl_ssl.
SslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
SslCertificateLocation (Facultatif) Chemin du certificat du client.
SslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
SslKeyPassword (Facultatif) Mot de passe pour le certificat du client.

Annotations

L’annotation KafkaTrigger vous permet de créer une fonction qui s’exécute lorsqu’une rubrique est reçue. Les options prises en charge incluent les éléments suivants :

Élément Description
name (Obligatoire) Nom de la variable qui représente le message de la file d’attente ou de la rubrique dans le code de la fonction.
brokerList (Obligatoire) Liste des répartiteurs Kafka supervisés par le déclencheur. Pour plus d’informations, consultez Connexions .
topic (Obligatoire) Rubrique supervisée par le déclencheur.
cardinalité (Facultatif) Indique la cardinalité de l’entrée du déclencheur. Les valeurs prises en charge sont ONE (valeur par défaut) et MANY. Utilisez ONE quand l’entrée est un message unique et MANY lorsque l’entrée est un tableau de messages. Lorsque vous utilisez MANY, vous devez également définir un dataType.
dataType Définit comment Functions gère la valeur du paramètre. Par défaut, la valeur est obtenue sous la forme d’une chaîne, et Functions tente de désérialiser la chaîne en objet POJO (Plain-Old Java Object) réel. Quand string, l’entrée est traitée comme une simple chaîne. Quand binary, le message est reçu sous forme de données binaires, et Functions tente de le désérialiser en type de paramètre réel byte[].
consumerGroup (Facultatif) Groupe de consommateurs Kafka utilisé par le déclencheur.
avroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
authenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont Gssapi, Plain (par défaut), ScramSha256, ScramSha512.
username (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
protocol (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont plaintext, ssl (par défaut), sasl_plaintext, sasl_ssl.
sslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
sslCertificateLocation (Facultatif) Chemin du certificat du client.
sslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
sslKeyPassword (Facultatif) Mot de passe pour le certificat du client.

Configuration

Le tableau suivant décrit les propriétés de configuration de liaison que vous définissez dans le fichier function.json.

Propriété function.json Description
type (Obligatoire) Doit être défini sur kafkaTrigger.
direction (Obligatoire) Doit être défini sur in.
name (Obligatoire) Nom de la variable qui représente les données réparties dans le code de la fonction.
brokerList (Obligatoire) Liste des répartiteurs Kafka supervisés par le déclencheur. Pour plus d’informations, consultez Connexions .
topic (Obligatoire) Rubrique supervisée par le déclencheur.
cardinalité (Facultatif) Indique la cardinalité de l’entrée du déclencheur. Les valeurs prises en charge sont ONE (valeur par défaut) et MANY. Utilisez ONE quand l’entrée est un message unique et MANY lorsque l’entrée est un tableau de messages. Lorsque vous utilisez MANY, vous devez également définir un dataType.
dataType Définit comment Functions gère la valeur du paramètre. Par défaut, la valeur est obtenue sous la forme d’une chaîne, et Functions tente de désérialiser la chaîne en objet POJO (Plain-Old Java Object) réel. Quand string, l’entrée est traitée comme une simple chaîne. Quand binary, le message est reçu sous forme de données binaires, et Functions tente de le désérialiser en type de paramètre réel byte[].
consumerGroup (Facultatif) Groupe de consommateurs Kafka utilisé par le déclencheur.
avroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
authenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont Gssapi, Plain (par défaut), ScramSha256, ScramSha512.
username (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
protocol (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont plaintext, ssl (par défaut), sasl_plaintext, sasl_ssl.
sslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
sslCertificateLocation (Facultatif) Chemin du certificat du client.
sslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
sslKeyPassword (Facultatif) Mot de passe pour le certificat du client.

Utilisation

Les événements Kafka sont actuellement pris en charge en tant que chaînes et tableaux de chaînes qui sont des charges utiles JSON.

Les messages Kafka sont transmis à la fonction en tant que chaînes et tableaux de chaînes qui sont des charges utiles JSON.

Dans un plan Premium, vous devez activer la supervision de l’échelle de runtime pour que la sortie Kafka puisse subir un scale-out vers plusieurs instances. Pour plus d’informations, consultez Activer la mise à l’échelle du runtime.

Vous ne pouvez pas utiliser la fonctionnalité Tester/exécuter de la page Code + Test dans le portail Azure pour utiliser les déclencheurs Kafka. À la place, vous devez envoyer des événements de test directement à la rubrique monitorée par le déclencheur.

Pour obtenir un ensemble complet de paramètres host.json pris en charge pour le déclencheur Kafka, consultez Paramètres host.json.

Connexions

Toutes les informations de connexion requises par vos déclencheurs et liaisons doivent être conservées dans des paramètres d’application, et non dans les définitions de liaison de votre code. Cela est valable pour les informations d’identification, qui ne doivent jamais être stockées dans votre code.

Important

Les paramètres d’informations d’identification doivent référencer un paramètre d’application. Ne codez pas d’informations d’identification en dur dans vos fichiers de code ou de configuration. Lors de l’exécution locale, utilisez le fichier local.settings.json pour vos informations d’identification, et ne publiez pas le fichier local.settings.json.

Lors de la connexion à un cluster Kafka managé fourni par Confluent dans Azure, veillez à ce que les informations d’identification d’authentification suivantes pour votre environnement Confluent Cloud soient définies dans votre déclencheur ou liaison :

Paramètre Valeur recommandée Description
BrokerList BootstrapServer Le paramètre d’application nommé BootstrapServer contient la valeur du serveur de démarrage trouvé sur la page de paramètres Confluent Cloud. La valeur est semblable à xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nom d’utilisateur ConfluentCloudUsername Le paramètre d’application nommé ConfluentCloudUsername contient la clé d’accès à l’API du site web de Confluent Cloud.
Mot de passe ConfluentCloudPassword Le paramètre d’application nommé ConfluentCloudPassword contient le secret de l’API obtenu sur le site web de Confluent Cloud.

Les valeurs de chaîne que vous utilisez pour ces paramètres doivent être présentes en tant que paramètres d’application dans Azure ou dans la collection Values dans le fichier local.settings.json file pendant le développement local.

Vous devez également définir Protocol, AuthenticationMode et SslCaLocation dans vos définitions de liaison.

Étapes suivantes