Serialize data in Hadoop with the Microsoft Avro Library

32 min to read Contributors
  • Jonathan Gao
  • Andy Pasic
  • Kim Whitlatch
  • Tyson Nevil
  • Larry Franks

This topic shows how to use the Microsoft Avro Library to serialize objects and other data structures into streams in order to persist them to memory, a database, or a file, and also how to deserialize them to recover the original objects.

Note

The information in this document only applies to Windows-based HDInsight clusters.

Apache Avro

The Microsoft Avro Library implements the Apache Avro data serialization system for the Microsoft.NET environment. Apache Avro provides a compact binary data interchange format for serialization. It uses JSON to define a language-agnostic schema that underwrites language interoperability. Data serialized in one language can be read in another. Currently C, C++, C#, Java, PHP, Python, and Ruby are supported. Detailed information on the format can be found in the Apache Avro Specification. Note that the current version of the Microsoft Avro Library does not support the remote procedure calls (RPCs) part of this specification.

The serialized representation of an object in the Avro system consists of two parts: schema and actual value. The Avro schema describes the language-independent data model of the serialized data with JSON. It is present side-by-side with a binary representation of data. Having the schema separate from the binary representation permits each object to be written with no per-value overheads, making serialization fast and the representation small.

The Hadoop scenario

The Apache Avro serialization format is widely used in Azure HDInsight and other Apache Hadoop environments. Avro provides a convenient way to represent complex data structures within a Hadoop MapReduce job. The format of Avro files (Avro object container file) has been designed to support the distributed MapReduce programming model. The key feature that enables the distribution is that the files are “splittable” in the sense that one can seek any point in a file and start reading from a particular block.

Serialization in Avro Library

The .NET Library for Avro supports two ways of serializing objects:

  • reflection - The JSON schema for the types is automatically built from the data contract attributes of the .NET types to be serialized.
  • generic record - A JSON schema is explicitly specified in a record represented by the AvroRecord class when no .NET types are present to describe the schema for the data to be serialized.

When the data schema is known to both the writer and reader of the stream, the data can be sent without its schema. In cases when an Avro object container file is used, the schema is stored within the file. Other parameters, such as the codec used for data compression, can be specified. These scenarios are outlined in more detail and illustrated in the code examples below.

Install Avro Library

The following are required before you install the library:

Note that the Newtonsoft.Json.dll dependency is downloaded automatically with the installation of the Microsoft Avro Library. The procedure for this is provided in the following section.

The Microsoft Avro Library is distributed as a NuGet package that can be installed from Visual Studio via the following procedure:

  1. Select the Project tab -> Manage NuGet Packages...
  2. Search for "Microsoft.Hadoop.Avro" in the Search Online box.
  3. Click the Install button next to Microsoft Azure HDInsight Avro Library.

Note that the Newtonsoft.Json.dll (>=6.0.4) dependency is also downloaded automatically with the Microsoft Avro Library.

You may want to visit the Microsoft Avro Library home page to read the current release notes.

The Microsoft Avro Library source code is available at the Microsoft Avro Library home page.

Compile schemas using Avro Library

The Microsoft Avro Library contains a code generation utility that allows creating C# types automatically based on the previously defined JSON schema. The code generation utility is not distributed as a binary executable, but can be easily built via the following procedure:

  1. Download the .zip file with the latest version of HDInsight SDK source code from Microsoft .NET SDK For Hadoop. (Click the Download icon, not the Downloads tab.)
  2. Extract the HDInsight SDK to a directory on the machine with .NET Framework 4 installed and connected to the Internet for downloading necessary dependency NuGet packages. Below we will assume that the source code is extracted to C:\SDK.
  3. Go to the folder C:\SDK\src\Microsoft.Hadoop.Avro.Tools and run build.bat. (The file will call MSBuild from the 32-bit distribution of the .NET Framework. If you would like to use the 64-bit version, edit build.bat, following the comments inside the file.) Ensure that the build is successful. (On some systems, MSBuild may produce warnings. These warnings do not affect the utility as long as there are no build errors.)
  4. The compiled utility is located in C:\SDK\Bin\Unsigned\Release\Microsoft.Hadoop.Avro.Tools.

To get familiar with the command-line syntax, execute the following command from the folder where the code generation utility is located: Microsoft.Hadoop.Avro.Tools help /c:codegen

To test the utility, you can generate C# classes from the sample JSON schema file provided with the source code. Execute the following command:

Microsoft.Hadoop.Avro.Tools codegen /i:C:\SDK\src\Microsoft.Hadoop.Avro.Tools\SampleJSON\SampleJSONSchema.avsc /o:

This is supposed to produce two C# files in the current directory: SensorData.cs and Location.cs.

To understand the logic that the code generation utility is using while converting the JSON schema to C# types, see the file GenerationVerification.feature located in C:\SDK\src\Microsoft.Hadoop.Avro.Tools\Doc.

Please note that namespaces are extracted from the JSON schema, using the logic described in the file mentioned in the previous paragraph. Namespaces extracted from the schema take precedence over whatever is provided with the /n parameter in the utility command line. If you want to override the namespaces contained within the schema, use the /nf parameter. For example, to change all namespaces from the SampleJSONSchema.avsc to my.own.nspace, execute the following command:

Microsoft.Hadoop.Avro.Tools codegen /i:C:\SDK\src\Microsoft.Hadoop.Avro.Tools\SampleJSON\SampleJSONSchema.avsc /o:. /nf:my.own.nspace

Samples

Six examples provided in this topic illustrate different scenarios supported by the Microsoft Avro Library. The Microsoft Avro Library is designed to work with any stream. In these examples, data is manipulated via memory streams rather than file streams or databases for simplicity and consistency. The approach taken in a production environment will depend on the exact scenario requirements, data source and volume, performance constraints, and other factors.

The first two examples show how to serialize and deserialize data into memory stream buffers by using reflection and generic records. The schema in these two cases is assumed to be shared between the readers and writers out-of-band.

The third and fourth examples show how to serialize and deserialize data by using the Avro object container files. When data is stored in an Avro container file, its schema is always stored with it because the schema must be shared for deserialization.

The sample containing the first four examples can be downloaded from the Azure code samples site.

The fifth example shows how to how to use a custom compression codec for Avro object container files. A sample containing the code for this example can be downloaded from the Azure code samples site.

The sixth sample shows how to use Avro serialization to upload data to Azure Blob storage and then analyze it by using Hive with an HDInsight (Hadoop) cluster. It can be downloaded from the Azure code samples site.

Here are links to the six samples discussed in the topic:

Sample 1: Serialization with reflection

The JSON schema for the types can be automatically built by the Microsoft Avro Library via reflection from the data contract attributes of the C# objects to be serialized. The Microsoft Avro Library creates an IAvroSeralizer to identify the fields to be serialized.

In this example, objects (a SensorData class with a member Location struct) are serialized to a memory stream, and this stream is in turn deserialized. The result is then compared to the initial instance to confirm that the SensorData object recovered is identical to the original.

The schema in this example is assumed to be shared between the readers and writers, so the Avro object container format is not required. For an example of how to serialize and deserialize data into memory buffers by using reflection with the object container format when the schema must be shared with the data, see Serialization using object container files with reflection.

namespace Microsoft.Hadoop.Avro.Sample
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Runtime.Serialization;
    using Microsoft.Hadoop.Avro.Container;
    using Microsoft.Hadoop.Avro;

    //Sample class used in serialization samples
    [DataContract(Name = "SensorDataValue", Namespace = "Sensors")]
    internal class SensorData
    {
        [DataMember(Name = "Location")]
        public Location Position { get; set; }

        [DataMember(Name = "Value")]
        public byte[] Value { get; set; }
    }

    //Sample struct used in serialization samples
    [DataContract]
    internal struct Location
    {
        [DataMember]
        public int Floor { get; set; }

        [DataMember]
        public int Room { get; set; }
    }

    //This class contains all methods demonstrating
    //the usage of Microsoft Avro Library
    public class AvroSample
    {

        //Serialize and deserialize sample data set represented as an object using reflection.
        //No explicit schema definition is required - schema of serialized objects is automatically built.
        public void SerializeDeserializeObjectUsingReflection()
        {

            Console.WriteLine("SERIALIZATION USING REFLECTION\n");
            Console.WriteLine("Serializing Sample Data Set...");

            //Create a new AvroSerializer instance and specify a custom serialization strategy AvroDataContractResolver
            //for serializing only properties attributed with DataContract/DateMember
            var avroSerializer = AvroSerializer.Create<SensorData>();

            //Create a memory stream buffer
            using (var buffer = new MemoryStream())
            {
                //Create a data set by using sample class and struct
                var expected = new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } };

                //Serialize the data to the specified stream
                avroSerializer.Serialize(buffer, expected);


                Console.WriteLine("Deserializing Sample Data Set...");

                //Prepare the stream for deserializing the data
                buffer.Seek(0, SeekOrigin.Begin);

                //Deserialize data from the stream and cast it to the same type used for serialization
                var actual = avroSerializer.Deserialize(buffer);

                Console.WriteLine("Comparing Initial and Deserialized Data Sets...");

                //Finally, verify that deserialized data matches the original one
                bool isEqual = this.Equal(expected, actual);

                Console.WriteLine("Result of Data Set Identity Comparison is {0}", isEqual);

            }
        }

        //
        //Helper methods
        //

        //Comparing two SensorData objects
        private bool Equal(SensorData left, SensorData right)
        {
            return left.Position.Equals(right.Position) && left.Value.SequenceEqual(right.Value);
        }



        static void Main()
        {

            string sectionDivider = "---------------------------------------- ";

            //Create an instance of AvroSample Class and invoke methods
            //illustrating different serializing approaches
            AvroSample Sample = new AvroSample();

            //Serialization to memory using reflection
            Sample.SerializeDeserializeObjectUsingReflection();

            Console.WriteLine(sectionDivider);
            Console.WriteLine("Press any key to exit.");
            Console.Read();
        }
    }
}
// The example is expected to display the following output:
// SERIALIZATION USING REFLECTION
//
// Serializing Sample Data Set...
// Deserializing Sample Data Set...
// Comparing Initial and Deserialized Data Sets...
// Result of Data Set Identity Comparison is True
// ----------------------------------------
// Press any key to exit.

Sample 2: Serialization with a generic record

A JSON schema can be explicitly specified in a generic record when reflection cannot be used because the data cannot be represented via .NET classes with a data contract. This method is generally slower than using reflection. In such cases, the schema for the data may also be dynamic, i.e., not be known at compile time. Data represented as comma-separated values (CSV) files whose schema is unknown until it is transformed to the Avro format at run time is an example of this sort of dynamic scenario.

This example shows how to create and use an AvroRecord to explicitly specify a JSON schema, how to populate it with the data, and then how to serialize and deserialize it. The result is then compared to the initial instance to confirm that the record recovered is identical to the original.

The schema in this example is assumed to be shared between the readers and writers, so the Avro object container format is not required. For an example of how to serialize and deserialize data into memory buffers by using a generic record with the object container format when the schema must be included with the serialized data, see the Serialization using object container files with generic record example.

namespace Microsoft.Hadoop.Avro.Sample
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using Microsoft.Hadoop.Avro.Container;
using Microsoft.Hadoop.Avro.Schema;
using Microsoft.Hadoop.Avro;

//This class contains all methods demonstrating
//the usage of Microsoft Avro Library
public class AvroSample
{

    //Serialize and deserialize sample data set by using a generic record.
    //A generic record is a special class with the schema explicitly defined in JSON.
    //All serialized data should be mapped to the fields of the generic record,
    //which in turn will be then serialized.
    public void SerializeDeserializeObjectUsingGenericRecords()
    {
        Console.WriteLine("SERIALIZATION USING GENERIC RECORD\n");
        Console.WriteLine("Defining the Schema and creating Sample Data Set...");

        //Define the schema in JSON
        const string Schema = @"{
                            ""type"":""record"",
                            ""name"":""Microsoft.Hadoop.Avro.Specifications.SensorData"",
                            ""fields"":
                                [
                                    {
                                        ""name"":""Location"",
                                        ""type"":
                                            {
                                                ""type"":""record"",
                                                ""name"":""Microsoft.Hadoop.Avro.Specifications.Location"",
                                                ""fields"":
                                                    [
                                                        { ""name"":""Floor"", ""type"":""int"" },
                                                        { ""name"":""Room"", ""type"":""int"" }
                                                    ]
                                            }
                                    },
                                    { ""name"":""Value"", ""type"":""bytes"" }
                                ]
                        }";

        //Create a generic serializer based on the schema
        var serializer = AvroSerializer.CreateGeneric(Schema);
        var rootSchema = serializer.WriterSchema as RecordSchema;

        //Create a memory stream buffer
        using (var stream = new MemoryStream())
        {
            //Create a generic record to represent the data
            dynamic location = new AvroRecord(rootSchema.GetField("Location").TypeSchema);
            location.Floor = 1;
            location.Room = 243;

            dynamic expected = new AvroRecord(serializer.WriterSchema);
            expected.Location = location;
            expected.Value = new byte[] { 1, 2, 3, 4, 5 };

            Console.WriteLine("Serializing Sample Data Set...");

            //Serialize the data
            serializer.Serialize(stream, expected);

            stream.Seek(0, SeekOrigin.Begin);

            Console.WriteLine("Deserializing Sample Data Set...");

            //Deserialize the data into a generic record
            dynamic actual = serializer.Deserialize(stream);

            Console.WriteLine("Comparing Initial and Deserialized Data Sets...");

            //Finally, verify the results
            bool isEqual = expected.Location.Floor.Equals(actual.Location.Floor);
            isEqual = isEqual && expected.Location.Room.Equals(actual.Location.Room);
            isEqual = isEqual && ((byte[])expected.Value).SequenceEqual((byte[])actual.Value);
            Console.WriteLine("Result of Data Set Identity Comparison is {0}", isEqual);
        }
    }

    static void Main()
    {

        string sectionDivider = "---------------------------------------- ";

        //Create an instance of AvroSample class and invoke methods
        //illustrating different serializing approaches
        AvroSample Sample = new AvroSample();

        //Serialization to memory using generic record
        Sample.SerializeDeserializeObjectUsingGenericRecords();

        Console.WriteLine(sectionDivider);
        Console.WriteLine("Press any key to exit.");
        Console.Read();
    }
}
}
// The example is expected to display the following output:
// SERIALIZATION USING GENERIC RECORD
//
// Defining the Schema and creating Sample Data Set...
// Serializing Sample Data Set...
// Deserializing Sample Data Set...
// Comparing Initial and Deserialized Data Sets...
// Result of Data Set Identity Comparison is True
// ----------------------------------------
// Press any key to exit.

Sample 3: Serialization using object container files and serialization with reflection

This example is similar to the scenario in the first example, where the schema is implicitly specified with reflection. The difference is that here, the schema is not assumed to be known to the reader that deserializes it. The SensorData objects to be serialized and their implicitly specified schema are stored in an Avro object container file represented by the AvroContainer class.

The data is serialized in this example with SequentialWriter and deserialized with SequentialReader. The result then is compared to the initial instances to ensure identity.

The data in the object container file is compressed via the default Deflate compression codec from .NET Framework 4. See the fifth example in this topic to learn how to use a more recent and superior version of the Deflate compression codec available in .NET Framework 4.5.

namespace Microsoft.Hadoop.Avro.Sample
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Runtime.Serialization;
    using Microsoft.Hadoop.Avro.Container;
    using Microsoft.Hadoop.Avro;

    //Sample class used in serialization samples
    [DataContract(Name = "SensorDataValue", Namespace = "Sensors")]
    internal class SensorData
    {
        [DataMember(Name = "Location")]
        public Location Position { get; set; }

        [DataMember(Name = "Value")]
        public byte[] Value { get; set; }
    }

    //Sample struct used in serialization samples
    [DataContract]
    internal struct Location
    {
        [DataMember]
        public int Floor { get; set; }

        [DataMember]
        public int Room { get; set; }
    }

    //This class contains all methods demonstrating
    //the usage of Microsoft Avro Library
    public class AvroSample
    {

        //Serializes and deserializes the sample data set by using reflection and Avro object container files.
        //Serialized data is compressed with the Deflate codec.
        public void SerializeDeserializeUsingObjectContainersReflection()
        {

            Console.WriteLine("SERIALIZATION USING REFLECTION AND AVRO OBJECT CONTAINER FILES\n");

            //Path for Avro object container file
            string path = "AvroSampleReflectionDeflate.avro";

            //Create a data set by using sample class and struct
            var testData = new List<SensorData>
                    {
                        new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } },
                        new SensorData { Value = new byte[] { 6, 7, 8, 9 }, Position = new Location { Room = 244, Floor = 1 } }
                    };

            //Serializing and saving data to file.
            //Creating a memory stream buffer.
            using (var buffer = new MemoryStream())
            {
                Console.WriteLine("Serializing Sample Data Set...");

                //Create a SequentialWriter instance for type SensorData, which can serialize a sequence of SensorData objects to stream.
                //Data will be compressed using the Deflate codec.
                using (var w = AvroContainer.CreateWriter<SensorData>(buffer, Codec.Deflate))
                {
                    using (var writer = new SequentialWriter<SensorData>(w, 24))
                    {
                        // Serialize the data to stream by using the sequential writer
                        testData.ForEach(writer.Write);
                    }
                }

                //Save stream to file
                Console.WriteLine("Saving serialized data to file...");
                if (!WriteFile(buffer, path))
                {
                    Console.WriteLine("Error during file operation. Quitting method");
                    return;
                }
            }

            //Reading and deserializing data.
            //Creating a memory stream buffer.
            using (var buffer = new MemoryStream())
            {
                Console.WriteLine("Reading data from file...");

                //Reading data from object container file
                if (!ReadFile(buffer, path))
                {
                    Console.WriteLine("Error during file operation. Quitting method");
                    return;
                }

                Console.WriteLine("Deserializing Sample Data Set...");

                //Prepare the stream for deserializing the data
                buffer.Seek(0, SeekOrigin.Begin);

                //Create a SequentialReader instance for type SensorData, which will deserialize all serialized objects from the given stream.
                //It allows iterating over the deserialized objects because it implements the IEnumerable<T> interface.
                using (var reader = new SequentialReader<SensorData>(
                    AvroContainer.CreateReader<SensorData>(buffer, true)))
                {
                    var results = reader.Objects;

                    //Finally, verify that deserialized data matches the original one
                    Console.WriteLine("Comparing Initial and Deserialized Data Sets...");
                    int count = 1;
                    var pairs = testData.Zip(results, (serialized, deserialized) => new { expected = serialized, actual = deserialized });
                    foreach (var pair in pairs)
                    {
                        bool isEqual = this.Equal(pair.expected, pair.actual);
                        Console.WriteLine("For Pair {0} result of Data Set Identity Comparison is {1}", count, isEqual);
                        count++;
                    }
                }
            }

            //Delete the file
            RemoveFile(path);
        }

        //
        //Helper methods
        //

        //Comparing two SensorData objects
        private bool Equal(SensorData left, SensorData right)
        {
            return left.Position.Equals(right.Position) && left.Value.SequenceEqual(right.Value);
        }

        //Saving memory stream to a new file with the given path
        private bool WriteFile(MemoryStream InputStream, string path)
        {
            if (!File.Exists(path))
            {
                try
                {
                    using (FileStream fs = File.Create(path))
                    {
                        InputStream.Seek(0, SeekOrigin.Begin);
                        InputStream.CopyTo(fs);
                    }
                    return true;
                }
                catch (Exception e)
                {
                    Console.WriteLine("The following exception was thrown during creation and writing to the file \"{0}\"", path);
                    Console.WriteLine(e.Message);
                    return false;
                }
            }
            else
            {
                Console.WriteLine("Can not create file \"{0}\". File already exists", path);
                return false;

            }
        }

        //Reading a file content by using the given path to a memory stream
        private bool ReadFile(MemoryStream OutputStream, string path)
        {
            try
            {
                using (FileStream fs = File.Open(path, FileMode.Open))
                {
                    fs.CopyTo(OutputStream);
                }
                return true;
            }
            catch (Exception e)
            {
                Console.WriteLine("The following exception was thrown during reading from the file \"{0}\"", path);
                Console.WriteLine(e.Message);
                return false;
            }
        }

        //Deleting file by using given path
        private void RemoveFile(string path)
        {
            if (File.Exists(path))
            {
                try
                {
                    File.Delete(path);
                }
                catch (Exception e)
                {
                    Console.WriteLine("The following exception was thrown during deleting the file \"{0}\"", path);
                    Console.WriteLine(e.Message);
                }
            }
            else
            {
                Console.WriteLine("Can not delete file \"{0}\". File does not exist", path);
            }
        }

        static void Main()
        {

            string sectionDivider = "---------------------------------------- ";

            //Create an instance of AvroSample class and invoke methods
            //illustrating different serializing approaches
            AvroSample Sample = new AvroSample();

            //Serialization using reflection to Avro object container file
            Sample.SerializeDeserializeUsingObjectContainersReflection();

            Console.WriteLine(sectionDivider);
            Console.WriteLine("Press any key to exit.");
            Console.Read();
        }
    }
}
// The example is expected to display the following output:
// SERIALIZATION USING REFLECTION AND AVRO OBJECT CONTAINER FILES
//
// Serializing Sample Data Set...
// Saving serialized data to file...
// Reading data from file...
// Deserializing Sample Data Set...
// Comparing Initial and Deserialized Data Sets...
// For Pair 1 result of Data Set Identity Comparison is True
// For Pair 2 result of Data Set Identity Comparison is True
// ----------------------------------------
// Press any key to exit.

Sample 4: Serialization using object container files and serialization with generic record

This example is similar to the scenario in the second example, where the schema is explicitly specified with JSON. The difference is that here, the schema is not assumed to be known to the reader that deserializes it.

The test data set is collected into a list of AvroRecord objects via an explicitly defined JSON schema and then stored in an object container file represented by the AvroContainer class. This container file creates a writer that is used to serialize the data, uncompressed, to a memory stream that is then saved to a file. The Codec.Null parameter used for creating the reader specifies that this data will not be compressed.

The data is then read from the file and deserialized into a collection of objects. This collection is compared to the initial list of Avro records to confirm that they are identical.

namespace Microsoft.Hadoop.Avro.Sample
{
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Runtime.Serialization;
    using Microsoft.Hadoop.Avro.Container;
    using Microsoft.Hadoop.Avro.Schema;
    using Microsoft.Hadoop.Avro;

    //This class contains all methods demonstrating
    //the usage of Microsoft Avro Library
    public class AvroSample
    {

        //Serializes and deserializes a sample data set by using a generic record and Avro object container files.
        //Serialized data is not compressed.
        public void SerializeDeserializeUsingObjectContainersGenericRecord()
        {
            Console.WriteLine("SERIALIZATION USING GENERIC RECORD AND AVRO OBJECT CONTAINER FILES\n");

            //Path for Avro object container file
            string path = "AvroSampleGenericRecordNullCodec.avro";

            Console.WriteLine("Defining the Schema and creating Sample Data Set...");

            //Define the schema in JSON
            const string Schema = @"{
                            ""type"":""record"",
                            ""name"":""Microsoft.Hadoop.Avro.Specifications.SensorData"",
                            ""fields"":
                                [
                                    {
                                        ""name"":""Location"",
                                        ""type"":
                                            {
                                                ""type"":""record"",
                                                ""name"":""Microsoft.Hadoop.Avro.Specifications.Location"",
                                                ""fields"":
                                                    [
                                                        { ""name"":""Floor"", ""type"":""int"" },
                                                        { ""name"":""Room"", ""type"":""int"" }
                                                    ]
                                            }
                                    },
                                    { ""name"":""Value"", ""type"":""bytes"" }
                                ]
                        }";

            //Create a generic serializer based on the schema
            var serializer = AvroSerializer.CreateGeneric(Schema);
            var rootSchema = serializer.WriterSchema as RecordSchema;

            //Create a generic record to represent the data
            var testData = new List<AvroRecord>();

            dynamic expected1 = new AvroRecord(rootSchema);
            dynamic location1 = new AvroRecord(rootSchema.GetField("Location").TypeSchema);
            location1.Floor = 1;
            location1.Room = 243;
            expected1.Location = location1;
            expected1.Value = new byte[] { 1, 2, 3, 4, 5 };
            testData.Add(expected1);

            dynamic expected2 = new AvroRecord(rootSchema);
            dynamic location2 = new AvroRecord(rootSchema.GetField("Location").TypeSchema);
            location2.Floor = 1;
            location2.Room = 244;
            expected2.Location = location2;
            expected2.Value = new byte[] { 6, 7, 8, 9 };
            testData.Add(expected2);

            //Serializing and saving data to file.
            //Create a MemoryStream buffer.
            using (var buffer = new MemoryStream())
            {
                Console.WriteLine("Serializing Sample Data Set...");

                //Create a SequentialWriter instance for type SensorData, which can serialize a sequence of SensorData objects to stream.
                //Data will not be compressed (Null compression codec).
                using (var writer = AvroContainer.CreateGenericWriter(Schema, buffer, Codec.Null))
                {
                    using (var streamWriter = new SequentialWriter<object>(writer, 24))
                    {
                        // Serialize the data to stream by using the sequential writer
                        testData.ForEach(streamWriter.Write);
                    }
                }

                Console.WriteLine("Saving serialized data to file...");

                //Save stream to file
                if (!WriteFile(buffer, path))
                {
                    Console.WriteLine("Error during file operation. Quitting method");
                    return;
                }
            }

            //Reading and deserializing the data.
            //Create a memory stream buffer.
            using (var buffer = new MemoryStream())
            {
                Console.WriteLine("Reading data from file...");

                //Reading data from object container file
                if (!ReadFile(buffer, path))
                {
                    Console.WriteLine("Error during file operation. Quitting method");
                    return;
                }

                Console.WriteLine("Deserializing Sample Data Set...");

                //Prepare the stream for deserializing the data
                buffer.Seek(0, SeekOrigin.Begin);

                //Create a SequentialReader instance for type SensorData, which will deserialize all serialized objects from the given stream.
                //It allows iterating over the deserialized objects because it implements the IEnumerable<T> interface.
                using (var reader = AvroContainer.CreateGenericReader(buffer))
                {
                    using (var streamReader = new SequentialReader<object>(reader))
                    {
                        var results = streamReader.Objects;

                        Console.WriteLine("Comparing Initial and Deserialized Data Sets...");

                        //Finally, verify the results
                        var pairs = testData.Zip(results, (serialized, deserialized) => new { expected = (dynamic)serialized, actual = (dynamic)deserialized });
                        int count = 1;
                        foreach (var pair in pairs)
                        {
                            bool isEqual = pair.expected.Location.Floor.Equals(pair.actual.Location.Floor);
                            isEqual = isEqual && pair.expected.Location.Room.Equals(pair.actual.Location.Room);
                            isEqual = isEqual && ((byte[])pair.expected.Value).SequenceEqual((byte[])pair.actual.Value);
                            Console.WriteLine("For Pair {0} result of Data Set Identity Comparison is {1}", count, isEqual.ToString());
                            count++;
                        }
                    }
                }
            }

            //Delete the file
            RemoveFile(path);
        }

        //
        //Helper methods
        //

        //Saving memory stream to a new file with the given path
        private bool WriteFile(MemoryStream InputStream, string path)
        {
            if (!File.Exists(path))
            {
                try
                {
                    using (FileStream fs = File.Create(path))
                    {
                        InputStream.Seek(0, SeekOrigin.Begin);
                        InputStream.CopyTo(fs);
                    }
                    return true;
                }
                catch (Exception e)
                {
                    Console.WriteLine("The following exception was thrown during creation and writing to the file \"{0}\"", path);
                    Console.WriteLine(e.Message);
                    return false;
                }
            }
            else
            {
                Console.WriteLine("Can not create file \"{0}\". File already exists", path);
                return false;

            }
        }

        //Reading a file content by using the given path to a memory stream
        private bool ReadFile(MemoryStream OutputStream, string path)
        {
            try
            {
                using (FileStream fs = File.Open(path, FileMode.Open))
                {
                    fs.CopyTo(OutputStream);
                }
                return true;
            }
            catch (Exception e)
            {
                Console.WriteLine("The following exception was thrown during reading from the file \"{0}\"", path);
                Console.WriteLine(e.Message);
                return false;
            }
        }

        //Deleting file by using the given path
        private void RemoveFile(string path)
        {
            if (File.Exists(path))
            {
                try
                {
                    File.Delete(path);
                }
                catch (Exception e)
                {
                    Console.WriteLine("The following exception was thrown during deleting the file \"{0}\"", path);
                    Console.WriteLine(e.Message);
                }
            }
            else
            {
                Console.WriteLine("Can not delete file \"{0}\". File does not exist", path);
            }
        }

        static void Main()
        {

            string sectionDivider = "---------------------------------------- ";

            //Create an instance of the AvroSample class and invoke methods
            //illustrating different serializing approaches
            AvroSample Sample = new AvroSample();

            //Serialization using generic record to Avro object container file
            Sample.SerializeDeserializeUsingObjectContainersGenericRecord();

            Console.WriteLine(sectionDivider);
            Console.WriteLine("Press any key to exit.");
            Console.Read();
        }
    }
}
// The example is expected to display the following output:
// SERIALIZATION USING GENERIC RECORD AND AVRO OBJECT CONTAINER FILES
//
// Defining the Schema and creating Sample Data Set...
// Serializing Sample Data Set...
// Saving serialized data to file...
// Reading data from file...
// Deserializing Sample Data Set...
// Comparing Initial and Deserialized Data Sets...
// For Pair 1 result of Data Set Identity Comparison is True
// For Pair 2 result of Data Set Identity Comparison is True
// ----------------------------------------
// Press any key to exit.

Sample 5: Serialization using object container files with a custom compression codec

The fifth example shows how to how to use a custom compression codec for Avro object container files. A sample containing the code for this example can be downloaded from the Azure code samples site.

The Avro Specification allows usage of an optional compression codec (in addition to Null and Deflate defaults). This example is not implementing a completely new codec such as Snappy (mentioned as a supported optional codec in the Avro Specification). It shows how to use the .NET Framework 4.5 implementation of the Deflate codec, which provides a better compression algorithm based on the zlib compression library than the default .NET Framework 4 version.

//
// This code needs to be compiled with the parameter Target Framework set as ".NET Framework 4.5"
// to ensure the desired implementation of the Deflate compression algorithm is used.
// Ensure your C# project is set up accordingly.
//

namespace Microsoft.Hadoop.Avro.Sample
{
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.IO;
    using System.IO.Compression;
    using System.Linq;
    using System.Runtime.Serialization;
    using Microsoft.Hadoop.Avro.Container;
    using Microsoft.Hadoop.Avro;

    #region Defining objects for serialization
    //Sample class used in serialization samples
    [DataContract(Name = "SensorDataValue", Namespace = "Sensors")]
    internal class SensorData
    {
        [DataMember(Name = "Location")]
        public Location Position { get; set; }

        [DataMember(Name = "Value")]
        public byte[] Value { get; set; }
    }

    //Sample struct used in serialization samples
    [DataContract]
    internal struct Location
    {
        [DataMember]
        public int Floor { get; set; }

        [DataMember]
        public int Room { get; set; }
    }
    #endregion

    #region Defining custom codec based on .NET Framework V.4.5 Deflate
    //Avro.NET codec class contains two methods,
    //GetCompressedStreamOver(Stream uncompressed) and GetDecompressedStreamOver(Stream compressed),
    //which are the key ones for data compression.
    //To enable a custom codec, one needs to implement these methods for the required codec.

    #region Defining Compression and Decompression Streams
    //DeflateStream (class from System.IO.Compression namespace that implements Deflate algorithm)
    //cannot be directly used for Avro because it does not support vital operations like Seek.
    //Thus one needs to implement two classes inherited from stream
    //(one for compressed and one for decompressed stream)
    //that use Deflate compression and implement all required features.
    internal sealed class CompressionStreamDeflate45 : Stream
    {
        private readonly Stream buffer;
        private DeflateStream compressionStream;

        public CompressionStreamDeflate45(Stream buffer)
        {
            Debug.Assert(buffer != null, "Buffer is not allowed to be null.");

            this.compressionStream = new DeflateStream(buffer, CompressionLevel.Fastest, true);
            this.buffer = buffer;
        }

        public override bool CanRead
        {
            get { return this.buffer.CanRead; }
        }

        public override bool CanSeek
        {
            get { return true; }
        }

        public override bool CanWrite
        {
            get { return this.buffer.CanWrite; }
        }

        public override void Flush()
        {
            this.compressionStream.Close();
        }

        public override long Length
        {
            get { return this.buffer.Length; }
        }

        public override long Position
        {
            get
            {
                return this.buffer.Position;
            }

            set
            {
                this.buffer.Position = value;
            }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            return this.buffer.Read(buffer, offset, count);
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            return this.buffer.Seek(offset, origin);
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            this.compressionStream.Write(buffer, offset, count);
        }

        protected override void Dispose(bool disposed)
        {
            base.Dispose(disposed);

            if (disposed)
            {
                this.compressionStream.Dispose();
                this.compressionStream = null;
            }
        }
    }

    internal sealed class DecompressionStreamDeflate45 : Stream
    {
        private readonly DeflateStream decompressed;

        public DecompressionStreamDeflate45(Stream compressed)
        {
            this.decompressed = new DeflateStream(compressed, CompressionMode.Decompress, true);
        }

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return true; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {
            this.decompressed.Close();
        }

        public override long Length
        {
            get { return this.decompressed.Length; }
        }

        public override long Position
        {
            get
            {
                return this.decompressed.Position;
            }

            set
            {
                throw new NotSupportedException();
            }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            return this.decompressed.Read(buffer, offset, count);
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);

            if (disposing)
            {
                this.decompressed.Dispose();
            }
        }
    }
    #endregion

    #region Define Codec
    //Define the actual codec class containing the required methods for manipulating streams:
    //GetCompressedStreamOver(Stream uncompressed) and GetDecompressedStreamOver(Stream compressed).
    //Codec class uses classes for compressed and decompressed streams defined above.
    internal sealed class DeflateCodec45 : Codec
    {

        //We merely use different IMPLEMENTATIONS of Deflate, so CodecName remains "deflate"
        public static readonly string CodecName = "deflate";

        public DeflateCodec45()
            : base(CodecName)
        {
        }

        public override Stream GetCompressedStreamOver(Stream decompressed)
        {
            if (decompressed == null)
            {
                throw new ArgumentNullException("decompressed");
            }

            return new CompressionStreamDeflate45(decompressed);
        }

        public override Stream GetDecompressedStreamOver(Stream compressed)
        {
            if (compressed == null)
            {
                throw new ArgumentNullException("compressed");
            }

            return new DecompressionStreamDeflate45(compressed);
        }
    }
    #endregion

    #region Define modified Codec Factory
    //Define modified codec factory to be used in the reader.
    //It will catch the attempt to use "Deflate" and provide  a custom codec.
    //For all other cases, it will rely on the base class (CodecFactory).
    internal sealed class CodecFactoryDeflate45 : CodecFactory
    {

        public override Codec Create(string codecName)
        {
            if (codecName == DeflateCodec45.CodecName)
                return new DeflateCodec45();
            else
                return base.Create(codecName);
        }
    }
    #endregion

    #endregion

    #region Sample Class with demonstration methods
    //This class contains methods demonstrating
    //the usage of Microsoft Avro Library
    public class AvroSample
    {

        //Serializes and deserializes sample data set by using reflection and Avro object container files.
        //Serialized data is compressed with the custom compression codec (Deflate of .NET Framework 4.5).
        //
        //This sample uses memory stream for all operations related to serialization, deserialization and
        //object container manipulation, though file stream could be easily used.
        public void SerializeDeserializeUsingObjectContainersReflectionCustomCodec()
        {

            Console.WriteLine("SERIALIZATION USING REFLECTION, AVRO OBJECT CONTAINER FILES AND CUSTOM CODEC\n");

            //Path for Avro object container file
            string path = "AvroSampleReflectionDeflate45.avro";

            //Create a data set by using sample class and struct
            var testData = new List<SensorData>
                    {
                        new SensorData { Value = new byte[] { 1, 2, 3, 4, 5 }, Position = new Location { Room = 243, Floor = 1 } },
                        new SensorData { Value = new byte[] { 6, 7, 8, 9 }, Position = new Location { Room = 244, Floor = 1 } }
                    };

            //Serializing and saving data to file.
            //Creating a memory stream buffer.
            using (var buffer = new MemoryStream())
            {
                Console.WriteLine("Serializing Sample Data Set...");

                //Create a SequentialWriter instance for type SensorData, which can serialize a sequence of SensorData objects to stream.
                //Here the custom codec is introduced. For convenience, the next commented code line shows how to use built-in Deflate.
                //Note that because the sample deals with different IMPLEMENTATIONS of Deflate, built-in and custom codecs are interchangeable
                //in read-write operations.
                //using (var w = AvroContainer.CreateWriter<SensorData>(buffer, Codec.Deflate))
                using (var w = AvroContainer.CreateWriter<SensorData>(buffer, new DeflateCodec45()))
                {
                    using (var writer = new SequentialWriter<SensorData>(w, 24))
                    {
                        // Serialize the data to stream using the sequential writer
                        testData.ForEach(writer.Write);
                    }
                }

                //Save stream to file
                Console.WriteLine("Saving serialized data to file...");
                if (!WriteFile(buffer, path))
                {
                    Console.WriteLine("Error during file operation. Quitting method");
                    return;
                }
            }

            //Reading and deserializing data.
            //Creating a memory stream buffer.
            using (var buffer = new MemoryStream())
            {
                Console.WriteLine("Reading data from file...");

                //Reading data from object container file
                if (!ReadFile(buffer, path))
                {
                    Console.WriteLine("Error during file operation. Quitting method");
                    return;
                }

                Console.WriteLine("Deserializing Sample Data Set...");

                //Prepare the stream for deserializing the data
                buffer.Seek(0, SeekOrigin.Begin);

                //Because of SequentialReader<T> constructor signature, an AvroSerializerSettings instance is required
                //when codec factory is explicitly specified.
                //You may comment the line below if you want to use built-in Deflate (see next comment).
                AvroSerializerSettings settings = new AvroSerializerSettings();

                //Create a SequentialReader instance for type SensorData, which will deserialize all serialized objects from the given stream.
                //It allows iterating over the deserialized objects because it implements the IEnumerable<T> interface.
                //Here the custom codec factory is introduced.
                //For convenience, the next commented code line shows how to use built-in Deflate
                //(no explicit Codec Factory parameter is required in this case).
                //Note that because the sample deals with different IMPLEMENTATIONS of Deflate, built-in and custom codecs are interchangeable
                //in read-write operations.
                //using (var reader = new SequentialReader<SensorData>(AvroContainer.CreateReader<SensorData>(buffer, true)))
                using (var reader = new SequentialReader<SensorData>(
                    AvroContainer.CreateReader<SensorData>(buffer, true, settings, new CodecFactoryDeflate45())))
                {
                    var results = reader.Objects;

                    //Finally, verify that deserialized data matches the original one
                    Console.WriteLine("Comparing Initial and Deserialized Data Sets...");
                    bool isEqual;
                    int count = 1;
                    var pairs = testData.Zip(results, (serialized, deserialized) => new { expected = serialized, actual = deserialized });
                    foreach (var pair in pairs)
                    {
                        isEqual = this.Equal(pair.expected, pair.actual);
                        Console.WriteLine("For Pair {0} result of Data Set Identity Comparison is {1}", count, isEqual.ToString());
                        count++;
                    }
                }
            }

            //Delete the file
            RemoveFile(path);
        }
    #endregion

        #region Helper Methods

        //Comparing two SensorData objects
        private bool Equal(SensorData left, SensorData right)
        {
            return left.Position.Equals(right.Position) && left.Value.SequenceEqual(right.Value);
        }

        //Saving memory stream to a new file with the given path
        private bool WriteFile(MemoryStream InputStream, string path)
        {
            if (!File.Exists(path))
            {
                try
                {
                    using (FileStream fs = File.Create(path))
                    {
                        InputStream.Seek(0, SeekOrigin.Begin);
                        InputStream.CopyTo(fs);
                    }
                    return true;
                }
                catch (Exception e)
                {
                    Console.WriteLine("The following exception was thrown during creation and writing to the file \"{0}\"", path);
                    Console.WriteLine(e.Message);
                    return false;
                }
            }
            else
            {
                Console.WriteLine("Can not create file \"{0}\". File already exists", path);
                return false;

            }
        }

        //Reading file content by using the given path to a memory stream
        private bool ReadFile(MemoryStream OutputStream, string path)
        {
            try
            {
                using (FileStream fs = File.Open(path, FileMode.Open))
                {
                    fs.CopyTo(OutputStream);
                }
                return true;
            }
            catch (Exception e)
            {
                Console.WriteLine("The following exception was thrown during reading from the file \"{0}\"", path);
                Console.WriteLine(e.Message);
                return false;
            }
        }

        //Deleting file by using given path
        private void RemoveFile(string path)
        {
            if (File.Exists(path))
            {
                try
                {
                    File.Delete(path);
                }
                catch (Exception e)
                {
                    Console.WriteLine("The following exception was thrown during deleting the file \"{0}\"", path);
                    Console.WriteLine(e.Message);
                }
            }
            else
            {
                Console.WriteLine("Can not delete file \"{0}\". File does not exist", path);
            }
        }
        #endregion

        static void Main()
        {

            string sectionDivider = "---------------------------------------- ";

            //Create an instance of AvroSample Class and invoke methods
            //illustrating different serializing approaches
            AvroSample Sample = new AvroSample();

            //Serialization using reflection to Avro object container file using custom codec
            Sample.SerializeDeserializeUsingObjectContainersReflectionCustomCodec();

            Console.WriteLine(sectionDivider);
            Console.WriteLine("Press any key to exit.");
            Console.Read();
        }
    }
}
// The example is expected to display the following output:
// SERIALIZATION USING REFLECTION, AVRO OBJECT CONTAINER FILES AND CUSTOM CODEC
//
// Serializing Sample Data Set...
// Saving serialized data to file...
// Reading data from file...
// Deserializing Sample Data Set...
// Comparing Initial and Deserialized Data Sets...
// For Pair 1 result of Data Set Identity Comparison is True
//For Pair 2 result of Data Set Identity Comparison is True
// ----------------------------------------
// Press any key to exit.

Sample 6: Using Avro to upload data for the Microsoft Azure HDInsight service

The sixth example illustrates some programming techniques related to interacting with the Azure HDInsight service. A sample containing the code for this example can be downloaded from the Azure code samples site.

The sample does the following:

  • Connects to an existing HDInsight service cluster.
  • Serializes several CSV files and uploads the result to Azure Blob storage. (The CSV files are distributed together with the sample and represent an extract from AMEX Stock historical data distributed by Infochimps for the period 1970-2010. The sample reads CSV file data, converts the records to instances of the Stock class, and then serializes them by using reflection. Stock type definition is created from a JSON schema via the Microsoft Avro Library code generation utility.
  • Creates a new external table called Stocks in Hive, and links it to the data uploaded in the previous step.
  • Executes a query by using Hive over the Stocks table.

In addition, the sample performs a clean-up procedure before and after performing major operations. During the clean-up, all of the related Azure Blob data and folders are removed, and the Hive table is dropped. You can also invoke the clean-up procedure from the sample command line.

The sample has the following prerequisites:

  • An active Microsoft Azure subscription and its subscription ID.
  • A management certificate for the subscription with the corresponding private key. The certificate should be installed in the current user private storage on the machine used to run the sample.
  • An active HDInsight cluster.
  • An Azure Storage account linked to the HDInsight cluster from the previous prerequisite, together with the corresponding primary or secondary access key.

All of the information from the prerequisites should be entered to the sample configuration file before the sample is run. There are two possible ways to do it:

  • Edit the app.config file in the sample root directory and then build the sample
  • First build the sample, and then edit AvroHDISample.exe.config in the build directory

In both cases all edits should be done in the settings section. Please follow the comments in the file. The sample is run from the command line by executing the following command (where the .zip file with the sample was assumed to be extracted to C:\AvroHDISample; if otherwise, use the relevant file path):

AvroHDISample run C:\AvroHDISample\Data

To clean up the cluster, run the following command:

AvroHDISample clean