read_kafka 테이블 반환 함수

적용 대상:검사 '예'로 표시 Databricks SQL 검사 '예'로 표시 Databricks Runtime 13.3 LTS 이상

Apache Kafka 클러스터에서 데이터를 읽고 테이블 형식으로 데이터를 반환합니다.

하나 이상의 Kafka 토픽에서 데이터를 읽을 수 있습니다. 일괄 처리 쿼리와 스트리밍 수집을 모두 지원합니다.

구문

read_kafka([option_key => option_value ] [, ...])

인수

이 함수에는 명명된 매개 변수 호출이 필요합니다.

  • option_key: 구성할 옵션의 이름입니다. 점()을 포함하는 옵션에는 백틱(.')을 사용해야 합니다.
  • option_value: 옵션을 설정하는 상수 식입니다. 리터럴 및 스칼라 함수를 허용합니다.

반품

다음 스키마를 사용하여 Apache Kafka 클러스터에서 읽은 레코드:

  • key BINARY: Kafka 레코드의 키입니다.
  • value BINARY NOT NULL: Kafka 레코드의 값입니다.
  • topic STRING NOT NULL: 레코드를 읽은 Kafka 토픽의 이름입니다.
  • partition INT NOT NULL: 레코드를 읽은 Kafka 파티션의 ID입니다.
  • offset BIGINT NOT NULL: Kafka TopicPartition에 있는 레코드의 오프셋 번호입니다.
  • timestamp TIMESTAMP NOT NULL: 레코드의 타임스탬프 값입니다. 열은 timestampType 이 타임스탬프가 해당하는 항목을 정의합니다.
  • timestampType INTEGER NOT NULL: 열에 지정된 timestamp 타임스탬프의 형식입니다.
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: 레코드의 일부로 제공되는 헤더 값입니다(사용하도록 설정된 경우).

예제

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

옵션

Apache Spark 설명서에서 자세한 옵션 목록을 찾을 수 있습니다.

필수 옵션

Kafka 클러스터에 연결하기 위한 아래 옵션을 제공합니다.

옵션
bootstrapServers

유형: String

Kafka 클러스터를 가리키는 호스트/포트 쌍의 쉼표로 구분된 목록입니다.

기본값: 없음

아래 옵션 중 하나만 제공하여 데이터를 가져올 Kafka 토픽을 구성합니다.

옵션
assign

유형: String

사용할 특정 토픽 파티션을 포함하는 JSON 문자열입니다. 예를 들어 '{"topicA":[0,1],"topicB":[2,4]}'topicA의 0번째 및 첫 번째 파티션은 소비됩니다.

기본값: 없음
subscribe

유형: String

읽을 Kafka 항목의 쉼표로 구분된 목록입니다.

기본값: 없음
subscribePattern

유형: String

구독할 정규식 일치 항목입니다.

기본값: 없음

기타 옵션

read_kafka 는 일괄 처리 쿼리 및 스트리밍 쿼리에서 사용할 수 있습니다. 아래 옵션은 적용할 쿼리 유형을 지정합니다.

옵션
endingOffsets

형식: String 쿼리 형식: 일괄 처리만

일괄 처리 쿼리 "latest" 를 위해 최신 레코드를 지정하거나 각 TopicPartition에 대한 끝 오프셋을 지정하는 JSON 문자열까지 읽을 오프셋입니다. JSON -1 에서 오프셋으로 최신 항목을 참조하는 데 사용할 수 있습니다. -2 (가장 이른) 오프셋은 허용되지 않습니다.

기본값: "latest"
endingOffsetsByTimestamp

형식: String 쿼리 형식: 일괄 처리만

각 TopicPartition에 대해 읽을 끝 타임스탬프를 지정하는 JSON 문자열입니다. 타임스탬프는 타임스탬프의 긴 값(예: 밀리초)으로 제공되어야 합니다.1970-01-01 00:00:00 UTC
1686444353000. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요.
endingOffsetsByTimestamp 는 .보다 endingOffsets우선합니다.

기본값: 없음
endingTimestamp

형식: String 쿼리 형식: 일괄 처리만

이후 타임스탬프의 문자열 값(밀리초)
1970-01-01 00:00:00 UTC예를 들면 다음과 같습니다 "1686444353000". Kafka가 일치하는 오프셋을 반환하지 않으면 오프셋이 최신으로 설정됩니다. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. 참고: endingTimestamp 우선 순위 endingOffsetsByTimestamp
endingOffsets.

기본값: 없음
includeHeaders

형식: Boolean 쿼리 형식: 스트리밍 및 일괄 처리

행에 Kafka 헤더를 포함할지 여부입니다.

기본값: false
kafka.<consumer_option>

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

모든 Kafka 소비자별 옵션은 접두사를 사용하여 kafka. 전달할 수 있습니다. 이러한 옵션은 제공될 때 백틱으로 묶어야 합니다. 그렇지 않으면 파서 오류가 발생합니다. Kafka 설명서에서 옵션을 찾을 수 있습니다.

참고: 이 함수를 사용하여 다음 옵션을 설정하면 안 됩니다.
key.deserializer, value.deserializer, bootstrap.serversgroup.id

기본값: 없음
maxOffsetsPerTrigger

형식: Long 쿼리 형식: 스트리밍만

트리거 간격당 처리되는 최대 오프셋 수 또는 행 수에 대한 속도 제한입니다. 지정된 총 오프셋 수는 TopicPartitions 간에 비례적으로 분할됩니다.

기본값: 없음
startingOffsets

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

쿼리가 시작될 때의 시작점이며, "earliest" 이는 가장 빠른 오프셋에서 온 것이며, "latest" 이는 최신 오프셋에서만 발생하거나 각 TopicPartition에 대한 시작 오프셋을 지정하는 JSON 문자열입니다. JSON -2 에서는 오프셋으로 가장 -1 빠른 항목을 최신으로 참조하는 데 사용할 수 있습니다.

참고: 일괄 처리 쿼리의 경우 최신(암시적으로 또는 JSON에서 -1 사용)은 허용되지 않습니다. 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사포인트에 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션은 가장 일찍 시작됩니다.

기본값: "latest" 스트리밍의 경우, "earliest" 일괄 처리의 경우
startingOffsetsByTimestamp

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

각 TopicPartition에 대한 시작 타임스탬프를 지정하는 JSON 문자열입니다. 타임스탬프는 타임스탬프의 긴 값(예1686444353000: 밀리초1970-01-01 00:00:00 UTC)으로 제공되어야 합니다. 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. Kafka가 일치하는 오프셋을 반환하지 않으면 동작은 옵션 startingOffsetsByTimestampStrategy의 값을 따릅니다.
startingOffsetsByTimestamp 는 .보다 startingOffsets우선합니다.

참고: 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사포인트에 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션은 가장 일찍 시작됩니다.

기본값: 없음
startingOffsetsByTimestampStrategy

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

이 전략은 타임스탬프별 지정된 시작 오프셋(전역 또는 파티션당)이 반환된 Kafka 오프셋과 일치하지 않는 경우에 사용됩니다. 사용 가능한 전략은 다음과 같습니다.

* "error": 쿼리 실패
* "latest": Spark가 이러한 파티션의 최신 레코드를 나중에 마이크로 일괄 처리로 읽을 수 있도록 이러한 파티션에 대한 최신 오프셋을 할당합니다.

기본값: "error"
startingTimestamp

형식: String 쿼리 형식: 스트리밍 및 일괄 처리

이후 타임스탬프의 문자열 값(밀리초)
1970-01-01 00:00:00 UTC예를 들면 다음과 같습니다 "1686444353000". 타임스탬프가 있는 동작에 대한 자세한 내용은 아래 참고를 참조하세요. Kafka가 일치하는 오프셋을 반환하지 않으면 동작은 옵션 startingOffsetsByTimestampStrategy의 값을 따릅니다.
startingTimestamp는 우선 startingOffsetsByTimestamp 순위를 하며 .startingOffsets

참고: 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용됩니다. 다시 시작한 스트리밍 쿼리는 쿼리 검사포인트에 정의된 오프셋에서 계속됩니다. 쿼리 중에 새로 검색된 파티션이 가장 일찍 시작됩니다.

기본값: 없음

참고 항목

각 파티션에 대해 반환된 오프셋은 타임스탬프가 해당 파티션의 지정된 타임스탬프보다 크거나 같은 가장 빠른 오프셋입니다. Kafka가 일치하는 오프셋을 반환하지 않는 경우 각 옵션에 대한 설명을 검사 동작은 옵션에 따라 다릅니다.

Spark는 단순히 타임스탬프 정보를 KafkaConsumer.offsetsForTimes전달하며 값에 대한 해석이나 이유를 전달하지 않습니다. 자세한 내용은 KafkaConsumer.offsetsForTimes설명서를 참조하세요. 또한 여기서 타임스탬프의 의미는 Kafka 구성(log.message.timestamp.type)에 따라 달라질 수 있습니다. 자세한 내용은 Apache Kafka 설명서를 참조 하세요.