Send events to or receive events from Azure Event Hubs using .NET Core

Event Hubs is a service that processes large amounts of event data (telemetry) from connected devices and applications. After you collect data into Event Hubs, you can store the data using a storage cluster or transform it using a real-time analytics provider. This large-scale event collection and processing capability is a key component of modern application architectures including the Internet of Things (IoT). For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

This tutorial shows how to create .NET Core applications in C# to send events to or receive events from an event hub.

Note

You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. Alternatively, you can follow the steps in this tutorial to create your own.

Prerequisites

Send events

This section shows you how to create a .NET Core console application to send events to an event hub.

Create a console application

Start Visual Studio. From the File menu, click New, and then click Project. Create a .NET Core console application.

New project

Add the Event Hubs NuGet package

Add the Microsoft.Azure.EventHubs .NET Core library NuGet package to your project by following these steps:

  1. Right-click the newly created project and select Manage NuGet Packages.
  2. Click the Browse tab, then search for "Microsoft.Azure.EventHubs" and select the Microsoft.Azure.EventHubs package. Click Install to complete the installation, then close this dialog box.

Write code to send messages to the event hub

  1. Add the following using statements to the top of the Program.cs file:

    using Microsoft.Azure.EventHubs;
    using System.Text;
    using System.Threading.Tasks;
    
  2. Add constants to the Program class for the Event Hubs connection string and entity path (individual event hub name). Replace the placeholders in brackets with the proper values that were obtained when creating the event hub. Make sure that the {Event Hubs connection string} is the namespace-level connection string, and not the event hub string.

    private static EventHubClient eventHubClient;
    private const string EventHubConnectionString = "{Event Hubs connection string}";
    private const string EventHubName = "{Event Hub path/name}";
    
  3. Add a new method named MainAsync to the Program class, as follows:

    private static async Task MainAsync(string[] args)
    {
        // Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
        // Typically, the connection string should have the entity path in it, but this simple scenario
        // uses the connection string from the namespace.
        var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
        {
            EntityPath = EventHubName
        };
    
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    
        await SendMessagesToEventHub(100);
    
        await eventHubClient.CloseAsync();
    
        Console.WriteLine("Press ENTER to exit.");
        Console.ReadLine();
    }
    
  4. Add a new method named SendMessagesToEventHub to the Program class, as follows:

    // Uses the event hub client to send 100 messages to the event hub.
    private static async Task SendMessagesToEventHub(int numMessagesToSend)
    {
        for (var i = 0; i < numMessagesToSend; i++)
        {
            try
            {
                var message = $"Message {i}";
                Console.WriteLine($"Sending message: {message}");
                await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }
    
            await Task.Delay(10);
        }
    
        Console.WriteLine($"{numMessagesToSend} messages sent.");
    }
    
  5. Add the following code to the Main method in the Program class:

    MainAsync(args).GetAwaiter().GetResult();
    

    Here is what your Program.cs should look like.

     namespace SampleSender
     {
         using System;
         using System.Text;
         using System.Threading.Tasks;
         using Microsoft.Azure.EventHubs;
    
         public class Program
         {
             private static EventHubClient eventHubClient;
             private const string EventHubConnectionString = "{Event Hubs connection string}";
             private const string EventHubName = "{Event Hub path/name}";
    
             public static void Main(string[] args)
             {
                 MainAsync(args).GetAwaiter().GetResult();
             }
    
             private static async Task MainAsync(string[] args)
             {
                 // Creates an EventHubsConnectionStringBuilder object from the connection string, and sets the EntityPath.
                 // Typically, the connection string should have the entity path in it, but for the sake of this simple scenario
                 // we are using the connection string from the namespace.
                 var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
                 {
                     EntityPath = EventHubName
                 };
    
                 eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
    
                 await SendMessagesToEventHub(100);
    
                 await eventHubClient.CloseAsync();
    
                 Console.WriteLine("Press ENTER to exit.");
                 Console.ReadLine();
             }
    
             // Uses the event hub client to send 100 messages to the event hub.
             private static async Task SendMessagesToEventHub(int numMessagesToSend)
             {
                 for (var i = 0; i < numMessagesToSend; i++)
                 {
                     try
                     {
                         var message = $"Message {i}";
                         Console.WriteLine($"Sending message: {message}");
                         await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
                     }
                     catch (Exception exception)
                     {
                         Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
                     }
    
                     await Task.Delay(10);
                 }
    
                 Console.WriteLine($"{numMessagesToSend} messages sent.");
             }
         }
     }
    
  6. Run the program, and ensure that there are no errors.

Receive events

This section shows how to write a .NET Core console application that receives messages from an event hub using the Event Processor Host. The Event Processor Host is a .NET class that simplifies receiving events from event hubs by managing persistent checkpoints and parallel receives from those event hubs. Using the Event Processor Host, you can split events across multiple receivers, even when hosted in different nodes. This example shows how to use the Event Processor Host for a single receiver.

Note

You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName, StorageAccountName, StorageAccountKey, and StorageContainerName strings with your event hub values, and run it. Alternatively, you can follow the steps in this tutorial to create your own.

Create a storage account for Event Processor Host

The Event Processor Host is an intelligent agent that simplifies receiving events from Event Hubs by managing persistent checkpoints and parallel receives. For checkpointing, the Event Processor Host requires a storage account. The following example shows how to create a storage account and how to get its keys for access:

  1. In the Azure portal, and select Create a resource at the top left of the screen.

  2. Select Storage, then select Storage account - blob, file, table, queue.

    Select Storage Account

  3. On the Create storage account page, take the following steps:

    1. Enter a name for the storage account.

    2. Choose an Azure subscription that contains the event hub.

    3. Select the resource group that has the event hub.

    4. Select a location in which to create the resource.

    5. Then click Review + create.

      Create storage account - page

  4. On the Review + create page, review the values, and select Create.

    Review storage account settings and create

  5. After you see the Deployments Succeeded message, select Go to resource at the top of the page. You can also launch the Storage Account page by selecting your storage account from the resource list.

    Select the storage account from deployment

  6. In the Essentials window, select Blobs.

    Select the Blobs service

  7. Select + Container at the top, enter a name for the container, and select OK.

    Create a blob container

  8. Select Access keys in the left-side menu, and copy the value of key1.

    Save the following values to Notepad or some other temporary location.

    • Name of the storage account
    • Access key for the storage account
    • Name of the container

Create a console application

Start Visual Studio. From the File menu, click New, and then click Project. Create a .NET Core console application.

New project

Add the Event Hubs NuGet package

Add the Microsoft.Azure.EventHubs and Microsoft.Azure.EventHubs.Processor .NET Standard library NuGet packages to your project by following these steps:

  1. Right-click the newly created project and select Manage NuGet Packages.
  2. Click the Browse tab, search for Microsoft.Azure.EventHubs, and then select the Microsoft.Azure.EventHubs package. Click Install to complete the installation, then close this dialog box.
  3. Repeat steps 1 and 2, and install the Microsoft.Azure.EventHubs.Processor package.

Implement the IEventProcessor interface

  1. In Solution Explorer, right-click the project, click Add, and then click Class. Name the new class SimpleEventProcessor.

  2. Open the SimpleEventProcessor.cs file and add the following using statements to the top of the file.

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
     using System.Threading.Tasks;
    
  3. Implement the IEventProcessor interface. Replace the entire contents of the SimpleEventProcessor class with the following code:

    public class SimpleEventProcessor : IEventProcessor
    {
        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
            return Task.CompletedTask;
        }
    
        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
            return Task.CompletedTask;
        }
    
        public Task ProcessErrorAsync(PartitionContext context, Exception error)
        {
            Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
            return Task.CompletedTask;
        }
    
        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
                Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
            }
    
            return context.CheckpointAsync();
        }
    }
    

Update the Main method to use SimpleEventProcessor

  1. Add the following using statements to the top of the Program.cs file.

    using Microsoft.Azure.EventHubs;
    using Microsoft.Azure.EventHubs.Processor;
     using System.Threading.Tasks;
    
  2. Add constants to the Program class for the event hub connection string, event hub name, storage account container name, storage account name, and storage account key. Add the following code, replacing the placeholders with their corresponding values:

    private const string EventHubConnectionString = "{Event Hubs connection string}";
    private const string EventHubName = "{Event Hub path/name}";
    private const string StorageContainerName = "{Storage account container name}";
    private const string StorageAccountName = "{Storage account name}";
    private const string StorageAccountKey = "{Storage account key}";
    
    private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
    
  3. Add a new method named MainAsync to the Program class, as follows:

    private static async Task MainAsync(string[] args)
    {
        Console.WriteLine("Registering EventProcessor...");
    
        var eventProcessorHost = new EventProcessorHost(
            EventHubName,
            PartitionReceiver.DefaultConsumerGroupName,
            EventHubConnectionString,
            StorageConnectionString,
            StorageContainerName);
    
        // Registers the Event Processor Host and starts receiving messages
        await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
        Console.WriteLine("Receiving. Press ENTER to stop worker.");
        Console.ReadLine();
    
        // Disposes of the Event Processor Host
        await eventProcessorHost.UnregisterEventProcessorAsync();
    }
    
  4. Add the following line of code to the Main method:

    MainAsync(args).GetAwaiter().GetResult();
    

    Here is what your Program.cs file should look like:

    namespace SampleEphReceiver
    {
    
        public class Program
        {
            private const string EventHubConnectionString = "{Event Hubs connection string}";
            private const string EventHubName = "{Event Hub path/name}";
            private const string StorageContainerName = "{Storage account container name}";
            private const string StorageAccountName = "{Storage account name}";
            private const string StorageAccountKey = "{Storage account key}";
    
            private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
    
            public static void Main(string[] args)
            {
                MainAsync(args).GetAwaiter().GetResult();
            }
    
            private static async Task MainAsync(string[] args)
            {
                Console.WriteLine("Registering EventProcessor...");
    
                var eventProcessorHost = new EventProcessorHost(
                    EventHubName,
                    PartitionReceiver.DefaultConsumerGroupName,
                    EventHubConnectionString,
                    StorageConnectionString,
                    StorageContainerName);
    
                // Registers the Event Processor Host and starts receiving messages
                await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();
    
                Console.WriteLine("Receiving. Press ENTER to stop worker.");
                Console.ReadLine();
    
                // Disposes of the Event Processor Host
                await eventProcessorHost.UnregisterEventProcessorAsync();
            }
        }
    }
    
  5. Run the program, and ensure that there are no errors.

Next steps

Read the following articles: