Share via


Apache Kafka-Trigger für Azure Functions

Sie können den Apache Kafka-Trigger in Azure Functions verwenden, um Ihren Funktionscode als Reaktion auf Nachrichten in Kafka-Themen auszuführen. Sie können außerdem eine Kafka-Ausgabebindung verwenden, um aus Ihrer Funktion in ein Thema zu schreiben. Ausführliche Informationen zur Einrichtung und Konfiguration finden Sie unter Übersicht über Apache Kafka-Bindungen für Azure Functions.

Wichtig

Kafka-Bindungen sind für Functions nur im Elastic Premium-Plan im Dedicated-Plan (App Service) verfügbar. Sie werden nur in Version 3.x und höher der Functions-Laufzeit unterstützt.

Beispiel

Die Verwendung des Triggers hängt von der C#-Modalität ab, die in Ihrer Funktions-App verwendet wird. Dies kann einer der folgenden Modi sein:

Eine Klassenbibliothek in einem isolierten Workerprozess ist eine kompilierte C#-Funktion, die in einem von der Runtime isolierten Prozess ausgeführt wird.

Die von Ihnen verwendeten Attribute hängen vom jeweiligen Ereignisanbieter ab.

Das folgende Beispiel zeigt eine C#-Funktion, die die Kafka-Nachricht als Kafka-Ereignis liest und protokolliert:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Um Ereignisse in einem Batch zu empfangen, verwenden Sie ein Zeichenfolgenarray als Eingabe, wie im folgenden Beispiel gezeigt:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

Die folgende Funktion protokolliert die Nachricht und die Header für das Kafka-Ereignis:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Eine vollständige Sammlung funktionierender .NET-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Hinweis

Eine entsprechende Sammlung von TypeScript-Beispielen finden Sie im Kafka-Erweiterungsrepository.

Die spezifischen Eigenschaften der Datei „function.json“ hängen von Ihrem Ereignisanbieter ab, der in diesen Beispielen entweder Confluent oder Azure Event Hubs ist. Die folgenden Beispiele zeigen einen Kafka-Trigger für eine Funktion, die eine Kafka-Nachricht liest und protokolliert.

Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

Der folgende Code wird ausgeführt, wenn die Funktion ausgelöst wird:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

Legen Sie zum Empfangen von Ereignissen in einem Batch den cardinality-Wert in der Datei „function.json“ auf many fest, wie in den folgenden Beispielen gezeigt:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

Der folgende Code analysiert dann das Array der Ereignisse und protokolliert die Ereignisdaten:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

Der folgende Code protokolliert außerdem die Headerdaten:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

Sie können ein generisches Avro-Schema für das an den Trigger übergebene Ereignis definieren. Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter mit einem generischen Avro-Schema:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Der folgende Code wird ausgeführt, wenn die Funktion ausgelöst wird:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

Eine vollständige Sammlung funktionierender JavaScript-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die spezifischen Eigenschaften der Datei „function.json“ hängen von Ihrem Ereignisanbieter ab, der in diesen Beispielen entweder Confluent oder Azure Event Hubs ist. Die folgenden Beispiele zeigen einen Kafka-Trigger für eine Funktion, die eine Kafka-Nachricht liest und protokolliert.

Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Der folgende Code wird ausgeführt, wenn die Funktion ausgelöst wird:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Legen Sie zum Empfangen von Ereignissen in einem Batch den cardinality-Wert in der Datei „function.json“ auf many fest, wie in den folgenden Beispielen gezeigt:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Der folgende Code analysiert dann das Array der Ereignisse und protokolliert die Ereignisdaten:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Der folgende Code protokolliert außerdem die Headerdaten:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Sie können ein generisches Avro-Schema für das an den Trigger übergebene Ereignis definieren. Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter mit einem generischen Avro-Schema:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Der folgende Code wird ausgeführt, wenn die Funktion ausgelöst wird:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Eine vollständige Sammlung funktionierender PowerShell-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die spezifischen Eigenschaften der Datei „function.json“ hängen von Ihrem Ereignisanbieter ab, der in diesen Beispielen entweder Confluent oder Azure Event Hubs ist. Die folgenden Beispiele zeigen einen Kafka-Trigger für eine Funktion, die eine Kafka-Nachricht liest und protokolliert.

Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

Der folgende Code wird ausgeführt, wenn die Funktion ausgelöst wird:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Legen Sie zum Empfangen von Ereignissen in einem Batch den cardinality-Wert in der Datei „function.json“ auf many fest, wie in den folgenden Beispielen gezeigt:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

Der folgende Code analysiert dann das Array der Ereignisse und protokolliert die Ereignisdaten:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Der folgende Code protokolliert außerdem die Headerdaten:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

Sie können ein generisches Avro-Schema für das an den Trigger übergebene Ereignis definieren. Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter mit einem generischen Avro-Schema:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Der folgende Code wird ausgeführt, wenn die Funktion ausgelöst wird:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Eine vollständige Sammlung funktionierender Python-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die Anmerkungen, die Sie zum Konfigurieren Ihres Triggers verwenden, hängen vom jeweiligen Ereignisanbieter ab.

Das folgende Beispiel zeigt eine Java-Funktion, die den Inhalt des Kafka-Ereignisses liest und protokolliert:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Verwenden Sie zum Empfangen von Ereignissen in einem Batch eine Eingabezeichenfolge als Array, wie im folgenden Beispiel gezeigt:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

Die folgende Funktion protokolliert die Nachricht und die Header für das Kafka-Ereignis:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Sie können ein generisches Avro-Schema für das an den Trigger übergebene Ereignis definieren. Die folgende Funktion definiert einen Trigger für den spezifischen Anbieter mit einem generischen Avro-Schema:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Eine vollständige Sammlung funktionierender Java-Beispiele für Confluent finden Sie im Kafka-Erweiterungsrepository.

Attribute

Sowohl C#-Bibliotheken des Typs In-Process als auch C#-Bibliotheken des Typs Isolierter Workerprozess verwenden KafkaTriggerAttribute zum Definieren des Funktionstriggers.

In der folgenden Tabelle werden die Eigenschaften erläutert, die mithilfe des Triggerattributs festgelegt werden können:

Parameter BESCHREIBUNG
BrokerList (Erforderlich) Die Liste der vom Trigger überwachten Kafka-Broker. Weitere Informationen finden Sie unter Verbindungen.
Thema (Erforderlich) Das Thema, das vom Trigger überwacht wird.
ConsumerGroup (Optional) Kafka-Consumergruppe, die vom Trigger verwendet wird.
AvroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird.
AuthenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Unterstützt werden die Werte Gssapi, Plain (Standardwert), ScramSha256 und ScramSha512.
Benutzername (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
Kennwort (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
Protokoll (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Unterstützt werden die Werte plaintext (Standardwert), ssl, sasl_plaintext und sasl_ssl.
SslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
SslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
SslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
SslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.

Anmerkungen

Mit der KafkaTrigger-Anmerkung können Sie eine Funktion erstellen, die ausgeführt wird, wenn ein Thema empfangen wird. Unterstützte Optionen umfassen die folgenden Elemente:

Element BESCHREIBUNG
name (Erforderlich) Der Name der Variablen, die die Warteschlangen- oder Themanachricht im Funktionscode darstellt.
brokerList (Erforderlich) Die Liste der vom Trigger überwachten Kafka-Broker. Weitere Informationen finden Sie unter Verbindungen.
topic (Erforderlich) Das Thema, das vom Trigger überwacht wird.
cardinality (Optional) Gibt die Kardinalität der Triggereingabe an. Die unterstützte Werte lauten ONE (Standardwert) und MANY. Verwenden Sie ONE, wenn die Eingabe eine einzelne Nachricht ist und MANY, wenn die Eingabe ein Array von Nachrichten ist. Wenn Sie MANY verwenden, müssen Sie auch einen dataType festlegen.
dataType Definiert, wie Functions den Parameterwert verarbeitet. Standardmäßig wird der Wert als Zeichenfolge abgerufen, und Functions versucht, die Zeichenfolge in das tatsächliche POJO (Plain-Old Java Object) zu deserialisieren. Bei string wird die Eingabe nur als Zeichenfolge behandelt. Bei binary wird die Nachricht als Binärdaten empfangen, und Functions versucht, sie in einen tatsächlichen Parametertyp byte[] zu deserialisieren.
consumerGroup (Optional) Kafka-Consumergruppe, die vom Trigger verwendet wird.
avroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird.
authenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Unterstützt werden die Werte Gssapi, Plain (Standardwert), ScramSha256 und ScramSha512.
username (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
password (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
protocol (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Unterstützt werden die Werte plaintext (Standardwert), ssl, sasl_plaintext und sasl_ssl.
sslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
sslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
sslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
sslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.

Konfiguration

Die folgende Tabelle gibt Aufschluss über die Bindungskonfigurationseigenschaften, die Sie in der Datei function.json festlegen.

function.json-Eigenschaft BESCHREIBUNG
type (Erforderlich) Muss auf kafkaTrigger festgelegt sein.
direction (Erforderlich) Muss auf in festgelegt sein.
name (Erforderlich) Der Name der Variablen, die die Brokerdaten im Funktionscode darstellt.
brokerList (Erforderlich) Die Liste der vom Trigger überwachten Kafka-Broker. Weitere Informationen finden Sie unter Verbindungen.
topic (Erforderlich) Das Thema, das vom Trigger überwacht wird.
cardinality (Optional) Gibt die Kardinalität der Triggereingabe an. Die unterstützte Werte lauten ONE (Standardwert) und MANY. Verwenden Sie ONE, wenn die Eingabe eine einzelne Nachricht ist und MANY, wenn die Eingabe ein Array von Nachrichten ist. Wenn Sie MANY verwenden, müssen Sie auch einen dataType festlegen.
dataType Definiert, wie Functions den Parameterwert verarbeitet. Standardmäßig wird der Wert als Zeichenfolge abgerufen, und Functions versucht, die Zeichenfolge in das tatsächliche POJO (Plain-Old Java Object) zu deserialisieren. Bei string wird die Eingabe nur als Zeichenfolge behandelt. Bei binary wird die Nachricht als Binärdaten empfangen, und Functions versucht, sie in einen tatsächlichen Parametertyp byte[] zu deserialisieren.
consumerGroup (Optional) Kafka-Consumergruppe, die vom Trigger verwendet wird.
avroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird.
authenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Unterstützt werden die Werte Gssapi, Plain (Standardwert), ScramSha256 und ScramSha512.
username (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
password (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
protocol (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Unterstützt werden die Werte plaintext (Standardwert), ssl, sasl_plaintext und sasl_ssl.
sslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
sslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
sslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
sslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.

Verwendung

Kafka-Ereignisse werden derzeit als Zeichenfolgen und Zeichenfolgenarrays unterstützt, die JSON-Nutzdaten sind.

Kafka-Nachrichten werden der Funktion als Zeichenfolgen und Zeichenfolgenarrays übergeben, die JSON-Nutzdaten sind.

In einem Premium-Plan müssen Sie die Laufzeitskalierungsüberwachung für die Kafka-Ausgabe aktivieren, um auf mehrere Instanzen aufzuskalieren. Weitere Informationen finden Sie unter Aktivieren der Laufzeitskalierung.

Sie können das Feature Testen/Ausführen auf der Seite Programmieren und testen im Azure-Portal nicht verwenden, um mit Kafka-Triggern zu arbeiten. Sie müssen stattdessen Testereignisse direkt an das Thema senden, das vom Trigger überwacht wird.

Eine vollständige Liste der unterstützten host.json-Einstellungen für den Kafka-Trigger finden Sie unter host.json-Einstellungen.

Verbindungen

Alle von Ihren Triggern und Bindungen benötigten Verbindungsinformationen sollten in den Anwendungseinstellungen und nicht in den Bindungsdefinitionen in Ihrem Code verwaltet werden. Dies gilt auch für Anmeldeinformationen, die niemals in Ihrem Code gespeichert werden sollten.

Wichtig

Einstellungen für Anmeldeinformationen müssen auf eine Anwendungseinstellung verweisen. Stellen Sie keine hartcodierten Anmeldeinformationen in Ihrem Code oder Ihren Konfigurationsdateien bereit. Wenn die Ausführung lokal erfolgt, verwenden Sie die Datei local.settings.json für Ihre Anmeldeinformationen, und veröffentlichen Sie die Datei „local.settings.json“ nicht.

Wenn Sie eine Verbindung mit einem von Confluent in Azure bereitgestellten verwalteten Kafka-Cluster herstellen, stellen Sie sicher, dass die folgenden Authentifizierungsanmeldeinformationen für Ihre Confluent Cloud-Umgebung in Ihrem Trigger oder Ihrer Bindung festgelegt sind:

Einstellung Empfohlener Wert BESCHREIBUNG
BrokerList BootstrapServer Die App-Einstellung namens BootstrapServer enthält den Wert des Bootstrapservers, der auf der Seite mit den Confluent Cloud-Einstellungen gefunden wurde. Der Wert ähnelt xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Benutzername ConfluentCloudUsername Die App-Einstellung namens ConfluentCloudUsername enthält den API-Zugriffsschlüssel von der Confluent Cloud-Website.
Kennwort ConfluentCloudPassword Die App-Einstellung namens ConfluentCloudPassword enthält das API-Geheimnis, das von der Confluent Cloud-Website abgerufen wurde.

Die für diese Einstellungen verwendeten Zeichenfolgenwerte müssen während der lokalen Entwicklung als Anwendungseinstellungen in Azure oder in der Values-Sammlung in der Datei local.settings.json vorhanden sein.

Sie sollten auch Protocol, AuthenticationMode und SslCaLocation in Ihren Bindungsdefinitionen festlegen.

Nächste Schritte