Ingerir dados com o SDK do Azure Data Explorer Go

O Azure Data Explorer é um serviço de exploração de dados rápido e altamente dimensionável para dados telemétricos e de registo. Fornece uma biblioteca de cliente Go SDK para interagir com o serviço Data Explorer do Azure. Pode utilizar o SDK Go para ingerir, controlar e consultar dados em clusters do Azure Data Explorer.

Neste artigo, vai criar primeiro uma tabela e mapeamento de dados num cluster de teste. Em seguida, coloca em fila uma ingestão para o cluster com o SDK Go e valida os resultados.

Pré-requisitos

Instalar o SDK Go

O SDK do Azure Data Explorer Go será instalado automaticamente quando executar a [aplicação de exemplo que utiliza módulos Go). Se instalou o SDK Go para outra aplicação, crie um módulo Go e obtenha o pacote de Data Explorer do Azure (com go get), por exemplo:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

A dependência do pacote será adicionada ao go.mod ficheiro. Utilize-o na sua aplicação Go.

Rever o código

Esta secção Rever o código é opcional. Se estiver interessado em saber como funciona o código, pode rever os seguintes fragmentos de código. Caso contrário, pode avançar para Executar a aplicação.

Autenticar

O programa tem de se autenticar no serviço Data Explorer do Azure antes de executar quaisquer operações.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Uma instância do kusto. A autorização é criada com as credenciais do principal de serviço. Em seguida, é utilizado para criar um kusto. Cliente com a função Novo que também aceita o ponto final do cluster.

Criar tabela

O comando criar tabela é representado por uma instrução Kusto. A função Mgmt é utilizada para executar comandos de gestão. É utilizado para executar o comando para criar uma tabela.

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Dica

Uma instrução Kusto é constante, por predefinição, para uma melhor segurança. NewStmt aceita constantes de cadeia. A UnsafeStmt API permite a utilização de segmentos de instrução não constantes, mas não é recomendada.

O comando Criar tabela do Kusto é o seguinte:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Criar mapeamento

Os mapeamentos de dados são utilizados durante a ingestão para mapear dados recebidos para colunas dentro de tabelas Data Explorer do Azure. Para obter mais informações, veja Mapeamento de dados. O mapeamento é criado da mesma forma que uma tabela, utilizando a Mgmt função com o nome da base de dados e o comando adequado. O comando completo está disponível no repositório do GitHub para o exemplo.

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Ingerir dados

Uma ingestão é colocada em fila com um ficheiro de um contentor de Armazenamento de Blobs do Azure existente.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

O cliente ingestão é criado com a ingestão. Novo. A função FromFile é utilizada para fazer referência ao URI do Armazenamento de Blobs do Azure. O nome de referência de mapeamento e o tipo de dados são transmitidos na forma de FileOption.

Executar a aplicação

  1. Clone o código de exemplo do GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Execute o código de exemplo, conforme visto neste fragmento a partir de main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Dica

    Para experimentar diferentes combinações de operações, pode anular o comentário/comentar as respetivas funções no main.go.

    Quando executa o código de exemplo, são executadas as seguintes ações:

    1. Tabela pendente: StormEvents a tabela é removida (se existir).
    2. Criação de tabela: StormEvents a tabela é criada.
    3. Criação de mapeamento: StormEvents_CSV_Mapping o mapeamento é criado.
    4. Ingestão de ficheiros: um ficheiro CSV (no Armazenamento de Blobs do Azure) está em fila para ingestão.
  3. Para criar um principal de serviço para autenticação, utilize a CLI do Azure com o comando az ad sp create-for-rbac . Defina as informações do principal de serviço com o ponto final do cluster e o nome da base de dados na forma de variáveis de ambiente que serão utilizadas pelo programa:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Execute o programa:

    go run main.go
    

    Obterá uma saída semelhante:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Validar e resolver problemas

Aguarde 5 a 10 minutos para que a ingestão em fila agende o processo de ingestão e carregue os dados para o Azure Data Explorer.

  1. Inicie sessão no https://dataexplorer.azure.com e ligue ao cluster. Em seguida, execute o seguinte comando para obter a contagem de registos na StormEvents tabela.

    StormEvents | count
    
  2. Execute o seguinte comando na base de dados para ver se ocorreram quaisquer falhas de ingestão nas últimas quatro horas. Substitua o nome da base de dados antes de executar.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Execute o seguinte comando para ver o estado de todas as operações de ingestão nas últimas quatro horas. Substitua o nome da base de dados antes de executar.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Limpar os recursos

Se pretender seguir os nossos outros artigos, mantenha os recursos que criou. Caso contrário, execute o seguinte comando na base de dados para remover a StormEvents tabela.

.drop table StormEvents

Passo seguinte