Handling Device Telemetry into MongoDB using Azure IoT Hub, Stream Analytics, and Azure Functions

By Theo van Kraay, Data and AI Solution Architect at Microsoft

Microsoft’s Azure cloud platform offers an array of services for building highly scalable and managed IoT solutions. These include IoT Hub, Azure Stream Analytics, and Azure Functions, which will be included in this tutorial.

Azure is also the home of Microsoft’s globally distributed, massively scalable, multi-model database, Cosmos DB. Among many other features, Cosmos DB has the ability to interact with multiple client APIs, by projecting its internal type system into different data models at the wire/protocol level. In this tutorial, we explore the MongoDB API.

For this example, we will use the IoT Hub samples found here. To use this sample, carry out the following:

  1. Create an IoT Hub using the instructions here.
  2. Register an IoT Device by following the instructions here (note that for an alternative/simpler method than using the CLI, simply follow the 3 steps highlighted in the below image)

  1. Configure your client sample to send telemetry to IoT Hub using instructions here (you will need the device connection string mentioned below). For now, don’t run the code. First, we will create a Stream Analytics job with the purpose of pushing telemetry data into MongoDB.

To create an Azure Stream Analytics job with an IoT Hub as your input, use the quick start guide section here. Be sure to use the IoT Hub you created earlier when configuring the input.

However, instead of creating a blob output (you can skip that part in the quick start) we will be creating a function output instead. For the Stream Analytics query, create something like the following (instead of what is in the quick start guide):

 SELECT *
INTO FunctionOutput
FROM IoTHubInput
HAVING temperature > 27

This should filter only messages that have a temperature greater than 27. Note that there are many more complex temporal queries that you can execute using Stream Analytics (see samples here), but for this tutorial we are just using a simple example. Now that the Stream Analytics Job is created, as MongoDB is not supported in Stream Analytics as a native output target, we need to create an Azure Function that will write data to Mongo DB. To setup the ability to create an Azure Function using .NET in Visual Studio, use the tutorial here (be sure to select “Http Trigger” as the type of function). Some sample C# code to use for writing data to MongoDB (where database name is "mongodb" and collection name is "data") is below. Note: you will need to install MongoDB.Driver using manage NuGet. Also note that for production, you should ensure you have a sound partition key strategy for the data being inserted into MongoDB, when backed by Cosmos DB. For instructions on creating a Cosmos DB collection using the MongoDB API, see here.

 using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using MongoDB.Bson;
using MongoDB.Driver;
using MongoDB.Bson.Serialization;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Collections.Generic;

namespace MongoFunctionHTTP
{
    public static class FunctionMongo
    {
        [FunctionName("FunctionMongo")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req, ILogger log)
        {
            //req coming from stream analytics would be in something like this format:

            //[{"temperature":34.053544718331445,"humidity":73.798910581413153},
            //{"temperature":25.041046617571752,"humidity":76.803241836281131}]

            log.LogInformation("C# HTTP trigger function processed a request.");
            const string connectionString = "<connection string of mongodb here>";

            // Create a MongoClient object by using the connection string
            var client = new MongoClient(connectionString);

            //Use the MongoClient to access the server
            var database = client.GetDatabase("mongodb");

            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            var messages = JsonConvert.DeserializeObject<List<object>>(requestBody);

            foreach (var message in messages)
            {
                Console.WriteLine("message: " + message);
                BsonDocument document;
                var collection = database.GetCollection<BsonDocument>("data");
                document = BsonSerializer.Deserialize<BsonDocument>(message.ToString());
                collection.InsertOne(document);
            }

            return messages != null
                ? (ActionResult)new OkObjectResult($"Hello, {messages}")
                : new BadRequestObjectResult("Please pass a name on the query string or in the request body");

        }
    }
}

Note that the connection string for MongoDB should look something like the below:

 mongodb://mongo:r4YHXQwiqBUKADOrcvuqEGW0g==@mongodatabase.documents.azure.com:10255/?ssl=true&replicaSet=globaldb

When you have published your function to Azure, in the Stream Analytics pane, select Overview > Outputs > Add. To add a new output, select Azure Function for the sink option (you should name it “FunctionOutput” – as per the output name we referenced in the Stream Analytics query earlier). You will notice a warning: “Please make sure that the Minimum TLS version is set to 1.0 on your Azure Functions before you start your ASA job”. To comply with this, go to your Azure Function and edit SSL settings (they will save automatically when changed).

 

When you have completed the above, you should be able to start your Stream Analytics Job, and run your sample IoT Telemetry code.

Resources

  • If you’d like to learn more about Azure Cosmos DB, visit ‪here.
  • To learn more about Cosmos DB uses cases, visit here.
  • For the latest news and announcements on Cosmos DB, please follow us @AzureCosmosDB and #CosmosDB.