다음을 통해 공유


Azure IoT 데이터 프로세서 미리 보기 파이프라인에서 데이터 집계

Important

Azure IoT 작업 미리 보기 - Azure Arc에서 지원되는 Azure IoT 작업은 현재 미리 보기로 제공됩니다. 프로덕션 환경에서는 이 미리 보기 소프트웨어를 사용하면 안 됩니다.

베타, 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 약관은 Microsoft Azure 미리 보기에 대한 추가 사용 약관을 참조하세요.

‘집계’ 스테이지는 사용자 정의 시간 동안 센서 데이터 스트리밍에 대한 다운 샘플링 및 일괄 처리 작업을 실행할 수 있는 선택적이고 구성 가능한 중간 파이프라인 스테이지입니다.

집계 스테이지를 사용하여 정의된 동안 메시지를 누적하고 메시지의 속성에서 집계 값을 계산합니다. 이 스테이지는 집계된 값을 각 시간 창의 끝에 있는 단일 메시지의 속성으로 내보냅니다.

  • 각 파이프라인 파티션은 서로 독립적으로 집계를 수행합니다.
  • 스테이지의 출력은 정의된 모든 집계 속성을 포함하는 단일 메시지입니다.
  • 이 스테이지는 다른 모든 속성을 삭제합니다. 그러나 Last, First 또는 Collect함수를 사용하여 스테이지가 집계 중에 삭제하는 속성을 유지할 수 있습니다.
  • 집계 스테이지가 작동하려면 파이프라인의 데이터 원본 스테이지가 들어오는 메시지를 역직렬화해야 합니다.

필수 조건

집계 파이프라인 단계를 구성하고 사용하려면 Azure IoT 데이터 프로세서 미리 보기의 배포된 인스턴스가 필요합니다.

스테이지 구성

집계 스테이지 JSON 구성은 스테이지의 세부 정보를 정의합니다. 스테이지를 작성하려면 양식 기반 UI와 상호 작용하거나 고급 탭에서 JSON 구성을 제공할 수 있습니다.

필드 형식 설명 필수 항목 기본값 예시
이름 문자열 데이터 프로세서 UI에 표시할 이름입니다. - Calculate Aggregate
설명 문자열 집계 스테이지가 수행하는 작업에 대한 알기 쉬운 설명입니다. 아니요 Aggregation over temperature
시간 창 집계가 실행되는 기간을 지정합니다. - 10s
속성 > 함수 열거형 사용할 집계 함수입니다. - Sum
속성 > InputPath1 Path 함수를 적용할 들어오는 메시지의 속성에 대한 경로입니다. - .payload.temperature
속성 > OutputPath2 Path 결과를 저장할 나가는 메시지의 위치에 대한 경로입니다. - .payload.temperature.average

하나의 집계 스테이지에서 여러 속성 구성을 정의할 수 있습니다. 예를 들어 온도의 합계를 계산하고 압력의 평균을 계산합니다.

1입력 경로:

  • 입력 경로 속성 값의 데이터 형식은 정의된 함수의 형식과 호환되어야 합니다.
  • 여러 집계 구성에서 동일한 입력 경로를 제공하여 동일한 입력 경로 속성에 대해 여러 함수를 계산할 수 있습니다. 결과를 덮어쓰지 않도록 출력 경로가 다른지 확인합니다.

2출력 경로:

  • 출력 경로는 입력 경로와 같을 수도 있고 다를 수도 있습니다. 동일한 입력 경로 속성에서 여러 집계를 계산하는 경우 다른 출력 경로를 사용합니다.
  • 집계 값을 덮어쓰지 않도록 고유한 출력 경로를 구성합니다.

Windows

창은 스테이지가 메시지를 누적하는 시간 간격입니다. 창의 끝에서 스테이지는 구성된 함수를 메시지 속성에 적용합니다. 그런 다음 스테이지는 단일 메시지를 내보냅니다.

현재, 스테이지는 ‘연속’ 창만 지원합니다.

연속 창은 일련의 겹치지 않고 연속되는 고정 크기 시간 간격입니다. 창은 고정된 시간에 시작되고 끝납니다.

Diagram that shows 10 second tumbling windows in the aggregate stage.

창의 크기는 스테이지가 메시지를 누적하는 시간 간격을 정의합니다. 창 크기는 기간 공통 패턴을 사용하여 정의합니다.

함수

집계 스테이지에서는 다음 함수를 지원하여 입력 경로에 정의된 메시지 속성에 대한 집계 값을 계산합니다.

함수 설명
Sum 입력 메시지에서 속성 값의 합계를 계산합니다.
평균 입력 메시지에서 속성 값의 평균을 계산합니다.
Count 창에서 속성이 나타나는 횟수를 계산합니다.
Min 입력 메시지에서 속성 값의 최소값을 계산합니다.
최대 입력 메시지에서 속성 값의 최대값을 계산합니다.
마지막 입력 메시지에서 속성 값의 최신 값을 반환합니다.
First 입력 메시지에서 속성 값의 첫 번째 값을 반환합니다.
수집 입력 메시지에서 속성의 값을 모두 반환합니다.

다음 표에는 각 함수에서 지원하는 메시지 데이터 형식이 나와 있습니다.

함수 정수 Float 문자열 DateTime Array Object 이진
Sum
평균
Count
Min
Max
마지막
First
수집

샘플 구성

다음 JSON 예제에서는 전체 집계 스테이지 구성을 보여줍니다.

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

구성은 10초 동안 계산하는 집계 스테이지를 정의합니다.

  • 평균 온도
  • 온도 합계
  • 압력 합계

예시

이 예제에는 이전 구성을 사용하여 생성된 두 개의 샘플 입력 메시지와 한 개의 샘플 출력 메시지가 포함됩니다.

입력 메시지 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

입력 메시지 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

출력 메시지:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}