Canalizaciones y actividades en Azure Data Factory

SE APLICA A: Azure Data Factory Azure Synapse Analytics

Este artículo ayuda a conocer las canalizaciones y actividades de Azure Data Factory y a usarlas para construir flujos de trabajo controlados por datos de un extremo para los escenarios de procesamiento de datos y movimiento de datos.

Información general

Una factoría de datos puede tener una o más canalizaciones. Una canalización es una agrupación lógica de actividades que realizan una tarea. Por ejemplo, una canalización puede contener un conjunto de actividades que ingieren y limpian los datos de registro y, a continuación, inician un flujo de datos de asignación para analizar los datos de registro. La canalización permite administrar las actividades como un conjunto en lugar de hacerlo individualmente. Puede implementar y programar la canalización en lugar de las actividades de forma independiente.

Las actividades de una canalización definen las acciones que se van a realizar en los datos. Por ejemplo, puede utilizar una actividad de copia para copiar datos de SQL Server en una instancia de Azure Blob Storage. A continuación, use una actividad de flujo de datos o una actividad de cuaderno de Databricks para procesar y transformar datos de Blob Storage a un grupo de Azure Synapse Analytics sobre el que se crean las soluciones de informes de inteligencia empresarial.

Data Factory tiene tres agrupaciones de actividades: actividades de movimiento de datos, actividades de transformación de datos y actividades de control. Una actividad puede tomar diversos conjuntos de datos, o ninguno, y generar uno o varios conjuntos de datos. En el siguiente diagrama se muestra la relación entre la canalización, la actividad y el conjunto de datos en Data Factory:

Relación entre el conjunto de datos, la actividad y la canalización

Un conjunto de datos de entrada representa la entrada para una actividad de la canalización y un conjunto de datos de salida representa la salida de la actividad. Los conjuntos de datos identifican datos en distintos almacenes de datos, como tablas, archivos, carpetas y documentos. Después de crear un conjunto de datos, puede usarlo con las actividades de una canalización. Por ejemplo, un conjunto de datos puede ser un conjunto de datos de entrada y salida de una actividad de copia o una actividad de HDInsightHive. Para obtener más información sobre los conjuntos de datos, vea el artículo Conjuntos de datos en Azure Data Factory.

Actividades de movimiento de datos

Copiar actividad en Data Factory realiza una copia de los datos de un almacén de datos de origen a uno receptor. Data Factory admite los almacenes de datos que se muestran en la tabla de esta sección. Se pueden escribir datos desde cualquier origen en todos los tipos de receptores. Haga clic en un almacén de datos para obtener información sobre cómo copiar datos a un almacén como origen o destino.

Category Almacén de datos Se admite como origen Se admite como receptor Compatible con IR de Azure Compatible con IR autohospedado
Azure Almacenamiento de blobs de Azure
  Índice de Azure Cognitive Search
  Azure Cosmos DB (API de SQL)
  API de Azure Cosmos DB para MongoDB
  Azure Data Explorer
  Azure Data Lake Storage Gen1
  Azure Data Lake Storage Gen2
  Azure Database for MariaDB
  Azure Database for MySQL
  Azure Database para PostgreSQL
  Azure Databricks Delta Lake
  Azure File Storage
  Azure SQL Database
  Instancia administrada de Azure SQL
  Azure Synapse Analytics
  Azure Table Storage
Base de datos Amazon Redshift
  DB2
  Drill
  Google BigQuery
  Greenplum
  HBase
  Hive
  Apache Impala
  Informix
  MariaDB
  Microsoft Access
  MySQL
  Netezza
  Oracle
  Phoenix
  PostgreSQL
  Presto
  SAP Business Warehouse a través de Open Hub
  SAP Business Warehouse a través de expresión multidimensional
  SAP HANA
  SAP Table
  Snowflake
  Spark
  SQL Server
  Sybase
  Teradata
  Vertica
NoSQL Cassandra
  Couchbase (versión preliminar)
  MongoDB
  MongoDB Atlas
Archivo Amazon S3
  Almacenamiento compatible con Amazon S3
  Sistema de archivos
  FTP
  Google Cloud Storage
  HDFS
  Oracle Cloud Storage
  SFTP
Protocolo genérico HTTP genérico
  OData genérico
  ODBC genérico
  REST genérico
Servicios y aplicaciones Servicio web de Amazon Marketplace
  Concur (versión preliminar)
  Dataverse
  Dynamics 365
  Dynamics AX
  Dynamics CRM
  Google AdWords
  HubSpot
  Jira
  Magento (versión preliminar)
  Marketo (versión preliminar)
  Microsoft 365
  Oracle Eloqua (versión preliminar)
  Oracle Responsys (versión preliminar)
  Oracle Service Cloud (versión preliminar)
  PayPal (versión preliminar)
  QuickBooks (versión preliminar)
  Salesforce
  Servicio de Salesforce en la nube
  Nube de marketing de Salesforce
  SAP Cloud for Customer (C4C)
  SAP ECC
  ServiceNow
Lista de SharePoint Online
  Shopify (versión preliminar)
  Square (versión preliminar)
  Tabla web (tabla HTML)
  Xero
  Zoho (versión preliminar)

Nota

Si un conector está marcado como versión preliminar, puede probarlo y enviarnos sus comentarios. Si quiere tener una dependencia de los conectores de versión preliminar de la solución, póngase en contacto con el servicio de soporte técnico de Azure.

Para obtener más información, consulte el artículo Información general de la actividad de copia.

Actividades de transformación de datos

Azure Data Factory admite las siguientes actividades de transformación que se pueden agregar a las canalizaciones tanto individualmente como encadenadas a otra actividad.

Actividad de transformación de datos Entorno de procesos
Flujo de datos Azure Databricks administrado por Azure Data Factory
Función de Azure Azure Functions
Hive HDInsight [Hadoop]
Pig HDInsight [Hadoop]
de Hadoop HDInsight [Hadoop]
Hadoop Streaming HDInsight [Hadoop]
Spark HDInsight [Hadoop]
Actividades de Estudio de Azure Machine Learning (clásico): ejecución de lotes y recurso de actualización Azure VM
Procedimiento almacenado Azure SQL, Azure Synapse Analytics o SQL Server
U-SQL Análisis con Azure Data Lake
Actividad personalizada Azure Batch
Databricks Notebook Azure Databricks
Actividad de Jar en Databricks Azure Databricks
Actividad de Python en Databricks Azure Databricks

Para obtener más información, consulte el artículo sobre las actividades de transformación de datos.

Actividades de flujo de control

Se admiten las siguientes actividades de flujo de control:

Actividad de control Descripción
Append Variable Agrega un valor a una variable de matriz existente.
Execute Pipeline La actividad de ejecución de canalización permite que una canalización de Data Factory invoque otra canalización.
Filter Aplicar una expresión de filtro a una matriz de entrada
For Each La actividad ForEach define un flujo de control repetido en la canalización. Esta actividad se usa para iterar una colección y ejecuta las actividades especificadas en un bucle. La implementación del bucle de esta actividad es similar a la estructura del bucle ForEach de los lenguajes de programación.
Get Metadata (Obtener metadatos) La actividad GetMetadata se puede usar para recuperar metadatos de cualquier dato en Azure Data Factory.
Actividad If Condition La condición If puede usarse para crear una rama basada en una condición que evalúa como true o false. La actividad de la condición IF proporciona la misma funcionalidad que proporciona una instrucción If en lenguajes de programación. Evalúa un conjunto de actividades cuando la condición se evalúa como true y otro conjunto de actividades cuando la condición se evalúa como false..
Actividad Lookup La actividad de búsqueda puede usarse para leer o buscar un registro, un nombre de tabla o un valor de cualquier origen externo. Además, las actividades posteriores pueden hacer referencia a esta salida.
Set Variable Establece el valor de una variable existente.
Actividad Until Implementa el bucle Do-Until, que es similar a la estructura de bucle Do-Until de los lenguajes de programación. Ejecuta un conjunto de actividades en un bucle hasta que la condición asociada a la actividad la evalúa como "true". Puede especificar un valor de tiempo de espera para la actividad Until en Data Factory.
Actividad de validación Asegúrese de que una canalización solo continúa la ejecución si existe un conjunto datos de referencia, cumple los criterios especificados o se ha alcanzado el tiempo de espera.
Actividad Wait Cuando use una actividad Wait en una canalización, esta espera durante el tiempo especificado antes de continuar con la ejecución de actividades sucesivas.
Actividad web La actividad Web puede usarse para llamar a un punto de conexión REST personalizado desde una canalización de Data Factory. Puede pasar conjuntos de datos y servicios vinculados que la actividad consumirá y a los que tendrá acceso.
Actividad de webhook Use la actividad de webhook para llamar a un punto de conexión y pasar una dirección URL de devolución de llamada. La ejecución de canalización espera a que la devolución de llamada se invoque antes de continuar con la siguiente actividad.

JSON de canalización

Aquí encontrará cómo se define una canalización en formato JSON:

{
    "name": "PipelineName",
    "properties":
    {
        "description": "pipeline description",
        "activities":
        [
        ],
        "parameters": {
        },
        "concurrency": <your max pipeline concurrency>,
        "annotations": [
        ]
    }
}
Etiqueta Descripción Tipo Obligatorio
name Nombre de la canalización. Especifique un nombre que represente la acción que realizará la canalización.
  • Número máximo de caracteres: 140
  • Debe empezar por una letra, un número o un carácter de subrayado (_)
  • No se permiten los caracteres siguientes: ".", "+", "?", "/", "<",">","*"," %"," &",":"," "
String
description Especifique el texto que describe para qué se usa la canalización. String No
activities La sección activities puede contener una o más actividades definidas. Consulte la sección JSON de actividades para obtener más información sobre el elemento JSON de actividades. Array
parámetros La sección parámetros puede tener uno o varios de los parámetros definidos dentro de la canalización, lo que hace que la canalización sea flexible para su reutilización. List No
simultaneidad Número máximo de ejecuciones simultáneas que puede tener la canalización. De forma predeterminada, no hay ningún máximo. Si se alcanza el límite de simultaneidad, las ejecuciones de canalización adicionales se ponen en cola hasta que se completan las anteriores. Number No
annotations Lista de etiquetas asociadas a la canalización Array No

Actividad de JSON

La sección activities puede contener una o más actividades definidas. Existen dos tipos principales de actividades: Actividades de ejecución y de control.

Actividades de ejecución

Las actividades de ejecución incluyen las actividades de movimiento de datos y de transformación de datos. Tienen la siguiente estructura de nivel superior:

{
    "name": "Execution Activity Name",
    "description": "description",
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "linkedServiceName": "MyLinkedService",
    "policy":
    {
    },
    "dependsOn":
    {
    }
}

En la tabla siguiente se describen las propiedades en la definición JSON de la actividad:

Etiqueta Descripción Obligatorio
name Nombre de la actividad. Especifique un nombre que represente la acción que realizará la actividad.
  • Número máximo de caracteres: 55
  • Debe empezar por una letra, un número o un carácter de subrayado (_)
  • No se permiten los caracteres siguientes: ".", "+", "?", "/", "<",">","*"," %"," &",":"," "
description Texto que describe para qué se usa la actividad.
type Tipo de la actividad. Consulte las secciones Actividades de movimiento de datos, Actividades de transformación de datos y Actividades de control para ver los diferentes tipos de actividades.
linkedServiceName Nombre del servicio vinculado utilizado por la actividad.

Una actividad puede requerir que especifique el servicio vinculado que enlaza con el entorno de procesos necesario.
Sí para la actividad de HDInsight, la actividad de puntuación por lotes de Estudio de Azure Machine Learning (clásico) y la actividad de procedimiento almacenado.

No para todos los demás
typeProperties Las propiedades en la sección typeProperties dependen de cada tipo de actividad. Para ver las propiedades de tipo de una actividad, haga clic en vínculos a la actividad de la sección anterior. No
policy Directivas que afectan al comportamiento en tiempo de ejecución de la actividad. Esta propiedad incluye un comportamiento de tiempo de espera y reintento. Si no se especifica, se usan los valores predeterminados. Para obtener más información, consulte la sección Directiva de actividades. No
dependsOn Esta propiedad se utiliza para definir las dependencias de actividad, y cómo las actividades siguientes dependen de actividades anteriores. Para obtener más información, consulte Dependencia de actividades. No

Directiva de actividades

Las directivas afectan al comportamiento en tiempo de ejecución de una actividad, lo que proporciona las opciones de capacidad de configuración. Las directivas de actividades solo están disponibles para las actividades de ejecución.

Definición de directiva de actividades de JSON

{
    "name": "MyPipelineName",
    "properties": {
      "activities": [
        {
          "name": "MyCopyBlobtoSqlActivity",
          "type": "Copy",
          "typeProperties": {
            ...
          },
         "policy": {
            "timeout": "00:10:00",
            "retry": 1,
            "retryIntervalInSeconds": 60,
            "secureOutput": true
         }
        }
      ],
        "parameters": {
           ...
        }
    }
}
Nombre JSON Descripción Valores permitidos Obligatorio
timeout Especifica el tiempo de espera para que se ejecute la actividad. TimeSpan No. El tiempo de espera predeterminado es de 7 días.
retry Número máximo de reintentos Entero No. El valor predeterminado es 0.
retryIntervalInSeconds El retraso entre reintentos, en segundos. Entero No. El valor predeterminado es 30 segundos.
secureOutput Cuando se establece en true, la salida de la actividad se considera segura y no se registra para la supervisión. Boolean No. El valor predeterminado es False.

Actividad de control

Las actividades de control tienen la siguiente estructura de nivel superior:

{
    "name": "Control Activity Name",
    "description": "description",
    "type": "<ActivityType>",
    "typeProperties":
    {
    },
    "dependsOn":
    {
    }
}
Etiqueta Descripción Obligatorio
name Nombre de la actividad. Especifique un nombre que represente la acción que realizará la actividad.
  • Número máximo de caracteres: 55
  • Debe empezar por una letra, un número o un carácter de subrayado (_)
  • No se permiten los caracteres siguientes: ".", "+", "?", "/", "<",">","*"," %"," &",":"," "
    description Texto que describe para qué se usa la actividad.
    type Tipo de la actividad. Consulte las secciones Actividades de movimiento de datos, Actividades de transformación de datos y Actividades de control para ver los diferentes tipos de actividades.
    typeProperties Las propiedades en la sección typeProperties dependen de cada tipo de actividad. Para ver las propiedades de tipo de una actividad, haga clic en vínculos a la actividad de la sección anterior. No
    dependsOn Esta propiedad se utiliza para definir las dependencias de actividad, y cómo las actividades siguientes dependen de actividades anteriores. Para obtener más información, consulte Dependencia de actividades. No

    Dependencia de actividades

    La dependencia de actividades define cómo las actividades siguientes dependen de las actividades anteriores, lo que determina la condición de si se debe continuar ejecutando la tarea siguiente. Una actividad puede depender de una o varias actividades anteriores con distintas condiciones de dependencia.

    Las distintas condiciones de dependencia son: Correcto, Error, Omitido, Completado.

    Por ejemplo, si una canalización tiene la actividad A -> actividad B, los distintos escenarios que pueden ocurrir son:

    • La actividad B tiene una condición de dependencia en la actividad A con correcto: la actividad B solo se ejecuta si la actividad A tiene un estado final correcto.
    • La actividad B tiene una condición de dependencia en la actividad A con error: la actividad B solo se ejecuta si la actividad A tiene un estado final de error.
    • La actividad B tiene una condición de dependencia en la actividad A con completado: la actividad B se ejecuta si la actividad A tiene un estado final correcto o de error.
    • La actividad B tiene una condición de dependencia de la actividad A con omitido: la actividad B se ejecuta si la actividad A tiene un estado final de omitido. La omisión se produce en el escenario de la actividad X -> actividad Y -> actividad Z, donde cada actividad se ejecuta solo si la actividad anterior se realiza correctamente. Si se produce un error en la actividad X, la actividad Y tiene un estado de "Omitido" porque nunca se ejecuta. De forma similar, la actividad Z también tiene un estado de "Omitido".

    Ejemplo: La actividad 2 depende de que la actividad 1 se realice correctamente

    {
        "name": "PipelineName",
        "properties":
        {
            "description": "pipeline description",
            "activities": [
             {
                "name": "MyFirstActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                }
            },
            {
                "name": "MySecondActivity",
                "type": "Copy",
                "typeProperties": {
                },
                "linkedServiceName": {
                },
                "dependsOn": [
                {
                    "activity": "MyFirstActivity",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
              ]
            }
          ],
          "parameters": {
           }
        }
    }
    
    

    Canalización de copia de ejemplo

    En la canalización de ejemplo siguiente, hay una actividad del tipo Copy in the actividades . En este ejemplo, la actividad de copia copia datos de Azure Blob Storage a una base de datos de Azure SQL Database.

    {
      "name": "CopyPipeline",
      "properties": {
        "description": "Copy data from a blob to Azure SQL table",
        "activities": [
          {
            "name": "CopyFromBlobToSQL",
            "type": "Copy",
            "inputs": [
              {
                "name": "InputDataset"
              }
            ],
            "outputs": [
              {
                "name": "OutputDataset"
              }
            ],
            "typeProperties": {
              "source": {
                "type": "BlobSource"
              },
              "sink": {
                "type": "SqlSink",
                "writeBatchSize": 10000,
                "writeBatchTimeout": "60:00:00"
              }
            },
            "policy": {
              "retry": 2,
              "timeout": "01:00:00"
            }
          }
        ]
      }
    }
    

    Tenga en cuenta los siguientes puntos:

    • En la sección de actividades, solo hay una actividad con type establecido en Copy.
    • La entrada de la actividad está establecida en InputDataset, mientras que la salida está establecida en OutputDataset. Vea el artículo Conjuntos de datos para definir conjuntos de datos en JSON.
    • En la sección typeProperties, BlobSource se especifica como el tipo de origen y SqlSink como el tipo de receptor. En la sección Actividades de movimiento de datos, haga clic en el almacén de datos que quiere usar como origen o receptor para obtener más información sobre cómo mover datos con ese almacén de datos como origen o destino.

    Para obtener un tutorial completo de creación de esta canalización, consulte el Inicio rápido: Create a data factory (Crear una factoría de datos).

    Canalización de transformación de ejemplo

    En la canalización de ejemplo siguiente, hay una actividad del tipo HDInsightHive in the actividades . En este ejemplo, el actividad de HDInsight Hive transforma los datos de Azure Blob Storage mediante la ejecución de un archivo de script de Hive en un clúster de Azure HDInsight Hadoop.

    {
        "name": "TransformPipeline",
        "properties": {
            "description": "My first Azure Data Factory pipeline",
            "activities": [
                {
                    "type": "HDInsightHive",
                    "typeProperties": {
                        "scriptPath": "adfgetstarted/script/partitionweblogs.hql",
                        "scriptLinkedService": "AzureStorageLinkedService",
                        "defines": {
                            "inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/inputdata",
                            "partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/partitioneddata"
                        }
                    },
                    "inputs": [
                        {
                            "name": "AzureBlobInput"
                        }
                    ],
                    "outputs": [
                        {
                            "name": "AzureBlobOutput"
                        }
                    ],
                    "policy": {
                        "retry": 3
                    },
                    "name": "RunSampleHiveActivity",
                    "linkedServiceName": "HDInsightOnDemandLinkedService"
                }
            ]
        }
    }
    

    Tenga en cuenta los siguientes puntos:

    • En la sección de actividades, solo hay una actividad con type establecido en HDInsightHive.
    • El archivo de script de Hive, partitionweblogs.hql, se almacena en la cuenta de Azure Storage (especificada por la propiedad scriptLinkedService, llamada AzureStorageLinkedService) y en una carpeta de script en el contenedor adfgetstarted.
    • La sección defines se usa para especificar la configuración de runtime n que se pasa al script de Hive como valores de configuración de Hive (por ejemplo, ${hiveconf:inputtable}, ${hiveconf:partitionedtable}).

    La sección typeProperties es diferente para cada actividad de transformación. Para obtener información sobre las propiedades de tipo compatibles con una actividad de transformación, haga clic en la actividad de transformación en Actividades de transformación de datos.

    Para obtener un tutorial completo de creación de esta canalización, consulte Tutorial: transform data using Spark (Transformación de datos con Spark).

    Varias actividades en una canalización

    Las dos canalizaciones de ejemplo anteriores solo tienen una actividad, pero se pueden tener más de una actividad en una canalización. Si tiene varias actividades en una canalización y las actividades siguientes no son dependientes de actividades anteriores, las actividades se pueden ejecutar en paralelo.

    Puede encadenar dos actividades con la dependencia de actividades, que define cómo las actividades siguientes dependen de las actividades anteriores, lo que determina la condición de si se debe continuar ejecutando la tarea siguiente. Una actividad puede depender de una o más actividades anteriores con distintas condiciones de dependencia.

    Programación de canalizaciones

    Las canalizaciones se programan mediante desencadenadores. Hay diferentes tipos de desencadenadores (desencadenador Scheduler, que permite que las canalizaciones se desencadenen según una programación de reloj, así como el desencadenador manual, que desencadena canalizaciones a petición). Para obtener más información sobre los desencadenadores, consulte el artículo Ejecución y desencadenadores de canalización.

    Para hacer que el desencadenador dé inicio a una ejecución de canalización, debe incluir una referencia de canalización de la canalización en particular en la definición del desencadenador. Las canalizaciones y los desencadenadores tienen una relación "de n a m". Varios desencadenadores pueden dar comienzo a una única canalización y el mismo desencadenador puede iniciar varias canalizaciones. Una vez definido el desencadenador, debe iniciar el desencadenador para que comience a desencadenar la canalización. Para obtener más información sobre los desencadenadores, consulte el artículo Ejecución y desencadenadores de canalización.

    Por ejemplo, supongamos que tiene un desencadenador Scheduler, "Trigger A" que quiere que inicie la canalización, "MyCopyPipeline". El desencadenador se define tal como se muestra en el ejemplo siguiente:

    Definición de TriggerA

    {
      "name": "TriggerA",
      "properties": {
        "type": "ScheduleTrigger",
        "typeProperties": {
          ...
          }
        },
        "pipeline": {
          "pipelineReference": {
            "type": "PipelineReference",
            "referenceName": "MyCopyPipeline"
          },
          "parameters": {
            "copySourceName": "FileSource"
          }
        }
      }
    }
    

    Pasos siguientes

    Consulte los siguientes tutoriales para obtener instrucciones paso a paso sobre cómo crear canalizaciones con actividades: