Leer datos compartidos mediante el uso compartido abierto de Delta Sharing (para destinatarios)

En este artículo se describe cómo leer los datos que se han compartido con usted mediante el protocolo de uso compartido abierto de Delta Sharing. En el uso compartido abierto, usted usa un archivo de credenciales que el proveedor de datos ha compartido con un miembro de su equipo para obtener un acceso de lectura seguro a los datos compartidos. El acceso persiste siempre y cuando las credenciales sean válidas y el proveedor siga compartiendo los datos. Los proveedores administran la expiración y la rotación de credenciales. Las actualizaciones de los datos están disponibles casi en tiempo real. Puede leer y realizar copias de los datos compartidos, pero no puede modificar los datos de origen.

Nota:

Si los datos se han compartido con usted usando Delta Sharing de Databricks a Databricks, no necesitará un archivo de credenciales para acceder a los datos, y este artículo no se le aplica. Para obtener instrucciones, consulte Lectura de datos compartidos usando Delta Sharing de Databricks a Databricks (para destinatarios).

En las secciones siguientes se describe cómo usar Azure Databricks, Apache Spark, Pandas y Power BI para acceder a datos compartidos y leerlos mediante el archivo de credenciales. Para obtener una lista completa de los conectores de Delta Sharing e información sobre cómo usarlos, consulte la documentación de orígenes abiertos de Delta Sharing. Si tiene problemas para acceder a los datos compartidos, póngase en contacto con el proveedor de datos.

Nota:

Las integraciones de asociados, salvo se indique lo contrario, la proporcionan terceros y es preciso tener una cuenta con el proveedor adecuado para poder usar sus productos y servicios. Aunque se hace todo lo posible para que el contenido de Databricks esté actualizado, no nos hacemos responsables de las integraciones ni de la exactitud del contenido de las páginas de integración de los asociados. Póngase en contacto con los proveedores adecuados para tratar cualquier aspecto concreto de sus integraciones.

Antes de empezar

Un miembro de su equipo debe descargar el archivo de credenciales compartido por el proveedor de datos. Consulte Obtener acceso en el modelo de uso compartido abierto.

Deberían usar un canal seguro para compartir ese archivo o su ubicación con usted.

Azure Databricks: Lectura de datos compartidos mediante conectores de uso compartido abierto

Esta sección describe cómo usar un conector de uso compartido abierto para acceder a datos compartidos usando un cuaderno en su área de trabajo de Azure Databricks. Usted u otro miembro de su equipo almacena el archivo de credenciales en DBFS, después lo usa para autenticarse en la cuenta de Azure Databricks del proveedor de datos y leer los datos que compartió con usted.

Nota:

El proveedor de datos está usando el uso compartido de Databricks a Databricks y no compartió un archivo de credenciales con usted, tiene que acceder a los datos mediante el catálogo de Unity. Para obtener instrucciones, consulte Lectura de datos compartidos usando Delta Sharing de Databricks a Databricks (para destinatarios).

En este ejemplo, creará un cuaderno con varias celdas que puede ejecutar de forma independiente. En su lugar, podría agregar los comandos del cuaderno a la misma celda y ejecutarlos en una secuencia.

Paso 1: Almacenar el archivo de credenciales en DBFS (instrucciones de Python)

En este paso, usted usa un cuaderno de Python en Azure Databricks para almacenar el archivo de credenciales para que los usuarios de su equipo puedan acceder a los datos compartidos.

Pase al siguiente paso si usted o alguien de su equipo ya han almacenado el archivo de credenciales en DBFS.

  1. En un editor de texto, abra el archivo de credenciales.

  2. En el área de trabajo de Azure Databricks, haga clic en Nuevo > Cuaderno.

    • Escriba un nombre.
    • Establezca el idioma predeterminado del cuaderno en Python.
    • Seleccione un clúster para adjuntarlo al cuaderno.
    • Haga clic en Crear.

    El cuaderno se abre en el editor de cuadernos.

  3. Para usar Python o Pandas para acceder a los datos compartidos, instale el conector de Python de Delta Sharing. En el editor de cuadernos, pegue el siguiente comando:

    %sh pip install delta-sharing
    
  4. Ejecute la celda.

    La biblioteca de Python delta-sharing se instala en el clúster si aún no está instalada.

  5. En una nueva celda, pegue el siguiente comando, que carga el contenido del archivo de credenciales en una carpeta de DBFS. Reemplace las variables como se muestra a continuación:

    • <dbfs-path>: ruta de acceso a la carpeta donde desea guardar el archivo de credenciales

    • <credential-file-contents>: el contenido del archivo de credenciales. No se trata de una ruta de acceso al archivo, sino del contenido copiado del mismo.

      El archivo de credenciales contiene JSON, que define tres campos: shareCredentialsVersion, endpoint y bearerToken.

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. Ejecute la celda.

    Una vez cargado el archivo de credenciales, puede eliminar esta celda. Todos los usuarios del área de trabajo pueden leer el archivo de credenciales de DBFS y dicho archivo está disponible en DBFS en todos los clústeres y almacenes de SQL de su área de trabajo. Para eliminar la celda, haga clic en x en el menú de acciones de celda Acciones de celda en el extremo derecho.

Paso 2: Usar un cuaderno para enumerar y leer tablas compartidas

En este paso, se enumeran las tablas del recurso compartido, o el conjunto de tablas y particiones compartidas, y se consulta una tabla.

  1. Con Python, enumere las tablas del recurso compartido.

    En una nueva celda, pegue el siguiente comando. Reemplace <dbfs-path> por la ruta de acceso que se creó en el Paso 1: Almacenar el archivo de credenciales en DBFS (instrucciones de Python).

    Cuando se ejecute el código, Python leerá el archivo de credenciales de DBFS en el clúster. Acceda a los datos almacenados en DBFS en la ruta de acceso /dbfs/.

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. Ejecute la celda.

    El resultado es una matriz de tablas, junto con metadatos para cada tabla. En la salida siguiente se muestran dos tablas:

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    Si la salida está vacía o no contiene las tablas que espera, póngase en contacto con el proveedor de datos.

  3. Consulta de una tabla compartida.

    • Usando Scala:

      En una nueva celda, pegue el siguiente comando. Cuando se ejecuta el código, el archivo de credenciales se lee de DBFS a través de JVM.

      Reemplace las variables como se muestra a continuación:

      • <profile-path>: ruta de acceso de DBFS del archivo de credenciales. Por ejemplo, /<dbfs-path>/config.share.
      • <share-name>: valor de share= para la tabla.
      • <schema-name>: valor de schema= para la tabla.
      • <table-name>: valor de name= para la tabla.
      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      Ejecute la celda. Cada vez que cargue la tabla compartida, verá datos nuevos del origen.

    • Usando SQL:

      Para consultar los datos mediante SQL, cree una tabla local en el área de trabajo a partir de la tabla compartida y, después, consulte la tabla local. Los datos compartidos no se guardan ni almacenan en caché en la tabla local. Cada vez que se consulta la tabla local, se ve el estado actual de los datos compartidos.

      En una nueva celda, pegue el siguiente comando.

      Reemplace las variables como se muestra a continuación:

      • <local-table-name>: nombre de la tabla local.
      • <profile-path>: ubicación del archivo de credenciales.
      • <share-name>: valor de share= para la tabla.
      • <schema-name>: valor de schema= para la tabla.
      • <table-name>: valor de name= para la tabla.
      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      Al ejecutar el comando, los datos compartidos se consultan directamente. Como prueba, se consulta la tabla y se devuelven los diez primeros resultados.

    Si la salida está vacía o no contiene los datos que espera, póngase en contacto con el proveedor de datos.

Apache Spark: Lectura de datos compartidos

Siga estos pasos para acceder a los datos compartidos usando Spark 3.x o superior.

En estas instrucciones se supone que tiene acceso al archivo de credenciales compartido por el proveedor de datos. Consulte Obtener acceso en el modelo de uso compartido abierto.

Instalación de los conectores de Python y Spark de Delta Sharing

Para acceder a los metadatos relacionados con los datos compartidos, como la lista de tablas compartidas con usted, haga lo siguiente. Este ejemplo utiliza Python.

  1. Instale el conector de Python delta-sharing:

    pip install delta-sharing
    
  2. Instale el conector de Apache Spark.

Enumeración de tablas compartidas mediante Spark

Enumera las tablas del recurso compartido. En el ejemplo siguiente, reemplace <profile-path> por la ubicación del archivo de credenciales.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

El resultado es una matriz de tablas, junto con metadatos para cada tabla. En la salida siguiente se muestran dos tablas:

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

Si la salida está vacía o no contiene las tablas que espera, póngase en contacto con el proveedor de datos.

Acceso a datos compartidos mediante Spark

Ejecute lo siguiente, sustituyendo estas variables:

  • <profile-path>: ubicación del archivo de credenciales.
  • <share-name>: valor de share= para la tabla.
  • <schema-name>: valor de schema= para la tabla.
  • <table-name>: valor de name= para la tabla.
  • <version-as-of>: opcional. Versión de la tabla para cargar los datos. Solo funciona si el proveedor de datos comparte el historial de la tabla. Requiere delta-sharing-spark 0.5.0 o superior.
  • <timestamp-as-of>: opcional. Cargue los datos en la versión anterior o en la marca de tiempo especificada. Solo funciona si el proveedor de datos comparte el historial de la tabla. Requiere delta-sharing-spark 0.6.0 o superior.

Python

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Scala

Ejecute lo siguiente, sustituyendo estas variables:

  • <profile-path>: ubicación del archivo de credenciales.
  • <share-name>: valor de share= para la tabla.
  • <schema-name>: valor de schema= para la tabla.
  • <table-name>: valor de name= para la tabla.
  • <version-as-of>: opcional. Versión de la tabla para cargar los datos. Solo funciona si el proveedor de datos comparte el historial de la tabla. Requiere delta-sharing-spark 0.5.0 o superior.
  • <timestamp-as-of>: opcional. Cargue los datos en la versión anterior o en la marca de tiempo especificada. Solo funciona si el proveedor de datos comparte el historial de la tabla. Requiere delta-sharing-spark 0.6.0 o superior.
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

Acceso a una fuente de distribución de datos de cambios compartida mediante Spark

Si el historial de tablas se ha compartido con usted y la fuente de distribución de datos modificados (CDF) está habilitada en la tabla de origen, puede acceder a la fuente de distribución de datos modificados ejecutando lo siguiente, reemplazando estas variables. Requiere delta-sharing-spark 0.5.0 o superior.

Se debe proporcionar uno y solo un parámetro de inicio.

  • <profile-path>: ubicación del archivo de credenciales.
  • <share-name>: valor de share= para la tabla.
  • <schema-name>: valor de schema= para la tabla.
  • <table-name>: valor de name= para la tabla.
  • <starting-version>: opcional. Versión inicial de la consulta, inclusiva. Especifíquelo como Long.
  • <ending-version>: opcional. Versión final de la consulta, inclusiva. Si no se proporciona la versión final, la API usa la versión de tabla más reciente.
  • <starting-timestamp>: opcional. La marca de tiempo inicial de la consulta, se convierte en una versión creada mayor o igual que esta marca de tiempo. Especifíquelo como una cadena con el formato yyyy-mm-dd hh:mm:ss[.fffffffff].
  • <ending-timestamp>: opcional. La marca de tiempo final de la consulta, esta se convierte en una versión creada inferior o igual a esta marca de tiempo. Especifíquelo como una cadena con el formato yyyy-mm-dd hh:mm:ss[.fffffffff].

Python

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Scala

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Si la salida está vacía o no contiene los datos que espera, póngase en contacto con el proveedor de datos.

Acceso a una tabla compartida mediante Spark Structured Streaming

Si el historial de tablas se comparte con usted, puede transmitir los datos compartidos. Requiere delta-sharing-spark 0.6.0 o superior.

Opciones admitidas:

  • ignoreDeletes: omitir las transacciones que eliminan los datos.
  • ignoreChanges: volver a procesar las actualizaciones si los archivos se volvieron a escribir en la tabla de origen debido a una operación de cambio de datos como UPDATE, MERGE INTO, DELETE (dentro de las particiones) o OVERWRITE. Todavía se pueden emitir filas sin cambios. Por tanto, los consumidores de nivel inferior deben ser capaces de controlar los duplicados. Las eliminaciones no se propagan de bajada. ignoreChanges subsumes ignoreDeletes. Por lo tanto, si usa ignoreChanges, el flujo no se interrumpirá mediante eliminaciones o actualizaciones de la tabla de origen.
  • startingVersion: la versión de la tabla compartida desde la que se va a iniciar. El origen de streaming leerá todos los cambios de tabla a partir de esta versión (inclusive).
  • startingTimestamp: marca de tiempo desde la que empezar. El origen de streaming leerá todos los cambios de tabla confirmados en la marca de tiempo o después (inclusive). Ejemplo: "2023-01-01 00:00:00.0".
  • maxFilesPerTrigger: el número de archivos nuevos por considerar en cada microlote.
  • maxBytesPerTrigger: la cantidad de datos que se procesan en cada microlote. Esta opción establece un "máximo flexible", lo que significa que un lote procesa aproximadamente esta cantidad de datos y puede procesar más que el límite para que la consulta de flujo avance en los casos en que la unidad de entrada más pequeña sea mayor que este límite.
  • readChangeFeed: se lee por streaming la fuente de datos modificados de la tabla compartida.

Opciones no admitidas:

  • Trigger.availableNow

Ejemplos de consultas de Structured Streaming

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Consulte también Streaming en Azure Databricks.

Lectura de tablas con vectores de eliminación o asignación de columnas habilitados

Importante

Esta característica está en versión preliminar pública.

Los vectores de eliminación son una característica de optimización de almacenamiento que el proveedor puede habilitar en tablas Delta compartidas. Consulte ¿Qué son los vectores de eliminación?.

Azure Databricks también admite la asignación de columnas para tablas Delta. Consulte Cambio de nombre y eliminación de columnas con la asignación de columnas de Delta Lake.

Si el proveedor ha compartido una tabla con vectores de eliminación o asignación de columna habilitada, puede leer la tabla mediante el proceso que ejecuta delta-sharing-spark 3.1 o superior. Si usa clústeres de Databricks, puede realizar lecturas por lotes mediante un clúster que ejecute Databricks Runtime 14.1 o superior. Las consultas de CDF y streaming requieren Databricks Runtime 14.2 o superior.

Puede realizar consultas por lotes tal como están, ya que pueden resolver automáticamente responseFormat en función de las características de tabla de la tabla compartida.

Para leer una fuente de distribución de datos modificado (CDF) o para realizar consultas de streaming en tablas compartidas con vectores de eliminación o asignación de columnas habilitadas, debe establecer la opción adicional responseFormat=delta.

En los siguientes ejemplos se muestran consultas por lotes, CDF y streaming:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: Lectura de datos compartidos

Siga estos pasos para acceder a los datos compartidos en Pandas 0.25.3 o posterior.

En estas instrucciones se supone que tiene acceso al archivo de credenciales compartido por el proveedor de datos. Consulte Obtener acceso en el modelo de uso compartido abierto.

Instale el conector de Python de Delta Sharing

Para acceder a los metadatos relacionados con los datos compartidos, como la lista de tablas compartidas con usted, debe instalar el conector delta-sharing de Python.

pip install delta-sharing

Enumeración de tablas compartidas mediante pandas

Para enumerar las tablas del recurso compartido, ejecute lo siguiente, sustituyendo <profile-path>/config.share por la ubicación del archivo de credenciales.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

Si la salida está vacía o no contiene las tablas que espera, póngase en contacto con el proveedor de datos.

Acceso a datos compartidos mediante pandas

Para acceder a los datos compartidos en pandas usando Python, ejecute lo siguiente, reemplazando las variables como se indica a continuación:

  • <profile-path>: ubicación del archivo de credenciales.
  • <share-name>: valor de share= para la tabla.
  • <schema-name>: valor de schema= para la tabla.
  • <table-name>: valor de name= para la tabla.
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

Acceso a una fuente de distribución de datos de cambios compartida mediante pandas

Para acceder a la fuente de distribución de datos de una tabla compartida en pandas usando Python ejecute lo siguiente, reemplazando las variables como se indica a continuación. Es posible que la fuente de distribución de datos de cambios no esté disponible, dependiendo de si el proveedor de datos compartió o no la fuente de distribución de datos de cambios para la tabla.

  • <starting-version>: opcional. Versión inicial de la consulta, inclusiva.
  • <ending-version>: opcional. Versión final de la consulta, inclusiva.
  • <starting-timestamp>: opcional. Marca de tiempo inicial de la consulta. Se convierte en una versión creada mayor o igual que esta marca de tiempo.
  • <ending-timestamp>: opcional. Marca de tiempo final de la consulta. Se convierte en una versión creada inferior o igual que esta marca de tiempo.
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

Si la salida está vacía o no contiene los datos que espera, póngase en contacto con el proveedor de datos.

Power BI: Lectura de datos compartidos

El conector de Delta Sharing de Power BI le permite descubrir, analizar y visualizar conjuntos de datos compartidos con usted a través del protocolo abierto de Delta Sharing.

Requisitos

Conexión a Databricks

Para conectarse a Azure Databricks usando el conector de Delta Sharing, haga lo siguiente:

  1. Abra el archivo de credenciales compartido con un editor de texto para recuperar la URL del punto de conexión y el token.
  2. Abra Power BI Desktop.
  3. En el menú Obtener datos, busque Delta Sharing.
  4. Seleccione el conector y haga clic en Conectar.
  5. Escriba la dirección URL del punto de conexión que copió del archivo de credenciales en el campo Delta Sharing Server URL (Dirección URL de Delta Sharing).
  6. Opcionalmente, en la pestaña Opciones avanzadas, establezca un límite de filas para el número máximo de filas que puede descargar. De forma predeterminada, se establece en 1 millón de filas.
  7. Haga clic en Aceptar.
  8. En Autenticación, copie el token que recuperó del archivo de credenciales en Token de portador.
  9. Haga clic en Conectar.

Limitaciones del conector de Delta Sharing de Power BI

El conector de Delta Sharing de Power BI tiene las siguientes limitaciones:

  • Los datos que carga el conector deben ajustarse a la memoria de la máquina. Para garantizarlo, el conector limita el número de filas importadas al Límite de filas que haya establecido en la pestaña Opciones avanzadas de Power BI Desktop.

Solicitud de credenciales nuevas

Si la dirección URL de activación de credenciales o las credenciales descargadas se pierden, dañan o ponen en peligro, o sus credenciales expiran sin que el proveedor le envíe unas nuevas, póngase en contacto con el proveedor para solicitar credenciales nuevas.