Partilhar via


Ingerir dados com o Apache Flink no Azure Data Explorer

O Apache Flink é um motor de processamento distribuído e de arquitetura para cálculos com estado através de fluxos de dados não vinculados e vinculados.

O conector Flink é um projeto open source que pode ser executado em qualquer cluster de Flink. Implementa o sink de dados para mover dados de um cluster Flink. Ao utilizar o conector para o Apache Flink, pode criar aplicações rápidas e dimensionáveis direcionadas para cenários baseados em dados, por exemplo, machine learning (ML), Extract-Transform-Load (ETL) e Log Analytics.

Neste artigo, irá aprender a utilizar o conector Flink para enviar dados do Flink para a sua tabela. Pode criar uma tabela e mapeamento de dados, direcionar o Flink para enviar dados para a tabela e, em seguida, validar os resultados.

Pré-requisitos

Para projetos Flink que utilizam o Maven para gerir dependências, integre o Sink do Flink Connector Core para o Azure Data Explorer ao adicioná-lo como uma dependência:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Para projetos que não utilizam o Maven para gerir dependências, clone o repositório do Conector do Azure Data Explorer para Apache Flink e crie-o localmente. Esta abordagem permite-lhe adicionar manualmente o conector ao repositório maven local com o comando mvn clean install -DskipTests.

Pode autenticar a partir do Flink para utilizar uma aplicação Microsoft Entra ID ou uma identidade gerida.

Este principal de serviço será a identidade utilizada pelo conector para escrever dados na sua tabela no Kusto. Posteriormente, irá conceder permissões para que este principal de serviço aceda aos recursos do Kusto.

  1. Inicie sessão na sua subscrição do Azure através da CLI do Azure. Em seguida, autentique-se no browser.

    az login
    
  2. Escolha a subscrição para alojar o principal. Este passo é necessário quando tem várias subscrições.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Crie o principal de serviço. Neste exemplo, o principal de serviço chama-se my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. A partir dos dados JSON devolvidos, copie o appId, passworde tenant para utilização futura.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Criou a sua aplicação Microsoft Entra e principal de serviço.

  1. Conceda permissões de utilizador da aplicação na base de dados:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Conceda à aplicação permissões de ingestor ou de administrador na tabela. As permissões necessárias dependem do método de escrita de dados escolhido. As permissões de ingestor são suficientes para SinkV2, enquanto WriteAndSink requer permissões de administrador.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Para obter mais informações sobre autorização, veja Controlo de acesso baseado em funções do Kusto.

Para escrever dados a partir do Flink:

  1. Importe as opções necessárias:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Utilize a sua aplicação ou identidade gerida para Autenticar.

    Para autenticação de aplicações:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Para autenticação de identidade gerida:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Configure os parâmetros de sink, como a base de dados e a tabela:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Pode adicionar mais opções, conforme descrito na tabela seguinte:

    Opção Descrição Valor Predefinido
    IngestionMappingRef Faz referência a um mapeamento de ingestão existente.
    FlushImmediately Elimina imediatamente os dados e pode causar problemas de desempenho. Este método não é recomendado.
    BatchIntervalMs Controla a frequência com que os dados são descarregados. 30 segundos
    BatchSize Define o tamanho do lote para registos de memória intermédia antes da descarga. 1000 registos
    ClientBatchSizeLimit Especifica o tamanho em MB de dados agregados antes da ingestão. 300 MB
    PollForIngestionStatus Se for verdade, o conector procura o estado de ingestão após a eliminação dos dados. false
    DeliveryGuarantee Determina a semântica de garantia de entrega. Para alcançar exatamente uma vez semântica, utilize WriteAheadSink. AT_LEAST_ONCE
  2. Escreva dados de transmissão em fluxo com um dos seguintes métodos:

    • SinkV2: esta é uma opção sem estado que limpa os dados no ponto de verificação, garantindo, pelo menos, uma consistência. Recomendamos esta opção para ingestão de dados de grande volume.
    • WriteAheadSink: este método emite dados para um KustoSink. Está integrado no sistema de pontos de verificação do Flink e oferece garantias exatamente uma vez. Os dados são armazenados num AbstractStateBackend e consolidados apenas após a conclusão de um ponto de verificação.

    O exemplo seguinte utiliza SinkV2. Para utilizar WriteAheadSink, utilize o buildWriteAheadSink método em vez de build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

O código completo deve ter um aspeto semelhante ao seguinte:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Verificar se os dados são ingeridos

Assim que a ligação estiver configurada, os dados são enviados para a sua tabela. Pode verificar se os dados são ingeridos ao executar uma consulta KQL.

  1. Execute a seguinte consulta para verificar se os dados são ingeridos na tabela:

    <MyTable>
    | count
    
  2. Execute a seguinte consulta para ver os dados:

    <MyTable>
    | take 100