Condividi tramite


Usare Hub eventi di Azure come origine dati Delta Live Tables

Questo articolo illustra come usare le tabelle Live Delta per elaborare i messaggi da Hub eventi di Azure. Non è possibile usare il connettore Structured Streaming Event Hubs perché questa libreria non è disponibile come parte di Databricks Runtime e Le tabelle Live Delta non consentono di usare librerie JVM di terze parti.

In che modo le tabelle live Delta possono connettersi a Hub eventi di Azure?

Hub eventi di Azure fornisce un endpoint compatibile con Apache Kafka che è possibile usare con Connettore Kafka structured Streaming, disponibile in Databricks Runtime, per elaborare i messaggi da Hub eventi di Azure. Per altre informazioni sulla compatibilità Hub eventi di Azure e Apache Kafka, vedere Usare Hub eventi di Azure dalle applicazioni Apache Kafka.

I passaggi seguenti descrivono la connessione di una pipeline di tabelle live Delta a un'istanza di Hub eventi esistente e l'utilizzo di eventi da un argomento. Per completare questi passaggi, sono necessari i valori di connessione di Hub eventi seguenti:

  • Nome dello spazio dei nomi di Hub eventi.
  • Nome dell'istanza di Hub eventi nello spazio dei nomi di Hub eventi.
  • Nome e chiave dei criteri di accesso condiviso per Hub eventi. Per impostazione predefinita, viene creato un RootManageSharedAccessKey criterio per ogni spazio dei nomi di Hub eventi. Questo criterio dispone di manageautorizzazioni e sendlisten . Se la pipeline legge solo da Hub eventi, Databricks consiglia di creare un nuovo criterio solo con autorizzazioni di ascolto.

Per altre informazioni sul stringa di connessione di Hub eventi, vedere Ottenere un stringa di connessione di Hub eventi.

Nota

  • Hub eventi di Azure offre opzioni OAuth 2.0 e firma di accesso condiviso per autorizzare l'accesso alle risorse protette. Queste istruzioni usano l'autenticazione basata su firma di accesso condiviso.
  • Se si ottiene l'stringa di connessione di Hub eventi dal portale di Azure, potrebbe non contenere il EntityPath valore . Il EntityPath valore è obbligatorio solo quando si usa il connettore dell'hub eventi structured streaming. L'uso di Structured Streaming Kafka Connessione or richiede solo il nome dell'argomento.

Archiviare la chiave dei criteri in un segreto di Azure Databricks

Poiché la chiave dei criteri è informazioni riservate, Databricks consiglia di non impostare come hardcoding il valore nel codice della pipeline. Usare invece i segreti di Azure Databricks per archiviare e gestire l'accesso alla chiave.

Nell'esempio seguente viene usata l'interfaccia della riga di comando di Databricks per creare un ambito segreto e archiviare la chiave nell'ambito del segreto. Nel codice della pipeline usare la dbutils.secrets.get() funzione con scope-name e shared-policy-name per recuperare il valore della chiave.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Per altre informazioni sui segreti di Azure Databricks, vedere Gestione dei segreti.

Creare un notebook e aggiungere il codice della pipeline per utilizzare gli eventi

L'esempio seguente legge gli eventi IoT da un argomento, ma è possibile adattare l'esempio per i requisiti dell'applicazione. Come procedura consigliata, Databricks consiglia di usare le impostazioni della pipeline delle tabelle dinamiche Delta per configurare le variabili dell'applicazione. Il codice della pipeline usa quindi la spark.conf.get() funzione per recuperare i valori. Per altre informazioni sull'uso delle impostazioni della pipeline per parametrizzare la pipeline, vedere Parametrizzare le pipeline.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Creare la pipeline

Creare una nuova pipeline con le impostazioni seguenti, sostituendo i valori segnaposto con i valori appropriati per l'ambiente in uso.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Sostituzione

  • <container-name> con il nome di un contenitore dell'account di archiviazione di Azure.
  • <storage-account-name> con il nome di un account di archiviazione DILS Gen2.
  • <eh-namespace> con il nome dello spazio dei nomi di Hub eventi.
  • <eh-policy-name> con la chiave dell'ambito segreto per la chiave dei criteri di Hub eventi.
  • <eventhub> con il nome dell'istanza di Hub eventi.
  • <secret-scope-name> con il nome dell'ambito del segreto di Azure Databricks che contiene la chiave dei criteri di Hub eventi.

Come procedura consigliata, questa pipeline non usa il percorso di archiviazione DBFS predefinito, ma usa invece un account di archiviazione di Azure Data Lake Archiviazione Gen2 (ADLS Gen2). Per altre informazioni sulla configurazione dell'autenticazione per un account di archiviazione DILS Gen2, vedere Accedere in modo sicuro alle credenziali di archiviazione con segreti in una pipeline.