Modellek létrehozása automatizált gépi tanulással (előzetes verzió)
Az automatizált gépi Tanulás (AutoML) olyan technikákat és eszközöket foglal magában, amelyek célja a gépi tanulási modellek betanítási folyamatának egyszerűsítése és a gépi tanulási modellek optimalizálása minimális emberi beavatkozással. Az AutoML elsődleges célja, hogy leegyszerűsítse és felgyorsítsa a legmegfelelőbb gépi tanulási modell és hiperparaméterek kiválasztását egy adott adatkészlethez, amely általában jelentős szakértelmet és számítási erőforrásokat igényel. A Fabric-keretrendszeren belül az adattudósok a flaml.AutoML
modul segítségével automatizálhatják gépi tanulási munkafolyamataik különböző aspektusait.
Ebben a cikkben belevágunk az AutoML-próbaverziók közvetlenül kódból történő generálásának folyamatába Egy Spark-adatkészlet használatával. Emellett megismerjük az adatok Pandas-adatkeretté való átalakításának módszereit, és bemutatjuk a kísérletezési kísérletek párhuzamosításának technikáit.
Előfeltételek
Microsoft Fabric-előfizetés lekérése. Vagy regisztráljon egy ingyenes Microsoft Fabric-próbaverzióra.
A kezdőlap bal oldalán található élménykapcsolóval válthat a Synapse Adattudomány felületre.
- Hozzon létre egy új Fabric-környezetet , vagy győződjön meg arról, hogy a Fabric Runtime 1.2 -en (Spark 3.4 (vagy újabb) és Delta 2.4-en fut.
- Hozzon létre egy új jegyzetfüzetet.
- Csatolja a jegyzetfüzetet egy tóházhoz. A jegyzetfüzet bal oldalán válassza a Hozzáadás lehetőséget egy meglévő tóház hozzáadásához, vagy hozzon létre egy újat.
Adatok betöltése és előkészítése
Ebben a szakaszban meg fogjuk adni az adatok letöltési beállításait, majd mentjük őket a lakehouse-ba.
Adatok letöltése
Ez a kódblokk letölti az adatokat egy távoli forrásból, és menti azokat a lakehouse-ba
import os
import requests
IS_CUSTOM_DATA = False # if TRUE, dataset has to be uploaded manually
if not IS_CUSTOM_DATA:
# Specify the remote URL where the data is hosted
remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
# List of data files to download
file_list = ["churn.csv"]
# Define the download path within the lakehouse
download_path = "/lakehouse/default/Files/churn/raw"
# Check if the lakehouse directory exists; if not, raise an error
if not os.path.exists("/lakehouse/default"):
raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
# Create the download directory if it doesn't exist
os.makedirs(download_path, exist_ok=True)
# Download each data file if it doesn't already exist in the lakehouse
for fname in file_list:
if not os.path.exists(f"{download_path}/{fname}"):
r = requests.get(f"{remote_url}/{fname}", timeout=30)
with open(f"{download_path}/{fname}", "wb") as f:
f.write(r.content)
print("Downloaded demo data files into lakehouse.")
Adatok betöltése Spark-adatkeretbe
Az alábbi kódblokk betölti az adatokat a CSV-fájlból egy Spark DataFrame-be, és gyorsítótárazza azokat a hatékony feldolgozás érdekében.
df = (
spark.read.option("header", True)
.option("inferSchema", True)
.csv("Files/churn/raw/churn.csv")
.cache()
)
Ez a kód feltételezi, hogy az adatfájl le lett töltve, és a megadott elérési úton található. Beolvassa a CSV-fájlt egy Spark DataFrame-be, kikövetkessze a sémát, és gyorsítótárazza azt a későbbi műveletek során a gyorsabb hozzáférés érdekében.
Az adatok előkészítése
Ebben a szakaszban adattisztítást és szolgáltatásfejlesztést végzünk az adathalmazon.
Az adatok tisztítása
Először definiálunk egy függvényt az adatok tisztításához, amely magában foglalja a hiányzó adatokkal rendelkező sorok elvetését, az ismétlődő sorok adott oszlopok alapján történő eltávolítását és a szükségtelen oszlopok elvetését.
# Define a function to clean the data
def clean_data(df):
# Drop rows with missing data across all columns
df = df.dropna(how="all")
# Drop duplicate rows based on 'RowNumber' and 'CustomerId'
df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
# Drop columns: 'RowNumber', 'CustomerId', 'Surname'
df = df.drop('RowNumber', 'CustomerId', 'Surname')
return df
# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")
# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)
A clean_data
függvény segít biztosítani, hogy az adathalmaz ne tartalmazhasson hiányzó értékeket és ismétlődéseket, miközben eltávolítja a szükségtelen oszlopokat.
Jellemzőkiemelés
Következő lépésként egy gyakori kódolással létrehozzuk a "Geography" és a "Gender" oszlophoz tartozó dummy oszlopokat.
# Import PySpark functions
from pyspark.sql import functions as F
# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
"*",
F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)
# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")
Itt egy gyakori kódolást használunk a kategorikus oszlopok bináris dummy oszlopokká alakításához, így alkalmasak gépi tanulási algoritmusokhoz.
Tisztított adatok megjelenítése
Végül megjelenítjük a megtisztított és funkcióalapú adathalmazt a megjelenítési függvény használatával.
display(df_clean)
Ezzel a lépéssel megvizsgálhatja az eredményként kapott DataFrame-et az alkalmazott átalakításokkal.
Mentés a lakehouse-ba
Most mentjük a megtisztított és funkcióalapú adatkészletet a lakehouse-ba.
# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")
Itt a megtisztított és átalakított PySpark DataFrame-et fogjuk, df_clean
és egy "churn_data_clean" nevű Delta-táblázatként mentjük a tóházban. A Delta formátumot használjuk az adathalmaz hatékony verziószámozásához és kezeléséhez. Ez mode("overwrite")
biztosítja, hogy az azonos nevű meglévő táblák felülíródnak, és létrejön a tábla új verziója.
Teszt- és betanítási adatkészletek létrehozása
Ezután létrehozzuk a teszt- és betanítási adatkészleteket a megtisztított és funkcióalapú adatokból.
A megadott kódszakaszban egy megtisztított és funkcióalapú adatkészletet töltünk be a lakehouse-ból Delta formátummal, felosztjuk betanítási és tesztelési csoportokra 80-20 arányban, és előkészítjük az adatokat a gépi tanuláshoz. Ez az előkészítés magában foglalja a PySpark ML-ből való importálást a VectorAssembler
funkcióoszlopok egyetlen "funkciók" oszlopba való kombinálásához. Ezt követően átalakítjuk a VectorAssembler
betanítási train_data
és tesztelési adathalmazokat, így a "Kilépés" célváltozót és test_data
a funkcióvektorokat tartalmazó DataFrame-eket. Ezek az adathalmazok készen állnak a gépi tanulási modellek létrehozására és értékelésére.
# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler
# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")
# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)
# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]
# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]
Alapmodell betanítása
A featurizált adatok használatával betanítunk egy alapszintű gépi tanulási modellt, konfiguráljuk az MLflow-t a kísérletkövetéshez, meghatározunk egy előrejelzési függvényt a metrikák kiszámításához, végül pedig megtekintjük és naplózzuk az eredményül kapott ROC AUC-pontszámot.
Naplózási szint beállítása
Itt úgy konfiguráljuk a naplózási szintet, hogy a Synapse.ml tár szükségtelen kimenetét letiltsa, így a naplók tisztábbak maradnak.
import logging
logging.getLogger('synapse.ml').setLevel(logging.ERROR)
MLflow konfigurálása
Ebben a szakaszban az MLflow-t konfiguráljuk a kísérletkövetéshez. A kísérlet nevét "automl_sample" értékre állítjuk a futtatások rendszerezéséhez. Emellett engedélyezzük az automatikus naplózást is, biztosítva, hogy a modellparaméterek, metrikák és összetevők automatikusan bekerüljenek az MLflow-ba.
import mlflow
# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)
A modell betanítása és kiértékelése
Végül betanítottunk egy LightGBMClassifier modellt a megadott betanítási adatokra. A modell a bináris besoroláshoz és az egyensúlytalanság kezeléséhez szükséges beállításokkal van konfigurálva. Ezt a betanított modellt használva előrejelzéseket készítünk a tesztadatokról. Kinyerjük a pozitív osztály előrejelzett valószínűségeit és a valódi címkéket a tesztadatokból. Ezt követően kiszámítjuk a ROC AUC-pontszámot a sklearn függvényével roc_auc_score
.
from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score
# Assuming you have already defined 'train_data' and 'test_data'
with mlflow.start_run(run_name="default") as run:
# Create a LightGBMClassifier model with specified settings
model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited")
# Fit the model to the training data
model = model.fit(train_data)
# Get the predictions
predictions = model.transform(test_data)
# Extract the predicted probabilities for the positive class
y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()
# Extract the true labels from the 'test_data' DataFrame
y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()
# Compute the ROC AUC score
roc_auc = roc_auc_score(y_true, y_pred)
# Log the ROC AUC score with MLflow
mlflow.log_metric("ROC_AUC", roc_auc)
# Print or log the ROC AUC score
print("ROC AUC Score:", roc_auc)
Innen láthatjuk, hogy az eredményül kapott modell 84%-os ROC AUC-pontszámot ér el.
AutoML-próbaverzió létrehozása a FLAML használatával
Ebben a szakaszban létrehozunk egy AutoML-próbaverziót a FLAML-csomag használatával, konfiguráljuk a próbabeállításokat, átalakítjuk a Spark-adathalmazt Egy Pandasra Spark-adatkészleten, futtatjuk az AutoML-próbaverziót, és megtekintjük az eredményként kapott metrikákat.
Az AutoML-próbaverzió konfigurálása
Itt importáljuk a szükséges osztályokat és modulokat a FLAML-csomagból, és létrehozunk egy AutoML-példányt, amely a gépi tanulási folyamat automatizálására szolgál.
# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark
# Create an AutoML instance
automl = AutoML()
Beállítások konfigurálása
Ebben a szakaszban definiáljuk az AutoML-próbaverzió konfigurációs beállításait.
# Define AutoML settings
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"log_file_name": 'flaml_experiment.log', # FLAML log file
"seed": 41, # Random seed
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Konvertálás Pandasra a Sparkban
Ha Spark-alapú adatkészlettel szeretné futtatni az AutoML-t, a függvény használatával pandas-sá kell konvertálnunk a to_pandas_on_spark
Spark-adathalmazon. Ez lehetővé teszi, hogy a FLAML hatékonyan működjön együtt az adatokkal.
# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)
Az AutoML próbaverziójának futtatása
Most végrehajtjuk az AutoML-próbaverziót. Beágyazott MLflow-futtatás használatával követjük nyomon a kísérletet a meglévő MLflow-futtatási környezetben. Az AutoML-próba a Spark-adathalmazon (df_automl
) lévő Pandason történik, a célváltozóval "Exited
és a megadott beállításokat a rendszer átadja a függvénynek a fit
konfigurációhoz.
'''The main flaml automl API'''
with mlflow.start_run(nested=True):
automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)
Az eredményként kapott metrikák megtekintése
Ebben az utolsó szakaszban lekérjük és megjelenítjük az AutoML-próba eredményeit. Ezek a metrikák betekintést nyújtanak az AutoML-modell teljesítményébe és konfigurációjába az adott adathalmazon.
# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))
Az AutoML-próbaverzió párhuzamosítása az Apache Sparkkal
Azokban az esetekben, amikor az adathalmaz egyetlen csomópontba fér el, és a Spark előnyeit szeretné kihasználni több párhuzamos AutoML-próba egyidejű futtatásához, kövesse az alábbi lépéseket:
Konvertálás Pandas-adatkeretté
A párhuzamosítás engedélyezéséhez az adatokat először Pandas DataFrame-gé kell konvertálni.
pandas_df = train_raw.toPandas()
Ebben az esetben a train_raw
Spark DataFrame-et egy Pandas DataFrame-mé pandas_df
alakítjuk át, amely alkalmassá teszi a párhuzamos feldolgozásra.
Párhuzamosítási beállítások konfigurálása
A Spark-alapú párhuzamosság engedélyezésére True
van beállítvause_spark
. Alapértelmezés szerint a FLAML végrehajtónként egy próbaverziót indít el. Az argumentum használatával n_concurrent_trials
testre szabhatja az egyidejű kísérletek számát.
settings = {
"time_budget": 250, # Total running time in seconds
"metric": 'roc_auc', # Optimization metric (ROC AUC in this case)
"task": 'classification', # Task type (classification)
"seed": 41, # Random seed
"use_spark": True, # Enable Spark-based parallelism
"n_concurrent_trials": 3, # Number of concurrent trials to run
"force_cancel": True, # Force stop training once time_budget is used up
"mlflow_exp_name": "automl_sample" # MLflow experiment name
}
Ezekben a beállításokban azt adhatja meg, hogy a Sparkot a párhuzamossághoz a következő beállítással use_spark
szeretnénk True
használni: . Az egyidejű kísérletek számát is 3-ra állítjuk, ami azt jelenti, hogy három próba párhuzamosan fog futni a Sparkban.
Ha többet szeretne megtudni az AutoML-nyomvonalak párhuzamosításáról, tekintse meg a párhuzamos Spark-feladatok FLAML-dokumentációját.
Az AutoML-próba futtatása párhuzamosan
Most a megadott beállításokkal párhuzamosan futtatjuk az AutoML-próbaverziót. Beágyazott MLflow-futtatás használatával követjük nyomon a kísérletet a meglévő MLflow-futtatási környezetben.
'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
automl.fit(dataframe=pandas_df, label='Exited', **settings)
Ezzel végrehajtja az AutoML-próbaverziót, és engedélyezve van a párhuzamosítás. Az dataframe
argumentum a Pandas DataFrame pandas_df
értékre van állítva, és a rendszer más beállításokat ad át a fit
függvénynek a párhuzamos végrehajtáshoz.
Metrikák megtekintése
A párhuzamos AutoML-próba futtatása után kérje le és jelenítse meg az eredményeket, beleértve a legjobb hiperparaméter-konfigurációt, a ROC AUC-t az érvényesítési adatokon, valamint a legjobban teljesítő futtatás betanítási időtartamát.
''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))
Következő lépések
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: