Sdílet prostřednictvím


Ingestování dat pomocí Apache Flinku do Azure Data Explorer

Apache Flink je architektura a distribuovaný procesor pro stavové výpočty nad nevázanými a ohraničenými datovými proudy.

Konektor Flink je open source projekt, který se dá spustit na libovolném clusteru Flink. Implementuje datovou jímku pro přesun dat z clusteru Flink. Pomocí konektoru pro Apache Flink můžete vytvářet rychlé a škálovatelné aplikace zaměřené na scénáře řízené daty, jako jsou strojové učení (ML), extrakce, transformace a načítání (ETL) a Log Analytics.

V tomto článku se dozvíte, jak pomocí konektoru Flink odesílat data z Flinku do tabulky. Vytvoříte tabulku a mapování dat, nasměrujete Flink, aby do tabulky odeslal data, a pak ověříte výsledky.

Požadavky

V případě projektů Flink, které ke správě závislostí používají Maven, integrujte základní jímku konektoru Flink pro Azure Data Explorer tak, že ji přidáte jako závislost:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

V případě projektů, které ke správě závislostí nepoužívají Maven, naklonujte úložiště pro azure Data Explorer Connector pro Apache Flink a sestavte ho místně. Tento přístup umožňuje ručně přidat konektor do místního úložiště Maven pomocí příkazu mvn clean install -DskipTests.

Z Flinku můžete provést ověření pomocí Microsoft Entra ID aplikace nebo spravované identity.

Tento instanční objekt bude identita, kterou konektor použije k zápisu dat do tabulky v Kusto. Později tomuto instančnímu objektu udělíte oprávnění pro přístup k prostředkům Kusto.

  1. Přihlaste se ke svému předplatnému Azure přes Azure CLI. Pak se ověřte v prohlížeči.

    az login
    
  2. Zvolte předplatné pro hostování objektu zabezpečení. Tento krok je potřeba, pokud máte více předplatných.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Vytvořte instanční objekt. V tomto příkladu se instanční objekt nazývá my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Z vrácených dat JSON zkopírujte appId, passworda tenant pro budoucí použití.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Vytvořili jste aplikaci Microsoft Entra a instanční objekt.

  1. Udělte uživateli aplikace oprávnění k databázi:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Udělte aplikaci oprávnění ingestora nebo správce tabulky. Požadovaná oprávnění závisí na zvolené metodě zápisu dat. Oprávnění ingestoru jsou dostatečná pro SinkV2, zatímco WriteAndSink vyžaduje oprávnění správce.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Další informace o autorizaci najdete v tématu Řízení přístupu na základě role v Kusto.

Zápis dat z Flinku:

  1. Import požadovaných možností:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. K ověření použijte svoji aplikaci nebo spravovanou identitu.

    Ověřování aplikací:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Ověřování pomocí spravované identity:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Nakonfigurujte parametry jímky, jako je databáze a tabulka:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Můžete přidat další možnosti, jak je popsáno v následující tabulce:

    Možnost Popis Výchozí hodnota
    IngestionMappingRef Odkazuje na existující mapování příjmu dat.
    Okamžitě vyprázdnit Okamžitě vyprázdní data a může způsobit problémy s výkonem. Tato metoda se nedoporučuje.
    BatchIntervalMs Určuje, jak často se data vyprázdní. 30 sekund
    Velikost dávky Nastaví velikost dávky pro ukládání záznamů do vyrovnávací paměti před vyprazdňováním. 1 000 záznamů
    ClientBatchSizeLimit Určuje velikost agregovaných dat před příjmem dat v MB. 300 MB
    PollForIngestionStatus Pokud je true, konektor se dotazuje na stav příjmu dat po vyprázdnění dat. false (nepravda)
    DeliveryGuarantee Určuje sémantiku záruky doručení. Pokud chcete dosáhnout sémantiky přesně jednou, použijte WriteAheadSink. AT_LEAST_ONCE
  2. Zapište streamovaná data pomocí jedné z následujících metod:

    • SinkV2: Jedná se o bezstavovou možnost, která vyprázdní data na kontrolním bodu a zajistí alespoň jednou konzistenci. Tuto možnost doporučujeme pro příjem velkých objemů dat.
    • WriteAheadSink: Tato metoda generuje data do KustoSink. Je integrovaná se systémem pro vytváření kontrolních bodů Flink a nabízí záruky přesně jednou. Data jsou uložena v AbstractStateBackend a potvrzena až po dokončení kontrolního bodu.

    Následující příklad používá SinkV2. Pokud chcete použít WriteAheadSink, použijte místo metody metodu buildWriteAheadSinkbuild:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Úplný kód by měl vypadat přibližně takto:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Ověření, že se data ingestují

Po nakonfigurování připojení se data odesílají do tabulky. Spuštěním dotazu KQL můžete ověřit, že se data ingestují.

  1. Spuštěním následujícího dotazu ověřte, že se do tabulky ingestují data:

    <MyTable>
    | count
    
  2. Spuštěním následujícího dotazu zobrazte data:

    <MyTable>
    | take 100