Apache Kafka

구조적 스트리밍을 위한 Apache Kafka 커넥터는 Databricks Runtime 패키지 됩니다. 커넥터를 사용 하 여 kafka kafka 0.10 +에 연결 하 고 커넥터를 사용 하 여 kafka08 kafka 0.8 + (사용 되지 않음)에 연결 합니다.

HDInsight의 kafka 커넥트 Azure Databricks

  1. HDInsight Kafka 클러스터를 만듭니다.

    지침은 Azure Virtual Network를 통해 HDInsight의 kafka에 커넥트를 참조 하세요.

  2. 올바른 주소를 보급 하도록 Kafka broker를 구성 합니다.

    IP 광고에 대해 Kafka 구성의 지침을 따릅니다. Azure Virtual Machines에서 Kafka을 직접 관리 하는 경우 advertised.listeners 브로커의 구성이 호스트의 내부 IP로 설정 되었는지 확인 합니다.

  3. Azure Databricks 클러스터를 만듭니다.

    빠른 시작: Azure Portal를 사용 하 여 Azure Databricks에서 Spark 작업 실행의 지침을 따릅니다.

  4. Azure Databricks 클러스터에 Kafka 클러스터를 피어 링 합니다.

    피어 가상 네트워크의 지침을 따릅니다.

  5. Kafka 노트북을 사용 하 여빠른 시작 및 프로덕션 구조적 스트리밍에 설명 된 시나리오를 테스트 하 여 연결의 유효성을 검사 합니다.

스키마

레코드의 스키마는 다음과 같습니다.

Column 형식
binary
value binary
토픽 문자열
partition Int
offset long
timestamp long
timestampType Int

key및는 value 항상를 사용 하 여 바이트 배열로 deserialize 됩니다 ByteArrayDeserializer . 데이터 프레임 작업 ( cast("string") , udf)을 사용 하 여 키 및 값을 명시적으로 deserialize 합니다.

빠른 시작

정식 WordCount 예부터 살펴보겠습니다. 다음 노트북은 Kafka과 함께 구조적 스트리밍을 사용 하 여 WordCount를 실행 하는 방법을 보여 줍니다.

참고

이 노트북 예제에서는 Kafka 0.10를 사용 합니다. Kafka 0.8을 사용 하려면 형식을로 변경 kafka08 합니다 (즉, .format("kafka08") ).

구조적 스트리밍 노트북을 사용 하는 kafka WordCount

Notebook 가져오기

Configuration

구성 옵션의 comphensive 목록은 Spark 구조적 스트리밍 + Kafka 통합 가이드를 참조 하세요. 시작 하려면 가장 일반적인 구성 옵션의 하위 집합을 참조 하세요.

참고

구조적 스트리밍이 아직 개발 중 이므로이 목록이 최신 상태가 아닐 수 있습니다.

여러 가지 방법으로 구독할 항목을 지정할 수 있습니다. 다음 매개 변수 중 하나만 제공 해야 합니다.

옵션 지원 되는 Kafka 버전 Description
subscribe 쉼표로 구분 된 항목 목록입니다. 0.8, 0.10 구독할 항목 목록입니다.
subscribePattern Java regex 문자열입니다. 0.10 토픽을 구독 하는 데 사용 되는 패턴입니다.
assign JSON 문자열 {"topicA":[0,1],"topic":[2,4]} 입니다. 0.8, 0.10 사용할 특정 topicPartitions.

기타 주목할 만한 구성:

옵션 기본값 지원 되는 Kafka 버전 설명
kafka. 서버 쉼표로 구분 된 호스트: port 목록입니다. 비어 있음 0.8, 0.10 하다 Kafka bootstrap.servers 구성입니다. Kafka의 데이터가 없으면 먼저 broker 주소 목록을 확인 하세요. Broker 주소 목록이 올바르지 않으면 오류가 없을 수 있습니다. 이는 Kafka 클라이언트는 궁극적으로 broker를 사용할 수 있는 것으로 가정 하 고 네트워크 오류가 계속 해 서 다시 시도 될 때 발생 합니다.
failOnDataLoss true 또는 false true 0.10 필드 데이터가 손실 될 수 있는 경우 쿼리를 실패할 지 여부입니다. 삭제 된 토픽, 처리 전 토픽 잘림 등의 많은 시나리오로 인해 쿼리가 Kafka에서 데이터를 영구적으로 읽지 못할 수 있습니다. 데이터가 손실 될 수 있는지 여부를 모두 예측 하려고 합니다. 경우에 따라 거짓 경보를 일으킬 수 있습니다. 예상 대로 작동 하지 않는 경우이 옵션을로 설정 하 고 false , 데이터 손실에도 불구 하 고 쿼리 처리를 계속 하려면이 옵션을로 설정 합니다.
minPartitions 정수 > = 0, 0 = 사용 안 함 0(사용 안 함) 0.10 필드 Kafka에서 읽을 최소 파티션 수입니다. Spark 2.1.0-db2 이상에서는 옵션을 사용 하 여 Kafka에서 읽을 수 있도록 임의의 최소 파티션을 사용 하도록 Spark를 구성할 수 있습니다 minPartitions . 일반적으로 Spark는 Kafka topicPartitions에서 사용 하는 Spark 파티션에 Kafka 매핑을 1-1 합니다. minPartitions옵션을 Kafka topicPartitions 보다 큰 값으로 설정 하면 Spark는 큰 kafka 분할을 더 작은 조각으로 divvy 합니다. 이 옵션은 최대 부하, 데이터 왜곡 및 스트림이 처리 속도를 높이기 위해 지연 되는 시간에 설정할 수 있습니다. 각 트리거에서 Kafka 소비자를 초기화 하는 데 비용이 듭니다 .이는 Kafka에 연결할 때 SSL을 사용 하는 경우 성능에 영향을 줄 수 있습니다.
kafka.group.id Kafka 소비자 그룹 ID입니다. 설정되지 않음 0.10 [선택 사항] Kafka에서 읽는 동안 사용할 그룹 ID입니다. Spark 2.2 이상에서 지원됩니다. 주의해서 사용합니다. 기본적으로 각 쿼리는 데이터를 읽기 위한 고유한 그룹 ID를 생성합니다. 이렇게 하면 각 쿼리에 다른 소비자의 간섭을 받지 않는 자체 소비자 그룹이 있으므로 구독된 토픽의 모든 파티션을 읽을 수 있습니다. 일부 시나리오(예: Kafka 그룹 기반 권한 부여)에서는 권한 있는 특정 그룹 ID를 사용하여 데이터를 읽을 수 있습니다. 필요에 따라 그룹 ID를 설정할 수 있습니다. 그러나 예기치 않은 동작이 발생할 수 있기 때문에 주의해서 이 작업을 수행합니다.

* 그룹 ID가 동일한 쿼리(일괄 처리 및 스트리밍 모두)를 동시에 실행하면 서로 간섭하여 각 쿼리가 데이터의 일부만 읽도록 할 수 있습니다.
* 쿼리를 빠르게 연속해서 시작/다시 시작할 때도 이 문제가 발생할 수 있습니다. 이러한 문제를 최소화하려면 Kafka 소비자 session.timeout.ms 구성을 매우 작게 설정합니다.

다른 선택적 구성은 구조적 스트리밍 Kafka 통합 가이드를 참조하세요.

중요

Kafka 0.10 커넥터에 대해 다음 Kafka 매개 변수를 설정하면 예외가 throw됩니다.

  • group.id: 2.2 미만의 Spark 버전에서는 이 매개 변수를 설정하는 것이 허용되지 않습니다.
  • auto.offset.reset: 대신 원본 옵션을 startingOffsets 설정하여 시작할 위치를 지정합니다. 일관성을 유지하기 위해 구조적 스트리밍(Kafka 소비자와 반대)은 오프셋의 소비를 내부적으로 관리합니다. 이렇게 하면 새 토픽/파티션을 동적으로 구독한 후 데이터를 놓치지 않습니다. startingOffsets 는 새 스트리밍 쿼리를 시작하고 검사점에서 다시 시작하는 경우에만 적용되며 쿼리가 중단된 위치에서 항상 선택합니다.
  • key.deserializer: 키는 항상 를 통해 바이트 배열로 deserialized됩니다. ByteArrayDeserializer DataFrame 작업을 사용하여 키를 명시적으로 deserialize합니다.
  • value.deserializer: 값은 항상 를 사용하는 바이트 배열로 deserialized됩니다. ByteArrayDeserializer DataFrame 작업을 사용하여 값을 명시적으로 deserialize합니다.
  • enable.auto.commit: 이 매개 변수를 설정하는 것은 허용되지 않습니다. Spark는 내부적으로 Kafka 오프셋을 추적하고 오프셋을 커밋하지 않습니다.
  • interceptor.classes: Kafka 원본은 항상 키와 값을 바이트 배열로 읽습니다. 쿼리를 중단할 수 있기 때문에 사용하는 것은 안전하지 ConsumerInterceptor 않습니다.

Kafka Notebook을 통해 프로덕션 구조적 스트리밍

Notebook 가져오기

메트릭

참고

Databricks Runtime 8.1 이상에서 사용할 수 있습니다.

, 및 메트릭을 사용하여 구독된 모든 토픽 중 스트리밍 쿼리가 사용 가능한 최신 오프셋 뒤에 있는 오프셋 수의 평균, 최소 및 최대값을 얻을 수 avgOffsetsBehindLatestmaxOffsetsBehindLatestminOffsetsBehindLatest 있습니다. 대화형으로 메트릭 읽기를참조하세요.

참고

Databricks Runtime 9.1 이상에서 사용할 수 있습니다.

스트리밍 쿼리 프로세스가 구독된 모든 토픽 뒤에 있는 예상 총 바이트 수를 로 얻을 수도 estimatedTotalBytesBehindLatest 있습니다. 이 예상치는 지난 300초 동안 처리된 일괄 처리를 기반으로 합니다. 옵션을 다른 값으로 설정하여 예상 기간을 변경할 수 bytesEstimateWindowLength 있습니다. 예를 들어 10분으로 설정하려면 다음을 수행합니다.

df = spark.readStream \
  .format("kafka") \
  .option("bytesEstimateWindowLength", "10m") // m for minutes, you can also use "600s" for 600 seconds

Notebook에서 스트림을 실행하는 경우 스트리밍 쿼리 진행률 대시보드의 원시 데이터 탭에서 이러한 메트릭을 볼 수 있습니다.

{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

SSL 사용

Kafka에 대한 SSL 연결을 사용하도록 설정하려면 Confluent 설명서 SSL로 암호화 및 인증의지침을 따릅니다. 옵션으로 접두사로 을 하여 설명된 구성을 제공할 수 kafka. 있습니다. 예를 들어 속성에 트러스트 저장소 위치를 kafka.ssl.truststore.location 지정합니다.

다음을 수행하는 것이 좋습니다.

  • Azure Blob Storage 또는 Azure Data Lake Storage Gen2에 인증서를 저장하고 DBFS 탑재 지점을통해 액세스합니다. 클러스터 및 작업 ACL과결합하면 Kafka에 액세스할 수 있는 클러스터로만 인증서에 대한 액세스를 제한할 수 있습니다.
  • 인증서 암호를 비밀 범위비밀로 저장합니다.

경로가 탑재되고 비밀이 저장되면 다음을 수행할 수 있습니다.

df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", ...) \
  .option("kafka.ssl.truststore.location", <dbfs-truststore-location>) \
  .option("kafka.ssl.keystore.location", <dbfs-keystore-location>) \
  .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>)) \
  .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))

리소스