你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

教程:创建并使用 Apache Kafka 服务进行开发

Azure 容器应用允许连接到开发和生产级服务,为应用程序提供各种功能。

本教程介绍如何创建和使用开发 Apache Kafka 服务。

本教程重点介绍了 Azure CLI 命令和 Bicep 模板片段。 如果使用 Bicep,可以将所有片段添加到单个 Bicep 文件,并一次性部署所有模板

  • 创建容器应用环境以部署服务和容器应用
  • 创建 Apache Kafka 服务
  • 设置命令行应用以使用开发 Apache Kafka 服务
  • 部署 kafka-ui 应用以查看应用程序数据
  • 编译最终的 bicep 模板,以使用一致且可预测的模板部署来部署所有资源
  • 使用 azd 模板对所有资源进行单命令部署

先决条件

注意

对于单命令部署,请跳到最后一个azd模板步骤

设置

  1. 为通用值定义变量。

    RESOURCE_GROUP="kafka-dev"
    LOCATION="northcentralus"
    ENVIRONMENT="aca-env"
    KAFKA_SVC="kafka01"
    KAFKA_CLI_APP="kafka-cli-app"
    KAFKA_UI_APP="kafka-ui-app"
    
  2. 登录 Azure。

    az login
    
  3. 将 CLI 升级到最新版本。

    az upgrade
    
  4. 将 Bicep 升级到最新版本。

    az bicep upgrade
    
  5. 添加 containerapp 扩展。

    az extension add --name containerapp --upgrade
    
  6. 注册所需的命名空间。

    az provider register --namespace Microsoft.App
    
    az provider register --namespace Microsoft.OperationalInsights
    

创建容器应用环境

  1. 创建资源组。

    az group create \
        --name "$RESOURCE_GROUP" \
        --location "$LOCATION"
    
  2. 创建容器应用环境。

    az containerapp env create \
      --name "$ENVIRONMENT" \
      --resource-group "$RESOURCE_GROUP" \
      --location "$LOCATION"
    

创建 Apache Kafka 服务

  1. 创建 Apache Kafka 服务。

    ENVIRONMENT_ID=$(az containerapp env show \
      --name "$ENVIRONMENT" \
      --resource-group "$RESOURCE_GROUP" \
      --output tsv \
      --query id)
    
  2. 部署模板。

        az containerapp add-on kafka create \
        --name "$KAFKA_SVC" \
        --resource-group "$RESOURCE_GROUP" \
        --environment "$ENVIRONMENT"
    
  3. 查看 Kafka 实例的日志输出

    使用 logs 命令查看日志消息。

    az containerapp logs show \
        --name $KAFKA_SVC \
        --resource-group $RESOURCE_GROUP \
        --follow --tail 30
    

    容器应用 kafka 服务日志的屏幕截图。

创建应用以测试服务

创建应用时,需将其设置为使用 ./kafka-topics.sh./kafka-console-producer.shkafka-console-consumer.sh 来连接到 Kafka 实例。

  1. 创建绑定到 Kafka 服务的 kafka-cli-app 应用。

    az containerapp create \
        --name "$KAFKA_CLI_APP" \
        --image mcr.microsoft.com/k8se/services/kafka:3.4 \
        --bind "$KAFKA_SVC" \
        --environment "$ENVIRONMENT" \
        --resource-group "$RESOURCE_GROUP" \
        --min-replicas 1 \
        --max-replicas 1 \
        --command "/bin/sleep" "infinity"
    
  2. 运行 CLI exec 命令以连接到测试应用。

    az containerapp exec \
        --name $KAFKA_CLI_APP \
        --resource-group $RESOURCE_GROUP \
        --command /bin/bash
    

    对测试应用使用 --bindserviceBinds 时,连接信息会注入到应用程序环境中。 连接到测试容器后,可以使用 env 命令检查值。

    env | grep "^KAFKA_"
    
    KAFKA_SECURITYPROTOCOL=SASL_PLAINTEXT
    KAFKA_BOOTSTRAPSERVER=kafka01:9092
    KAFKA_HOME=/opt/kafka
    KAFKA_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user" password="7dw..." user_kafka-user="7dw..." ;
    KAFKA_BOOTSTRAP_SERVERS=kafka01:9092
    KAFKA_SASLUSERNAME=kafka-user
    KAFKA_SASL_USER=kafka-user
    KAFKA_VERSION=3.4.0
    KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
    KAFKA_SASL_PASSWORD=7dw...
    KAFKA_SASLPASSWORD=7dw...
    KAFKA_SASL_MECHANISM=PLAIN
    KAFKA_SASLMECHANISM=PLAIN
    
  3. 使用 kafka-topics.sh 创建事件主题。

    创建 kafka.props 文件。

    echo "security.protocol=$KAFKA_SECURITY_PROTOCOL" >> kafka.props && \
    echo "sasl.mechanism=$KAFKA_SASL_MECHANISM" >> kafka.props && \
    echo "sasl.jaas.config=$KAFKA_PROPERTIES_SASL_JAAS_CONFIG" >> kafka.props
    

    创建 quickstart-events 事件主题。

    /opt/kafka/bin/kafka-topics.sh \
        --create --topic quickstart-events \
        --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
        --command-config kafka.props
    # Created topic quickstart-events.
    
    /opt/kafka/bin/kafka-topics.sh \
        --describe --topic quickstart-events \
        --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
        --command-config kafka.props
    # Topic: quickstart-events	TopicId: lCkTKmvZSgSUCHozhhvz1Q	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
    # Topic: quickstart-events	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
    
  4. 使用 kafka-console-producer.sh 将事件写入主题。

    /opt/kafka/bin/kafka-console-producer.sh \
        --topic quickstart-events \
        --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
        --producer.config kafka.props
    
    > this is my first event
    > this is my second event
    > this is my third event
    > CTRL-C
    

    备注

    ./kafka-console-producer.sh 命令会提示你使用 > 编写事件。 按如下所示编写一些事件,然后点击 CTRL-C 退出。

  5. 使用 kafka-console-consumer.sh 从主题中读取事件。

    /opt/kafka/bin/kafka-console-consumer.sh \
         --topic quickstart-events \
        --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
        --from-beginning \
        --consumer.config kafka.props
    
    # this is my first event
    # this is my second event
    # this is my third event
    

容器应用 kafka CLI 输出日志的屏幕截图。

将开发服务与现有应用配合使用

如果已有使用 Apache Kafka 的应用,可以更改加载连接信息的方式。

首先,创建以下环境变量。

KAFKA_HOME=/opt/kafka
KAFKA_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka-user" password="7dw..." user_kafka-user="7dw..." ;
KAFKA_BOOTSTRAP_SERVERS=kafka01:9092
KAFKA_SASL_USER=kafka-user
KAFKA_VERSION=3.4.0
KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
KAFKA_SASL_PASSWORD=7dw...
KAFKA_SASL_MECHANISM=PLAIN

借助 CLI(或 Bicep),可以更新应用,添加 --bind $KAFKA_SVC 以使用开发服务。

绑定到开发服务

部署 kafka-ui 以查看和管理 Kafka 实例。

请参阅 Bicep 或 azd 示例。

连接到 kafka 服务的 pgweb 容器应用的屏幕截图。

部署所有资源

如果要一次性部署所有资源,请使用以下示例。

Bicep

以下 Bicep 模板包含本教程中使用的所有资源。

可以创建包含此内容的 kafka-dev.bicep 文件。

targetScope = 'resourceGroup'
param location string = resourceGroup().location
param appEnvironmentName string = 'aca-env'
param kafkaSvcName string = 'kafka01'
param kafkaCliAppName string = 'kafka-cli-app'
param kafkaUiAppName string = 'kafka-ui'

resource logAnalytics 'Microsoft.OperationalInsights/workspaces@2022-10-01' = {
  name: '${appEnvironmentName}-log-analytics'
  location: location
  properties: {
    sku: {
      name: 'PerGB2018'
    }
  }
}

resource appEnvironment 'Microsoft.App/managedEnvironments@2023-04-01-preview' = {
  name: appEnvironmentName
  location: location
  properties: {
    appLogsConfiguration: {
      destination: 'log-analytics'
      logAnalyticsConfiguration: {
        customerId: logAnalytics.properties.customerId
        sharedKey: logAnalytics.listKeys().primarySharedKey
      }
    }
  }
}

resource kafka 'Microsoft.App/containerApps@2023-04-01-preview' = {
  name: kafkaSvcName
  location: location
  properties: {
    environmentId: appEnvironment.id
    configuration: {
      service: {
          type: 'kafka'
      }
    }
  }
}

resource kafkaCli 'Microsoft.App/containerApps@2023-04-01-preview' = {
  name: kafkaCliAppName
  location: location
  properties: {
    environmentId: appEnvironment.id
    template: {
      serviceBinds: [
        {
          serviceId: kafka.id
        }
      ]
      containers: [
        {
          name: 'kafka-cli'
          image: 'mcr.microsoft.com/k8se/services/kafka:3.4'
          command: [ '/bin/sleep', 'infinity' ]
        }
      ]
      scale: {
        minReplicas: 1
        maxReplicas: 1
      }
    }
  }
}

resource kafkaUi 'Microsoft.App/containerApps@2023-04-01-preview' = {
  name: kafkaUiAppName
  location: location
  properties: {
    environmentId: appEnvironment.id
    configuration: {
      ingress: {
        external: true
        targetPort: 8080
      }
    }
    template: {
      serviceBinds: [
        {
          serviceId: kafka.id
          name: 'kafka'
        }
      ]
      containers: [
        {
          name: 'kafka-ui'
          image: 'docker.io/provectuslabs/kafka-ui:latest'
          command: [
            '/bin/sh'
          ]
          args: [
            '-c'
            '''export KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS="$KAFKA_BOOTSTRAP_SERVERS" && \
            export KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG="$KAFKA_PROPERTIES_SASL_JAAS_CONFIG" && \
            export KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM="$KAFKA_SASL_MECHANISM" && \
            export KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL="$KAFKA_SECURITY_PROTOCOL" && \
            java $JAVA_OPTS -jar kafka-ui-api.jar'''
          ]
          resources: {
            cpu: json('1.0')
            memory: '2.0Gi'
          }
        }
      ]
    }
  }
}

output kafkaUiUrl string = 'https://${kafkaUi.properties.configuration.ingress.fqdn}'

output kafkaCliExec string = 'az containerapp exec -n ${kafkaCli.name} -g ${resourceGroup().name} --command /bin/bash'

output kafkaLogs string = 'az containerapp logs show -n ${kafka.name} -g ${resourceGroup().name} --follow --tail 30'

使用 Azure CLI 部署模板。

RESOURCE_GROUP="kafka-dev"
LOCATION="northcentralus"

az group create \
    --name "$RESOURCE_GROUP" \
    --location "$LOCATION"

az deployment group create -g $RESOURCE_GROUP \
    --query 'properties.outputs.*.value' \
    --template-file kafka-dev.bicep

Azure 开发人员 CLI

GitHub 上提供了一个最终模板

使用 azd up 部署模板。

git clone https://github.com/Azure-Samples/aca-dev-service-kafka-azd
cd aca-dev-service-kafka-azd
azd up

清理资源

完成后,运行以下命令以删除包含容器应用资源的资源组。

注意

以下命令删除指定的资源组及其包含的所有资源。 如果指定的资源组中存在本教程范围外的资源,这些资源也会被删除。

az group delete \
    --resource-group $RESOURCE_GROUP