Hadoop Binary Streaming and F# MapReduce

As mentioned in my previous post Hadoop Streaming not only supports text streaming, but it also supports Binary Streaming. As such I wanted to put together a sample that supports processing Office documents. As before the code can be downloaded from:

https://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850

Putting together this sample involved a bit more work than the text streaming case as one needed to put together an implementation of the Java classes to support binary streaming; namely FileInputFormat and RecordReader. The purpose of these implementations is to support Binary Streaming of the document such that it is not split on the usual line boundaries. More on the Java code later.

The implemented Java classes are written, with a key value type pairing of <Text, BytesWritable>, which writes out the data for the mapper in the following format:

  • Filename
  • Tab character
  • UTF8 Encoded Document
  • Linefeed character

The Mapper code will get called with this format for each document in the specified input directories.

Map and Reduce Classes

The goal of the sample code is to support a Map and Reduce with the following prototypes:

 Map : WordprocessingDocument –> seq<string * obj>
 Reduce : string -> seq<string> -> obj option

The idea is the Mapper takes in a Word document and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key value pair and returns an optional reduced value.

The use of the obj type is, as in the text streaming case, to support serializing the output values.

As an example, I have put together a MapReduce to process Word documents, where the word document is mapped into the number of pages per author and where multiple authors are credited with the same number of pages.

The sample Mapper code is as follows:

module OfficeWordPageMapper =

    let dc = XNamespace.Get("https://purl.org/dc/elements/1.1/")

    let cp = XNamespace.Get("https://schemas.openxmlformats.org/package/2006/metadata/core-properties")

    let unknownAuthor = "unknown author"

    let getAuthors (document:WordprocessingDocument) =

        let coreFilePropertiesXDoc = XElement.Load(document.CoreFilePropertiesPart.GetStream());

          

        // Take the first dc:creator element and split based on a ";"

        let creators = coreFilePropertiesXDoc.Elements(dc + "creator")

        if Seq.isEmpty creators then

            [| unknownAuthor |]

        else

            let creator = (Seq.head creators).Value

            if String.IsNullOrWhiteSpace(creator) then

                [| unknownAuthor |]

            else

                creator.Split(';')

    let getPages (document:WordprocessingDocument) =

        // return page count

        Int32.Parse(document.ExtendedFilePropertiesPart.Properties.Pages.Text)

    // Map the data from input name/value to output name/value

    let Map (document:WordprocessingDocument) =

        let pages = getPages document

        (getAuthors document)

        |> Seq.map (fun author -> (author, pages))

As you can see the majority code is needed to pull out the document properties. If one wanted to process the words within the document one would use the following code:

 document.MainDocumentPart.Document.Body.InnerText

To run this code it is worth noting there is a dependency on installing the Open XML SDK 2.0 for Microsoft Office.

Finally, the Reducer code is as follows:

module OfficePageReducer =

    let Reduce (key:string) (values:seq<string>) =

        let totalPages =

            values |>

            Seq.fold (fun pages value -> pages + Int32.Parse(value)) 0

        Some(totalPages)

Again, as in the text streaming application, both the mapper and the reducer are executables that read the input from StdIn and emit the output to StdOut. In the case of the mapper the console application will need to do multiple Console.ReadByte() calls to get the data, and then perform a Console.WriteLine() to emit the output. The reducer will do a Console.ReadLine() to get the data, and perform a Console.WriteLine() to emit the output.

The previous post covers the schematics of creating console applications for F#; so I will not cover this again but assume the same program structure.

Mapper Executable

As previously mentioned the purpose of the Mapper code is to perform Input Format Parsing, Projection, and Filtering. In the Mapper for a Word document the bytes are used to create a WordprocessingDocument, with invalid documents ignored, these are then projected into a key/value sequence using the Map function:

module Controller =

    let Run (args:string array) =    

    

        // Define what arguments are expected

        let defs = [

            {ArgInfo.Command="input"; Description="Input File"; Required=false };

            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary

        let parsedArgs = Arguments.ParseArgs args defs        

        

        // Ensure Standard Input/Output and allow for debug configuration

        let builder = new StringBuilder()

        let encoder = Text.UTF8Encoding()

        use reader =

            if parsedArgs.ContainsKey("input") then

                builder.Append(Path.GetFileName(parsedArgs.["input"])) |> ignore

                File.Open(Path.GetFullPath(parsedArgs.["input"]), FileMode.Open) :> Stream

            else

                let stream = new MemoryStream()

                // Ignore bytes until one hits a tab

                let rec readTab() =

                    let inByte = Console.OpenStandardInput().ReadByte()

                    if inByte <> 0x09 then                        

                        builder.Append(encoder.GetString([| (byte)inByte |])) |> ignore

                        readTab()

                readTab()

                // Copy the rest of the stream and truncate the last linefeed char

                Console.OpenStandardInput().CopyTo(stream)

                if (stream.Length > 0L) then

                    stream.SetLength(stream.Length - 1L)

                stream.Position <- 0L

                stream :> Stream

        use writer =

            if parsedArgs.ContainsKey("output") then

                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))

            else

                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)

        

        let filename = builder.ToString()

        // Combine the name/value output into a string

        let outputCollector (outputKey, outputValue) =

            let output = sprintf "%s\t%O" outputKey outputValue

            writer.WriteLine(output)

        

        // Check we do not have a null document

        if (reader.Length > 0L) then

            try

                // Get access to the word processing document from the input stream

                use document = WordprocessingDocument.Open(reader, false)

                // Process the word document with the mapper

                OfficeWordPageMapper.Map document

                |> Seq.iter (fun value -> outputCollector value)

        

                // close document

                document.Close()

            with

            | :? System.IO.FileFormatException ->

                // Ignore invalid files formats

                ()

        // Close the streams

        reader.Close()

        writer.Close()

There are a few things of note in the code.

When pulling out the filename of the document that is being processed, it is assumed that UTF8 encoding has been used and that a Tab character is used as a delimiter between the filename and the documents bytes.

In building the Stream that is to be used for creating the WordprocessingDocument a MemoryStream is used. There are several reasons for this. Firstly one needs to remove the last Newline character from the Stream, and secondly when creating a WordprocessingDocument a Stream is needed that supports Seek operations; unfortunately this is not the case for StdIn.

At the moment the code does not use the Filename. However in future posts I will extend the code to also support processing PDF documents.

Reducer Executable

After running the Mapper, the data being parsed into the Reducer will be a key/value pair delimited with a Tab. Using the above Map, a sample input dataset for the Reducer would be:

Brad Sarsfield 44

Carl Nolan 1
Marie West 1

Carl Nolan 9

Thus in this case, the code for the Reducer will be the same as in the text streaming case:

module Controller =

    let Run (args:string array) =

        // Define what arguments are expected

        let defs = [

            {ArgInfo.Command="input"; Description="Input File"; Required=false };

            {ArgInfo.Command="output"; Description="Output File"; Required=false } ]

        // Parse Arguments into a Dictionary

        let parsedArgs = Arguments.ParseArgs args defs

        // Ensure Standard Input/Output and allow for debug configuration

        use reader =

            if parsedArgs.ContainsKey("input") then

                new StreamReader(Path.GetFullPath(parsedArgs.["input"]))

            else

                new StreamReader(Console.OpenStandardInput())

        use writer =

            if parsedArgs.ContainsKey("output") then

                new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))

            else

                new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)

        

        // Combine the name/value output into a string

        let outputCollector outputKey outputValue =            

            let output = sprintf "%s\t%O" outputKey outputValue

            writer.WriteLine(output)

        // Read the next line of the input stream

        let readLine() =

            reader.ReadLine()

        // Parse the input into the required name/value pair

        let parseLine (input:string) =

            let keyValue = input.Split('\t')

            (keyValue.[0].Trim(), keyValue.[1].Trim())

        // Converts a input line into an option

        let getInput() =

            let input = readLine()

            if not(String.IsNullOrWhiteSpace(input)) then

                Some(parseLine input)

            else

                None

        // Creates a sequence of the input based on the provided key

        let lastInput = ref None

        let continueDo = ref false

        let inputsByKey key firstValue = seq {

            // Yield any value from previous read

            yield firstValue

            continueDo := true

            while !continueDo do

                match getInput() with

                | Some(input) when (fst input) = key ->

                    // Yield found value and remainder of sequence

                    yield (snd input)                    

                | Some(input) ->

                    // Have a value but different key

                    lastInput := Some(fst input, snd input)

                    continueDo := false

                | None ->

                    // Have no more entries

                    lastInput := None

                    continueDo := false

        }

        // Controls the calling of the reducer

        let rec processInput (input:(string*string) option) =

            if input.IsSome then

                let key = fst input.Value

                let value = OfficePageReducer.Reduce key (inputsByKey key (snd input.Value))

                if value.IsSome then

                    outputCollector key value.Value

                if lastInput.contents.IsSome then

                    processInput lastInput.contents

        processInput (getInput())

Once run, the reduced output would be:

Brad Sarsfield 44

Carl Nolan 10

Marie West 1

Once again the only complexity in the code is the fact the Seq.groupBy function cannot be used. Also, as in the text streaming case there is a fair amount of code controlling the input and output streams for calling the Map and Reduce functions, that can be reused for all Hadoop Binary Streaming jobs.

Running the Executables

In the case of Binary Streaming the command parameters are a little different to the text streaming case:

C:\Apps\dist\hadoop.cmd jar lib/hadoop-streaming-ms.jar

-input "/office/documents"

-output "/office/authors"

-mapper "..\..\jars\FSharp.Hadoop.MapperOffice.exe"

-combiner "..\..\jars\FSharp.Hadoop.ReducerOffice.exe"

-reducer "..\..\jars\FSharp.Hadoop.ReducerOffice.exe"

-file "C:\Users\carlnol\Projects\FSharp.Hadoop.MapReduce\FSharp.Hadoop.MapperOffice\bin\Release\FSharp.Hadoop.MapperOffice.exe"

-file "C:\Users\carlnol\Projects\FSharp.Hadoop.MapReduce\FSharp.Hadoop.ReducerOffice\bin\Release\FSharp.Hadoop.ReducerOffice.exe"

-file "C:\Users\carlnol\Projects\FSharp.Hadoop.MapReduce\FSharp.Hadoop.MapReduce\bin\Release\FSharp.Hadoop.MapReduce.dll"

-file "C:\Users\carlnol\Projects\FSharp.Hadoop.MapReduce\FSharp.Hadoop.Utilities\bin\Release\FSharp.Hadoop.Utilities.dll"

-inputformat com.microsoft.hadoop.mapreduce.lib.input.BinaryDocumentWithNameInputFormat

The first difference is the use of the hadoop-streaming-ms.jar. This file contains the InputFormat class specified by the inputformat parameter; the later Java Classes section discusses how this is created. This is needed to support Binary Streaming.

One other difference to my previous text streaming case is the use of the Reducer class as a Combiner. As the output types from the mapper are the same as the reducer then this is possible.

Tester Application

For completeness I have included the base code for the tester application; albeit the full code is included in the download. The code is very similar to the tester application mentioned in my previous post, with the exception of how the mapper executable is called:

module MapReduceConsole =

        

    let Run args =

        // Define what arguments are expected

        let defs = [            

            {ArgInfo.Command="input"; Description="Input File"; Required=true };

            {ArgInfo.Command="output"; Description="Output File"; Required=true };

            {ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };

            {ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };

            {ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true }; ]

        // Parse Arguments into a Dictionary

        let parsedArgs = Arguments.ParseArgs args defs

        Arguments.DisplayArgs parsedArgs

        // define the executables

        let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])

        let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])

        Console.WriteLine()

        Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)

        Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)

        // Get the file names

        let inputpath = Path.GetFullPath(parsedArgs.["input"])

        let outputfile = Path.GetFullPath(parsedArgs.["output"])

        let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])

        let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))

        let mappedfile = Path.ChangeExtension(tempFile, "mapped")

        let reducefile = Path.ChangeExtension(tempFile, "reduced")

        Console.WriteLine()

        Console.WriteLine (sprintf "The input path is:\t\t%O" inputpath)

        Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)

        Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)

        Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)

        // Give the user an option to continue

        Console.WriteLine()

        Console.WriteLine("Hit ENTER to continue...")

        Console.ReadLine() |> ignore

        let CHUNK = 1024

        let buffer:byte array = Array.zeroCreate CHUNK

        // Call the mapper with the input file

        let mapperProcessLoop inputfile =

            use mapper = new Process()

            mapper.StartInfo.FileName <- mapperExe

            mapper.StartInfo.UseShellExecute <- false

            mapper.StartInfo.RedirectStandardInput <- true

            mapper.StartInfo.RedirectStandardOutput <- true

            mapper.Start() |> ignore

            use mapperInput = mapper.StandardInput.BaseStream

            use mapperOutput = mapper.StandardOutput

        

            // Map the reader to a background thread so processing can happen in parallel

            let taskMapperFunc() =

                use mapperWriter = File.AppendText(mappedfile)

                while not mapperOutput.EndOfStream do

                    mapperWriter.WriteLine(mapperOutput.ReadLine())

            let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))

            // Pass the file into the mapper process and close input stream when done          

            use mapperReader = new BinaryReader(File.OpenRead(inputfile))

            let rec readLoop() =

                let bytesread = mapperReader.Read(buffer, 0, CHUNK)

                if bytesread > 0 then

                    mapperInput.Write(buffer, 0, bytesread)

                    readLoop()

            // Write out a Filename/Tab/Document/LineFeed

            let filename = Path.GetFileName(inputfile)

            let encoding = Text.UTF8Encoding();

            mapperInput.Write(encoding.GetBytes(filename), 0, encoding.GetByteCount(filename))

            mapperInput.Write([| 0x09uy |], 0, 1)

            readLoop()

            mapperInput.Write([| 0x0Auy |], 0, 1)

            mapperInput.Close()

            taskMapperWriting.Wait()

            mapperOutput.Close()

            mapper.WaitForExit()

            let result = match mapper.ExitCode with | 0 -> true | _ -> false

            mapper.Close()

            result

      

        let mapperProcess() =

            Console.WriteLine "Mapper Processing Starting..."  

            // Create the output file

            if File.Exists(mappedfile) then File.Delete(mappedfile)

            use mapperWriter = File.CreateText(mappedfile)

            mapperWriter.Close()

            // function to determine if valid document extension

            let isValidFile inputfile =

                if String.Equals(Path.GetExtension(inputfile), ".docx", StringComparison.InvariantCultureIgnoreCase) ||

                   String.Equals(Path.GetExtension(inputfile), ".pdf", StringComparison.InvariantCultureIgnoreCase) then

                    true

                else

                    false

            // Process the files in the directory

            Directory.GetFiles(inputpath)

            |> Array.filter isValidFile

            |> Array.fold (fun result inputfile -> result && (mapperProcessLoop inputfile)) true

        // Sort the mapped file by the first field - mimic the role of Hadoop

        let hadoopProcess() =

            Console.WriteLine "Hadoop Processing Starting..."

            let unsortedValues = seq {

                use reader = new StreamReader(File.OpenRead(mappedfile))

                while not reader.EndOfStream do

                    let input = reader.ReadLine()

                    let keyValue = input.Split('\t')

                    yield (keyValue.[0].Trim(), keyValue.[1].Trim())

                reader.Close()

                }

            use writer = File.CreateText(reducefile)

            unsortedValues

            |> Seq.sortBy fst

            |> Seq.iter (fun (key, value) -> writer.WriteLine (sprintf "%O\t%O" key value))

            writer.Close()

        

        // Finally call the reducer process

        let reducerProcess() =

            use reducer = new Process()

            reducer.StartInfo.FileName <- reducerExe

            reducer.StartInfo.UseShellExecute <- false

            reducer.StartInfo.RedirectStandardInput <- true

            reducer.StartInfo.RedirectStandardOutput <- true

            reducer.Start() |> ignore

            use reducerInput = reducer.StandardInput

            use reducerOutput = reducer.StandardOutput

        

            // Map the reader to a background thread so processing can happen in parallel

            Console.WriteLine "Reducer Processing Starting..."

            let taskReducerFunc() =

                use reducerWriter = File.CreateText(outputfile)

                while not reducerOutput.EndOfStream do

                    reducerWriter.WriteLine(reducerOutput.ReadLine())

            let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))

            // Pass the file into the mapper process and close input stream when done

            use reducerReader = new StreamReader(File.OpenRead(reducefile))

            while not reducerReader.EndOfStream do

                reducerInput.WriteLine(reducerReader.ReadLine())

            reducerInput.Close()

            taskReducerWriting.Wait()

            reducerOutput.Close()

            reducer.WaitForExit()

            let result = match reducer.ExitCode with | 0 -> true | _ -> false

            reducer.Close()

            result

        // Finish test

        if mapperProcess() then

            Console.WriteLine "Mapper Processing Complete..."  

            hadoopProcess()

            Console.WriteLine "Hadoop Processing Complete..."

            if reducerProcess() then

                Console.WriteLine "Reducer Processing Complete..."

                Console.WriteLine "Processing Complete..."     

                   

        Console.ReadLine() |> ignore

When calling the mapper it is no longer the case that the mapper executable is called once, with each line being redirected into the executables StdIn. In the binary case the mapper executable is called once for each document, where for each document the data is then redirected into the executables StdIn. As mentioned the format defined is the filename, delimitated with a Tab, followed by the documents bytes, and finally the Newline character:

// Pass the file into the mapper process and close input stream when done          

use mapperReader = new BinaryReader(File.OpenRead(inputfile))

let rec readLoop() =

    let bytesread = mapperReader.Read(buffer, 0, CHUNK)

    if bytesread > 0 then

        mapperInput.Write(buffer, 0, bytesread)

        readLoop()

// Write out a Filename/Tab/Document/LineFeed

let filename = Path.GetFileName(inputfile)

let encoding = Text.UTF8Encoding();

mapperInput.Write(encoding.GetBytes(filename), 0, encoding.GetByteCount(filename))

mapperInput.Write([| 0x09uy |], 0, 1)

readLoop()

mapperInput.Write([| 0x0Auy |], 0, 1)

This code is code for each document found in the directory specified in the input argument.

The previous post discussed the threading and stream processing needed for testing in a little more detail.

Java Classes

To complete the post here is the full listing for the Java class implementations:

FileInputFormat

package com.microsoft.hadoop.mapreduce.lib.input;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileSplit;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.InputSplit;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.RecordReader;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.JobContext;

public class BinaryDocumentWithNameInputFormat

    extends FileInputFormat<Text, BytesWritable> {

    public BinaryDocumentWithNameInputFormat() {

        super();

    }

    

    protected boolean isSplitable(FileSystem fs, Path filename) {

        return false;

    }

    @Override

    public RecordReader<Text, BytesWritable> getRecordReader(

            InputSplit split, JobConf job, Reporter reporter) throws IOException {

        

        return new BinaryDocumentWithNameRecordReader((FileSplit) split, job);        

    }

}

RecordReader

package com.microsoft.hadoop.mapreduce.lib.input;

import java.io.IOException;

import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.BytesWritable;

import org.apache.hadoop.io.DataOutputBuffer;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.RecordReader;

import org.apache.hadoop.mapred.FileSplit;

import org.apache.hadoop.mapred.Reporter;

public class BinaryDocumentWithNameRecordReader implements RecordReader<Text, BytesWritable> {

    

    private FileSplit fileSplit;

    private Configuration conf;

    private boolean processed = false;

    

    public BinaryDocumentWithNameRecordReader(FileSplit fileSplit, Configuration conf)

        throws IOException {

        this.fileSplit = fileSplit;

        this.conf = conf;

    }

    @Override

    public Text createKey() {

        return new Text();

    }

    @Override

    public BytesWritable createValue() {

        return new BytesWritable();

    }

      

    @Override

    public long getPos() throws IOException {

        return this.processed ? this.fileSplit.getLength() : 0;

    }

    

    @Override

    public float getProgress() throws IOException {

        return this.processed ? 1.0f : 0.0f;

    }

    

    @Override

    public boolean next(Text key, BytesWritable value) throws IOException {

        if (!this.processed) {

            byte[] contents = new byte[(int) this.fileSplit.getLength()];

            Path file = this.fileSplit.getPath();

            FileSystem fs = file.getFileSystem(this.conf);

            FSDataInputStream in = null;

            

            try {

                in = fs.open(file);

                in.readFully(contents, 0, contents.length);

                

                key.set(file.getName());

                value.set(contents, 0, contents.length);

            }

            finally {

                in.close();

            }

            

            this.processed = true;

            return true;

        }

        else {

            return false;

        }

    }

    @Override

    public void close() throws IOException {

    }

}

Once the Java classes have been compiled they need to be merged into a copy of the hadoop-streaming.jar file. In my case I have created a file called hadoop-streaming-ms.jar. This file is a copy of the base file into which I have copied the classes, as the JAR file is just a ZIP file; although one can also use the JAR tool. One just has to remember to use the package path:

com\microsoft\hadoop\mapreduce\lib\input

To use this new streaming package the file needs to be copied to the Hadoop install lib directory; in my case:

C:\Apps\dist\lib

If you download the code there is also an implementation of these classes that support a key value type pairing of <NullWritable, BytesWritable>; rather than the <Text, BytesWritable> key value type pairing that is used to pass in the documents filename. This can used used if the document’s filename is not needed.

Conclusion

Hopefully the code is once again useful if you intend to write any Binary Streaming applications of documents. If you download the sample code, in addition to support for processing of Microsoft Word documents there is support for processing PDF documents.

Written by Carl Nolan