Erfassen von Daten mit dem Go SDK für Azure Data Explorer

Azure-Daten-Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Daten (Protokoll- und Telemetriedaten). Er bietet eine Go SDK-Clientbibliothek für die Interaktion mit dem Azure Data Explorer-Dienst. Mit dem Go SDK können Daten in Azure Data Explorer-Clustern erfasst, gesteuert und abgefragt werden.

In diesem Artikel erstellen Sie zunächst eine Tabelle und eine Datenzuordnung in einem Testcluster. Anschließend reihen Sie die Erfassung für den Cluster mithilfe des Go SDK in die Warteschlange ein und überprüfen die Ergebnisse.

Voraussetzungen

Installieren des Go SDK

Das Go SDK für Azure Data Explorer wird automatisch installiert, wenn Sie die Beispielanwendung ausführen, die Go-Module verwendet. Wenn Sie das Go SDK für eine andere Anwendung installiert haben, erstellen Sie ein Go-Modul, und rufen Sie mithilfe von go get das Azure Data Explorer-Paket ab. Beispiel:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

Die Paketabhängigkeit wird der Datei go.mod hinzugefügt. Verwenden Sie sie in Ihrer Go-Anwendung.

Überprüfen des Codes

Der Abschnitt Überprüfen des Codes ist optional. Anhand der folgenden Codeausschnitte können Sie sich bei Interesse mit der Funktionsweise des Codes vertraut machen. Andernfalls können Sie mit dem Abschnitt Ausführen der Anwendung fortfahren.

Authenticate

Das Programm muss sich vor dem Ausführen von Vorgängen beim Azure Data Explorer-Dienst authentifizieren.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

Hierzu wird eine Instanz von kusto.Authorization mit den Dienstprinzipal-Anmeldeinformationen erstellt. Anschließend wird damit unter Verwendung der Funktion Neu, die auch den Clusterendpunkt akzeptiert, ein Kusto-Client (kusto.Client) erstellt.

Tabelle erstellen

Der Befehl für die Tabellenerstellung wird durch eine Kusto-Anweisung dargestellt. Die Funktion Mgmt dient zum Ausführen von Verwaltungsbefehlen. Sie wird verwendet, um den Befehl für die Tabellenerstellung auszuführen.

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Tipp

Eine Kusto-Anweisung ist aus Sicherheitsgründen standardmäßig konstant. Von NewStmt werden Zeichenfolgenkonstanten akzeptiert. Die API UnsafeStmt ermöglicht die Verwendung nicht konstanter Anweisungssegmente. Dies wird jedoch nicht empfohlen.

Der Kusto-Befehl für die Tabellenerstellung sieht wie folgt aus:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Erstellen von Zuordnungen

Datenzuordnungen werden im Rahmen der Erfassung verwendet, um eingehende Daten den Spalten in Azure Data Explorer-Tabellen zuzuordnen. Weitere Informationen finden Sie unter Datenzuordnungen. Die Zuordnung wird genau wie eine Tabelle mithilfe der Funktion Mgmt sowie mit dem Datenbanknamen und dem entsprechenden Befehl erstellt. Der vollständige Befehl steht im GitHub-Repository für das Beispiel zur Verfügung.

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Erfassen von Daten

Eine Erfassung wird mithilfe einer Datei aus einem vorhandenen Azure Blob Storage-Container in eine Warteschlange eingereiht.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

Der Client für die Erfassung wird mithilfe von ingest.New erstellt. Die Funktion FromFile wird verwendet, um auf den Azure Blob Storage-URI zu verweisen. Der Name des Zuordnungsverweises und der Datentyp werden als FileOption übergeben.

Ausführen der Anwendung

  1. Klonen Sie den Beispielcode von GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Führen Sie den Beispielcode aus, wie in diesem Ausschnitt aus main.go zu sehen:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Tipp

    Sie können verschiedene Vorgangskombinationen ausprobieren, indem Sie die entsprechenden Funktionen in main.goauskommentieren bzw. kommentieren.

    Wenn Sie den Beispielcode ausführen, werden folgende Aktionen ausgeführt:

    1. Tabellenlöschung: Die Tabelle StormEvents wird gelöscht (sofern vorhanden).
    2. Tabellenerstellung: Die Tabelle StormEvents wird erstellt.
    3. Zuordnungserstellung: Die Zuordnung StormEvents_CSV_Mapping wird erstellt.
    4. Dateierfassung: Eine CSV-Datei (in Azure Blob Storage) wird zur Erfassung in eine Warteschlange eingereiht.
  3. Erstellen Sie über die Azure-Befehlszeilenschnittstelle mithilfe des Befehls az ad sp create-for-rbac einen Dienstprinzipal für die Authentifizierung. Legen Sie die Dienstprinzipalinformationen mit dem Clusterendpunkt und dem Datenbanknamen in Form von Umgebungsvariablen fest, die vom Programm verwendet werden:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. Führen Sie das Programm aus:

    go run main.go
    

    Die Ausgabe sollte in etwa wie folgt aussehen:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

Überprüfen und Behandeln von Problemen

Warten Sie fünf bis zehn Minuten, bis die in der Warteschlange befindliche Erfassung geplant wurde und die Daten in Azure Data Explorer geladen wurden.

  1. Melden Sie sich bei https://dataexplorer.azure.com an, und stellen Sie eine Verbindung mit Ihrem Cluster her. Führen Sie dann den folgenden Befehl aus, um die Anzahl von Datensätzen in der Tabelle StormEvents zu erhalten.

    StormEvents | count
    
  2. Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um festzustellen, ob in den letzten vier Stunden Erfassungsfehler aufgetreten sind. Ersetzen Sie den Namen der Datenbank vor dem Ausführen.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Führen Sie den folgenden Befehl aus, um den Status aller Erfassungsvorgänge in den letzten vier Stunden anzuzeigen. Ersetzen Sie den Namen der Datenbank vor dem Ausführen.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Bereinigen von Ressourcen

Wenn Sie unsere anderen Artikel durcharbeiten möchten, behalten Sie die erstellten Ressourcen bei. Führen Sie andernfalls den folgenden Befehl in Ihrer Datenbank aus, um die Tabelle StormEvents zu löschen:

.drop table StormEvents

Nächster Schritt

Write queries (Schreiben von Abfragen)