Partilhar via


Criar um trabalho de streaming de dados no Azure SQL Edge

Importante

O Azure SQL Edge não suporta mais a plataforma ARM64.

Este artigo explica como criar um trabalho de streaming T-SQL no Azure SQL Edge. Você cria os objetos de entrada e saída de fluxo externo e, em seguida, define a consulta de trabalho de streaming como parte da criação de trabalho de streaming.

Configurar os objetos de entrada e saída de fluxo externo

O streaming T-SQL usa a funcionalidade de fonte de dados externa do SQL Server para definir as fontes de dados associadas às entradas e saídas de fluxo externo do trabalho de streaming. Use os seguintes comandos T-SQL para criar um objeto de entrada ou saída de fluxo externo:

Além disso, se o Azure SQL Edge, o SQL Server ou o Banco de Dados SQL do Azure for usado como um fluxo de saída, você precisará da CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL). Este comando T-SQL define as credenciais para acessar o banco de dados.

Fontes de dados de fluxo de entrada e saída suportadas

Atualmente, o SQL Edge do Azure dá suporte apenas às seguintes fontes de dados como entradas e saídas de fluxo.

Tipo de origem de dados Entrada Saída Descrição
Hub do Azure IoT Edge Y Y Fonte de dados para ler e gravar dados de streaming em um hub do Azure IoT Edge. Para obter mais informações, consulte IoT Edge Hub.
Base de Dados SQL N Y Conexão de fonte de dados para gravar dados de streaming no Banco de dados SQL. O banco de dados pode ser um banco de dados local no Azure SQL Edge ou um banco de dados remoto no SQL Server ou no Banco de Dados SQL do Azure.
Kafka Y N Fonte de dados para ler dados de streaming de um tópico de Kafka.

Exemplo: Criar um objeto de entrada/saída de fluxo externo para o hub do Azure IoT Edge

O exemplo a seguir cria um objeto de fluxo externo para o hub do Azure IoT Edge. Para criar uma fonte de dados de entrada/saída de fluxo externo para o hub do Azure IoT Edge, primeiro você precisa criar um formato de arquivo externo para o layout dos dados que estão sendo lidos ou gravados também.

  1. Crie um formato de arquivo externo do tipo JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Crie uma fonte de dados externa para o hub do Azure IoT Edge. O script T-SQL a seguir cria uma conexão de fonte de dados com um hub IoT Edge que é executado no mesmo host do Docker que o Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Crie o objeto de fluxo externo para o hub do Azure IoT Edge. O script T-SQL a seguir cria um objeto de fluxo para o hub IoT Edge. No caso de um objeto de fluxo de hub IoT Edge, o parâmetro LOCATION é o nome do tópico ou canal do hub IoT Edge que está sendo lido ou gravado.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Exemplo: Criar um objeto de fluxo externo para o Banco de Dados SQL do Azure

O exemplo a seguir cria um objeto de fluxo externo para o banco de dados local no Azure SQL Edge.

  1. Crie uma chave mestra no banco de dados. Isso é necessário para criptografar o segredo da credencial.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Crie uma credencial com escopo de banco de dados para acessar a origem do SQL Server. O exemplo a seguir cria uma credencial para a fonte de dados externa, com IDENTITY = username e SECRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. Crie uma fonte de dados externa com CREATE EXTERNAL DATA SOURCE. O exemplo a seguir:

    • Cria uma fonte de dados externa chamada LocalSQLOutput.
    • Identifica a fonte de dados externa (LOCATION = '<vendor>://<server>[:<port>]'). No exemplo, ele aponta para uma instância local do Azure SQL Edge.
    • Usa a credencial criada anteriormente.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Crie o objeto de fluxo externo. O exemplo a seguir cria um objeto de fluxo externo apontando para um dbo de tabela . TemperatureMeasurements, na base de dados MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Exemplo: Criar um objeto de fluxo externo para Kafka

O exemplo a seguir cria um objeto de fluxo externo para o banco de dados local no Azure SQL Edge. Este exemplo pressupõe que o servidor kafka esteja configurado para acesso anônimo.

  1. Crie uma fonte de dados externa com CREATE EXTERNAL DATA SOURCE. O exemplo a seguir:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Crie um formato de arquivo externo para a entrada Kafka. O exemplo a seguir criou um formato de arquivo JSON com GZipped Compression.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Crie o objeto de fluxo externo. O exemplo a seguir cria um objeto de fluxo externo apontando para o tópico TemperatureMeasurementKafka.

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Criar o trabalho de streaming e as consultas de streaming

Use o procedimento armazenado do sistema para definir as consultas de streaming e criar o sys.sp_create_streaming_job trabalho de streaming. O sp_create_streaming_job procedimento armazenado usa os seguintes parâmetros:

O exemplo a seguir cria um trabalho de streaming simples com uma consulta de streaming. Essa consulta lê as entradas do hub IoT Edge e grava dbo.TemperatureMeasurements no banco de dados.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

O exemplo a seguir cria um trabalho de streaming mais complexo com várias consultas diferentes. Essas consultas incluem uma que usa a função interna AnomalyDetection_ChangePoint para identificar anomalias nos dados de temperatura.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Iniciar, parar, descartar e monitorar trabalhos de streaming

Para iniciar um trabalho de streaming no Azure SQL Edge, execute o sys.sp_start_streaming_job procedimento armazenado. O procedimento armazenado requer o nome do trabalho de streaming para iniciar, como entrada.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Para interromper um trabalho de streaming, execute o sys.sp_stop_streaming_job procedimento armazenado. O procedimento armazenado requer que o nome do trabalho de streaming seja interrompido, como entrada.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Para descartar (ou excluir) um trabalho de streaming, execute o sys.sp_drop_streaming_job procedimento armazenado. O procedimento armazenado requer que o nome do trabalho de streaming seja descartado, como entrada.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Para obter o status atual de um trabalho de streaming, execute o sys.sp_get_streaming_job procedimento armazenado. O procedimento armazenado requer que o nome do trabalho de streaming seja descartado, como entrada. Ele gera o nome e o status atual do trabalho de streaming.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

O trabalho de streaming pode ter qualquer um dos seguintes status:

Status Descrição
Criado O trabalho de streaming foi criado, mas ainda não foi iniciado.
A iniciar O trabalho de streaming está em fase inicial.
Períodos O trabalho de streaming está em execução, mas não há entrada para processar.
Em processamento O trabalho de streaming está em execução e está processando entradas. Esse estado indica um estado íntegro para o trabalho de streaming.
Degradado O trabalho de streaming está em execução, mas houve alguns erros não fatais durante o processamento de entrada. O trabalho de entrada continua a ser executado, mas descartará as entradas que encontrarem erros.
Parado O trabalho de streaming foi interrompido.
Com Falha O trabalho de streaming falhou. Isso geralmente é uma indicação de um erro fatal durante o processamento.

Nota

Como o trabalho de streaming é executado de forma assíncrona, o trabalho pode encontrar erros no tempo de execução. Para solucionar problemas de uma falha de trabalho de streaming, use o procedimento armazenado ou revise o sys.sp_get_streaming_job log do Docker do contêiner SQL Edge do Azure, que pode fornecer os detalhes do erro do trabalho de streaming.

Próximos passos