Share via


使用 Azure Data Explorer Go SDK 擷取資料

Azure 資料總管是一項快速又可高度調整的資料探索服務,可用於處理記錄和遙測資料。 它提供Go SDK 用戶端程式庫,以與 Azure Data Explorer 服務互動。 您可以使用Go SDK來擷取、控制及查詢 Azure Data Explorer叢集中的資料。

在本文中,您會先在測試叢集中建立資料表和資料對應。 然後,您會使用 Go SDK 將擷取排入叢集,並驗證結果。

必要條件

安裝 Go SDK

當您執行 [使用Go 模組的範例應用程式] 時,會自動安裝 Azure Data Explorer Go SDK。 如果您為另一個應用程式安裝 Go SDK,請建立 Go 模組並使用) 擷取 Azure Data Explorer 套件 (go get ,例如:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

套件相依性將會新增至 go.mod 檔案。 在 Go 應用程式中使用它。

檢閱程式碼

檢閱程式碼 區段是選擇性的。 如果您有興趣瞭解程式碼的運作方式,您可以檢閱下列程式碼片段。 或是,您可以直接跳到執行應用程式

Authenticate

此程式在執行任何作業之前,必須先向 Azure Data Explorer服務進行驗證。

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

kusto 的實例。授權是使用服務主體認證所建立。 然後,它會用來建立 kusto。用戶端 ,其中包含也接受叢集端點的 New 函式。

建立資料表

create table 命令是由 Kusto 語句表示。 Mgmt 函式可用來執行管理命令。 它用來執行 命令來建立資料表。

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

提示

Kusto 語句預設為常數,以取得更佳的安全性。 NewStmt 接受字串常數。 UnsafeStmtAPI 允許使用非常數語句區段,但不建議使用。

Kusto create table 命令如下所示:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

建立對應

擷取期間會使用資料對應,將傳入的資料對應至 Azure Data Explorer 資料表中的資料行。 如需詳細資訊,請參閱 資料對應。 使用具有資料庫名稱和適當命令的函式, Mgmt 以與資料表相同的方式建立對應。 範例的 GitHub 存放庫中提供完整的命令。

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

內嵌資料

擷取會使用來自現有Azure Blob 儲存體容器的檔案排入佇列。

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

用戶端是使用擷取來建立 的。新增FromFile函式可用來參考Azure Blob 儲存體 URI。 對應參考名稱和資料類型會以 FileOption的形式傳遞。

執行應用程式

  1. 從 GitHub 複製範例程式碼:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. 執行範例程式碼,如 下列程式碼片段 main.go 所示:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    提示

    若要嘗試不同的作業組合,您可以在 中 main.go 取消批註/批註個別函式。

    當您執行範例程式碼時,會執行下列動作:

    1. 卸載資料表StormEvents 如果資料表存在) ,則會將其卸載 (。
    2. 資料表建立StormEvents 建立資料表。
    3. 對應建立StormEvents_CSV_Mapping 建立對應。
    4. 檔案擷取:Azure Blob 儲存體) 中的 CSV 檔案 (已排入佇列以進行擷取。
  3. 若要建立服務主體以進行驗證,請使用 Azure CLI 搭配 az ad sp create-for-rbac 命令。 使用叢集端點和資料庫名稱,以程式將使用的環境變數形式設定服務主體資訊:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. 執行程式:

    go run main.go
    

    您將會收到類似的輸出:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

驗證和疑難排解

等候 5 到 10 分鐘,讓佇列擷取排程擷取程式,並將資料載入 Azure Data Explorer。

  1. 登入 https://dataexplorer.azure.com,並連線至您的叢集。 然後執行下列命令,以取得資料表中的 StormEvents 記錄計數。

    StormEvents | count
    
  2. 在資料庫中執行下列命令,以查看最後四個小時是否有任何擷取失敗。 先取代資料庫名稱,再執行。

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. 執行下列命令,以檢視最後四個小時內的所有擷取作業狀態。 先取代資料庫名稱,再執行。

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

清除資源

如果您打算遵循我們的其他文章,請保留您所建立的資源。 如果沒有,請在資料庫中執行下列命令來卸載 StormEvents 資料表。

.drop table StormEvents

後續步驟