다음을 통해 공유


Event Hubs 및 .NET을 사용하여 Atlas Kafka topics 메시지 보내기 및 받기

이 빠른 시작에서는 Atlas Kafka topics 이벤트를 보내고 받는 방법을 설명합니다. Azure Event HubsAzure.Messaging.EventHubs .NET 라이브러리를 사용합니다.

필수 구성 요소

Event Hubs를 사용하는 경우 이 빠른 시작을 완료하기 전에 Event Hubs 개요 를 참조하세요.

이 빠른 시작을 수행하려면 다음과 같은 특정 필수 구성 요소가 필요합니다.

  • Microsoft Azure 구독. Event Hubs를 비롯한 Azure 서비스를 사용하려면 Azure 구독이 필요합니다. Azure 계정이 없는 경우 평가판 에 등록하거나 계정을 만들 때 MSDN 구독자 혜택을 사용할 수 있습니다.
  • Microsoft Visual Studio 2022. Event Hubs 클라이언트 라이브러리는 C# 8.0에 도입된 새로운 기능을 사용합니다. 이전 C# 버전에서 라이브러리를 계속 사용할 수 있지만 새 구문을 사용할 수 없습니다. 전체 구문을 사용하려면 .NET Core SDK 3.0 이상 및 언어 버전을 로 설정하여 컴파일하는 latest것이 좋습니다. Visual Studio 2019 이전의 Visual Studio 버전을 사용하는 경우 C# 8.0 프로젝트를 빌드하는 데 필요한 도구가 없습니다. 무료 Community 버전을 포함한 Visual Studio 2022는 여기에서 다운로드할 수 있습니다.
  • 활성 Microsoft Purview 계정입니다.
  • 메시지를 보내고 받도록 Microsoft Purview 계정으로 구성된 Event Hubs:

Microsoft Purview에 메시지 게시

Event Hubs Kafka 토픽을 통해 Microsoft Purview에 이벤트를 보내는 .NET Core 콘솔 애플리케이션을 만들어 보겠습니다. ATLAS_HOOK.

Microsoft Purview에 메시지를 게시하려면 관리되는 Event Hubs 또는 후크 구성이 있는 하나 이상의 Event Hubs가 필요합니다.

Visual Studio 프로젝트 만들기

다음으로 Visual Studio에서 C# .NET 콘솔 애플리케이션을 만듭니다.

  1. Visual Studio를 시작합니다.
  2. 시작 창에서 새 프로젝트> 만들기콘솔 앱(.NET Framework)을 선택합니다. .NET 버전 4.5.2 이상이 필요합니다.
  3. 프로젝트 이름PurviewKafkaProducer를 입력합니다.
  4. 만들기를 선택하여 프로젝트를 만듭니다.

콘솔 애플리케이션 만들기

  1. Visual Studio 2022를 시작합니다.
  2. 새 프로젝트 만들기를 선택합니다.
  3. 새 프로젝트 만들기 대화 상자에서 다음 단계를 수행합니다. 이 대화 상자가 표시되지 않으면 메뉴에서 파일을 선택하고 새로 만들기를 선택한 다음 프로젝트를 선택합니다.
    1. 프로그래밍 언어로 C# 을 선택합니다.
    2. 애플리케이션 유형에 대해 콘솔 을 선택합니다.
    3. 결과 목록에서 콘솔 앱(.NET Core) 을 선택합니다.
    4. 그런 후 다음을 선택합니다.

Event Hubs NuGet 패키지 추가

  1. 메뉴에서 도구>NuGet 패키지 관리자>패키지 관리자 콘솔 을 선택합니다.

  2. 다음 명령을 실행하여 Azure.Messaging.EventHubs NuGet 패키지 및 Azure.Messaging.EventHubs.Producer NuGet 패키지를 설치합니다.

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

이벤트 허브에 메시지를 보내는 코드 작성

  1. Program.cs 파일의 맨 위에 다음 using 문을 추가합니다.

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Event Hubs Program 연결 문자열 및 Event Hubs 이름에 대한 상수를 클래스에 추가합니다.

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. 메서드를 Main 다음 async Main 메서드로 바꾸고 을 async ProduceMessage 추가하여 Microsoft Purview에 메시지를 푸시합니다. 자세한 내용은 코드의 주석을 참조하세요.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. 프로젝트를 만듭니다. 오류가 없는지 확인합니다.

  5. 프로그램을 실행하고 확인 메시지를 기다립니다.

    참고

    자세한 정보가 포함된 전체 소스 코드는 GitHub에서 이 파일을 참조하세요.

엔터티 JSON 만들기 메시지를 사용하여 두 개의 열이 있는 sql 테이블을 만드는 샘플 코드

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Microsoft Purview 메시지 받기

다음으로 이벤트 프로세서를 사용하여 이벤트 허브에서 메시지를 수신하는 .NET Core 콘솔 애플리케이션을 작성하는 방법을 알아봅니다. 이벤트 프로세서는 이벤트 허브에서 영구 검사점 및 병렬 수신을 관리합니다. 이렇게 하면 이벤트를 받는 프로세스가 간소화됩니다. Microsoft Purview에서 메시지를 받으려면 ATLAS_ENTITIES 이벤트 허브를 사용해야 합니다.

Microsoft Purview에서 메시지를 받으려면 관리되는 Event Hubs 또는 Event Hubs 알림 구성이 필요합니다.

경고

Event Hubs SDK는 사용 가능한 최신 버전의 Storage API를 사용합니다. 해당 버전은 Stack Hub 플랫폼에서 반드시 사용할 수 있는 것은 아닙니다. Azure Stack Hub에서 이 코드를 실행하는 경우 사용 중인 특정 버전을 대상으로 지정하지 않는 한 런타임 오류가 발생합니다. Azure Blob Storage 검사점 저장소로 사용하는 경우 Azure Stack Hub 빌드 및 코드에서 지원되는 Azure Storage API 버전을 검토하고 해당 버전을 대상으로 지정합니다.

스토리지 서비스의 사용 가능한 가장 높은 버전은 버전 2019-02-02입니다. 기본적으로 Event Hubs SDK 클라이언트 라이브러리는 Azure에서 사용 가능한 가장 높은 버전(SDK 릴리스 시 2019-07-07)을 사용합니다. Azure Stack Hub 버전 2005를 사용하는 경우 이 섹션의 단계를 따르는 것 외에도 Storage 서비스 API 버전 2019-02-02를 대상으로 하는 코드를 추가해야 합니다. 특정 Storage API 버전을 대상으로 지정하는 방법을 알아보려면 GitHub에서 이 샘플을 참조하세요.

Azure Storage 및 Blob 컨테이너 만들기

Azure Storage를 검사점 저장소로 사용합니다. 다음 단계를 사용하여 Azure Storage 계정을 만듭니다.

  1. Azure Storage 계정을 생성합니다

  2. Blob 컨테이너 만들기

  3. 스토리지 계정에 대한 연결 문자열 가져오기

    연결 문자열과 컨테이너 이름을 기록해 둡다. 수신 코드에서 사용합니다.

수신기에 대한 Visual Studio 프로젝트 만들기

  1. 솔루션 탐색기 창에서 EventHubQuickStart 솔루션을 선택하고 길게 누르거나 마우스 오른쪽 단추로 클릭하고 추가를 가리킨 다음 새 프로젝트를 선택합니다.
  2. 콘솔 앱(.NET Core)을 선택하고 다음을 선택합니다.
  3. 프로젝트 이름PurviewKafkaConsumer를 입력하고 만들기를 선택합니다.

Event Hubs NuGet 패키지 추가

  1. 메뉴에서 도구>NuGet 패키지 관리자>패키지 관리자 콘솔 을 선택합니다.

  2. 다음 명령을 실행하여 Azure.Messaging.EventHubs NuGet 패키지를 설치합니다.

    Install-Package Azure.Messaging.EventHubs
    
  3. 다음 명령을 실행하여 Azure.Messaging.EventHubs.Processor NuGet 패키지를 설치합니다.

    Install-Package Azure.Messaging.EventHubs.Processor
    

Main 메서드 업데이트

  1. Program.cs 파일의 맨 위에 다음 using 문을 추가합니다.

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Event Hubs Program 연결 문자열 및 이벤트 허브 이름의 클래스에 상수를 추가합니다. 대괄호 안에 있는 자리 표시자를 이벤트 허브 및 스토리지 계정(액세스 키 - 기본 연결 문자열)을 만들 때 얻은 실제 값으로 바꿉니다. 이 {Event Hubs namespace connection string} 이벤트 허브 문자열이 아닌 네임스페이스 수준 연결 문자열인지 확인합니다.

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Microsoft Purview에 메시지를 보낼 때 ATLAS_ENTITIES 이벤트 허브 이름으로 사용합니다.

  3. 메서드를 Main 다음 async Main 메서드로 바꿉다. 자세한 내용은 코드의 주석을 참조하세요.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. 이제 클래스에 다음 이벤트 및 오류 처리기 메서드를 추가합니다.

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. 프로젝트를 만듭니다. 오류가 없는지 확인합니다.

    참고

    자세한 정보가 포함된 전체 소스 코드는 GitHub에서 이 파일을 참조하세요.

  6. 수신기 애플리케이션을 실행합니다.

Microsoft Purview에서 받은 메시지의 예

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

다음 단계

GitHub에서 더 많은 예제를 확인하세요.