Datos de flujo como entrada en Stream Analytics

Stream Analytics presenta una integración de primera clase con flujos de datos de Azure como entradas de tres tipos de recursos:

Estos recursos de entrada pueden proceder de la misma suscripción de Azure que el trabajo de Stream Analytics o de otra suscripción.

Compresión

Stream Analytics admite la compresión en todos los orígenes de entrada del flujo de datos. Los tipos de compresión admitidos son: Compresión GZip, Deflate y Ninguna. La compatibilidad con la compresión no está disponible para los datos de referencia. Si se comprimen los datos de entrada de Avro, se controlan de forma transparente. No es necesario especificar el tipo de compresión con la serialización de Avro.

Creación, edición o prueba de entradas

Puede usar Azure Portal, Visual Studio, y Visual Studio Code para agregar, ver o editar entradas existentes en su trabajo de streaming. También puede probar conexiones de entrada y probar consultas a partir de datos de ejemplo desde Azure Portal, Visual Studio y Visual Studio Code. Al escribir una consulta, se muestra una lista de las entradas en la cláusula FROM. Puede obtener la lista de las entradas disponibles en la página Consulta del portal. Si desea usar varias entradas, puede utilizar JOIN o escribir varias consultas SELECT.

Datos de flujo de Event Hubs

Azure Event Hubs ofrece servicios de ingesta de eventos de publicación y suscripción muy escalables. Un centro de eventos puede recopilar millones de eventos por segundo, por lo que se pueden procesar y analizar las grandes cantidades de datos que generan las aplicaciones y los dispositivos conectados. Juntos, Event Hubs y Stream Analytics proporcionan una solución de un extremo a otro para realizar análisis en tiempo real. Event Hubs le permite suministrar eventos a Azure en tiempo real y los trabajos de Stream Analytics pueden procesarlos también en tiempo real. Por ejemplo, se pueden enviar clics de web, lecturas de sensor o eventos de registro en línea a Event Hubs. Luego se pueden crear trabajos de Stream Analytics que usen Event Hubs como los flujos de datos de entrada para su filtrado, agregación y correlación en tiempo real.

EventEnqueuedUtcTime es la marca de tiempo de la llegada de un evento a un centro de eventos y es la marca de tiempo predeterminada de los eventos procedentes de Event Hubs hacia Stream Analytics. Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY.

Grupo de consumidores de Event Hubs

Se debe configurar cada entrada de centro de eventos de Stream Analytics para que tenga su propio grupo de consumidores. Cuando un trabajo contiene una autocombinación o tiene varias entradas, es posible que algunas entradas sean leídas por más de un lector de un nivel inferior. Esta situación afecta al número de lectores de un solo grupo de consumidores. El procedimiento recomendado para evitar superar el límite de cinco lectores por grupo de consumidores por cada partición de Event Hubs consiste en designar un grupo de consumidores para cada trabajo de Stream Analytics. También hay un límite de 20 grupos de consumidores para un centro de eventos de nivel estándar. Para obtener más información, consulte Troubleshoot Azure Stream Analytics inputs (Solución de problemas de las entradas de Azure Stream Analytics).

Creación de una entrada desde Event Hubs

En la siguiente tabla se explica cada propiedad de la página Nueva entrada de Azure Portal para transmitir la entrada de datos desde un centro de eventos:

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción en el que existe el recurso del centro de eventos.
Espacio de nombres del centro de eventos El espacio de nombres del centro de eventos es un contenedor para un conjunto de entidades de mensajería. Al crear un centro de eventos, también se crea el espacio de nombres.
Nombre del centro de eventos Nombre del centro de eventos que se usa como entrada.
Nombre de la directiva del centro de eventos Directiva de acceso compartido que proporciona acceso al centro de eventos. Cada directiva de acceso compartido tiene un nombre, los permisos establecidos y las claves de acceso. Esta opción se rellena automáticamente, a menos que elija proporcionar la configuración del centro de eventos manualmente.
Grupo de consumidores del centro de eventos (recomendado) Se recomienda encarecidamente usar un grupo de consumidores distinto para cada trabajo de Stream Analytics. Esta cadena identifica el grupo de consumidores que se usa para la ingesta de datos desde el centro de eventos. Si no se especifica ningún grupo de consumidores, el trabajo de Stream Analytics usa el grupo de consumidores $Default.
Clave de partición Este campo es opcional y solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usa el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo es "PartitionId".
Formato de serialización de eventos El formato de serialización (JSON, CSV, Avro u otro [Protobuf, XML, propietario...]) del flujo de datos de entrada. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Encoding Por el momento, UTF-8 es el único formato de codificación compatible.
Tipo de compresión de eventos El tipo de compresión utilizado para leer el flujo de datos entrante, como None (valor predeterminado), GZip o Deflate.

Cuando los datos proceden de una entrada de flujo de un centro de eventos, puede acceder a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
EventProcessedUtcTime Fecha y hora en que se procesó el evento por Stream Analytics.
EventEnqueuedUtcTime Fecha y la hora en que el Centro de eventos recibió el evento.
PartitionId Identificador de partición de base cero para el adaptador de entrada.

Por ejemplo, si usa estos campos, puede escribir una consulta similar al ejemplo siguiente:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Nota

Si se usa Event Hubs como punto de conexión para las rutas de IoT Hub, puede acceder a los metadatos de IoT Hub mediante la función GetMetadataPropertyValue.

Datos de flujo de IoT Hub

Azure IoT Hub es un agente de ingesta de eventos de suscripción y publicación muy escalable optimizado para escenarios de IoT.

La marca de tiempo predeterminada de los eventos procedentes de IoT Hub en Stream Analytics es la marca de tiempo correspondiente al momento en que el evento llega a IoT Hub, que es EventEnqueuedUtcTime. Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY.

Grupo de consumidores de IoT Hub

Se debe configurar cada entrada de IoT Hub de Stream Analytics para que tenga su propio grupo de consumidores. Cuando un trabajo contiene una autocombinación o tiene varias entradas, es posible que alguna entrada pueda leerla más de un lector de bajada. Esta situación afecta al número de lectores de un solo grupo de consumidores. El procedimiento recomendado para evitar superar el límite de cinco lectores por grupo de consumidores por cada partición de Azure IoT Hub consiste en designar un grupo de consumidores para cada trabajo de Stream Analytics.

Configuración de una instancia de IoT Hub como entrada de flujo de datos

En la siguiente tabla se explica cada propiedad de la página Nueva entrada de Azure Portal cuando se configura IoT Hub como entrada de flujo.

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción en la que se encuentra el recurso de IoT Hub.
IoT Hub Nombre de la instancia de IoT Hub que se usa como entrada.
Punto de conexión Punto de conexión de IoT Hub.
Nombre de directiva de acceso compartido Directiva de acceso compartido que proporciona acceso a IoT Hub. Cada directiva de acceso compartido tiene un nombre, los permisos establecidos y las claves de acceso.
Clave de directiva de acceso compartido Clave de acceso compartido que se usa para autorizar el acceso a IoT Hub. Esta opción se rellena automáticamente, a menos que elija proporcionar la configuración de IoT Hub manualmente.
Grupo de consumidores Se recomienda encarecidamente usar un grupo de consumidores distinto para cada trabajo de Stream Analytics. El grupo de consumidores que se usa para ingerir datos desde Azure IoT Hub. Stream Analytics usa el grupo de consumidores $Default, a menos que se especifique lo contrario.
Clave de partición Este campo es opcional y solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usa el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo es "PartitionId".
Formato de serialización de eventos El formato de serialización (JSON, CSV, Avro u otro [Protobuf, XML, propietario...]) del flujo de datos de entrada. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Encoding Por el momento, UTF-8 es el único formato de codificación compatible.
Tipo de compresión de eventos El tipo de compresión utilizado para leer el flujo de datos entrante, como None (valor predeterminado), GZip o Deflate.

Cuando usa datos de flujo de IoT Hub, puede acceder a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
EventProcessedUtcTime Fecha y la hora en que se produjo el evento.
EventEnqueuedUtcTime La fecha y la hora en que el evento se recibió por Azure IoT Hub.
PartitionId Identificador de partición de base cero para el adaptador de entrada.
IoTHub.MessageId Identificador que sirve para correlacionar la comunicación bidireccional en IoT Hub.
IoTHub.CorrelationId Identificador que se usa en las respuestas a mensajes y en los comentarios en IoT Hub.
IoTHub.ConnectionDeviceId Identificador de autenticación que se usa para enviar este mensaje. Este valor lo marca IoT Hub en los mensajes servicebound.
IoTHub.ConnectionDeviceGenerationId Identificador de generación del dispositivo autenticado que se ha usado para enviar este mensaje. Este valor lo marca IoT Hub en los mensajes servicebound.
IoTHub.EnqueuedTime Hora en la que IoT Hub recibió el mensaje.

Transmisión de datos de Blob Storage o Data Lake Storage Gen2

En escenarios con grandes cantidades de datos no estructurados para almacenar en la nube, Azure Blob Storage o Azure Data Lake Storage Gen2 (ADLS Gen2) ofrecen una solución rentable y escalable. Los datos de Blob Storage o ADLS Gen2 se consideran normalmente datos en reposo; sin embargo, Stream Analytics puede procesarlos como un flujo de datos.

Un escenario que se usa normalmente para utilizar tales entradas con Stream Analytics es el procesamiento de registros. En este escenario, se han capturado archivos de datos de telemetría de un sistema y es preciso analizarlos y procesarlos para extraer datos significativos.

La marca de tiempo predeterminada de un evento de Blob Storage o ADLS Gen2 en Stream Analytics es la marca de tiempo en que se modificó por última vez, que es BlobLastModifiedUtcTime. Si se carga un blob en una cuenta de almacenamiento a las 13:00 y el trabajo de Azure Stream Analytics se inicia con la opción Now (Ahora) a las 13:01, el blob no se seleccionará, ya que su hora de modificación está fuera del período de ejecución del trabajo.

Si se carga un blob en un contenedor de cuenta de almacenamiento a las 13:00 y el trabajo Azure Stream Analytics se inicia con la opción Hora personalizada a las 13:00 o antes, el blob se recogerá, ya que su hora de modificación está dentro del período de ejecución del trabajo.

Si se inicia un trabajo de Azure Stream Analytics mediante la opción Ahora a las 13:00 y se carga un blob en el contenedor de la cuenta de almacenamiento a las 13:01, Azure Stream Analytics recogerá dicho blob. La marca de tiempo asignada a cada blob se basa únicamente en BlobLastModifiedTime. La carpeta en la que se encuentra el BLOB no guarda ninguna relación con la marca de tiempo asignada. Por ejemplo, si hay un blob 2019/10-01/00/b1.txt con un objeto BlobLastModifiedTime de 2019-11-11, la marca de tiempo asignada a este blob es 2019-11-11.

Para procesar los datos como un flujo con una marca de tiempo en la carga del evento, se debe usar la palabra clave TIMESTAMP BY. Un trabajo de Stream Analytics extrae datos de entrada de Azure Blob Storage o ADLS Gen2 cada segundo si el archivo de blob está disponible. Si el archivo de blob no está disponible, hay un retroceso exponencial con un retraso de tiempo máximo de 90 segundos.

Las entradas con formato CSV requieren una fila de encabezado para definir los campos del conjunto de datos, y todos los campos de fila de encabezado deben ser únicos.

Nota

Stream Analytics no permite agregar contenido a un archivo de blob existente. Stream Analytics solo verá cada archivo una vez y los cambios que se produzcan en él después de que el trabajo lea los datos no se procesan. Se recomienda cargar todos los datos de un archivo de blob a la vez y, a continuación, agregar los eventos más recientes a un archivo de blob diferente y nuevo.

En escenarios en los que se agregan continuamente muchos blobs que Stream Analytics procesa a medida que se agregan, es posible que se omitan algunos blobs en raras ocasiones debido a la granularidad del objeto BlobLastModifiedTime. Puede mitigar esto cargando los blobs con al menos dos segundos de diferencia. Si esta opción no es factible, puede usar Event Hubs para transmitir grandes volúmenes de eventos.

Configuración de Blob Storage como entrada de flujo

En la siguiente tabla se explica cada propiedad de la página Nueva entrada de Azure Portal cuando se configura Blob Storage como entrada de flujo.

Propiedad Descripción
Alias de entrada Nombre descriptivo que se usará en la consulta del trabajo para hacer referencia a esta entrada.
Suscripción Elija la suscripción en la que se encuentra el recurso de almacenamiento.
Cuenta de almacenamiento Nombre de la cuenta de almacenamiento donde se encuentran los archivos de blob.
Clave de cuenta de almacenamiento La clave secreta asociada con la cuenta de almacenamiento. Esta opción se rellena automáticamente, a menos que elija proporcionar la configuración de forma manual.
Contenedor Los contenedores proporcionan una agrupación lógica de los blobs. Puede elegir el contenedor Usar existente o Crear nuevo para crear un contenedor.
Patrón de ruta de acceso (opcional) Ruta de acceso de archivo que sirve para ubicar los blobs dentro del contenedor especificado. Si desea leer los blobs de la raíz del contenedor, no establezca un patrón de ruta de acceso. Dentro de la ruta, puede especificar una o más instancias de las tres variables siguientes: {date}, {time} o {partition}.

Ejemplo 1: cluster1/logs/{date}/{time}/{partition}

Ejemplo 2: cluster1/logs/{date}

El carácter * no es un valor permitido para el prefijo de ruta de acceso. Solo se permiten caracteres de Blob de Azure. No incluya nombres de contenedor ni nombres de archivo.
Formato de fecha (opcional) Si usa la variable de fecha en la ruta, formato de fecha por el que se organizan los archivos. Ejemplo: YYYY/MM/DD

Cuando la entrada de blob incluye {date} o {time} en su ruta de acceso, las carpetas se examinan en orden temporal ascendente.
Formato de hora (opcional) Si usa la variable de hora en la ruta, formato de hora por el que se organizan los archivos. Actualmente, el único valor admitido es HH para las horas.
Clave de partición Este campo es opcional y solo está disponible si el trabajo está configurado para usar el nivel de compatibilidad 1.2 o superior. Si la entrada está particionada por una propiedad, puede agregar aquí el nombre de esta propiedad. Se usa para mejorar el rendimiento de la consulta si incluye una cláusula PARTITION BY o GROUP BY en esta propiedad. Si este trabajo usa el nivel de compatibilidad 1.2 o superior, el valor predeterminado de este campo es "PartitionId".
Count of input partitions (Recuento de particiones de entrada) Este campo solo está presente cuando {partition} existe en el patrón de ruta de acceso. El valor de esta propiedad es un entero > = 1. Siempre que {partition} aparezca en pathPattern, se usará un número entre 0 y el valor de este campo -1.
Formato de serialización de eventos El formato de serialización (JSON, CSV, Avro u otro [Protobuf, XML, propietario...]) del flujo de datos de entrada. Asegúrese de que el formato JSON responde a la especificación y no incluye un 0 inicial para números decimales.
Encoding Por el momento, UTF-8 es el único formato de codificación compatible para CSV y JSON.
Compresión El tipo de compresión utilizado para leer el flujo de datos entrante, como None (valor predeterminado), GZip o Deflate.

Cuando los datos proceden de un origen de Blob Storage, puede acceder a los siguientes campos de metadatos en la consulta de Stream Analytics:

Propiedad Descripción
BlobName Nombre del blob de entrada de donde procede el evento.
EventProcessedUtcTime Fecha y hora en que se procesó el evento por Stream Analytics.
BlobLastModifiedUtcTime Fecha y la hora en que se modificó por última vez el blob.
PartitionId Identificador de partición de base cero para el adaptador de entrada.

Por ejemplo, si usa estos campos, puede escribir una consulta similar al ejemplo siguiente:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Pasos siguientes