Bagikan melalui


Membuat model dengan ML Otomatis (pratinjau)

Automated Pembelajaran Mesin (AutoML) mencakup serangkaian teknik dan alat yang dirancang untuk menyederhanakan proses pelatihan dan mengoptimalkan model pembelajaran mesin dengan intervensi manusia minimal. Tujuan utama AutoML adalah untuk menyederhanakan dan mempercepat pemilihan model pembelajaran mesin dan hiperparameter yang paling sesuai untuk himpunan data tertentu, tugas yang biasanya menuntut keahlian dan sumber daya komputasi yang cukup besar. Dalam kerangka kerja Fabric, ilmuwan data dapat memanfaatkan flaml.AutoML modul untuk mengotomatiskan berbagai aspek alur kerja pembelajaran mesin mereka.

Dalam artikel ini, kita akan mempelajari proses pembuatan uji coba AutoML langsung dari kode menggunakan himpunan data Spark. Selain itu, kami akan menjelajahi metode untuk mengonversi data ini menjadi kerangka data Pandas dan mendiskusikan teknik untuk paralelisasi uji coba eksperimen Anda.

Penting

Fitur ini dalam pratinjau.

Prasyarat

  • Buat lingkungan Fabric baru atau pastikan Anda berjalan pada Fabric Runtime 1.2 (Spark 3.4 (atau lebih tinggi) dan Delta 2.4)
  • Buat buku catatan baru.
  • Lampirkan buku catatan Anda ke lakehouse. Di sisi kiri buku catatan Anda, pilih Tambahkan untuk menambahkan lakehouse yang sudah ada atau buat yang baru.

Memuat dan menyiapkan data

Di bagian ini, kita akan menentukan pengaturan unduhan untuk data lalu menyimpannya ke lakehouse.

Mengunduh data

Blok kode ini mengunduh data dari sumber jarak jauh dan menyimpannya ke lakehouse

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Memuat data ke dalam dataframe Spark

Blok kode berikut memuat data dari file CSV ke dalam Spark DataFrame dan menyimpannya untuk pemrosesan yang efisien.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Kode ini mengasumsikan bahwa file data telah diunduh dan terletak di jalur yang ditentukan. Ini membaca file CSV ke dalam Spark DataFrame, menyimpulkan skema, dan menyimpannya untuk akses yang lebih cepat selama operasi berikutnya.

Menyiapkan data

Di bagian ini, kita akan melakukan pembersihan data dan rekayasa fitur pada himpunan data.

Bersihkan data

Pertama, kita menentukan fungsi untuk membersihkan data, yang mencakup menghilangkan baris dengan data yang hilang, menghapus baris duplikat berdasarkan kolom tertentu, dan menghilangkan kolom yang tidak perlu.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

Fungsi ini clean_data membantu memastikan himpunan data bebas dari nilai yang hilang dan duplikat saat menghapus kolom yang tidak perlu.

Rekayasa fitur

Selanjutnya, kami melakukan rekayasa fitur dengan membuat kolom dummy untuk kolom 'Geografi' dan 'Gender' menggunakan pengodean satu panas.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Di sini, kami menggunakan pengodean satu panas untuk mengonversi kolom kategoris menjadi kolom dummy biner, membuatnya cocok untuk algoritma pembelajaran mesin.

Menampilkan data yang dibersihkan

Terakhir, kami menampilkan himpunan data yang dibersihkan dan direkayasa fitur menggunakan fungsi tampilan.


display(df_clean)

Langkah ini memungkinkan Anda memeriksa DataFrame yang dihasilkan dengan transformasi yang diterapkan.

Simpan ke lakehouse

Sekarang, kita akan menyimpan himpunan data yang dibersihkan dan direkayasa fitur ke lakehouse.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Di sini, kami mengambil PySpark DataFrame yang dibersihkan dan diubah, df_clean, dan menyimpannya sebagai tabel Delta bernama "churn_data_clean" di lakehouse. Kami menggunakan format Delta untuk penerapan versi dan manajemen himpunan data yang efisien. mode("overwrite") memastikan bahwa tabel yang ada dengan nama yang sama ditimpa, dan versi baru tabel dibuat.

Membuat himpunan data pengujian dan pelatihan

Selanjutnya, kita akan membuat himpunan data pengujian dan pelatihan dari data yang dibersihkan dan direkayasa fitur.

Di bagian kode yang disediakan, kami memuat himpunan data yang dibersihkan dan direkayasa fitur dari lakehouse menggunakan format Delta, membaginya menjadi set pelatihan dan pengujian dengan rasio 80-20, dan menyiapkan data untuk pembelajaran mesin. Persiapan ini melibatkan impor VectorAssembler dari PySpark ML untuk menggabungkan kolom fitur ke dalam satu kolom "fitur". Selanjutnya, kami menggunakan VectorAssembler untuk mengubah himpunan data pelatihan dan pengujian, menghasilkan train_data dan test_data DataFrame yang berisi variabel target "Keluar" dan vektor fitur. Himpunan data ini sekarang siap digunakan dalam membangun dan mengevaluasi model pembelajaran mesin.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Melatih model garis besar

Dengan menggunakan data yang ditampilkan, kita akan melatih model pembelajaran mesin dasar, mengonfigurasi MLflow untuk pelacakan eksperimen, menentukan fungsi prediksi untuk perhitungan metrik, dan akhirnya, melihat dan mencatat skor ROC AUC yang dihasilkan.

Set tingkat pengelogan

Di sini, kami mengonfigurasi tingkat pengelogan untuk menekan output yang tidak perlu dari pustaka Synapse.ml, menjaga log lebih bersih.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

Mengonfigurasi MLflow

Di bagian ini, kami mengonfigurasi MLflow untuk pelacakan eksperimen. Kami mengatur nama eksperimen ke "automl_sample" untuk mengatur eksekusi. Selain itu, kami mengaktifkan pengelogan otomatis, memastikan bahwa parameter model, metrik, dan artefak secara otomatis dicatat ke MLflow.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

Melatih dan mengevaluasi model

Terakhir, kami melatih model LightGBMClassifier pada data pelatihan yang disediakan. Model dikonfigurasi dengan pengaturan yang diperlukan untuk klasifikasi biner dan penanganan ketidakseimbangan. Kami kemudian menggunakan model terlatih ini untuk membuat prediksi pada data pengujian. Kami mengekstrak probabilitas yang diprediksi untuk kelas positif dan label sebenarnya dari data pengujian. Setelah itu, kami menghitung skor ROC AUC menggunakan fungsi sklearn roc_auc_score .

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Dari sini, kita dapat melihat bahwa model yang dihasilkan mencapai skor ROC AUC 84%.

Membuat uji coba AutoML dengan FLAML

Di bagian ini, kita akan membuat uji coba AutoML menggunakan paket FLAML, mengonfigurasi pengaturan uji coba, mengonversi himpunan data Spark ke Pandas pada himpunan data Spark, menjalankan uji coba AutoML, dan melihat metrik yang dihasilkan.

Mengonfigurasi uji coba AutoML

Di sini, kami mengimpor kelas dan modul yang diperlukan dari paket FLAML dan membuat instans AutoML, yang akan digunakan untuk mengotomatiskan alur pembelajaran mesin.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Konfigurasikan pengaturan

Di bagian ini, kami menentukan pengaturan konfigurasi untuk uji coba AutoML.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Mengonversi ke Panda di Spark

Untuk menjalankan AutoML dengan himpunan data berbasis Spark, kita perlu mengonversinya ke Pandas pada himpunan data Spark menggunakan fungsi .to_pandas_on_spark Ini memungkinkan FLAML untuk bekerja dengan data secara efisien.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Jalankan uji coba AutoML

Sekarang, kami menjalankan uji coba AutoML. Kami menggunakan eksekusi MLflow berlapis untuk melacak eksperimen dalam konteks eksekusi MLflow yang ada. Uji coba AutoML dilakukan pada Panda pada himpunan data Spark (df_automl) dengan variabel target "Exited dan pengaturan yang ditentukan diteruskan ke fit fungsi untuk konfigurasi.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Menampilkan metrik yang dihasilkan

Di bagian akhir ini, kami mengambil dan menampilkan hasil uji coba AutoML. Metrik ini memberikan wawasan tentang performa dan konfigurasi model AutoML pada himpunan data yang diberikan.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Paralelkan uji coba AutoML Anda dengan Apache Spark

Dalam skenario di mana himpunan data Anda dapat masuk ke dalam satu simpul dan Anda ingin memanfaatkan kekuatan Spark untuk menjalankan beberapa uji coba AutoML paralel secara bersamaan, Anda dapat mengikuti langkah-langkah berikut:

Mengonversi ke dataframe Pandas

Untuk mengaktifkan paralelisasi, data Anda harus terlebih dahulu dikonversi menjadi Pandas DataFrame.

pandas_df = train_raw.toPandas()

Di sini, kami mengonversi train_raw Spark DataFrame menjadi Pandas DataFrame bernama pandas_df agar cocok untuk pemrosesan paralel.

Mengonfigurasi pengaturan paralelisasi

Atur use_spark ke True untuk mengaktifkan paralelisme berbasis Spark. Secara default, FLAML akan meluncurkan satu uji coba per eksekutor. Anda dapat menyesuaikan jumlah uji coba bersamaan dengan menggunakan n_concurrent_trials argumen .

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

Dalam pengaturan ini, kami menentukan bahwa kami ingin menggunakan Spark untuk paralelisme dengan mengatur use_spark ke True. Kami juga menetapkan jumlah uji coba bersamaan menjadi 3, yang berarti bahwa tiga uji coba akan berjalan secara paralel pada Spark.

Untuk mempelajari selengkapnya tentang cara menyejajarkan jejak AutoML, Anda dapat mengunjungi dokumentasi FLAML untuk pekerjaan Spark paralel.

Jalankan uji coba AutoML secara paralel

Sekarang, kita akan menjalankan uji coba AutoML secara paralel dengan pengaturan yang ditentukan. Kami akan menggunakan eksekusi MLflow berlapis untuk melacak eksperimen dalam konteks eksekusi MLflow yang ada.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Ini sekarang akan menjalankan uji coba AutoML dengan paralelisasi diaktifkan. Argumen dataframe diatur ke Pandas DataFrame pandas_df, dan pengaturan lainnya diteruskan ke fit fungsi untuk eksekusi paralel.

Melihat metrik

Setelah menjalankan uji coba AutoML paralel, ambil dan tampilkan hasilnya, termasuk konfigurasi hyperparameter terbaik, ROC AUC pada data validasi, dan durasi pelatihan eksekusi berkinerja terbaik.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))

Langkah berikutnya