Interaktivní transformace dat pomocí Apache Sparku v Azure Machine Učení

Transformace dat se stává jedním z nejdůležitějších kroků v projektech strojového učení. Integrace služby Azure Machine Učení s Azure Synapse Analytics poskytuje přístup k fondu Apache Spark založenému na Azure Synapse pro interaktivní transformace dat pomocí azure Machine Učení Notebooks.

V tomto článku se dozvíte, jak provádět transformace dat pomocí

  • Výpočetní prostředí Spark bez serveru
  • Připojený fond Synapse Spark

Požadavky

Než začnete s úkoly transformace dat, přečtěte si o procesu ukládání tajných kódů.

  • Přístupový klíč účtu služby Azure Blob Storage
  • Token sdíleného přístupového podpisu (SAS)
  • Informace o instančním objektu služby Azure Data Lake Storage (ADLS) Gen2

ve službě Azure Key Vault. Potřebujete také vědět, jak zpracovávat přiřazení rolí v účtech úložiště Azure. Tyto koncepty jsou popsány v následujících částech. Pak prozkoumáme podrobnosti o interaktivní transformaci dat pomocí fondů Spark ve službě Azure Machine Učení Notebooks.

Tip

Informace o konfiguraci přiřazení role účtu úložiště Azure nebo o přístupu k datům v účtech úložiště pomocí předávání identity uživatele najdete v tématu Přidání přiřazení rolí v účtech úložiště Azure.

Transformace interaktivních dat pomocí Apache Sparku

Azure Machine Učení nabízí bezserverové výpočetní prostředky Sparku a připojený fond Synapse Spark pro interaktivní transformaci dat pomocí Apache Sparku v Azure Machine Učení Notebooks. Výpočetní prostředí Spark bez serveru nevyžaduje vytvoření prostředků v pracovním prostoru Azure Synapse. Místo toho se plně spravovaný bezserverový výpočetní výkon Sparku zpřístupní přímo v poznámkových blocích Azure Machine Učení. Použití bezserverového výpočetního prostředí Spark je nejjednodušší přístup ke clusteru Spark v Učení Azure Machine.

Výpočetní prostředí Spark bez serveru ve službě Azure Machine Učení Notebooks

Výpočetní prostředí Spark bez serveru je ve výchozím nastavení dostupné ve službě Azure Machine Učení Notebooks. Pokud k němu chcete získat přístup v poznámkovém bloku, vyberte v části Počítač Azure Učení Bezserverový Spark z nabídky Výběr výpočetních prostředků bezserverové výpočetní prostředky.

Uživatelské rozhraní Poznámkové bloky také poskytuje možnosti konfigurace relace Sparku pro výpočetní prostředí Spark bez serveru. Konfigurace relace Sparku:

  1. V horní části obrazovky vyberte Konfigurovat relaci .
  2. V rozevírací nabídce vyberte verzi Apache Sparku.

    Důležité

    Azure Synapse Runtime pro Apache Spark: Oznámení

    • Azure Synapse Runtime pro Apache Spark 3.2:
      • Datum oznámení EOLA: 8. července 2023
      • Datum ukončení podpory: 8. července 2024. Po tomto datu bude modul runtime zakázán.
    • Pro trvalou podporu a optimální výkon doporučujeme migrovat na Apache Spark 3.3.
  3. V rozevírací nabídce vyberte Typ instance. V současné době jsou podporovány následující typy instancí:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Zadejte hodnotu časového limitu relace Sparku v minutách.
  5. Vyberte, zda se mají exekutory dynamicky přidělovat .
  6. Vyberte počet exekutorů pro relaci Sparku.
  7. V rozevírací nabídce vyberte velikost exekutoru.
  8. V rozevírací nabídce vyberte velikost ovladače.
  9. Pokud chcete ke konfiguraci relace Sparku použít soubor Conda, zaškrtněte políčko Nahrát soubor conda. Pak vyberte Procházet a zvolte soubor Conda s požadovanou konfigurací relace Sparku.
  10. Přidejte vlastnosti nastavení konfigurace, vstupní hodnoty do textových polí Vlastnost a Hodnota a vyberte Přidat.
  11. Vyberte Použít.
  12. V místní nabídce Konfigurovat novou relaci vyberte Zastavit relaci?

Změny konfigurace relace se zachovají a budou k dispozici pro jinou relaci poznámkového bloku, která se spouští s bezserverovým výpočetním prostředím Spark.

Tip

Pokud používáte balíčky Conda na úrovni relace, můžete zkrátit dobu spuštění relace Sparku, pokud nastavíte konfigurační proměnnou spark.hadoop.aml.enable_cache na hodnotu true. Při prvním spuštění relace s balíčky Conda na úrovni relace obvykle trvá 10 až 15 minut. Následující relace však začíná proměnnou konfigurace nastavenou na true obvykle trvá tři až pět minut.

Import a uspořádání dat z Azure Data Lake Storage (ADLS) Gen2

K datům uloženým v účtech úložiště Azure Data Lake Storage (ADLS) Gen2 můžete přistupovat a měnit jejich uspořádání pomocí abfss:// identifikátorů URI dat, které následují jeden ze dvou mechanismů přístupu k datům:

  • Předávání identity uživatele
  • Přístup k datům založeným na instančním objektu

Tip

Transformace dat s výpočetním prostředím Spark bez serveru a předáním identity uživatele pro přístup k datům v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 vyžaduje nejmenší počet kroků konfigurace.

Zahájení interaktivní transformace dat s předáváním identity uživatele:

  • Ověřte, že identita uživatele má v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 přiřazení role Přispěvatela Přispěvateldat objektů blob služby Storage.

  • Pokud chcete použít bezserverové výpočetní prostředky Sparku, v části Počítač Azure Učení Bezserverové prostředí Spark v nabídce Výběr výpočetních prostředků vyberte bezserverové výpočetní prostředky.

  • Pokud chcete použít připojený fond Synapse Spark, vyberte připojený fond Synapse Spark v části Fondy Synapse Sparku z nabídky Výběr výpočetních prostředků .

  • Tento vzorový kód transformace dat Titanic ukazuje použití identifikátoru URI dat ve formátu abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> s pyspark.pandas a pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Poznámka:

    Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Uspořádání dat prostřednictvím instančního objektu:

  1. Ověřte, že instanční objekt má v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 přiřazené role Přispěvatel dat přispěvatele a Přispěvateldat objektů blob služby Storage.

  2. Vytvořte tajné kódy služby Azure Key Vault pro ID tenanta instančního objektu, ID klienta a hodnoty tajných kódů klienta.

  3. V nabídce pro výběr výpočetních prostředků vyberte výpočetní prostředí Spark bez serveru v azure Učení Bezserverové výpočetní prostředí nebo v nabídce pro výběr výpočetních prostředků vyberte připojený fond Synapse Spark ve fondechSynapse Spark.

  4. Pokud chcete nastavit ID tenanta instančního objektu, ID klienta a tajný klíč klienta v konfiguraci, a spusťte následující ukázku kódu.

    • Volání get_secret() v kódu závisí na názvu služby Azure Key Vault a na názvech tajných kódů služby Azure Key Vault vytvořených pro ID tenanta instančního objektu, ID klienta a tajný klíč klienta. Nastavte v konfiguraci tyto odpovídající názvy a hodnoty vlastností:

      • Vlastnost ID klienta: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Vlastnost tajného klíče klienta: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Vlastnost ID tenanta: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Hodnota ID tenanta: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Importujte a přeuspořádejte data pomocí identifikátoru URI dat ve formátu abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> , jak je znázorněno v ukázce kódu pomocí dat Titanic.

Import a uspořádání dat ze služby Azure Blob Storage

K datům azure Blob Storage můžete přistupovat buď pomocí přístupového klíče účtu úložiště, nebo pomocí tokenu sdíleného přístupového podpisu (SAS). Tyto přihlašovací údaje byste měli uložit ve službě Azure Key Vault jako tajný klíč a nastavit je jako vlastnosti v konfiguraci relace.

Zahájení interaktivní transformace dat:

  1. Na studio Azure Machine Learning levém panelu vyberte Poznámkové bloky.

  2. V nabídce pro výběr výpočetních prostředků vyberte výpočetní prostředí Spark bez serveru v azure Učení Bezserverové výpočetní prostředí nebo v nabídce pro výběr výpočetních prostředků vyberte připojený fond Synapse Spark ve fondechSynapse Spark.

  3. Konfigurace přístupového klíče účtu úložiště nebo tokenu sdíleného přístupového podpisu (SAS) pro přístup k datům ve službě Azure Machine Učení Notebooks:

    • Pro přístupový klíč nastavte vlastnost fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net , jak je znázorněno v tomto fragmentu kódu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Pro token SAS nastavte vlastnost fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net , jak je znázorněno v tomto fragmentu kódu:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Poznámka:

      Volání get_secret() ve výše uvedených fragmentech kódu vyžadují název služby Azure Key Vault a názvy tajných kódů vytvořených pro přístupový klíč účtu služby Azure Blob Storage nebo token SAS.

  4. Spusťte kód transformace dat ve stejném poznámkovém bloku. Naformátujte identifikátor URI dat jako , podobně jako wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>tento fragment kódu:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Poznámka:

    Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Import a uspořádání dat ze služby Azure Machine Učení Datastore

Pokud chcete získat přístup k datům ze služby Azure Machine Učení Datastore, definujte cestu k datům v úložišti dat pomocí formátuazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA> URI. Pokud chcete uspořádat data z počítače Azure Učení úložiště dat v relaci poznámkových bloků interaktivně:

  1. V nabídce pro výběr výpočetních prostředků vyberte výpočetní prostředí Spark bez serveru v azure Učení Bezserverové výpočetní prostředí nebo v nabídce pro výběr výpočetních prostředků vyberte připojený fond Synapse Spark ve fondechSynapse Spark.

  2. Tento vzorový kód ukazuje, jak číst a měnit uspořádání dat Titanic ze služby Azure Machine Učení Datastore pomocí azureml:// identifikátoru URI pyspark.pandas úložiště dat a pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Poznámka:

    Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Azure Machine Učení úložišti dat mají přístup k datům pomocí přihlašovacích údajů účtu úložiště Azure.

  • přístupový klíč
  • Token SAS
  • instanční objekt

nebo zadejte přístup k datům bez přihlašovacích údajů. V závislosti na typu úložiště dat a základním typu účtu úložiště Azure vyberte příslušný ověřovací mechanismus, který zajistí přístup k datům. Tato tabulka shrnuje mechanismy ověřování pro přístup k datům ve službě Azure Machine Učení úložištích dat:

Storage account type Přístup k datům bez přihlašovacích údajů Mechanismus přístupu k datům Přiřazení rolí
Azure Blob No Přístupový klíč nebo token SAS Nejsou potřeba žádná přiřazení rolí.
Azure Blob Ano Předávání identity uživatele* Identita uživatele by měla mít v účtu služby Azure Blob Storage odpovídající přiřazení rolí.
Azure Data Lake Storage (ADLS) Gen 2 No Instanční objekt Instanční objekt by měl mít v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 odpovídající přiřazení rolí.
Azure Data Lake Storage (ADLS) Gen 2 Ano Předávání identity uživatele Identita uživatele by měla mít v účtu úložiště Azure Data Lake Storage (ADLS) Gen2 odpovídající přiřazení rolí.

* Předávání identity uživatele funguje pro úložiště dat bez přihlašovacích údajů, které odkazují na účty úložiště objektů blob v Azure, pouze pokud není povolené obnovitelné odstranění .

Přístup k datům ve výchozí sdílené složce

Výchozí sdílená složka se připojí k výpočetním prostředkům Sparku bez serveru i k připojeným fondům Synapse Spark.

Screenshot showing use of a file share.

V studio Azure Machine Learning se soubory ve výchozí sdílené složce zobrazují ve stromu adresáře na kartě Soubory. Kód poznámkového bloku může přímo přistupovat k souborům uloženým v této sdílené složce pomocí file:// protokolu spolu s absolutní cestou k souboru bez dalších konfigurací. Tento fragment kódu ukazuje, jak získat přístup k souboru uloženému ve výchozí sdílené složce:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Poznámka:

Tento vzorový kód Pythonu používá pyspark.pandas. Tuto možnost podporuje pouze modul runtime Spark verze 3.2 nebo novější.

Další kroky