Membuat fungsi yang ditentukan pengguna (UDF) di .NET untuk Apache Spark

Pada artikel ini, Anda mempelajari cara menggunakan fungsi yang ditentukan pengguna (UDF) di .NET untuk Apache Spark. UDFs) adalah fitur Spark yang memungkinkan Anda menggunakan fungsi khusus untuk memperluas fungsionalitas bawaan sistem. UDFs mengubah nilai dari satu baris dalam tabel untuk menghasilkan satu nilai output yang sesuai per baris berdasarkan logika yang ditentukan dalam UDF.

Tentukan UDFs

Tinjau definisi UDF berikut:

string s1 = "hello";
Func<Column, Column> udf = Udf<string, string>(
    str => $"{s1} {str}");

UDF mengambil string sebagai input dalam bentuk KolomDataframe) dan mengembalikan dengan stringhello ditambahkan di depan input.

DataFrame df berikut berisi daftar nama:

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

Sekarang mari kita terapkan hal di atas yang ditentukan udf ke DataFrame df:

DataFrame udfResult = df.Select(udf(df["name"]));

DataFrame udfResult berikut adalah hasil dari UDF:

+-------------+
|         name|
+-------------+
|hello Michael|
|   hello Andy|
| hello Justin|
+-------------+

Untuk lebih memahami cara menerapkan UDFs, tinjau fungsi dan contohpembantu UDF di GitHub.

Serialisasi UDF

Karena UDFs adalah fungsi yang perlu dijalankan pada pekerja, mereka harus diserialkan dan dikirim ke pekerja sebagai bagian dari muatan dari pengemudi. Delegasi, yang merupakan referensi ke metode, perlu diserialkan serta targetnya, yang merupakan contoh kelas di mana delegasi saat ini memanggil metode instans. Tinjau contoh kode ini di GitHub untuk mendapatkan pemahaman yang lebih baik tentang bagaimana serialisasi UDF dilakukan.

.NET untuk Apache Spark menggunakan .NET Core, yang tidak mendukung delegasi serialisasi. Sebaliknya, refleksi digunakan untuk serialisasi target di mana delegasi didefinisikan. Ketika beberapa delegasi didefinisikan dalam lingkup umum, mereka memiliki penutupan bersama yang menjadi target refleksi untuk serialisasi.

Contoh serialisasi

Cuplikan kode berikut mendefinisikan dua variabel string yang sedang direferensikan dalam dua delegasi fungsi yang mengembalikan string masing-masing sebagai hasilnya:

using System;

public class C {
    public void M() {
        string s1 = "s1";
        string s2 = "s2";
        Func<string, string> a = str => s1;
        Func<string, string> b = str => s2;
    }
}

Kode C# di atas menghasilkan kode pembongkaran C# berikut (sumber kredit: sharplab.io) dari kompiler:

public class C
{
    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_0
    {
        public string s1;

        public string s2;

        internal string <M>b__0(string str)
        {
            return s1;
        }

        internal string <M>b__1(string str)
        {
            return s2;
        }
    }

    public void M()
    {
        <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
        <>c__DisplayClass0_.s1 = "s1";
        <>c__DisplayClass0_.s2 = "s2";
        Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
        Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_.<M>b__1);
    }
}

Keduanya func dan func2 berbagi penutupan <>c__DisplayClass0_0yang sama, yang merupakan target yang diserialkan saat membuat serialisasi delegasi func dan func2delegasi. Meskipun Func<string, string> a hanya referensi s1, s2 juga diserialkan ketika byte dikirim ke pekerja.

Hal ini dapat menyebabkan beberapa perilaku tak terduga pada waktu berjalan (seperti dalam kasus menggunakan variabel siaran), itulah sebabnya kami sarankan Anda membatasi visibilitas variabel yang digunakan dalam fungsi ke ruang lingkup fungsi tersebut.

Cuplikan kode berikut adalah cara yang disarankan untuk menerapkan perilaku serialisasi yang diinginkan:

using System;

public class C {
    public void M() {
        {
            string s1 = "s1";
            Func<string, string> a = str => s1;
        }
        {
            string s2 = "s2";
            Func<string, string> b = str => s2;
        }
    }
}

Kode C# di atas menghasilkan kode pembongkaran C# berikut (sumber kredit: sharplab.io) dari kompiler:

public class C
{
    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_0
    {
        public string s1;

        internal string <M>b__0(string str)
        {
            return s1;
        }
    }

    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_1
    {
        public string s2;

        internal string <M>b__1(string str)
        {
            return s2;
        }
    }

    public void M()
    {
        <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
        <>c__DisplayClass0_.s1 = "s1";
        Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
        <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1();
        <>c__DisplayClass0_2.s2 = "s2";
        Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_2.<M>b__1);
    }
}

Perhatikan itu func dan func2 tidak lagi berbagi penutupan, dan mereka memiliki penutupan <>c__DisplayClass0_0 terpisah mereka sendiri dan <>c__DisplayClass0_1 masing-masing. Ketika digunakan sebagai target untuk serialisasi, tidak ada yang lain selain variabel yang direferensikan akan diserialkan untuk delegasi. Perilaku ini penting untuk diingat saat menerapkan beberapa UDF dalam lingkup umum.

Beberapa peringatan Spark UDF

  • Nilai null di UDFs dapat memberikan pengecualian. Ini adalah tanggung jawab pengembang untuk menanganinya.
  • UDFs tidak memanfaatkan pengoptimalan yang disediakan oleh fungsi bawaan Spark, jadi disarankan untuk menggunakan fungsi bawaan jika memungkinkan.

Tanya Jawab Umum

Mengapa saya mendapatkan kesalahan System.NotImplementedException: The method or operation is not implemented. atau System.InvalidCastException: Unable to cast object of type 'System.Collections.Hashtable' to type 'System.Collections.Generic.IDictionary ketika mencoba memanggil UDF dengan ArrayType, , MapType, ArrayList, atau HashTable sebagai argumen atau tipe kembali?
Dukungan untuk ArrayType dan MapType tidak disediakan sampai v1.0, sehingga Anda akan mendapatkan kesalahan ini jika menggunakan .NET untuk versi Apache Spark sebelum itu, dan mencoba untuk meneruskan jenis ini baik sebagai argumen ke UDF atau sebagai tipe kembali. ArrayList dan HashTable jenis tidak dapat didukung sebagai jenis pengembalian UDF karena merupakan koleksi non-generik dan karenanya definisi jenis elemennya tidak dapat diberikan kepada Spark.

Langkah berikutnya