June 2014

Volume 29 Number 6

Azure Insider : Telemetry Ingestion and Analysis Using Microsoft Azure Services

Bruno Terkaly | June 2014

Bruno Terkaly and Ricardo Villalobos[The regular columnists Bruno Terkaly and Ricardo Villalobos present a guest columnist for this month’s column. They will return with the next installment.—Ed.]

Every sensor-based device generates telemetry data. Interpreting this data is at the core of its value proposition. In the consumer world, a driver looks at his connected car dashboard to see how his driving style affects fuel consumption and traffic. In the industrial world, comparing the temperature of a machine to the average of others on the factory floor can help an operator identify the risks of failure and perform predictive maintenance.

These scenarios require telemetry data from tens or hundreds of thousands of connected devices. What’s more important, you need to analyze this data to provide meaningful visualizations and insights. When dealing with such large amounts of data, Big Data frameworks such as Hadoop build a solid data-processing foundation that can scale up with the installed base of devices.

In this article, you’ll learn how to create a simple telemetry ingestion architecture using the Microsoft Azure Service Bus. Then you’ll consume and analyze this data in a scalable way using the Microsoft Azure Hadoop service called HDInsight.

Solution Architecture

In previous columns, Bruno Terkaly and Ricardo Villalobos showed how to use the Service Bus to establish a command channel to communicate with a connected object. In this article, I’ll use the Service Bus as a middleware layer to buffer telemetry messages sent by the device.

The devices will communicate directly with the Service Bus to send telemetry messages to a dedicated Topic (see Figure 1). Then one or several subscriptions will de-queue these messages in a worker role and store them as flat files in Blob storage. The Hadoop cluster can then use these input files to perform analysis and calculations.

Basic Flow of Big Data Telemetry Solution
Figure 1 Basic Flow of Big Data Telemetry Solution

This architecture has the benefit of decoupling the various pieces from each other. The Service Bus acts as middleware and can buffer data if the workers can’t read them fast enough. You can monitor the queue length and use that as the basis for auto-scaling the worker tier.

The subscriptions are also useful for performing simple filtering on incoming data and routing it to different back-end processing tiers. For example, you could have an Urgent subscription that sends messages to a real-time alerting system, and use an Everything subscription to capture all data for later analysis.

Because the workers just move data into storage—whether Hadoop Distributed File System (HDFS) or Blob storage—it’s decoupled from the Hadoop processing piece. This can run independently of the incoming data rhythm. You could choose to have a Hadoop cluster running permanently. This lets you process small batches all the time and reduce computational latency. You could also choose to save money by having an HDInsight cluster start just once per day to perform all calculations in one batch. You can also have a mix of the two.

Ingest Telemetry Data Using the Service Bus

The Azure Service Bus offers two protocol choices to send messages to a Topic: HTTP or AMQP. In the case of connected devices, often with limited bandwidth, AMQP has some advantages. It’s an efficient, binary, reliable and portable protocol. It also has libraries for many languages, runtime environments and OSes. This gives you flexibility when connecting your device directly to the Service Bus to send telemetry messages.

To test this approach, I used a Raspberry Pi board to feed temperature and other sensor data, using the Apache Qpid Proton AMQP library. Proton is a bare-bones, portable library you can compile on a variety of environments to send AMQP messages. It’s completely interoperable with the Azure Service Bus. Find more information about the Proton AMQP library at bit.ly/1icc6Ag.

For this example, I’ve compiled the Proton library directly on the Raspberry Pi board. I used the Python bindings to write a simple script to capture sensor readings from the USB serial port and send them to the Azure Service Bus, which you can see in Figure 2.

Figure 2 Python Code in the Raspberry Pi Reading to Capture Sensor Readings

#!/usr/bin/python
import sys
import commands
import re
import uuid
import serial
from proton import *
# Device ID
id = uuid.getnode()
# Topic address
address = "amqps://owner:key@address.servicebus.windows.net/telemetry"
# Open serial port
ser = serial.Serial('/dev/ttyACM0', 9600)
# Create Proton objects
messenger = Messenger()
while True:
  # Read values from Arduino in the form K1:V1_K2:V2_...
  temp = ser.readline().rstrip('\r\n')
  print temp
  # Create AMQP message
  message = Message()
  # Initialize properties
  message.properties = dict()
  message.properties[symbol("did")] = symbol(id)
  # Map string to list, symbolize, create dict and merge
  pairs=map(lambda x:x.split(':'), temp.split('_'))
  symbols = map(lambda x:(symbol(x[0]),int(x[1])), pairs)
  message.properties.update(dict(symbols))
  message.address = address
  messenger.put(message)
  messenger.send()

The Python script directly addresses the Azure Service Bus Topic named “telemetry.” It’s using a connection string that includes the standard Service Bus authentication token and specifies using the AMQP protocol. In a real-world environment, you’d need to use a more sophisticated authentication mechanism to ensure your connection parameters aren’t compromised.

Assume a significant number of these Raspberry devices start gathering data. Each one will send a Device ID (DID) you’ll use again later to calculate average temperatures. In this example, the DID is generated with the UUID module to retrieve the system’s MAC address.

An Arduino Esplora board connected to the Raspberry Pi via USB gathers the readings. The Esplora is an all-in-one board with built-in sensors. That makes it easy to read temperature or other environmental parameters and send them to the serial bus. The Python script at the other end of the USB cable then reads the output values. An example of an Arduino schema that prints sensor values to the serial port is shown in Figure 3.

Figure 3 Arduino Code Gathering Raspberry Pi Readings

void loop()
{
  int celsius = Esplora.readTemperature(DEGREES_C);
  int loudness = Esplora.readMicrophone();
  int light = Esplora.readLightSensor();
  Serial.print("T:");
  Serial.print(celsius);
  Serial.print("_");
  Serial.print("M:");
  Serial.print(loudness);
  Serial.print("_");
  Serial.print("L:");
  Serial.print(light);
  Serial.println();
  // Wait a second
  delay(1000);
}

Select Your Big Data Deployment

You have several choices for which type of Hadoop solution you’ll use for data analysis. The choice of deployment type will dictate how and where you’ll need to aggregate data for analysis.

Azure offers a compelling solution with HDInsight. This exposes the Hadoop framework as a service. This distribution of Hadoop, based on the Hortonworks Data Platform (HDP) for Windows, comes with a connector that lets jobs directly access input data from Azure Blob storage.

This means you don’t have to have the Hadoop cluster up and running in order to receive input files. You can upload files to a Blob storage container that HDInsight will use later. When you analyze a batch of files, you can start the HDInsight cluster in a few minutes, execute a series of jobs for a couple of hours and then shut it down. This translates into lower bills in terms of compute hours.

On the other hand, if you choose to deploy a standard Hadoop distribution such as HDP, or the Cloudera Distribution on Azure virtual machines (VMs), you’ll be responsible for keeping the cluster up-to-date. You’ll also have to have it properly configured for optimal operation. This approach makes sense if you intend to use custom Hadoop components not included in HDInsight, such as HBase, as the storage mechanism.

Save Telemetry Data to Blob Storage

Extracting data from the Azure Service Bus is a simple process. Use a worker role as a “reader” or “listener” for the subscription. Then, accumulate messages into input files HDInsight can use.

First, set up one or several subscriptions on your Azure Service Bus Topic. This gives you some latitude when splitting or routing the data stream, depending on the requirements. At the very least, it’s a good idea to create a “catch-all” subscription to store all incoming messages. You can also use Filters on Azure Service Bus subscriptions. This will create additional streams for specific messages. An example of creating the Topic and Subscriptions using C# and the Azure Service Bus SDK library is shown in Figure 4.

Figure 4 An Azure Service Bus Subscription

var namespaceManager = 
  NamespaceManager.CreateFromConnectionString(connectionString);
// Create the Topic
if (!namespaceManager.TopicExists("telemetry"))
{
  namespaceManager.CreateTopic("telemetry");
}
// Create a "catch-all" Subscription
if (!namespaceManager.SubscriptionExists("telemetry", "all"))
{
  namespaceManager.CreateSubscription("telemetry", "all");
}
// Create an "alerts" subscription
if (!namespaceManager.SubscriptionExists("telemetry", "alert"))
{
  SqlFilter alertFilter = new SqlFilter("type = 99");
  namespaceManager.CreateSubscription("telemetry", 
  "alert", alertFilter);
}

Once you’ve created the Azure Service Bus Subscription, you can receive and save messages. This example uses the CSV format, which is easy to read and understand both by machines and humans. To read the incoming messages as quickly as possible, the worker creates a number of Tasks (there are 10 in this example). It also uses Async methods to read batches of messages, instead of reading them one at a time. The “all” Subscription and “telemetry” topic will receive the messages (see Figure 5).

Figure 5 Receiving Messages from the Subscription and Storing Them in Blob Storage

SubscriptionClient client = 
  SubscriptionClient.CreateFromConnectionString(connectionString, 
  "telemetry", "all", ReceiveMode.ReceiveAndDelete);
List<Task> tasks = new List<Task>();
for (int i = 0; i < NBTASKS; i++)
{
  var id = i; // Closure alert
  Task t = Task.Run(async () =>
  {
    BlobStorageWriter writer = new BlobStorageWriter(id);
    while (true)
    {
      var messages = await client.ReceiveBatchAsync(BATCH_SIZE);
      foreach (var message in messages)
      {
        try
        {
          await writer.WriteOneLine(TelemetryMessage.Stringify(message));
        }
        catch (Exception ex)
        {
          Trace.TraceError(ex.Message);
        }
      }
    }
  });
  tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());

The TelemetryMessage.Stringify method simply returns a line of text in CSV format that contains the telemetry data. It can also extract some useful fields from the Azure Service Bus headers, such as the Message ID or the Enqueued Time.

The job of BlobStorageWriter.WriteOneLine is to write the line directly into a Blob. Because 10 tasks are available in parallel, that same number of Blobs will be affected at once. WriteOneLine also rotates files from time to time for HDInsight to pick them up. I use two parameters to decide when to switch to a new file: the number of lines written to the file and the time since the Blob was created (for example, creating a new file every hour or when it reaches 1,000,000 lines). This method uses asynchronous calls to avoid blocking while writing messages to the Blob stream (see Figure 6).

Figure 6 Write Data from Messages to Azure Blobs

public async Task WriteOneLine(string line)
{
  var bytes = Encoding.UTF8.GetBytes(string.Format("{0}\n", line));
  await destinationStream.WriteAsync(bytes, 0, bytes.Length);
  TimeSpan ts = DateTime.Now - startBlobTime;
  if (++linesWritten > MAX_LINES || ts.TotalSeconds > MAX_SECONDS)
  {
    Trace.TraceInformation(
      "Wrote " + linesWritten + " lines to " + currentBlob.Name);
    GetNextBlob();
    linesWritten = 0;
  }
}

The resulting files contain data extracted from the telemetry messages, as shown:

145268284e8e498282e20b01170634df,test,24,980,21,2014-03-14 13:43:32
dbb52a3cf690467d8401518fc5e266fd,test,24,980,21,2014-03-14 13:43:32
e9b5f508ef8c4d1e8d246162c02e7732,test,24,980,21,2014-03-14 13:43:32

They include the Message ID, Device ID, three of the readings and the date the message was enqueued. This format is easy to parse in the next step.

Analyze the Data Using HDInsight

The most impressive benefit of HDInsight is that you can start a complete Hadoop cluster, run a job and deprovision the cluster directly from the command line. You don’t ever have to log on to a VM or perform any custom configuration. You can provision and manage HDInsight with Windows PowerShell on Windows, or using cross-platform command-line tools on Mac or Linux.

You can download the integrated Azure PowerShell commandlets from bit.ly/1tGirZk. These commandlets include everything you need to manage your Azure infrastructure, including HDInsight clusters. Once you’ve imported your publishing settings and selected the default subscription, you only need one command line to create a new HDInsight cluster:

New-AzureHDInsightCluster -Name "hditelemetry" -Location "North Europe" -DefaultStorageAccountName "telemetry.blob.core.windows.net" -DefaultStorageAccountKey "storage-account-key" -DefaultStorageContainerName "data" -ClusterSizeInNodes 4

This command instructs the HDInsight cluster to use the existing Storage Account and Container as its file system root. This is how it will access all the telemetry data the ingestion process generates. You can also select how many worker nodes the cluster should use, depending on the volume of data and how much parallelism you need.

Once the cluster is up and running, you can enable Remote Desktop access. Doing so lets other users log on to the head node to start an interactive session with standard Hadoop commands and tools. However, it’s much faster to use remote commands, taking advantage of Windows PowerShell to launch Map Reduce, Hive or Pig jobs.

I used a Pig job to calculate the average temperature value. Pig was initially developed at Yahoo. It lets people using Hadoop focus more on analyzing large data sets and spend less time writing mapper and reducer programs. A Pig script typically has three stages:

  1. Load the data you want to manipulate.
  2. Run a series of data transformations (which are translated into a set of mapper and reducer tasks).
  3. Dump the results to screen, or store the results in a file.

The following example shows how you typically achieve this by running the script interactively, in an Exploratory Data Analysis (EDA) phase, with the Pig interpreter:

data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);
data1 = group data by did;
data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);
dump data2;

If you type this script directly into the Pig interpreter, it will display a table containing the number of temperature data points and the average measured value for each DID. As you can see, the Pig syntax is quite explicit. The different data manipulation steps are clearly separated:

  • The first load statement is used to load the data from the CSV files, describing the name and types of the input fields.
  • The data is then grouped by DID, or per device.
  • The result dataset is generated with aggregate functions like COUNT and AVG.

Once the script is finalized, you can automate this task with Windows PowerShell. Use the New-AzureHDInsightPigJob­Definition commandlet to initialize a Pig job with the script created. Then you can use Start-AzureHDInsightJob and Wait-AzureHD­InsightJob to start the job and wait for its conclusion (see Figure 7). You can then use Get-AzureHDInsightJobOutput to retrieve the results.

Figure 7 Insert, Analyze and Start Jobs in HDInsight

$PigScript = "data = load '/telemetry*.csv' using PigStorage(',') as (id:chararray, did:chararray, temp:int, light:int, mic:int, timestamp:datetime);" +
"data1 = group data by did;" +
"data2 = foreach data1 generate group as did, COUNT(data), AVG(data.temp);" +
"dump data2;"
# Define a Pig job
$pigJobDefinition = New-AzureHDInsightPigJobDefinition -Query $PigScript
# Start the job
$pigJob = Start-AzureHDInsightJob -Cluster "hditelemetry" -JobDefinition $pigJobDefinition
# Wait for the job to finish
Wait-AzureHDInsightJob -Job $pigJob -WaitTimeoutInSeconds 3600
# Get the job results
Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId –StandardOutput

The displayed result in the command-line console looks like this:

C:\> Get-AzureHDInsightJobOutput -Cluster "hditelemetry" -JobId $pigJob.JobId
(test,29091,24.0)
(49417795060,3942,30.08371385083714)

In this case, there are quite a few test measurements and around 4,000 readings from the Raspberry Pi. The readings average 30 degrees.

Wrapping Up

The Azure Service Bus is a reliable and fast way to gather data from all sorts of devices. In order to store and analyze that data, you need a robust storage and analysis engine. Azure HDInsight abstracts the process of creating and maintaining a Hadoop cluster for this degree of storage. It’s a highly scalable solution you can configure and automate using tools such as Windows PowerShell or the Mac/Linux Azure command-line interface.


Thomas Conte is a technical evangelist for the Microsoft Azure platform in the Developer & Platform Evangelism (DPE) division. His role is to facilitate access to technology for developers, architects, and software partners through code samples, publications, and public speaking. He endeavors to run Microsoft Azure on as many non-Microsoft technologies from the open source world as possible. Follow him at twitter.com/tomconte.

Bruno Terkaly is a developer evangelist for Microsoft. His depth of knowledge comes from years of experience in the field, writing code using a multitude of platforms, languages, frameworks, SDKs, libraries and APIs. He spends time writing code, blogging and giving live presentations on building cloud-based applications, specifically using the Microsoft Azure platform. You can read his blog at blogs.msdn.com/b/brunoterkaly.

Ricardo Villalobos is a seasoned software architect with more than 15 years of experience designing and creating applications for companies in multiple industries. Holding different technical certifications, as well as a master’s degree in business administration from the University of Dallas, he works as a cloud architect in the DPE Globally Engaged Partners team for Microsoft, helping companies worldwide to implement solutions in Microsoft Azure. You can read his blog at blog.ricardovillalobos.com.

Thanks to the following Microsoft technical experts for reviewing this article: Rafael Godinho and Jeremiah Talkar