Aracılığıyla paylaş


Azure İşlevleri için Apache Kafka çıkış bağlaması

Çıkış bağlaması, bir Azure İşlevleri uygulamasının Kafka konusuna ileti yazmasına olanak tanır.

Önemli

Kafka bağlamaları yalnızca Elastik Premium Plan ve Ayrılmış (App Service) planındaki İşlevler için kullanılabilir. Bunlar yalnızca İşlevler çalışma zamanının 3.x ve sonraki sürümlerinde desteklenir.

Örnek

Bağlamanın kullanımı, işlev uygulamanızda kullanılan C# modalitesine bağlıdır ve bu da aşağıdakilerden biri olabilir:

Yalıtılmış çalışan işlem sınıfı kitaplığı derlenmiş C# işlevi çalışma zamanından yalıtılmış bir işlemde çalışır.

Kullandığınız öznitelikler belirli olay sağlayıcısına bağlıdır.

Aşağıdaki örnekte, HTTP yanıtı ve Kafka çıkışından oluşan özel bir dönüş türü MultipleOutputTypevardır.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

sınıfında MultipleOutputType, Kevent Kafka bağlaması için çıkış bağlama değişkenidir.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Bir toplu olay göndermek için, aşağıdaki örnekte gösterildiği gibi çıkış türüne bir dize dizisi geçirin:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Dize dizisi, çıkış bağlamasının tanımlandığı sınıfında özellik olarak Kevents tanımlanır:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Aşağıdaki işlev Kafka çıkış verilerine üst bilgiler ekler:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Çalışan .NET örneklerinin tamamı için kafka uzantısı deposuna bakın.

Not

Eşdeğer bir TypeScript örnekleri kümesi için bkz . Kafka uzantısı deposu

function.json dosyasının belirli özellikleri, bu örneklerde Confluent veya Azure Event Hubs olan olay sağlayıcınıza bağlıdır. Aşağıdaki örneklerde, HTTP isteği tarafından tetiklenen ve istekten Kafka konusuna veri gönderen bir işlev için Kafka çıkış bağlaması gösterilmektedir.

Aşağıdaki function.json, bu örneklerde belirli bir sağlayıcının tetikleyicisini tanımlar:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Ardından aşağıdaki kod konuya bir ileti gönderir:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Aşağıdaki kod aynı konuya dizi olarak birden çok ileti gönderir:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Aşağıdaki örnekte, aynı Kafka konusuna üst bilgi içeren bir olay iletisinin nasıl gönder adımları gösterilmektedir:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Çalışan JavaScript örneklerinin tamamı için kafka uzantısı deposuna bakın.

function.json dosyasının belirli özellikleri, bu örneklerde Confluent veya Azure Event Hubs olan olay sağlayıcınıza bağlıdır. Aşağıdaki örneklerde, HTTP isteği tarafından tetiklenen ve istekten Kafka konusuna veri gönderen bir işlev için Kafka çıkış bağlaması gösterilmektedir.

Aşağıdaki function.json, bu örneklerde belirli bir sağlayıcının tetikleyicisini tanımlar:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Ardından aşağıdaki kod konuya bir ileti gönderir:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Aşağıdaki kod aynı konuya dizi olarak birden çok ileti gönderir:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Aşağıdaki örnekte, aynı Kafka konusuna üst bilgi içeren bir olay iletisinin nasıl gönder adımları gösterilmektedir:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Çalışan PowerShell örneklerinin tamamı için kafka uzantısı deposuna bakın.

function.json dosyasının belirli özellikleri, bu örneklerde Confluent veya Azure Event Hubs olan olay sağlayıcınıza bağlıdır. Aşağıdaki örneklerde, HTTP isteği tarafından tetiklenen ve istekten Kafka konusuna veri gönderen bir işlev için Kafka çıkış bağlaması gösterilmektedir.

Aşağıdaki function.json, bu örneklerde belirli bir sağlayıcının tetikleyicisini tanımlar:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Ardından aşağıdaki kod konuya bir ileti gönderir:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Aşağıdaki kod aynı konuya dizi olarak birden çok ileti gönderir:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

Aşağıdaki örnekte, aynı Kafka konusuna üst bilgi içeren bir olay iletisinin nasıl gönder adımları gösterilmektedir:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Çalışan Python örneklerinin tamamı için kafka uzantısı deposuna bakın.

Çıkış bağlamasını yapılandırmak için kullandığınız ek açıklamalar belirli olay sağlayıcısına bağlıdır.

Aşağıdaki işlev Kafka konusuna bir ileti gönderir.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

Aşağıdaki örnekte, bir Kafka konusuna birden çok ileti gönderme adımları gösterilmektedir.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

Bu örnekte çıkış bağlama parametresi dize dizisi olarak değiştirilmiştir.

Son örnekte ve KafkaEntityKafkaHeader sınıfları için kullanılır:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Aşağıdaki örnek işlev, bir Kafka konusuna üst bilgiler içeren bir ileti gönderir.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Confluent için çalışan java örneklerinin tamamı için Kafka uzantısı deposuna bakın.

Özellikler

hem işlem içi hem de yalıtılmış çalışan işlemi C# kitaplıkları işlev tetikleyicisini Kafka tanımlamak için özniteliğini kullanır.

Aşağıdaki tabloda, bu özniteliği kullanarak ayarlayabileceğiniz özellikler açıklanmaktadır:

Parametre Açıklama
BrokerList (Gerekli) Çıkışın gönderildiği Kafka aracılarının listesi. Daha fazla bilgi için bkz. Bağlan ions.
Konu (Gerekli) Çıkışın gönderildiği konu.
AvroSchema (İsteğe bağlı) Avro protokolü kullanılırken genel bir kaydın şeması.
MaxMessageBytes (İsteğe bağlı) Varsayılan değeri olan, gönderilen çıktı iletisinin en büyük boyutu (MB cinsinden).1
Batchsize (İsteğe bağlı) Varsayılan değeri 10000olan tek bir ileti kümesinde toplu işlenen en fazla ileti sayısı.
EnableIdempotence (İsteğe bağlı) olarak trueayarlandığında, iletilerin tam olarak bir kez ve varsayılan değerle özgün üretim sırasında başarıyla üretildiğini garanti eder false
MessageTimeoutMs (İsteğe bağlı) Milisaniye cinsinden yerel ileti zaman aşımı. Bu değer yalnızca yerel olarak zorlanır ve oluşturulan iletinin başarılı teslimi bekleme süresini varsayılan 300000olarak sınırlar. Sonsuz bir 0 zaman. Bu değer, ileti teslim etmek için kullanılan en uzun süredir (yeniden denemeler dahil). Yeniden deneme sayısı veya ileti zaman aşımı aşıldığında teslim hatası oluşur.
RequestTimeoutMs (İsteğe bağlı) Çıkış isteğinin bildirim zaman aşımı milisaniye cinsinden varsayılan değeri 5000olur.
MaxRetries (İsteğe bağlı) Varsayılan 2değeri olan başarısız bir İleti göndermeyi yeniden deneme sayısı. yeniden deneme, olarak ayarlanmadığı sürece EnableIdempotence yeniden sıralamaya trueneden olabilir.
Authenticationmode (İsteğe bağlı) Basit Kimlik Doğrulaması ve Güvenlik Katmanı (SASL) kimlik doğrulaması kullanılırken kullanılan kimlik doğrulama modu. Desteklenen değerler şunlardır: Gssapi, Plain (varsayılan), ScramSha256, ScramSha512.
Kullanıcı adı (İsteğe bağlı) SASL kimlik doğrulaması için kullanıcı adı. olduğunda AuthenticationModeGssapidesteklenmez. Daha fazla bilgi için bkz. Bağlan ions.
Parola (İsteğe bağlı) SASL kimlik doğrulamasının parolası. olduğunda AuthenticationModeGssapidesteklenmez. Daha fazla bilgi için bkz. Bağlan ions.
Protokol (İsteğe bağlı) Aracılarla iletişim kurarken kullanılan güvenlik protokolü. Desteklenen değerler şunlardır plaintext : (varsayılan), ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (İsteğe bağlı) Aracının sertifikasını doğrulamak için CA sertifika dosyasının yolu.
SslCertificateLocation (İsteğe bağlı) İstemci sertifikasının yolu.
SslKeyLocation (İsteğe bağlı) Kimlik doğrulaması için kullanılan istemcinin özel anahtarının (PEM) yolu.
SslKeyPassword (İsteğe bağlı) İstemci sertifikasının parolası.

Ek Açıklamalar

Ek KafkaOutput açıklama, belirli bir konuya yazan bir işlev oluşturmanıza olanak tanır. Desteklenen seçenekler aşağıdaki öğeleri içerir:

Öğe Açıklama
Adı İşlev kodunda aracılı verileri temsil eden değişkenin adı.
brokerList (Gerekli) Çıkışın gönderildiği Kafka aracılarının listesi. Daha fazla bilgi için bkz. Bağlan ions.
topic (Gerekli) Çıkışın gönderildiği konu.
Datatype İşlevler'in parametre değerini nasıl işlediğini tanımlar. Varsayılan olarak, değer bir dize olarak elde edilir ve İşlevler dizeyi gerçek düz eski Java nesnesine (POJO) seri durumdan çıkarmaya çalışır. olduğunda string, giriş yalnızca bir dize olarak değerlendirilir. olduğunda binary, ileti ikili veri olarak alınır ve İşlevler bunu gerçek parametre türü bayt[] olarak seri durumdan çıkarmaya çalışır.
avroSchema (İsteğe bağlı) Avro protokolü kullanılırken genel bir kaydın şeması. (Şu anda Java için desteklenmiyor.)
maxMessageBytes (İsteğe bağlı) Varsayılan değeri olan, gönderilen çıktı iletisinin en büyük boyutu (MB cinsinden).1
Batchsize (İsteğe bağlı) Varsayılan değeri 10000olan tek bir ileti kümesinde toplu işlenen en fazla ileti sayısı.
enableIdempotence (İsteğe bağlı) olarak trueayarlandığında, iletilerin tam olarak bir kez ve varsayılan değerle özgün üretim sırasında başarıyla üretildiğini garanti eder false
messageTimeoutMs (İsteğe bağlı) Milisaniye cinsinden yerel ileti zaman aşımı. Bu değer yalnızca yerel olarak zorlanır ve oluşturulan iletinin başarılı teslimi bekleme süresini varsayılan 300000olarak sınırlar. Sonsuz bir 0 zaman. Bu, ileti teslim etmek için kullanılan en uzun süredir (yeniden denemeler dahil). Yeniden deneme sayısı veya ileti zaman aşımı aşıldığında teslim hatası oluşur.
requestTimeoutMs (İsteğe bağlı) Çıkış isteğinin bildirim zaman aşımı milisaniye cinsinden varsayılan değeri 5000olur.
maxRetries (İsteğe bağlı) Varsayılan 2değeri olan başarısız bir İleti göndermeyi yeniden deneme sayısı. yeniden deneme, olarak ayarlanmadığı sürece EnableIdempotence yeniden sıralamaya trueneden olabilir.
Authenticationmode (İsteğe bağlı) Basit Kimlik Doğrulaması ve Güvenlik Katmanı (SASL) kimlik doğrulaması kullanılırken kullanılan kimlik doğrulama modu. Desteklenen değerler şunlardır: Gssapi, Plain (varsayılan), ScramSha256, ScramSha512.
Username (İsteğe bağlı) SASL kimlik doğrulaması için kullanıcı adı. olduğunda AuthenticationModeGssapidesteklenmez. Daha fazla bilgi için bkz. Bağlan ions.
Parola (İsteğe bağlı) SASL kimlik doğrulamasının parolası. olduğunda AuthenticationModeGssapidesteklenmez. Daha fazla bilgi için bkz. Bağlan ions.
Protokolü (İsteğe bağlı) Aracılarla iletişim kurarken kullanılan güvenlik protokolü. Desteklenen değerler şunlardır plaintext : (varsayılan), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (İsteğe bağlı) Aracının sertifikasını doğrulamak için CA sertifika dosyasının yolu.
sslCertificateLocation (İsteğe bağlı) İstemci sertifikasının yolu.
sslKeyLocation (İsteğe bağlı) Kimlik doğrulaması için kullanılan istemcinin özel anahtarının (PEM) yolu.
sslKeyPassword (İsteğe bağlı) İstemci sertifikasının parolası.

Yapılandırma

Aşağıdaki tabloda, function.json dosyasında ayarladığınız bağlama yapılandırma özellikleri açıklanmaktadır.

function.json özelliği Açıklama
type olarak ayarlanmalıdır kafka.
direction olarak ayarlanmalıdır out.
Adı İşlev kodunda aracılı verileri temsil eden değişkenin adı.
brokerList (Gerekli) Çıkışın gönderildiği Kafka aracılarının listesi. Daha fazla bilgi için bkz. Bağlan ions.
topic (Gerekli) Çıkışın gönderildiği konu.
avroSchema (İsteğe bağlı) Avro protokolü kullanılırken genel bir kaydın şeması.
maxMessageBytes (İsteğe bağlı) Varsayılan değeri olan, gönderilen çıktı iletisinin en büyük boyutu (MB cinsinden).1
Batchsize (İsteğe bağlı) Varsayılan değeri 10000olan tek bir ileti kümesinde toplu işlenen en fazla ileti sayısı.
enableIdempotence (İsteğe bağlı) olarak trueayarlandığında, iletilerin tam olarak bir kez ve varsayılan değerle özgün üretim sırasında başarıyla üretildiğini garanti eder false
messageTimeoutMs (İsteğe bağlı) Milisaniye cinsinden yerel ileti zaman aşımı. Bu değer yalnızca yerel olarak zorlanır ve oluşturulan iletinin başarılı teslimi bekleme süresini varsayılan 300000olarak sınırlar. Sonsuz bir 0 zaman. Bu, ileti teslim etmek için kullanılan en uzun süredir (yeniden denemeler dahil). Yeniden deneme sayısı veya ileti zaman aşımı aşıldığında teslim hatası oluşur.
requestTimeoutMs (İsteğe bağlı) Çıkış isteğinin bildirim zaman aşımı milisaniye cinsinden varsayılan değeri 5000olur.
maxRetries (İsteğe bağlı) Varsayılan 2değeri olan başarısız bir İleti göndermeyi yeniden deneme sayısı. yeniden deneme, olarak ayarlanmadığı sürece EnableIdempotence yeniden sıralamaya trueneden olabilir.
Authenticationmode (İsteğe bağlı) Basit Kimlik Doğrulaması ve Güvenlik Katmanı (SASL) kimlik doğrulaması kullanılırken kullanılan kimlik doğrulama modu. Desteklenen değerler şunlardır: Gssapi, Plain (varsayılan), ScramSha256, ScramSha512.
Username (İsteğe bağlı) SASL kimlik doğrulaması için kullanıcı adı. olduğunda AuthenticationModeGssapidesteklenmez. Daha fazla bilgi için bkz. Bağlan ions.
Parola (İsteğe bağlı) SASL kimlik doğrulamasının parolası. olduğunda AuthenticationModeGssapidesteklenmez. Daha fazla bilgi için bkz. Bağlan ions.
Protokolü (İsteğe bağlı) Aracılarla iletişim kurarken kullanılan güvenlik protokolü. Desteklenen değerler şunlardır plaintext : (varsayılan), ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (İsteğe bağlı) Aracının sertifikasını doğrulamak için CA sertifika dosyasının yolu.
sslCertificateLocation (İsteğe bağlı) İstemci sertifikasının yolu.
sslKeyLocation (İsteğe bağlı) Kimlik doğrulaması için kullanılan istemcinin özel anahtarının (PEM) yolu.
sslKeyPassword (İsteğe bağlı) İstemci sertifikasının parolası.

Kullanım

Yerleşik Avrove Protobuf serileştirmesi ile hem anahtarlar hem de değer türleri desteklenir.

Olayın uzaklığı, bölümü ve zaman damgası çalışma zamanında oluşturulur. İşlevin içinde yalnızca değer ve üst bilgiler ayarlanabilir. Konu function.json ayarlanır.

Lütfen yazmaya çalıştığınız Kafka konusuna erişiminiz olduğundan emin olun. Bağlamayı Kafka konusuna erişim ve bağlantı kimlik bilgileriyle yapılandırabilirsiniz.

Premium planda, Kafka çıkışının birden çok örneğe ölçeği genişletebilmesi için çalışma zamanı ölçeği izlemeyi etkinleştirmeniz gerekir. Daha fazla bilgi edinmek için bkz . Çalışma zamanı ölçeklendirmesini etkinleştirme.

Kafka tetikleyicisi için desteklenen host.json ayarlarının tamamı için bkz . host.json ayarları.

Bağlantılar

Tetikleyicileriniz ve bağlamalarınız için gereken tüm bağlantı bilgileri, kodunuzdaki bağlama tanımlarında değil uygulama ayarlarında tutulmalıdır. Bu, kodunuzda hiçbir zaman depolanmaması gereken kimlik bilgileri için geçerlidir.

Önemli

Kimlik bilgileri ayarları bir uygulama ayarına başvurmalıdır. Kod veya yapılandırma dosyalarınızda kimlik bilgilerini sabit kodlamayın. Yerel olarak çalışırken, kimlik bilgileriniz için local.settings.json dosyasını kullanın ve local.settings.json dosyasını yayımlamayın.

Azure'da Confluent tarafından sağlanan yönetilen bir Kafka kümesine bağlanırken, Confluent Cloud ortamınız için aşağıdaki kimlik doğrulama kimlik bilgilerinin tetikleyicinizde veya bağlamanızda ayarlandığından emin olun:

Ayar Önerilen değer Açıklama
BrokerList BootstrapServer adlı BootstrapServer uygulama ayarı, Confluent Bulut ayarları sayfasında bulunan bootstrap sunucusunun değerini içerir. değerine benzer.xyz-xyzxzy.westeurope.azure.confluent.cloud:9092
Kullanıcı adı ConfluentCloudUsername adlı ConfluentCloudUsername uygulama ayarı Confluent Cloud web sitesinden API erişim anahtarını içerir.
Parola ConfluentCloudPassword adlı ConfluentCloudPassword uygulama ayarı Confluent Cloud web sitesinden alınan API gizli dizisini içerir.

Bu ayarlar için kullandığınız dize değerleri, yerel geliştirme sırasında Azure'da veya Values local.settings.json dosyasındakikoleksiyonda uygulama ayarları olarak bulunmalıdır.

Bağlama tanımlarınızda , AuthenticationModeve SslCaLocation değerlerini de ayarlamanız Protocolgerekir.

Sonraki adımlar