Create user-defined functions (UDF) in .NET for Apache Spark

In this article, you learn how to use user-defined functions (UDF) in .NET for Apache Spark. UDFs) are a Spark feature that allow you to use custom functions to extend the system's built-in functionality. UDFs transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF.

Define UDFs

Review the following UDF definition:

string s1 = "hello";
Func<Column, Column> udf = Udf<string, string>(
    str => $"{s1} {str}");

The UDF takes a string as an input in the form of a Column of a Dataframe) and returns a string with hello appended in front of the input.

The following DataFrame df contains a list of names:

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

Now let's apply the above defined udf to the DataFrame df:

DataFrame udfResult = df.Select(udf(df["name"]));

The following DataFrame udfResult is the result of the UDF:

+-------------+
|         name|
+-------------+
|hello Michael|
|   hello Andy|
| hello Justin|
+-------------+

To better understand how to implement UDFs, review the UDF helper functions and examples on GitHub.

UDF serialization

Because UDFs are functions that need to be executed on workers, they have to be serialized and sent to the workers as part of the payload from the driver. The delegate, which is a reference to the method, needs to be serialized as well as its target, which is the class instance on which the current delegate invokes the instance method. Review this code example in GitHub to get a better understanding of how UDF serialization is being done.

.NET for Apache Spark uses .NET Core, which doesn't support serializing delegates. Instead, reflection is used to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization.

Serialization example

The following code snippet defines two string variables that are being referenced in two function delegates that return the respective strings as a result:

using System;

public class C {
    public void M() {
        string s1 = "s1";
        string s2 = "s2";
        Func<string, string> a = str => s1;
        Func<string, string> b = str => s2;
    }
}

The above C# code generates the following C# disassembly (credit source: sharplab.io) code from the compiler:

public class C
{
    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_0
    {
        public string s1;

        public string s2;

        internal string <M>b__0(string str)
        {
            return s1;
        }

        internal string <M>b__1(string str)
        {
            return s2;
        }
    }

    public void M()
    {
        <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
        <>c__DisplayClass0_.s1 = "s1";
        <>c__DisplayClass0_.s2 = "s2";
        Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
        Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_.<M>b__1);
    }
}

Both func and func2 share the same closure <>c__DisplayClass0_0, which is the target that is serialized when serializing the delegates func and func2. Even though Func<string, string> a is only referencing s1, s2 is also serialized when the bytes are sent to the workers.

This can lead to some unexpected behaviors at run time (like in the case of using broadcast variables), which is why we recommend that you restrict the visibility of the variables used in a function to that function's scope.

The following code snippet is the recommended way to implement the desired serialization behavior:

using System;

public class C {
    public void M() {
        {
            string s1 = "s1";
            Func<string, string> a = str => s1;
        }
        {
            string s2 = "s2";
            Func<string, string> b = str => s2;
        }
    }
}

The above C# code generates the following C# disassembly (credit source: sharplab.io) code from the compiler:

public class C
{
    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_0
    {
        public string s1;

        internal string <M>b__0(string str)
        {
            return s1;
        }
    }

    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_1
    {
        public string s2;

        internal string <M>b__1(string str)
        {
            return s2;
        }
    }

    public void M()
    {
        <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
        <>c__DisplayClass0_.s1 = "s1";
        Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
        <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1();
        <>c__DisplayClass0_2.s2 = "s2";
        Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_2.<M>b__1);
    }
}

Notice that func and func2 no longer share a closure, and they have their own separate closures <>c__DisplayClass0_0 and <>c__DisplayClass0_1 respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate. This behavior is important to keep in mind while implementing multiple UDFs in a common scope.

Some Spark UDF caveats

  • Null values in UDFs can throw exceptions. It's the responsibility of the developer to handle them.
  • UDFs don't leverage the optimizations provided by Spark's built-in functions, so it's recommended to use built-in functions where possible.

FAQs

Why do I get the error System.NotImplementedException: The method or operation is not implemented. or System.InvalidCastException: Unable to cast object of type 'System.Collections.Hashtable' to type 'System.Collections.Generic.IDictionary when trying to call a UDF with ArrayType, MapType, ArrayList, or HashTable as argument or return type?
Support for ArrayType and MapType is not provided until v1.0, and so you would get this error if using a .NET for Apache Spark version prior to that, and trying to pass these types either as arguments to the UDF or as a return type. ArrayList and HashTable types cannot be supported as return types of a UDF as they are non-generic collections and hence their element type definitions cannot be provided to Spark.

Next steps