執行您的第一個結構化串流工作負載

本文提供在 Azure Databricks 上執行第一個結構化串流查詢所需基本概念的程式碼範例和說明。 您可以針對近乎即時和增量處理工作負載使用結構化串流。

結構化串流是數種技術之一,可支援差異即時資料表中的串流資料表。 Databricks 建議針對所有新的 ETL、擷取和結構化串流工作負載使用 Delta 即時資料表。 請參閱 什麼是 Delta Live Tables?

注意

雖然 Delta Live Tables 提供宣告串流資料表的稍微修改語法,但設定串流讀取和轉換的一般語法適用于 Azure Databricks 上的所有串流使用案例。 Delta Live Tables 也可藉由管理狀態資訊、中繼資料和許多組態來簡化串流。

從資料流程讀取

您可以使用結構化串流,以累加方式從支援的資料來源內嵌資料。 Azure Databricks 結構化串流工作負載中使用的一些最常見資料來源包括下列各項:

  • 雲端物件儲存體中的資料檔
  • 訊息匯流排和佇列
  • Delta Lake

Databricks 建議使用自動載入器從雲端物件儲存體串流擷取。 自動載入器支援結構化串流支援的大部分檔案格式。 請參閱 什麼是自動載入器?

每個資料來源都提供一些選項,以指定如何載入批次的資料。 在讀取器設定期間,您可能需要設定的主要選項屬於下列類別:

  • 指定資料來源或格式的選項(例如檔案類型、分隔符號和架構)。
  • 設定來源系統存取權的選項(例如埠設定和認證)。
  • 指定要在資料流程中啟動位置的選項(例如 Kafka 位移或讀取所有現有的檔案)。
  • 控制每個批次中處理多少資料的選項(例如,每個批次的位移上限、檔案或位元組數)。

使用自動載入器從物件儲存體讀取串流資料

下列範例示範使用自動載入器載入 JSON 資料,以 cloudFiles 表示格式和選項。 選項 schemaLocation 可啟用架構推斷和演進。 將下列程式碼貼到 Databricks 筆記本資料格中,然後執行資料格以建立名為 raw_df 的串流資料框架:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

如同 Azure Databricks 上的其他讀取作業,設定串流讀取實際上不會載入資料。 您必須在資料流程開始之前觸發資料上的動作。

注意

在串流資料框架上呼叫 display() 會啟動串流作業。 對於大部分結構化串流使用案例,觸發資料流程的動作應該將資料寫入接收。 請參閱 準備結構化串流程式碼以供生產環境 使用。

執行串流轉換

結構化串流支援 Azure Databricks 和 Spark SQL 中可用的大部分轉換。 您甚至可以將 MLflow 模型載入為 UDF,並以轉換的形式進行串流預測。

下列程式碼範例會完成簡單的轉換,以使用 Spark SQL 函式擴充內嵌的 JSON 資料:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

產生的 transformed_df 包含載入和轉換每個記錄的查詢指示,因為它到達資料來源。

注意

結構化串流會將資料來源視為未系結或無限資料集。 因此,結構化串流工作負載不支援某些轉換,因為它們需要排序無限數量的專案。

大部分匯總和許多聯結都需要使用浮水印、視窗和輸出模式來管理狀態資訊。 請參閱 套用浮水印來控制資料處理閾值

寫入至資料接收

資料接收是串流寫入作業的目標。 Azure Databricks 串流工作負載中使用的常見接收包括下列各項:

  • Delta Lake
  • 訊息匯流排和佇列
  • 索引鍵/值資料庫

與資料來源一樣,大部分的資料接收都會提供數個選項來控制資料寫入目標系統的方式。 在寫入器設定期間,您可能需要設定的主要選項屬於下列類別:

  • 輸出模式(預設為附加)。
  • 檢查點位置(每個寫入器 都需要)。
  • 觸發間隔;請參閱 設定結構化串流觸發程式間隔
  • 指定資料接收或格式的選項(例如檔案類型、分隔符號和架構)。
  • 設定目標系統的存取權的選項(例如埠設定和認證)。

執行增量批次寫入 Delta Lake

下列範例會使用指定的檔案路徑和檢查點寫入 Delta Lake。

重要

請務必為您所設定的每個串流寫入器指定唯一的檢查點位置。 檢查點會為您的串流提供唯一的身分識別,並追蹤與串流查詢相關聯的所有已處理記錄和狀態資訊。

availableNow觸發程式的設定會指示結構化串流處理來源資料集先前未處理的所有記錄,然後關閉,因此您可以安全地執行下列程式碼,而不必擔心資料流程執行:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

在此範例中,沒有任何新記錄抵達我們的資料來源,因此重複執行此程式碼並不會內嵌新的記錄。

警告

結構化串流執行可以防止自動終止關閉計算資源。 若要避免非預期的成本,請務必終止串流查詢。

準備結構化串流程式碼以供生產環境使用

Databricks 建議針對大部分結構化串流工作負載使用 Delta 即時資料表。 下列建議提供準備生產環境結構化串流工作負載的起點:

  • 從會傳回結果的筆記本中移除不必要的程式碼,例如 displaycount
  • 請勿在互動式叢集上執行結構化串流工作負載;一律會將資料流程排程為作業。
  • 若要協助自動復原串流作業,請設定具有無限重試的作業。
  • 請勿針對具有結構化串流的工作負載使用自動調整。

如需更多建議,請參閱 結構化串流 的生產考慮。

從 Delta Lake 讀取資料、轉換和寫入 Delta Lake

Delta Lake 支援使用結構化串流作為來源和接收。 請參閱 差異資料表串流讀取和寫入

下列範例顯示從 Delta 資料表累加載入所有新記錄的範例語法、將它們與另一個 Delta 資料表的快照集聯結,並將其寫入 Delta 資料表:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

您必須設定適當的許可權,才能讀取來源資料表,以及寫入目標資料表和指定的檢查點位置。 使用資料來源和接收的相關值,填入以角括弧 ( <> ) 表示的所有參數。

注意

Delta Live Tables 提供完整的宣告式語法,可用來建立 Delta Lake 管線,並自動管理觸發程式和檢查點等屬性。 請參閱 什麼是 Delta Live Tables?

從 Kafka 讀取資料、轉換和寫入 Kafka

Apache Kafka 和其他傳訊匯流排提供大型資料集可用的一些最低延遲。 您可以使用 Azure Databricks 將轉換套用至從 Kafka 擷取的資料,然後將資料寫回 Kafka。

注意

將資料寫入雲端物件儲存體會增加額外的延遲負荷。 如果您想要將資料從 Delta Lake 中的傳訊匯流排儲存,但需要串流工作負載的最低延遲,Databricks 建議設定個別的串流作業以將資料內嵌至 Lakehouse,並針對下游傳訊匯流排接收套用近乎即時的轉換。

下列程式碼範例示範一個簡單的模式,藉由將 Kafka 中的資料與 Delta 資料表中的資料聯結,然後回寫至 Kafka,以擴充來自 Kafka 的資料:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

您必須設定適當的許可權,才能存取 Kafka 服務。 使用資料來源和接收的相關值,填入以角括弧 ( <> ) 表示的所有參數。 請參閱 使用 Apache Kafka 和 Azure Databricks 進行串流處理。