Tutorial: Ingestión de datos en un grupo de datos de SQL Server con trabajos de Spark

Se aplica a: SQL Server 2019 (15.x)

Importante

El complemento Clústeres de macrodatos de Microsoft SQL Server 2019 se va a retirar. La compatibilidad con Clústeres de macrodatos de SQL Server 2019 finalizará el 28 de febrero de 2025. Todos los usuarios existentes de SQL Server 2019 con Software Assurance serán totalmente compatibles con la plataforma, y el software se seguirá conservando a través de actualizaciones acumulativas de SQL Server hasta ese momento. Para más información, consulte la entrada de blog sobre el anuncio y Opciones de macrodatos en la plataforma Microsoft SQL Server.

En este tutorial se muestra cómo usar trabajos de Spark para cargar datos en el grupo de datos de un elemento Clústeres de macrodatos de SQL Server 2019.

En este tutorial, aprenderá a:

  • Crear una tabla externa en el grupo de datos.
  • Crea un trabajo de Spark para cargar datos desde HDFS.
  • Consultar los resultados en la tabla externa.

Sugerencia

Si lo prefiere, puede descargar y ejecutar un script con los comandos de este tutorial. Para obtener instrucciones, vea los Ejemplos de grupos de datos en GitHub.

Requisitos previos

Crear una tabla externa en el grupo de datos

Siga este procedimiento para crear una tabla externa en el grupo de datos denominada web_clickstreams_spark_results. Después, esta tabla se puede usar como una ubicación para ingerir datos en el clúster de macrodatos.

  1. En Azure Data Studio, conéctese a la instancia maestra de SQL Server del clúster de macrodatos. Para obtener más información, vea Conectarse a una instancia maestra de SQL Server.

  2. Haga doble clic en la conexión de la ventana Servidores para mostrar el panel del servidor de la instancia maestra de SQL Server. Seleccione Nueva consulta.

    Consultar una instancia maestra de SQL Server

  3. Cree permisos para el conector MSSQL-Spark.

    USE Sales
    CREATE LOGIN sample_user  WITH PASSWORD ='password123!#' 
    CREATE USER sample_user FROM LOGIN sample_user
    
    -- To create external tables in data pools
    GRANT ALTER ANY EXTERNAL DATA SOURCE TO sample_user;
    
    -- To create external tables
    GRANT CREATE TABLE TO sample_user;
    GRANT ALTER ANY SCHEMA TO sample_user;
    
    -- To view database state for Sales
    GRANT VIEW DATABASE STATE ON DATABASE::Sales TO sample_user;
    
    ALTER ROLE [db_datareader] ADD MEMBER sample_user
    ALTER ROLE [db_datawriter] ADD MEMBER sample_user
    
  4. Cree un origen de datos externo al grupo de datos (si aún no existe).

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_data_sources WHERE name = 'SqlDataPool')
      CREATE EXTERNAL DATA SOURCE SqlDataPool
      WITH (LOCATION = 'sqldatapool://controller-svc/default');
    
  5. Cree una tabla externa denominada web_clickstreams_spark_results en el grupo de datos.

    USE Sales
    GO
    IF NOT EXISTS(SELECT * FROM sys.external_tables WHERE name = 'web_clickstreams_spark_results')
       CREATE EXTERNAL TABLE [web_clickstreams_spark_results]
       ("wcs_click_date_sk" BIGINT , "wcs_click_time_sk" BIGINT , "wcs_sales_sk" BIGINT , "wcs_item_sk" BIGINT , "wcs_web_page_sk" BIGINT , "wcs_user_sk" BIGINT)
       WITH
       (
          DATA_SOURCE = SqlDataPool,
          DISTRIBUTION = ROUND_ROBIN
       );
    
  6. Cree un inicio de sesión para los grupos de datos y proporcione permisos al usuario.

    EXECUTE( ' Use Sales; CREATE LOGIN sample_user  WITH PASSWORD = ''password123!#'' ;') AT  DATA_SOURCE SqlDataPool;
    
    EXECUTE('Use Sales; CREATE USER sample_user; ALTER ROLE [db_datareader] ADD MEMBER sample_user;  ALTER ROLE [db_datawriter] ADD MEMBER sample_user;') AT DATA_SOURCE SqlDataPool;
    

La creación de la tabla externa del grupo de datos es una operación de bloqueo. El control se devuelve cuando se ha creado la tabla especificada en todos los nodos del grupo de datos de back-end. Si se produce un error durante la operación de creación, se devuelve un mensaje de error al autor de la llamada.

Iniciar un trabajo de streaming de Spark

El paso siguiente es crear un trabajo de streaming de Spark que cargue datos de secuencias de clics web desde el grupo de almacenamiento (HDFS) en la tabla externa que ha creado en el grupo de datos. Estos datos se han agregado a /clickstream_data en Carga de datos de ejemplo en el clúster de macrodatos.

  1. En Azure Data Studio, conéctese a la instancia maestra del clúster de macrodatos. Para obtener más información, vea Conexión a un clúster de macrodatos.

  2. Cree un cuaderno y seleccione Spark | Scala como el kernel.

  3. Ejecute el trabajo de ingesta de Spark.

    1. Configure los parámetros del conector de Spark-SQL.

    Nota

    Si el clúster de macrodatos se implementa con la integración de Active Directory, reemplace el valor de hostname siguiente para incluir el FQDN anexado al nombre del servicio. Por ejemplo, hostname=master-p-svc.<domainName>.

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
    
    // Change per your installation
    val user= "username"
    val password= "****"
    val database =  "MyTestDatabase"
    val sourceDir = "/clickstream_data"
    val datapool_table = "web_clickstreams_spark_results"
    val datasource_name = "SqlDataPool"
    val schema = StructType(Seq(
    StructField("wcs_click_date_sk",LongType,true), StructField("wcs_click_time_sk",LongType,true), 
    StructField("wcs_sales_sk",LongType,true), StructField("wcs_item_sk",LongType,true),
    StructField("wcs_web_page_sk",LongType,true), StructField("wcs_user_sk",LongType,true)
    ))
    
    val hostname = "master-p-svc"
    val port = 1433
    val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"
    
    1. Defina y ejecute el trabajo de Spark.
      • Cada trabajo tiene dos elementos: readStream y writeStream. A continuación, se crea una trama de datos con el esquema definido anteriormente y, después, se escribe en la tabla externa del grupo de datos.
      import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
      
      val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
      val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
                batchDF.write
                 .format("com.microsoft.sqlserver.jdbc.spark")
                 .mode("append")
                  .option("url", url)
                  .option("dbtable", datapool_table)
                  .option("user", user)
                  .option("password", password)
                  .option("dataPoolDataSource",datasource_name).save()
               }.start()
      
      query.awaitTermination(40000)
      query.stop()
      

Consultar los datos

En los pasos siguientes, se muestra que el trabajo de streaming de Spark ha cargado los datos de HDFS en el grupo de datos.

  1. Antes de consultar los datos ingeridos, examine el estado de la ejecución de Spark, incluidos el identificador de aplicación de Yarn, la interfaz de usuario de Spark y los registros de controlador. Esta información se mostrará en el cuaderno la primera vez que inicie la aplicación de Spark.

    Detalles de la ejecución de Spark

  2. Vuelva a la ventana de consultas de la instancia maestra de SQL Server que ha abierto al principio de este tutorial.

  3. Ejecute la consulta siguiente para inspeccionar los datos ingeridos.

    USE Sales
    GO
    SELECT count(*) FROM [web_clickstreams_spark_results];
    SELECT TOP 10 * FROM [web_clickstreams_spark_results];
    
  4. Los datos también se pueden consultar en Spark. Por ejemplo, el código siguiente imprime el número de registros de la tabla:

    def df_read(dbtable: String,
                 url: String,
                 dataPoolDataSource: String=""): DataFrame = {
         spark.read
              .format("com.microsoft.sqlserver.jdbc.spark")
              .option("url", url)
              .option("dbtable", dbtable)
              .option("user", user)
              .option("password", password)
              .option("dataPoolDataSource", dataPoolDataSource)
              .load()
              }
    
    val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name)
    println("Number of rows is " +  new_df.count)
    

Limpieza

Use este comando para quitar los objetos de la base de datos creados en este tutorial.

DROP EXTERNAL TABLE [dbo].[web_clickstreams_spark_results];

Pasos siguientes

Obtenga información sobre cómo ejecutar un cuaderno de ejemplo en Azure Data Studio: