Megosztás a következőn keresztül:


Modellek létrehozása automatizált gépi tanulással (előzetes verzió)

Az automatizált gépi Tanulás (AutoML) olyan technikákat és eszközöket foglal magában, amelyek célja a gépi tanulási modellek betanítási folyamatának egyszerűsítése és a gépi tanulási modellek optimalizálása minimális emberi beavatkozással. Az AutoML elsődleges célja, hogy leegyszerűsítse és felgyorsítsa a legmegfelelőbb gépi tanulási modell és hiperparaméterek kiválasztását egy adott adatkészlethez, amely általában jelentős szakértelmet és számítási erőforrásokat igényel. A Fabric-keretrendszeren belül az adattudósok a flaml.AutoML modul segítségével automatizálhatják gépi tanulási munkafolyamataik különböző aspektusait.

Ebben a cikkben belevágunk az AutoML-próbaverziók közvetlenül kódból történő generálásának folyamatába Egy Spark-adatkészlet használatával. Emellett megismerjük az adatok Pandas-adatkeretté való átalakításának módszereit, és bemutatjuk a kísérletezési kísérletek párhuzamosításának technikáit.

Fontos

Ez a funkció előzetes verzióban érhető el.

Előfeltételek

  • Microsoft Fabric-előfizetés lekérése. Vagy regisztráljon egy ingyenes Microsoft Fabric-próbaverzióra.

  • Jelentkezzen be a Microsoft Fabricbe.

  • A kezdőlap bal oldalán található élménykapcsolóval válthat a Synapse Adattudomány felületre.

    Képernyőkép a felületváltó menüjéről, amelyen látható, hogy hol válassza ki a Adattudomány.

  • Hozzon létre egy új Fabric-környezetet , vagy győződjön meg arról, hogy a Fabric Runtime 1.2 -en (Spark 3.4 (vagy újabb) és Delta 2.4-en fut.
  • Hozzon létre egy új jegyzetfüzetet.
  • Csatolja a jegyzetfüzetet egy tóházhoz. A jegyzetfüzet bal oldalán válassza a Hozzáadás lehetőséget egy meglévő tóház hozzáadásához, vagy hozzon létre egy újat.

Adatok betöltése és előkészítése

Ebben a szakaszban meg fogjuk adni az adatok letöltési beállításait, majd mentjük őket a lakehouse-ba.

Adatok letöltése

Ez a kódblokk letölti az adatokat egy távoli forrásból, és menti azokat a lakehouse-ba

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

Adatok betöltése Spark-adatkeretbe

Az alábbi kódblokk betölti az adatokat a CSV-fájlból egy Spark DataFrame-be, és gyorsítótárazza azokat a hatékony feldolgozás érdekében.

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

Ez a kód feltételezi, hogy az adatfájl le lett töltve, és a megadott elérési úton található. Beolvassa a CSV-fájlt egy Spark DataFrame-be, kikövetkessze a sémát, és gyorsítótárazza azt a későbbi műveletek során a gyorsabb hozzáférés érdekében.

Az adatok előkészítése

Ebben a szakaszban adattisztítást és szolgáltatásfejlesztést végzünk az adathalmazon.

Az adatok tisztítása

Először definiálunk egy függvényt az adatok tisztításához, amely magában foglalja a hiányzó adatokkal rendelkező sorok elvetését, az ismétlődő sorok adott oszlopok alapján történő eltávolítását és a szükségtelen oszlopok elvetését.

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

A clean_data függvény segít biztosítani, hogy az adathalmaz ne tartalmazhasson hiányzó értékeket és ismétlődéseket, miközben eltávolítja a szükségtelen oszlopokat.

Jellemzőkiemelés

Következő lépésként egy gyakori kódolással létrehozzuk a "Geography" és a "Gender" oszlophoz tartozó dummy oszlopokat.

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

Itt egy gyakori kódolást használunk a kategorikus oszlopok bináris dummy oszlopokká alakításához, így alkalmasak gépi tanulási algoritmusokhoz.

Tisztított adatok megjelenítése

Végül megjelenítjük a megtisztított és funkcióalapú adathalmazt a megjelenítési függvény használatával.


display(df_clean)

Ezzel a lépéssel megvizsgálhatja az eredményként kapott DataFrame-et az alkalmazott átalakításokkal.

Mentés a lakehouse-ba

Most mentjük a megtisztított és funkcióalapú adatkészletet a lakehouse-ba.

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

Itt a megtisztított és átalakított PySpark DataFrame-et fogjuk, df_cleanés egy "churn_data_clean" nevű Delta-táblázatként mentjük a tóházban. A Delta formátumot használjuk az adathalmaz hatékony verziószámozásához és kezeléséhez. Ez mode("overwrite") biztosítja, hogy az azonos nevű meglévő táblák felülíródnak, és létrejön a tábla új verziója.

Teszt- és betanítási adatkészletek létrehozása

Ezután létrehozzuk a teszt- és betanítási adatkészleteket a megtisztított és funkcióalapú adatokból.

A megadott kódszakaszban egy megtisztított és funkcióalapú adatkészletet töltünk be a lakehouse-ból Delta formátummal, felosztjuk betanítási és tesztelési csoportokra 80-20 arányban, és előkészítjük az adatokat a gépi tanuláshoz. Ez az előkészítés magában foglalja a PySpark ML-ből való importálást a VectorAssembler funkcióoszlopok egyetlen "funkciók" oszlopba való kombinálásához. Ezt követően átalakítjuk a VectorAssembler betanítási train_data és tesztelési adathalmazokat, így a "Kilépés" célváltozót és test_data a funkcióvektorokat tartalmazó DataFrame-eket. Ezek az adathalmazok készen állnak a gépi tanulási modellek létrehozására és értékelésére.

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

Alapmodell betanítása

A featurizált adatok használatával betanítunk egy alapszintű gépi tanulási modellt, konfiguráljuk az MLflow-t a kísérletkövetéshez, meghatározunk egy előrejelzési függvényt a metrikák kiszámításához, végül pedig megtekintjük és naplózzuk az eredményül kapott ROC AUC-pontszámot.

Naplózási szint beállítása

Itt úgy konfiguráljuk a naplózási szintet, hogy a Synapse.ml tár szükségtelen kimenetét letiltsa, így a naplók tisztábbak maradnak.

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

MLflow konfigurálása

Ebben a szakaszban az MLflow-t konfiguráljuk a kísérletkövetéshez. A kísérlet nevét "automl_sample" értékre állítjuk a futtatások rendszerezéséhez. Emellett engedélyezzük az automatikus naplózást is, biztosítva, hogy a modellparaméterek, metrikák és összetevők automatikusan bekerüljenek az MLflow-ba.

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

A modell betanítása és kiértékelése

Végül betanítottunk egy LightGBMClassifier modellt a megadott betanítási adatokra. A modell a bináris besoroláshoz és az egyensúlytalanság kezeléséhez szükséges beállításokkal van konfigurálva. Ezt a betanított modellt használva előrejelzéseket készítünk a tesztadatokról. Kinyerjük a pozitív osztály előrejelzett valószínűségeit és a valódi címkéket a tesztadatokból. Ezt követően kiszámítjuk a ROC AUC-pontszámot a sklearn függvényével roc_auc_score .

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

Innen láthatjuk, hogy az eredményül kapott modell 84%-os ROC AUC-pontszámot ér el.

AutoML-próbaverzió létrehozása a FLAML használatával

Ebben a szakaszban létrehozunk egy AutoML-próbaverziót a FLAML-csomag használatával, konfiguráljuk a próbabeállításokat, átalakítjuk a Spark-adathalmazt Egy Pandasra Spark-adatkészleten, futtatjuk az AutoML-próbaverziót, és megtekintjük az eredményként kapott metrikákat.

Az AutoML-próbaverzió konfigurálása

Itt importáljuk a szükséges osztályokat és modulokat a FLAML-csomagból, és létrehozunk egy AutoML-példányt, amely a gépi tanulási folyamat automatizálására szolgál.

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

Beállítások konfigurálása

Ebben a szakaszban definiáljuk az AutoML-próbaverzió konfigurációs beállításait.

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

Konvertálás Pandasra a Sparkban

Ha Spark-alapú adatkészlettel szeretné futtatni az AutoML-t, a függvény használatával pandas-sá kell konvertálnunk a to_pandas_on_spark Spark-adathalmazon. Ez lehetővé teszi, hogy a FLAML hatékonyan működjön együtt az adatokkal.

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

Az AutoML próbaverziójának futtatása

Most végrehajtjuk az AutoML-próbaverziót. Beágyazott MLflow-futtatás használatával követjük nyomon a kísérletet a meglévő MLflow-futtatási környezetben. Az AutoML-próba a Spark-adathalmazon (df_automl) lévő Pandason történik, a célváltozóval "Exited és a megadott beállításokat a rendszer átadja a függvénynek a fit konfigurációhoz.

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

Az eredményként kapott metrikák megtekintése

Ebben az utolsó szakaszban lekérjük és megjelenítjük az AutoML-próba eredményeit. Ezek a metrikák betekintést nyújtanak az AutoML-modell teljesítményébe és konfigurációjába az adott adathalmazon.

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

Az AutoML-próbaverzió párhuzamosítása az Apache Sparkkal

Azokban az esetekben, amikor az adathalmaz egyetlen csomópontba fér el, és a Spark előnyeit szeretné kihasználni több párhuzamos AutoML-próba egyidejű futtatásához, kövesse az alábbi lépéseket:

Konvertálás Pandas-adatkeretté

A párhuzamosítás engedélyezéséhez az adatokat először Pandas DataFrame-gé kell konvertálni.

pandas_df = train_raw.toPandas()

Ebben az esetben a train_raw Spark DataFrame-et egy Pandas DataFrame-mé pandas_df alakítjuk át, amely alkalmassá teszi a párhuzamos feldolgozásra.

Párhuzamosítási beállítások konfigurálása

A Spark-alapú párhuzamosság engedélyezésére True van beállítvause_spark. Alapértelmezés szerint a FLAML végrehajtónként egy próbaverziót indít el. Az argumentum használatával n_concurrent_trials testre szabhatja az egyidejű kísérletek számát.

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

Ezekben a beállításokban azt adhatja meg, hogy a Sparkot a párhuzamossághoz a következő beállítással use_spark szeretnénk Truehasználni: . Az egyidejű kísérletek számát is 3-ra állítjuk, ami azt jelenti, hogy három próba párhuzamosan fog futni a Sparkban.

Ha többet szeretne megtudni az AutoML-nyomvonalak párhuzamosításáról, tekintse meg a párhuzamos Spark-feladatok FLAML-dokumentációját.

Az AutoML-próba futtatása párhuzamosan

Most a megadott beállításokkal párhuzamosan futtatjuk az AutoML-próbaverziót. Beágyazott MLflow-futtatás használatával követjük nyomon a kísérletet a meglévő MLflow-futtatási környezetben.

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

Ezzel végrehajtja az AutoML-próbaverziót, és engedélyezve van a párhuzamosítás. Az dataframe argumentum a Pandas DataFrame pandas_dfértékre van állítva, és a rendszer más beállításokat ad át a fit függvénynek a párhuzamos végrehajtáshoz.

Metrikák megtekintése

A párhuzamos AutoML-próba futtatása után kérje le és jelenítse meg az eredményeket, beleértve a legjobb hiperparaméter-konfigurációt, a ROC AUC-t az érvényesítési adatokon, valamint a legjobban teljesítő futtatás betanítási időtartamát.

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))

Következő lépések