Connettore del pool SQL dedicato di Azure Synapse per Apache Spark

Introduzione

Il pool SQL dedicato di Azure Synapse Connessione or per Apache Spark in Azure Synapse Analytics consente un trasferimento efficiente di set di dati di grandi dimensioni tra il runtime di Apache Spark e il pool SQL dedicato. Il connettore viene fornito come libreria predefinita con l'area di lavoro Azure Synapse. Il connettore viene implementato usando il Scala linguaggio. Il connettore supporta Scala e Python. Per usare il Connessione or con altre opzioni del linguaggio del notebook, usare il comando Magic spark - %%spark.

A livello generale, il connettore offre le funzionalità seguenti:

  • Leggere dal pool SQL dedicato di Azure Synapse:
    • Leggere set di dati di grandi dimensioni dalle tabelle del pool SQL dedicato di Synapse (interne ed esterne) e dalle viste.
    • Supporto completo per il push down del predicato, in cui i filtri sul dataframe vengono mappati al push down del predicato SQL corrispondente.
    • Supporto per l'eliminazione delle colonne.
    • Supporto per il push delle query.
  • Scrivere nel pool SQL dedicato di Azure Synapse:
    • Inserire dati di volumi di grandi dimensioni in tipi di tabella interni ed esterni.
    • Supporta le preferenze della modalità di salvataggio del dataframe seguenti:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • La scrittura in tipo tabella esterna supporta il formato di file parquet e testo delimitato (ad esempio - CSV).
    • Per scrivere dati in tabelle interne, il connettore usa ora l'istruzione COPY anziché l'approccio CETAS/CTAS.
    • Miglioramenti per ottimizzare le prestazioni della velocità effettiva di scrittura end-to-end.
    • Introduce un handle di callback facoltativo (argomento funzione Scala) che i client possono usare per ricevere metriche post-scrittura.
      • Alcuni esempi includono il numero di record, la durata per completare determinate azioni e il motivo dell'errore.

Approccio di orchestrazione

Lettura

A high-level data flow diagram to describe the connector's orchestration of a read request.

Scrivi

A high-level data flow diagram to describe the connector's orchestration of a write request.

Prerequisiti

I prerequisiti, ad esempio la configurazione delle risorse di Azure necessarie e i passaggi per configurarli, sono descritti in questa sezione.

Risorse di Azure

Esaminare e configurare le risorse di Azure dipendenti seguenti:

Preparare il database

Connessione al database del pool SQL dedicato di Synapse ed eseguire le istruzioni di installazione seguenti:

  • Creare un utente di database mappato all'identità utente di Microsoft Entra usata per accedere all'area di lavoro di Azure Synapse.

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • Creare uno schema in cui verranno definite le tabelle, in modo che il Connessione or possa scrivere e leggere correttamente dalle rispettive tabelle.

    CREATE SCHEMA [<schema_name>];
    

Authentication

Autenticazione basata su ID Di Microsoft Entra

L'autenticazione basata su ID Di Microsoft Entra è un approccio di autenticazione integrato. L'utente deve accedere correttamente all'area di lavoro di Azure Synapse Analytics.

Autenticazione di base

Un approccio di autenticazione di base richiede all'utente di configurare e password opzioniusername. Fare riferimento alla sezione Opzioni di configurazione per informazioni sui parametri di configurazione pertinenti per la lettura e la scrittura nelle tabelle nel pool SQL dedicato di Azure Synapse.

Autorizzazione

Azure Data Lake Storage Gen2

Esistono due modi per concedere le autorizzazioni di accesso ad Azure Data Lake Archiviazione Gen2 - account Archiviazione:

  • Ruolo Controllo di accesso basato sui ruoli - Ruolo Collaboratore dati BLOB Archiviazione
    • L'assegnazione di concede all'utente Storage Blob Data Contributor Role le autorizzazioni di lettura, scrittura ed eliminazione dai contenitori BLOB Archiviazione di Azure.
    • Il controllo degli accessi in base al ruolo offre un approccio di controllo grossolano a livello di contenitore.
  • elenchi Controllo di accesso (ACL)
    • L'approccio ACL consente controlli con granularità fine su percorsi e/o file specifici in una determinata cartella.
    • I controlli ACL non vengono applicati se l'utente ha già concesso le autorizzazioni usando l'approccio di controllo degli accessi in base al ruolo.
    • Esistono due tipi generali di autorizzazioni ACL:
      • Autorizzazioni di accesso (applicate a un livello o a un oggetto specifico).
      • Autorizzazioni predefinite (applicate automaticamente per tutti gli oggetti figlio al momento della creazione).
    • Il tipo di autorizzazioni include:
      • Execute consente di attraversare o esplorare le gerarchie di cartelle.
      • Read consente di leggere.
      • Write consente di scrivere.
    • È importante configurare gli elenchi di controllo di accesso in modo che il Connessione or possa scrivere e leggere correttamente dai percorsi di archiviazione.

Nota

  • Se si vogliono eseguire notebook usando le pipeline dell'area di lavoro synapse, è necessario concedere anche le autorizzazioni di accesso elencate in precedenza all'identità gestita predefinita dell'area di lavoro synapse. Il nome predefinito dell'identità gestita dell'area di lavoro corrisponde al nome dell'area di lavoro.

  • Per usare l'area di lavoro Synapse con account di archiviazione protetti, è necessario configurare un endpoint privato gestito dal notebook. L'endpoint privato gestito deve essere approvato dalla sezione dell'account di archiviazione di Private endpoint connections ADLS Gen2 nel Networking riquadro.

Pool SQL dedicato di Azure Synapse

Per abilitare l'interazione riuscita con il pool SQL dedicato di Azure Synapse, è necessaria l'autorizzazione seguente, a meno che non si sia un utente configurato anche come nell'endpoint Active Directory Admin SQL dedicato:

  • Scenario di lettura

    • Concedere all'utente l'utilizzo db_exporter della stored procedure sp_addrolememberdi sistema .

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • Scenario di scrittura

    • Connessione or usa il comando COPY per scrivere dati dalla gestione temporanea alla posizione gestita della tabella interna.
      • Configurare le autorizzazioni necessarie descritte qui.

      • Di seguito è riportato un frammento di accesso rapido dello stesso:

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

Documentazione sull'API

Pool SQL dedicato di Azure Synapse Connessione or per Apache Spark - Documentazione dell'API.

Opzioni di configurazione

Per eseguire correttamente il bootstrap e orchestrare l'operazione di lettura o scrittura, il Connessione or prevede determinati parametri di configurazione. Definizione dell'oggetto: com.microsoft.spark.sqlanalytics.utils.Constants fornisce un elenco di costanti standardizzate per ogni chiave di parametro.

Di seguito è riportato l'elenco delle opzioni di configurazione in base allo scenario di utilizzo:

  • Lettura con l'autenticazione basata su ID Di Microsoft Entra
    • Le credenziali vengono mappate automaticamente e l'utente non è necessario per fornire opzioni di configurazione specifiche.
    • L'argomento nome tabella in tre parti sul synapsesql metodo è necessario per leggere dalla rispettiva tabella nel pool SQL dedicato di Azure Synapse.
  • Leggere con l'autenticazione di base
    • Endpoint SQL dedicato di Azure Synapse
      • Constants.SERVER - Punto finale del pool SQL dedicato di Synapse (FQDN server)
      • Constants.USER - Nome utente SQL.
      • Constants.PASSWORD - Password utente SQL.
    • Endpoint di Azure Data Lake Archiviazione (gen 2) - Cartelle di gestione temporanea
      • Constants.DATA_SOURCE- Archiviazione percorso impostato nel parametro del percorso dell'origine dati viene usato per la gestione temporanea dei dati.
  • Scrivere usando l'autenticazione basata su ID Entra di Microsoft
    • Endpoint SQL dedicato di Azure Synapse
      • Per impostazione predefinita, il Connessione or deduce l'endpoint SQL dedicato di Synapse usando il nome del database impostato nel synapsesql parametro del nome della tabella in tre parti del metodo.
      • In alternativa, gli utenti possono usare l'opzione Constants.SERVER per specificare l'endpoint sql. Verificare che il punto finale ospiti il database corrispondente con lo schema corrispondente.
    • Endpoint di Azure Data Lake Archiviazione (gen 2) - Cartelle di gestione temporanea
      • Per tipo di tabella interna:
        • Configurare o Constants.TEMP_FOLDERConstants.DATA_SOURCE l'opzione .
        • Se l'utente ha scelto di fornire Constants.DATA_SOURCE un'opzione, la cartella di staging verrà derivata usando il location valore di DataSource.
        • Se vengono specificati entrambi, verrà usato il valore dell'opzione Constants.TEMP_FOLDER .
        • In assenza di un'opzione di cartella di staging, il Connessione or ne deriva uno in base alla configurazione di runtime - spark.sqlanalyticsconnector.stagingdir.prefix.
      • Per tipo di tabella esterna:
        • Constants.DATA_SOURCE è un'opzione di configurazione obbligatoria.
        • Il connettore usa il percorso di archiviazione impostato nel parametro location dell'origine dati in combinazione con l'argomento location al synapsesql metodo e deriva il percorso assoluto per rendere persistenti i dati della tabella esterna.
        • Se l'argomento location al synapsesql metodo non viene specificato, il connettore deriva il valore della posizione come <base_path>/dbName/schemaName/tableName.
  • Scrivere usando l'autenticazione di base
    • Endpoint SQL dedicato di Azure Synapse
      • Constants.SERVER - - Punto finale del pool SQL dedicato di Synapse (FQDN server).
      • Constants.USER - Nome utente SQL.
      • Constants.PASSWORD - Password utente SQL.
      • Constants.STAGING_STORAGE_ACCOUNT_KEYassociato a Archiviazione Account che ospita Constants.TEMP_FOLDERS (solo tipi di tabella interni) o Constants.DATA_SOURCE.
    • Endpoint di Azure Data Lake Archiviazione (gen 2) - Cartelle di gestione temporanea
      • Le credenziali di autenticazione di base di SQL non si applicano ai punti finali di archiviazione.
      • Assicurarsi quindi di assegnare autorizzazioni di accesso alle risorse di archiviazione pertinenti, come descritto nella sezione Azure Data Lake Archiviazione Gen2.

Modelli di codice

Questa sezione presenta modelli di codice di riferimento per descrivere come usare e richiamare il pool SQL dedicato di Azure Synapse Connessione or per Apache Spark.

Nota

Uso del Connessione or in Python

  • Il connettore è supportato solo in Python per Spark 3. Per Spark 2.4 (non supportato), è possibile usare l'API del connettore Scala per interagire con il contenuto di un dataframe in PySpark usando DataFrame.createOrReplaceTempView o DataFrame.createOrReplaceGlobalTempView. Vedere sezione - Uso di dati materializzati tra le celle.
  • L'handle di callback non è disponibile in Python.

Leggere dal pool SQL dedicato di Azure Synapse

Richiesta di lettura - synapsesql firma del metodo

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

Leggere da una tabella usando l'autenticazione basata su ID Entra di Microsoft

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

Leggere da una query usando l'autenticazione basata su ID Entra di Microsoft

Nota

Restrizioni durante la lettura dalla query:

  • Impossibile specificare contemporaneamente il nome e la query della tabella.
  • Sono consentite solo query di selezione. Gli sql DDL e DML non sono consentiti.
  • Le opzioni di selezione e filtro nel dataframe non vengono spostate nel pool dedicato SQL quando viene specificata una query.
  • La lettura da una query è disponibile solo in Spark 3.1 e 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Leggere da una tabella usando l'autenticazione di base

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

Leggere da una query usando l'autenticazione di base

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Scrivere nel pool SQL dedicato di Azure Synapse

Richiesta di scrittura - synapsesql firma del metodo

La firma del metodo per la versione del Connessione or compilata per Spark 2.4.8 ha un argomento minore rispetto a quello applicato alla versione di Spark 3.1.2. Di seguito sono riportate le due firme del metodo:

  • Pool di Spark versione 2.4.8
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None):Unit
  • Pool di Spark versione 3.1.2
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

Scrivere usando l'autenticazione basata su ID Entra di Microsoft

Di seguito è riportato un modello di codice completo che descrive come usare il Connessione or per gli scenari di scrittura:

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

Scrivere usando l'autenticazione di base

Il frammento di codice seguente sostituisce la definizione di scrittura descritta nella sezione Write using Microsoft Entra ID based authentication (Scrittura usando l'autenticazione basata su ID di Microsoft Entra ID) per inviare una richiesta di scrittura usando l'approccio di autenticazione di base di SQL:

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

In un approccio di autenticazione di base, per leggere i dati da un percorso di archiviazione di origine sono necessarie altre opzioni di configurazione. Il frammento di codice seguente fornisce un esempio per leggere da un'origine dati di Azure Data Lake Archiviazione Gen2 usando le credenziali dell'entità servizio:

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

Modalità di salvataggio del dataframe supportate

Quando si scrivono dati di origine in una tabella di destinazione nel pool SQL dedicato di Azure Synapse, sono supportate le modalità di salvataggio seguenti:

  • ErrorIfExists (modalità di salvataggio predefinita)
    • Se la tabella di destinazione esiste, la scrittura viene interrotta con un'eccezione restituita al chiamato. In caso contrario, viene creata una nuova tabella con i dati delle cartelle di staging.
  • Ignorare
    • Se la tabella di destinazione esiste, la scrittura ignorerà la richiesta di scrittura senza restituire un errore. In caso contrario, viene creata una nuova tabella con i dati delle cartelle di staging.
  • Sovrascrivere
    • Se la tabella di destinazione esiste, i dati esistenti nella destinazione vengono sostituiti con i dati delle cartelle di staging. In caso contrario, viene creata una nuova tabella con i dati delle cartelle di staging.
  • Aggiungere
    • Se la tabella di destinazione esiste, i nuovi dati vengono accodati. In caso contrario, viene creata una nuova tabella con i dati delle cartelle di staging.

Handle di callback della richiesta di scrittura

La nuova API del percorso di scrittura ha introdotto una funzionalità sperimentale per fornire al client una mappa chiave-valore> delle metriche post-scrittura. Le chiavi per le metriche vengono definite nella nuova definizione oggetto - Constants.FeedbackConstants. Le metriche possono essere recuperate come stringa JSON passando l'handle di callback (a Scala Function). Di seguito è riportata la firma della funzione:

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

Di seguito sono riportate alcune metriche rilevanti (presentate nel caso camel):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

Di seguito è riportata una stringa JSON di esempio con metriche post-scrittura:

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

Altri esempi di codice

Uso di dati materializzati tra celle

I dataframe createOrReplaceTempView Spark possono essere usati per accedere ai dati recuperati in un'altra cella, registrando una visualizzazione temporanea.

  • Cella in cui vengono recuperati i dati(ad esempio con preferenza lingua notebook come Scala)
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • Modificare ora la preferenza per la lingua nel notebook in PySpark (Python) e recuperare i dati dalla visualizzazione registrata <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

Gestione delle risposte

La chiamata synapsesql ha due possibili stati finali: Operazione riuscita o Stato non riuscito. Questa sezione descrive come gestire la risposta della richiesta per ogni scenario.

Leggere la risposta alla richiesta

Al termine, il frammento di risposta di lettura viene visualizzato nell'output della cella. L'errore nella cella corrente annulla anche le esecuzioni successive delle celle. Le informazioni dettagliate sugli errori sono disponibili nei log applicazioni Spark.

Risposta alla richiesta di scrittura

Per impostazione predefinita, viene stampata una risposta di scrittura nell'output della cella. In caso di errore, la cella corrente viene contrassegnata come non riuscita e le successive esecuzioni di celle verranno interrotte. L'altro approccio consiste nel passare l'opzione handle di callback al synapsesql metodo . L'handle di callback fornirà l'accesso a livello di codice alla risposta di scrittura.

Altre considerazioni

  • Durante la lettura dalle tabelle del pool SQL dedicato di Azure Synapse:
    • Valutare la possibilità di applicare i filtri necessari nel dataframe per sfruttare la funzionalità di eliminazione delle colonne di Connessione or.
    • Lo scenario di lettura non supporta la TOP(n-rows) clausola , quando si incorniciano le istruzioni di SELECT query. La scelta di limitare i dati consiste nell'usare la clausola limite(.) del dataframe.
  • Durante la scrittura nelle tabelle del pool SQL dedicato di Azure Synapse:
    • Per i tipi di tabella interni:
      • Le tabelle vengono create con ROUND_ROBIN distribuzione dei dati.
      • I tipi di colonna vengono dedotti dal dataframe che leggerebbe i dati dall'origine. Le colonne stringa vengono mappate a NVARCHAR(4000).
    • Per i tipi di tabella esterni:
      • Il parallelismo iniziale del dataframe determina l'organizzazione dei dati per la tabella esterna.
      • I tipi di colonna vengono dedotti dal dataframe che leggerebbe i dati dall'origine.
    • È possibile ottenere una migliore distribuzione dei dati tra executor ottimizzando il spark.sql.files.maxPartitionBytes parametro e il parametro del repartition dataframe.
    • Quando si scrivono set di dati di grandi dimensioni, è importante tenere conto dell'impatto dell'impostazione del livello di prestazioni DWU che limita le dimensioni delle transazioni.
  • Monitorare le tendenze di utilizzo di Azure Data Lake Archiviazione Gen2 per individuare i comportamenti di limitazione che possono influire sulle prestazioni di lettura e scrittura.

Riferimenti