Creación de una canalización de datos con Data Collector APICreate a data pipeline with the Data Collector API

Data Collector API de Azure Monitor permite importar los datos de registros personalizados en un área de trabajo de Log Analytics en Azure Monitor.The Azure Monitor Data Collector API allows you to import any custom log data into a Log Analytics workspace in Azure Monitor. Los únicos requisitos son que los datos tengan formato JSON y estén divididos en segmentos de 30 MB o menos.The only requirements are that the data be JSON-formatted and split into 30 MB or less segments. Se trata de un mecanismo completamente flexible que se puede utilizar de muchas maneras: desde datos enviados directamente por la aplicación hasta cargas ad hoc de un solo uso.This is a completely flexible mechanism that can be plugged into in many ways: from data being sent directly from your application, to one-off adhoc uploads. Este artículo describe algunos puntos de partida para un escenario común: la necesidad de cargar los datos almacenados en archivos de forma regular y automatizada.This article will outline some starting points for a common scenario: the need to upload data stored in files on a regular, automated basis. Aunque la canalización que se presenta aquí no será la de mayor rendimiento ni optimización, está pensada para servir como punto de partida para la creación de una canalización de producción propia.While the pipeline presented here will not be the most performant or otherwise optimized, it is intended to serve as a starting point towards building a production pipeline of your own.

Nota

Este artículo se ha actualizado recientemente para usar el término registros de Azure Monitor en lugar de Log Analytics.This article was recently updated to use the term Azure Monitor logs instead of Log Analytics. Los datos de registro siguen almacenándose en un área de trabajo de Log Analytics y siguen recopilándose y analizándose por el mismo servicio de Log Analytics.Log data is still stored in a Log Analytics workspace and is still collected and analyzed by the same Log Analytics service. Estamos actualizando la terminología para reflejar mejor el rol de los registros de Azure Monitor.We are updating the terminology to better reflect the role of logs in Azure Monitor. Consulte Azure Monitor terminology changes (Cambios en la terminología de Azure Monitor) para obtener más información.See Azure Monitor terminology changes for details.

Problema de ejemploExample problem

El resto de este artículo, examinaremos los datos de la vista de página de Application Insights.For the remainder of this article, we will examine page view data in Application Insights. En nuestro escenario hipotético, queremos correlacionar información geográfica recopilada de manera predeterminada por el SDK de Application Insights con datos personalizados que contienen la población de todos los países o regiones del mundo, con el fin de identificar dónde debemos dedicar el máximo importe en dólares en marketing.In our hypothetical scenario, we want to correlate geographical information collected by default by the Application Insights SDK to custom data containing the population of every country/region in the world, with the goal of identifying where we should be spending the most marketing dollars.

Se usa un origen de datos público como UN World Population Prospects (Perspectivas de población mundial de la ONU) para este propósito.We use a public data source such as the UN World Population Prospects for this purpose. Los datos tendrán el siguiente esquema simple:The data will have the following simple schema:

Esquema de ejemplo simple

En nuestro ejemplo, se supone que se cargará un archivo nuevo con los datos del último año tan pronto como esté disponible.In our example, we assume that we will upload a new file with the latest year’s data as soon as it becomes available.

Diseño generalGeneral design

Se usa una lógica de tipo ETL clásica para diseñar nuestra canalización.We are using a classic ETL-type logic to design our pipeline. La arquitectura tendrá el aspecto siguiente:The architecture will look as follows:

Arquitectura de la canalización de recopilación de datos

En este artículo no se explica cómo crear datos ni cómo cargarlos en una cuenta de Azure Blob Storage.This article will not cover how to create data or upload it to an Azure Blob Storage account. En su lugar, se recoge el flujo en cuanto se carga un archivo nuevo en el blob.Rather, we pick the flow up as soon as a new file is uploaded to the blob. Desde aquí:From here:

  1. Un proceso detectará que se han cargado nuevos datos.A process will detect that new data has been uploaded. En nuestro ejemplo se utiliza una aplicación lógica de Azure que dispone de un desencadenador para detectar la carga de nuevos datos en un blob.Our example uses an Azure Logic App, which has available a trigger to detect new data being uploaded to a blob.

  2. Un procesador lee estos nuevos datos y los convierte en JSON, el formato requerido por Azure Monitor. En este ejemplo, usamos una función de Azure como una manera ligera, eficaz y rentable de ejecutar el código de procesamiento.A processor reads this new data and converts it to JSON, the format required by Azure Monitor In this example, we use an Azure Function as a lightweight, cost-efficient way of executing our processing code. La misma aplicación lógica que se utiliza para detectar nuevos datos inicia la función.The function is kicked off by the same Logic App that we used to detect a the new data.

  3. Por último, una vez que el objeto JSON está disponible, se envía a Azure Monitor.Finally, once the JSON object is available, it is sent to Azure Monitor. La misma aplicación lógica envía los datos a Azure Monitor mediante la actividad integrada de Log Analytics Data Collector.The same Logic App sends the data to Azure Monitor using the built in Log Analytics Data Collector activity.

Aunque la configuración detallada del almacenamiento de blobs, la aplicación lógica o la función de Azure no se describen en este artículo, hay disponibles instrucciones detalladas en las páginas de los productos específicos.While the detailed setup of the blob storage, Logic App, or Azure Function is not outlined in this article, detailed instructions are available on the specific products’ pages.

Para supervisar esta canalización, se usan Application Insights para supervisar la función de Azure (más información aquí) y Azure Monitor para supervisar la aplicación lógica (más información aquí).To monitor this pipeline, we use Application Insights to monitor our Azure Function details here, and Azure Monitor to monitor our Logic App details here.

Configuración de la canalizaciónSetting up the pipeline

Para establecer la canalización, primero debe asegurarse de que tiene el contenedor de blobs creado y configurado.To set the pipeline, first make sure you have your blob container created and configured. Del mismo modo, asegúrese de que se ha creado el área de trabajo de Log Analytics donde le gustaría enviar los datos.Likewise, make sure that the Log Analytics workspace where you’d like to send the data to is created.

Ingesta de datos JSONIngesting JSON data

La ingesta de datos JSON es trivial con Logic Apps y, puesto que no se necesita llevar a cabo ninguna transformación, podemos encapsular toda la canalización en una única aplicación lógica.Ingesting JSON data is trivial with Logic Apps, and since no transformation needs to take place, we can encase the entire pipeline in a single Logic App. Una vez que se han configurado el contenedor de blobs y el área de trabajo de Log Analytics, cree una nueva aplicación lógica y configúrela como sigue:Once both the blob container and the Log Analytics workspace have been configured, create a new Logic App and configure it as follows:

Ejemplo de flujo de trabajo de Logic Apps

Guarde la aplicación lógica y pruébela.Save your Logic App and proceed to test it.

Ingesta de XML, CSV u otros formatos de datosIngesting XML, CSV, or other formats of data

Hoy en día, Logic Apps no tiene funcionalidades integradas para transformar fácilmente XML, CSV u otros tipos al formato JSON.Logic Apps today does not have built-in capabilities to easily transform XML, CSV, or other types into JSON format. Por lo tanto, es necesario usar otros medios para realizar esta transformación.Therefore, we need to use another means to complete this transformation. En este artículo, usamos las funcionalidades de proceso sin servidor de Azure Functions como una manera muy ligera y rentable de hacerlo.For this article, we use the serverless compute capabilities of Azure Functions as a very lightweight and cost-friendly way of doing so.

En este ejemplo se analiza un archivo CSV, pero se puede procesar cualquier otro tipo de archivo de forma similar.In this example, we parse a CSV file, but any other file type can be similarly processed. Basta con modificar el apartado de deserialización de la función de Azure para reflejar la lógica adecuada para el tipo de datos específico.Simply modify the deserializing portion of the Azure Function to reflect the correct logic for your specific data type.

  1. Cree una nueva función de Azure, con el runtime v1 de la función y la opción basado en consumo cuando se le solicite.Create a new Azure Function, using the Function runtime v1 and consumption-based when prompted. Seleccione la plantilla Desencadenador HTTP para C# como punto de partida, que configura los enlaces tal y como se necesita.Select the HTTP trigger template targeted at C# as a starting point that configures your bindings as we require.

  2. En la pestaña Ver archivos del panel derecho, cree un nuevo archivo llamado project.json y pegue el siguiente código de los paquetes de NuGet que estamos usando:From the View Files tab on the right pane, create a new file called project.json and paste the following code from NuGet packages that we are using:

    Proyecto de ejemplo de Azure Functions

    {
      "frameworks": {
        "net46":{
          "dependencies": {
            "CsvHelper": "7.1.1",
            "Newtonsoft.Json": "11.0.2"
          }  
        }  
       }  
     }  
    
  3. Cambie a run.csx en el panel derecho y reemplace el código predeterminado por el siguiente.Switch to run.csx from the right pane, and replace the default code with the following.

    Nota

    Para el proyecto, debe reemplazar el modelo de registro (la clase "PopulationRecord") por su propio esquema de datos.For your project, you have to replace the record model (the “PopulationRecord” class) with your own data schema.

    using System.Net;
    using Newtonsoft.Json;
    using CsvHelper;
    
    class PopulationRecord
    {
        public String Location { get; set; }
        public int Time { get; set; }
        public long Population { get; set; }
    }
    
    public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)
    {
        string filePath = await req.Content.ReadAsStringAsync(); //get the CSV URI being passed from Logic App
        string response = "";
    
        //get a stream from blob
        WebClient wc = new WebClient();
        Stream s = wc.OpenRead(filePath);         
    
        //read the stream
        using (var sr = new StreamReader(s))
        {
            var csvReader = new CsvReader(sr);
    
            var records = csvReader.GetRecords<PopulationRecord>(); //deserialize the CSV stream as an IEnumerable
    
            response = JsonConvert.SerializeObject(records); //serialize the IEnumerable back into JSON
        }    
    
        return response == null
            ? req.CreateResponse(HttpStatusCode.BadRequest, "There was an issue getting data")
            : req.CreateResponse(HttpStatusCode.OK, response);
     }  
    
  4. Guarde la función.Save your function.

  5. Pruebe la función para asegurarse de que el código funciona correctamente.Test the function to make sure the code is working correctly. Cambie a la pestaña Probar en el panel derecho y configure la prueba como sigue.Switch to the Test tab in the right pane, configuring the test as follows. Especifique un vínculo a un blob con datos de ejemplo en cuadro de texto Cuerpo de la solicitud.Place a link to a blob with sample data into the Request body textbox. Después de hacer clic en Ejecutar, debería ver la salida JSON en el cuadro Salida:After clicking Run, you should see JSON output in the Output box:

    Código de prueba de Function App

Ahora debemos volver atrás y modificar la aplicación lógica que comenzamos a crear anteriormente para incluir los datos ingeridos y convertidos a formato JSON.Now we need to go back and modify the Logic App we started building earlier to include the data ingested and converted to JSON format. Mediante el Diseñador de vistas, realice la configuración como se indica a continuación y guarde la aplicación lógica:Using View Designer, configure as follows and then save your Logic App:

Ejemplo completo del flujo de trabajo de Logic Apps

Prueba de la canalizaciónTesting the pipeline

Ahora puede cargar un archivo nuevo en el blob que configuró anteriormente y será supervisado por la aplicación lógica.Now you can upload a new file to the blob configured earlier and have it monitored by your Logic App. En unos instantes, verá el inicio de una nueva instancia de la aplicación lógica, la llamada a la función de Azure y, después, el envío correcto de los datos a Azure Monitor.Soon, you should see a new instance of the Logic App kick off, call out to your Azure Function, and then successfully send the data to Azure Monitor.

Nota

Puede tardar hasta 30 minutos para que los datos aparezcan en Azure Monitor la primera vez que envía un nuevo tipo de datos.It can take up to 30 minutes for the data to appear in Azure Monitor the first time you send a new data type.

Correlación con otros datos de Log Analytics y Application InsightsCorrelating with other data in Log Analytics and Application Insights

Para completar nuestro objetivo de poner en correlación los datos de la vista de página de Application Insights con los datos de población que se han ingerido del origen de datos personalizado, ejecute la siguiente consulta desde la ventana de Application Insights Analytics o en el área de trabajo de Log Analytics:To complete our goal of correlating Application Insights page view data with the population data we ingested from our custom data source, run the following query from either your Application Insights Analytics window or Log Analytics workspace:

app("fabrikamprod").pageViews
| summarize numUsers = count() by client_CountryOrRegion
| join kind=leftouter (
   workspace("customdatademo").Population_CL
) on $left.client_CountryOrRegion == $right.Location_s
| project client_CountryOrRegion, numUsers, Population_d

La salida ahora debe mostrar los dos orígenes de datos unidos.The output should show the two data sources now joined.

Correlación de datos separados en un ejemplo de resultado de búsqueda

Mejoras sugeridas para una canalización de producciónSuggested improvements for a production pipeline

En este artículo se presenta un prototipo funcional, cuya lógica subyacente se puede aplicar a una verdadera solución con calidad de producción.This article presented a working prototype, the logic behind which can be applied towards a true production-quality solution. Para esta solución con calidad de producción, se recomiendan las siguientes mejoras:For such a production-quality solution, the following improvements are recommended:

  • Agregue control de errores y lógica de reintento en la aplicación lógica y en la función.Add error handling and retry logic in your Logic App and Function.
  • Agregue lógica para asegurarse de que no se supera el límite de 30 MB en una sola llamada a Log Analytics Ingestion API.Add logic to ensure that the 30MB/single Log Analytics Ingestion API call limit is not exceeded. Divida los datos en segmentos más pequeños si es necesario.Split the data into smaller segments if needed.
  • Configure una directiva de limpieza en el almacenamiento de blobs.Set up a clean-up policy on your blob storage. Una vez que se han enviado correctamente al área de trabajo de Log Analytics, a menos que quiera mantener los datos sin procesar disponibles para fines de archivo, no hay ninguna razón para seguir almacenándolos.Once successfully sent to the Log Analytics workspace, unless you’d like to keep the raw data available for archival purposes, there is no reason to continue storing it.
  • Compruebe que la supervisión está habilitada en la canalización completa y agregue puntos de seguimiento y alertas según corresponda.Verify monitoring is enabled across the full pipeline, adding trace points and alerts as appropriate.
  • Aproveche las ventajas del control de código fuente para administrar el código de la función y la aplicación lógica.Leverage source control to manage the code for your function and Logic App.
  • Asegúrese de seguir una directiva de administración de cambios adecuada, de manera que si cambia el esquema, la función y la aplicación lógica se modifican en consecuencia.Ensure that a proper change management policy is followed, such that if the schema changes, the function and Logic Apps are modified accordingly.
  • Si va a cargar varios tipos de datos diferentes, divídalos en carpetas individuales dentro del contenedor de blobs y cree la lógica en función del tipo de datos.If you are uploading multiple different data types, segregate them into individual folders within your blob container, and create logic to fan the logic out based on the data type.

Pasos siguientesNext steps

Obtenga más información sobre Data Collector API para escribir datos en el área de trabajo de Log Analytics desde cualquier cliente de API REST.Learn more about the Data Collector API to write data to Log Analytics workspace from any REST API client.