Apache Kafka trigger for Azure Functions

You can use the Apache Kafka trigger in Azure Functions to run your function code in response to messages in Kafka topics. You can also use a Kafka output binding to write from your function to a topic. For information on setup and configuration details, see Apache Kafka bindings for Azure Functions overview.

Important

Kafka bindings are only available for Functions on the Elastic Premium Plan and Dedicated (App Service) plan. They are only supported on version 3.x and later version of the Functions runtime.

Example

The usage of the trigger depends on the C# modality used in your function app, which can be one of the following modes:

An isolated worker process class library compiled C# function runs in a process isolated from the runtime.

The attributes you use depend on the specific event provider.

The following example shows a C# function that reads and logs the Kafka message as a Kafka event:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

To receive events in a batch, use a string array as input, as shown in the following example:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

The following function logs the message and headers for the Kafka Event:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

For a complete set of working .NET examples, see the Kafka extension repository.

Note

For an equivalent set of TypeScript examples, see the Kafka extension repository

The specific properties of the function.json file depend on your event provider, which in these examples are either Confluent or Azure Event Hubs. The following examples show a Kafka trigger for a function that reads and logs a Kafka message.

The following function.json defines the trigger for the specific provider:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

The following code then runs when the function is triggered:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

To receive events in a batch, set the cardinality value to many in the function.json file, as shown in the following examples:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

The following code then parses the array of events and logs the event data:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

The following code also logs the header data:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

You can define a generic Avro schema for the event passed to the trigger. The following function.json defines the trigger for the specific provider with a generic Avro schema:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

The following code then runs when the function is triggered:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

For a complete set of working JavaScript examples, see the Kafka extension repository.

The specific properties of the function.json file depend on your event provider, which in these examples are either Confluent or Azure Event Hubs. The following examples show a Kafka trigger for a function that reads and logs a Kafka message.

The following function.json defines the trigger for the specific provider:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

The following code then runs when the function is triggered:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

To receive events in a batch, set the cardinality value to many in the function.json file, as shown in the following examples:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

The following code then parses the array of events and logs the event data:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

The following code also logs the header data:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

You can define a generic Avro schema for the event passed to the trigger. The following function.json defines the trigger for the specific provider with a generic Avro schema:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

The following code then runs when the function is triggered:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

For a complete set of working PowerShell examples, see the Kafka extension repository.

The specific properties of the function.json file depend on your event provider, which in these examples are either Confluent or Azure Event Hubs. The following examples show a Kafka trigger for a function that reads and logs a Kafka message.

The following function.json defines the trigger for the specific provider:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

The following code then runs when the function is triggered:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

To receive events in a batch, set the cardinality value to many in the function.json file, as shown in the following examples:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

The following code then parses the array of events and logs the event data:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

The following code also logs the header data:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

You can define a generic Avro schema for the event passed to the trigger. The following function.json defines the trigger for the specific provider with a generic Avro schema:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

The following code then runs when the function is triggered:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

For a complete set of working Python examples, see the Kafka extension repository.

The annotations you use to configure your trigger depend on the specific event provider.

The following example shows a Java function that reads and logs the content of the Kafka event:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

To receive events in a batch, use an input string as an array, as shown in the following example:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

The following function logs the message and headers for the Kafka Event:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

You can define a generic Avro schema for the event passed to the trigger. The following function defines a trigger for the specific provider with a generic Avro schema:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

For a complete set of working Java examples for Confluent, see the Kafka extension repository.

Attributes

Both in-process and isolated worker process C# libraries use the KafkaTriggerAttribute to define the function trigger.

The following table explains the properties you can set using this trigger attribute:

Parameter Description
BrokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
Topic (Required) The topic monitored by the trigger.
ConsumerGroup (Optional) Kafka consumer group used by the trigger.
AvroSchema (Optional) Schema of a generic record when using the Avro protocol.
AuthenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi, Plain (default), ScramSha256, ScramSha512.
Username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
Password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
Protocol (Optional) The security protocol used when communicating with brokers. The supported values are plaintext (default), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
SslCertificateLocation (Optional) Path to the client's certificate.
SslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
SslKeyPassword (Optional) Password for client's certificate.

Annotations

The KafkaTrigger annotation allows you to create a function that runs when a topic is received. Supported options include the following elements:

Element Description
name (Required) The name of the variable that represents the queue or topic message in function code.
brokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType.
dataType Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[].
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi, Plain (default), ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are plaintext (default), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.

Configuration

The following table explains the binding configuration properties that you set in the function.json file.

function.json property Description
type (Required) Must be set to kafkaTrigger.
direction (Required) Must be set to in.
name (Required) The name of the variable that represents the brokered data in function code.
brokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType.
dataType Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[].
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi, Plain (default), ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are plaintext (default), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.

Usage

Kafka events are currently supported as strings and string arrays that are JSON payloads.

Kafka messages are passed to the function as strings and string arrays that are JSON payloads.

In a Premium plan, you must enable runtime scale monitoring for the Kafka output to be able to scale out to multiple instances. To learn more, see Enable runtime scaling.

You can't use the Test/Run feature of the Code + Test page in the Azure Portal to work with Kafka triggers. You must instead send test events directly to the topic being monitored by the trigger.

For a complete set of supported host.json settings for the Kafka trigger, see host.json settings.

Connections

All connection information required by your triggers and bindings should be maintained in application settings and not in the binding definitions in your code. This is true for credentials, which should never be stored in your code.

Important

Credential settings must reference an application setting. Don't hard-code credentials in your code or configuration files. When running locally, use the local.settings.json file for your credentials, and don't publish the local.settings.json file.

When connecting to a managed Kafka cluster provided by Confluent in Azure, make sure that the following authentication credentials for your Confluent Cloud environment are set in your trigger or binding:

Setting Recommended value Description
BrokerList BootstrapServer App setting named BootstrapServer contains the value of bootstrap server found in Confluent Cloud settings page. The value resembles xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Username ConfluentCloudUsername App setting named ConfluentCloudUsername contains the API access key from the Confluent Cloud web site.
Password ConfluentCloudPassword App setting named ConfluentCloudPassword contains the API secret obtained from the Confluent Cloud web site.

The string values you use for these settings must be present as application settings in Azure or in the Values collection in the local.settings.json file during local development.

You should also set the Protocol, AuthenticationMode, and SslCaLocation in your binding definitions.

Next steps