Apache Pulsar에서 스트림

Important

이 기능은 공개 미리 보기 상태입니다.

Databricks Runtime 14.1 이상에서는 구조적 스트리밍을 사용하여 Azure Databricks의 Apache Pulsar에서 데이터를 스트리밍할 수 있습니다.

구조적 스트리밍은 Pulsar 원본에서 읽은 데이터에 대해 정확히 한 번 처리 의미 체계를 제공합니다.

구문 예제

다음은 구조적 스트리밍을 사용하여 Pulsar에서 읽는 기본 예제입니다.

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

항목을 지정하려면 항상 다음 옵션 중 하나를 제공해야 service.url 합니다.

  • topic
  • topics
  • topicsPattern

전체 옵션 목록은 펄서 스트리밍 읽기에 대한 옵션 구성을 참조 하세요.

Pulsar에 인증

Azure Databricks는 Pulsar에 대한 truststore 및 키 저장소 인증을 지원합니다. Databricks는 구성 세부 정보를 저장할 때 비밀을 사용하는 것이 좋습니다.

스트림 구성 중에 다음 옵션을 설정할 수 있습니다.

  • pulsar.client.authPluginClassName
  • pulsar.client.authParams
  • pulsar.client.useKeyStoreTls
  • pulsar.client.tlsTrustStoreType
  • pulsar.client.tlsTrustStorePath
  • pulsar.client.tlsTrustStorePassword

스트림에서 다음을 PulsarAdmin사용하는 경우 다음도 설정합니다.

  • pulsar.admin.authPluginClassName
  • pulsar.admin.authParams

다음 예제에서는 인증 옵션을 구성하는 방법을 보여 줍니다.

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

펄서 스키마

Pulsar에서 읽은 레코드의 스키마는 토픽의 스키마를 인코딩하는 방법에 따라 달라집니다.

  • Avro 또는 JSON 스키마가 있는 토픽의 경우 결과 Spark DataFrame에서 필드 이름 및 필드 형식이 유지됩니다.
  • 스키마가 없거나 Pulsar의 단순 데이터 형식이 있는 토픽의 경우 페이로드가 열에 value 로드됩니다.
  • 판독기에서 스키마가 다른 여러 항목을 읽도록 구성된 경우 원시 콘텐츠를 열에 로드하도록 value 설정합니다allowDifferentTopicSchemas.

펄서 레코드에는 다음과 같은 메타데이터 필드가 있습니다.

Column Type
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

펄서 스트리밍 읽기에 대한 옵션 구성

모든 옵션은 구문을 사용하여 .option("<optionName>", "<optionValue>") 구조적 스트리밍 읽기의 일부로 구성됩니다. 옵션을 사용하여 인증을 구성할 수도 있습니다. 펄서 인증을 참조하세요.

다음 표에서는 Pulsar에 필요한 구성에 대해 설명합니다. 옵션 topictopicstopicsPattern중 하나만 지정해야 합니다.

옵션 기본값 설명
service.url 없음 Pulsar serviceUrl 서비스에 대한 펄서 구성입니다.
topic 없음 사용할 토픽의 토픽 이름 문자열입니다.
topics 없음 사용할 항목의 쉼표로 구분된 목록입니다.
topicsPattern 없음 사용할 토픽과 일치하는 Java regex 문자열입니다.

다음 표에서는 Pulsar에 지원되는 다른 옵션에 대해 설명합니다.

옵션 기본값 설명
predefinedSubscription 없음 Spark 애플리케이션 진행률을 추적하기 위해 커넥터에서 사용하는 미리 정의된 구독 이름입니다.
subscriptionPrefix 없음 Spark 애플리케이션 진행률을 추적하기 위해 임의 구독을 생성하기 위해 커넥터에서 사용하는 접두사입니다.
pollTimeoutMs 120000 Pulsar에서 메시지를 읽기 위한 시간 제한(밀리초)입니다.
waitingForNonExistedTopic false 커넥터가 원하는 토픽이 생성될 때까지 기다려야 하는지 여부입니다.
failOnDataLoss true 데이터가 손실될 때 쿼리에 실패할지 여부를 제어합니다(예: 토픽이 삭제되거나 보존 정책으로 인해 메시지가 삭제됨).
allowDifferentTopicSchemas false 스키마가 다른 여러 항목을 읽는 경우 이 매개 변수를 사용하여 자동 스키마 기반 토픽 값 역직렬화를 해제합니다. 이 경우 원시 값만 반환됩니다 true.
startingOffsets latest 이 경우 latest판독기는 실행을 시작한 후 최신 레코드를 읽습니다. 이 경우 earliest판독기는 가장 빠른 오프셋에서 읽습니다. 사용자는 특정 오프셋을 지정하는 JSON 문자열을 지정할 수도 있습니다.
maxBytesPerTrigger 없음 마이크로배치당 처리하려는 최대 바이트 수의 소프트 제한입니다. 이 항목이 지정되면 admin.url 지정해야 합니다.
admin.url 없음 펄서 serviceHttpUrl 구성입니다. 지정된 경우에만 maxBytesPerTrigger 필요합니다.

다음 패턴을 사용하여 Pulsar 클라이언트, 관리자 및 판독기 구성을 지정할 수도 있습니다.

패턴 구성 옵션에 연결
pulsar.client.* 펄서 클라이언트 구성
pulsar.admin.* Pulsar 관리자 구성
pulsar.reader.* 펄서 판독기 구성

시작 오프셋 JSON 생성

메시지 ID를 수동으로 생성하여 특정 오프셋을 지정하고 이를 JSON으로 옵션에 startingOffsets 전달할 수 있습니다. 다음 코드 예제에서는 이 구문을 보여 줍니다.

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()