Hadoop .Net HDFS File Access (Archived)

Updated post can be found here: http://blogs.msdn.com/b/carlnol/archive/2013/02/08/hdinsight-net-hdfs-file-access.aspx

If you grab the latest installment of Microsoft Distribution of Hadoop you will notice, in addition to the C library, a Managed C++ solution for HDFS file access. This solution now enables one to consume HDFS files from within a .Net environment.

The purpose of this post is first to ensure folks know about the new Windows HDFS Managed library (WinHdfsManaged), provided alongside the native C library, and secondly to give a few samples of its usage from C#.

Class Structure

Let’s start with a simple class diagram of the Win HDFS Managed library:

WinHdfsManagedModel

The main premise is that the HdfsFileSystem is your starting point, from which one can acquire a HdfsFileHandle or a HdfsFileStream. From the HdfsFileHandle you can perform operations analogous to normal HDFS file operations. From the HdfsFileStream you can perform operations one would normally expect when working with .Net Streams.

So let’s run through some sample file operations.

Directory Operations

As in all operations one firstly needs to get a connection to the HDFS cluster. This is achieved by calling a Connect() method and specifying the host, name or IP address, and access port:

Create File System Access

using (HdfsFileSystem hdfsSystem = HdfsFileSystem.Connect("127.0.0.1", 9000))
{
    ...
}

Once one has the connection one can then easily perform a directory traversal to enquire into the files and directories:

List Directory Structure

Action<string> processDirectory = null;
processDirectory = (looppath) =>
{
    using (HdfsFileInfoEntries entries = hdfsSystem.ListDirectory(looppath))
    {
        foreach (HdfsFileInfoEntry entry in entries.Entries)
        {
            string kind = entry.Kind == HdfsFileInfoEntryKind.Directory ? "Directory" : "\tFile";
            Console.WriteLine(string.Format(@"{0}:""{1}"", Modified/Accessed:""{2:G}, {3:G}"", Owner:""{4}""", kind, entry.Name, entry.LastModified, entry.LastAccessed, entry.Owner));
            if (entry.Kind == HdfsFileInfoEntryKind.Directory)
            {
                processDirectory(entry.Name);
            }
        }
    }
};
processDirectory(hdfspath)

Here is a sample output created from the test application:

Directory:"hdfs://127.0.0.1:9000/user/isotope/qwanchi", Modified/Accessed:"30/01/2012 20:46:38, 01/01/1970 00:00:00", Owner:"isotope"
        File:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/MobileSampleData.txt", Modified/Accessed:"30/01/2012 20:46:38, 30/01/2012 20:46:38", Owner:"isotope"
Directory:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/duplicate", Modified/Accessed:"30/01/2012 20:46:38, 01/01/1970 00:00:00", Owner:"isotope"
        File:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/duplicate/testdata.txt", Modified/Accessed:"30/01/2012 20:46:38, 30/01/2012 20:46:38", Owner:"isotope"
        File:"hdfs://127.0.0.1:9000/user/isotope/qwanchi/testdata.txt", Modified/Accessed:"28/01/2012 20:46:38, 29/01/2012 20:46:38", Owner:"isotope"

In addition to getting directory information one can also query on a file or directory directly:

 

 

Get Path Information

hdfsSystem.SetWorkingDirectory(hdfspath);
using (HdfsFileInfoEntry pathinfo = hdfsSystem.GetPathInfo(hdfspath))
{
    if (pathinfo != null)
    {
        string kind = pathinfo.Kind == HdfsFileInfoEntryKind.Directory ? "Directory" : "\tFile";
        Console.WriteLine(string.Format(@"{0}:""{1}"", Modified/Accessed:""{2:G}, {3:G}"", Owner:""{4}""", kind, pathinfo.Name, pathinfo.LastModified, pathinfo.LastAccessed, pathinfo.Owner));
    }
}

The HdfsFileSystem class also supports other operations such as copying and moving files, file renaming, deleting files, modifying security, checking a file exists, etc. The copy and move operations support copying and moving these files between systems.

So now onto creating and reading files.

Reading Files

Processing HDFS files is not that dissimilar from normal .Net file operations. Once one has opened a file for reading, operations are available for operations such as reading a byte, line, or block of bytes:

Reading Stream File Data

using (HdfsFileStream file = hdfsSystem.OpenFileStream(filename, HdfsFileAccess.Write, chunksize))
{
    file.Write(dataBytes, 0, data.Length);
    file.WriteByte((byte)47);
    file.Flush();
}

The OpenFile operations support parameter overrides for the file block size and replication factors, whereas a value of zero implies the default values will be used.

The HdfsFileHandle operations are very similar:

Reading File Data

using (HdfsFileHandle file = hdfsSystem.OpenFileForRead(filename))
{
    byte[] newDataBytes = new byte[dataLen];
    file.ReadBytes(newDataBytes, 0, newDataBytes.Length);
    Console.Write(Encoding.UTF8.GetString(newDataBytes));

    Console.Write((char)file.ReadByte());
    Console.WriteLine(file.ReadLine());
}

If one wants to read the full contents of a file into a second Stream, the HdfsFileStream makes this a simple process:

Reading a File by Stream

using (HdfsFileStream hdfsStream = hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Read))
{
    using (FileStream fileStream = new FileStream(localfilestream, FileMode.Create, FileAccess.Write))
    {
        hdfsStream.CopyTo(fileStream);
    }
}

There are other options available for reading the full contents of a file. The first option is to perform a ReadLine() until a null is returned, processed using a StreamReader:

Writing a HDFS to Local Stream

using (StreamReader reader = new StreamReader(hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Read, chunksize)))
{
    using (StreamWriter writer = new StreamWriter(localfileline, false, Encoding.UTF8))
    {
        string line;
        while ((line = reader.ReadLine()) != null)
        {
            writer.WriteLine(line);
        }
    }
}

The HdfsFileHandle operations are very similar:

Reading a File by Line

using (HdfsFileHandle file = hdfsSystem.OpenFileForRead(filename))
{
    String line;
    while ((line = file.ReadLine()) != null)
    {
        Console.WriteLine(line);
    }
}

.

Alternatively, for more efficient reading of files, one can read the blocks of data into a byte array:

Reading a File in Bytes

using (HdfsFileStream file = hdfsSystem.OpenFileStream(filename, HdfsFileAccess.Read))
{
    while ((chunk = file.Read(readBytes, 0, chunksize)) > 0)
    {
        Console.Write(Encoding.UTF8.GetString(readBytes, 0, chunk));
    }
}

Other operations that are supported are PositionalReadByte(), PositionalReadBytes(), and Seek(). These operations allow reading the contents of a file from specific positions.

One final sample worth noting is copying a HDFS file to a local file using byte reads:

Writing a HDFS to Local File

using (HdfsFileHandle file = hdfsSystem.OpenFileForRead(localhdfsfilename, chunksize))
{
    using (FileStream stream = new FileStream(localfilewrite, FileMode.Create, FileAccess.Write))
    {
        while ((chunk = file.ReadBytes(readBytes, 0, chunksize)) > 0)
        {
            stream.Write(readBytes, 0, chunk);
        }
    }
}

The reason a chunk size is specified in this case is to sync the size being used for HDFS file access to the byte array used for writing the local file.

If one has a Stream reference one can also get the associated file information:

Get File Information

HdfsFileInfoEntry fileinfo = file.GetInformation();
if (fileinfo != null)
{
    Console.WriteLine(string.Format(@"'{0}', Modified/Accessed:""{1:G}, {2:G}"", Owner:""{3}""", fileinfo.Name, fileinfo.LastModified, fileinfo.LastAccessed, fileinfo.Owner));
}

Also one can modify the file properties:

Modifying File Properties

file.Chown("isotope", null);
file.SetTimes(DateTime.Now.AddDays(-2), DateTime.Now.AddDays(-1));

So now onto writing files.

Writing Files

As in the case for reading, writing operations are supported for writing a byte, line, and block of bytes:

Writing File Stream Data

using (HdfsFileStream file = hdfsSystem.OpenFileStream(filename, HdfsFileAccess.Write, chunksize))
{
    file.Write(dataBytes, 0, data.Length);
    file.WriteByte((byte)47);
    file.Flush();
}

The chunk size when opening a file is set to correspond to the size of the buffer used for writing the data.

The HdfsFileHandle operations are very similar:

Writing File Data

string data = "I am some unstructured data.\nThat will be written.\n";
byte[] dataBytes = Encoding.UTF8.GetBytes(data);
int dataLen = dataBytes.Length;

using (HdfsFileHandle file = hdfsSystem.OpenFileForWrite(filename, chunksize, 0, 0))
{
    file.WriteBytes(dataBytes, 0, data.Length);
    file.WriteByte((byte)9);
    file.WriteLine("This is an inserted line.");

    file.Flush();
}

As in the reading case, if one wants to copy a file from the local file system to an HDFS file one would write:

Writing a Local to HDFS Stream

using (HdfsFileStream file = hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Write, chunksize))
{
    using (FileStream stream = new FileStream(localfilepath, FileMode.Open, FileAccess.Read))
    {
        while ((chunk = stream.Read(localbytes, 0, chunksize)) > 0)
        {
            file.Write(localbytes, 0, chunk);
        }
    }

    file.Flush();
}

All one has to do is read, in byte chunks, data from the local file and write the corresponding bytes to the HDFS file.

Of course one can also use the CopyTo operation:

CopyTo Local to HDFS Stream

using (HdfsFileStream file = hdfsSystem.OpenFileStream(localhdfsfilename, HdfsFileAccess.Write, chunksize))
{
    using (FileStream stream = new FileStream(localfilepath, FileMode.Open, FileAccess.Read))
    {
        stream.CopyTo(file);
    }
}

The HdfsFileHandle operations are again very similar:

Writing a Local to HDFS File

using (HdfsFileHandle file = hdfsSystem.OpenFileForWrite(localhdfsfilename, chunksize, 0, 0))
{
    using (FileStream stream = new FileStream(localfilepath, FileMode.Open, FileAccess.Read))
    {
        while ((chunk = stream.Read(localbytes, 0, chunksize)) > 0)
        {
            file.WriteBytes(localbytes, 0, chunk);
        }
    }

    file.Flush();
}

A quick word is warranted on appending to a file. Although the API currently supports open files for Append, this is only supported in Hadoop version 1.0.0 and above.

Building the Library

The code for the managed and unmanaged libraries for HDFS file access can be found in the folder:

C:\Apps\dist\contrib\WinLibHdfs

The download not only consists of the compiled libraries but also the full source code and sample C# application that this post is based upon. You can compile the source or just use the delivered assemblies.

One final word is warranted about environment variables.

As the C library being used by the Managed wrapper is actually calling Java code, one needs to define some additional directories in the Path and CLASSPATH environment variables.

For the Path one needs to include the following directories for the Java SDK:

C:\Apps\java\openjdk7\jre\bin\server;C:\Apps\java\openjdk7\jre\bin

For the CLASSPATH (if the environment is not defined it will need to be created) one needs to include the following directories for the Isotope JAR files:

C:\Apps\dist\hadoop-core-0.20.203.1-SNAPSHOT.jar;C:\Apps\dist\lib\commons-logging-1.1.1.jar;C:\Apps\dist\lib\commons-configuration-1.6.jar;C:\Apps\dist\lib\commons-lang-2.4.jar

Once configured you are good to go. If one does get a File Not Found exception then chances are that your environment variables are not configured correctly.

Happy .Net Coding!