Share via


Gegevens laden met Delta Live Tables

U kunt gegevens laden uit elke gegevensbron die wordt ondersteund door Apache Spark in Azure Databricks met behulp van Delta Live Tables. U kunt gegevenssets (tabellen en weergaven) in Delta Live Tables definiƫren voor elke query die een Spark DataFrame retourneert, inclusief streaming DataFrames en Pandas voor Spark DataFrames. Voor taken voor gegevensopname raadt Databricks het gebruik van streamingtabellen aan voor de meeste use cases. Streamingtabellen zijn geschikt voor het opnemen van gegevens uit cloudobjectopslag met behulp van automatische laadprogramma's of van berichtenbussen zoals Kafka. In de onderstaande voorbeelden ziet u enkele veelvoorkomende patronen.

Belangrijk

Niet alle gegevensbronnen hebben SQL-ondersteuning. U kunt SQL- en Python-notebooks combineren in een Delta Live Tables-pijplijn om SQL te gebruiken voor alle bewerkingen die verder gaan dan opname.

Zie Python-afhankelijkheden beheren voor Delta Live Tables-pijplijnen voor meer informatie over het werken met bibliotheken die niet standaard in Delta Live Tables zijn verpakt.

Bestanden laden vanuit cloudobjectopslag

Databricks raadt aan om automatisch laden met Delta Live Tables te gebruiken voor de meeste gegevensopnametaken uit de opslag van cloudobjecten. Automatische laadprogramma's en Delta Live Tables zijn ontworpen om steeds groeiende gegevens incrementeel en idempotent te laden wanneer ze binnenkomen in de cloudopslag. In de volgende voorbeelden wordt Auto Loader gebruikt om gegevenssets te maken op basis van CSV- en JSON-bestanden:

Notitie

Als u bestanden wilt laden met automatisch laden in een pijplijn met Unity Catalog, moet u gebruikmaken van externe locaties. Raadpleeg Unity Catalog gebruiken met uw Delta Live Tables-pijplijnen voor meer informatie over het gebruik van Unity Catalog met Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Zie Wat is de sql-syntaxis van het automatisch laden? en de SQL-syntaxis van het automatisch laden.

Waarschuwing

Als u Automatisch laden gebruikt met bestandsmeldingen en een volledige vernieuwing uitvoert voor uw pijplijn of streamingtabel, moet u uw resources handmatig opschonen. U kunt CloudFilesResourceManager in een notebook gebruiken om opschoning uit te voeren.

Gegevens laden uit een berichtenbus

U kunt Pijplijnen voor Delta Live Tables configureren om gegevens op te nemen uit berichtenbussen met streamingtabellen. Databricks raadt aan streamingtabellen te combineren met continue uitvoering en verbeterde automatische schaalaanpassing om de meest efficiƫnte opname te bieden voor het laden van lage latentie van berichtenbussen. Zie Het clustergebruik van Delta Live Tables-pijplijnen optimaliseren met verbeterde automatische schaalaanpassing.

Met de volgende code wordt bijvoorbeeld een streamingtabel geconfigureerd voor het opnemen van gegevens uit Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

U kunt downstreambewerkingen schrijven in pure SQL om streamingtransformaties uit te voeren op deze gegevens, zoals in het volgende voorbeeld:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Zie Azure Event Hubs gebruiken als een Delta Live Tables-gegevensbron voor een voorbeeld van het werken met Event Hubs.

Zie Streaminggegevensbronnen configureren.

Gegevens laden uit externe systemen

Delta Live Tables ondersteunt het laden van gegevens uit elke gegevensbron die wordt ondersteund door Azure Databricks. Zie Verbinding maken naar gegevensbronnen. U kunt ook externe gegevens laden met Lakehouse Federation voor ondersteunde gegevensbronnen. Omdat Voor Lakehouse Federation Databricks Runtime 13.3 LTS of hoger is vereist, moet u Lakehouse Federation gebruiken om uw pijplijn te configureren voor het gebruik van het preview-kanaal.

Sommige gegevensbronnen hebben geen equivalente ondersteuning in SQL. Als u Lakehouse Federation niet kunt gebruiken met een van deze gegevensbronnen, kunt u een zelfstandig Python-notebook gebruiken om gegevens op te nemen uit de bron. Dit notebook kan vervolgens worden toegevoegd als een bronbibliotheek met SQL-notebooks om een Delta Live Tables-pijplijn te bouwen. In het volgende voorbeeld wordt een gerealiseerde weergave declareren voor toegang tot de huidige status van gegevens in een externe PostgreSQL-tabel:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Kleine of statische gegevenssets laden vanuit cloudobjectopslag

U kunt kleine of statische gegevenssets laden met behulp van apache Spark-belastingsyntaxis. Delta Live Tables ondersteunt alle bestandsindelingen die worden ondersteund door Apache Spark in Azure Databricks. Zie Opties voor gegevensindeling voor een volledige lijst.

In de volgende voorbeelden ziet u hoe u JSON laadt om Delta Live Tables-tabellen te maken:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Notitie

De SELECT * FROM format.`path`; SQL-constructie is gebruikelijk voor alle SQL-omgevingen in Azure Databricks. Het is het aanbevolen patroon voor directe bestandstoegang met BEHULP van SQL met Delta Live Tables.

Veilige toegang tot opslagreferenties met geheimen in een pijplijn

U kunt Azure Databricks-geheimen gebruiken om referenties op te slaan, zoals toegangssleutels of wachtwoorden. Als u het geheim in uw pijplijn wilt configureren, gebruikt u een Spark-eigenschap in de clusterconfiguratie voor pijplijninstellingen. Zie Uw rekeninstellingen configureren.

In het volgende voorbeeld wordt een geheim gebruikt voor het opslaan van een toegangssleutel die is vereist voor het lezen van invoergegevens uit een Azure Data Lake Storage Gen2-opslagaccount (ADLS Gen2) met behulp van Automatisch laden. U kunt dezelfde methode gebruiken om elk geheim te configureren dat is vereist voor uw pijplijn, bijvoorbeeld AWS-sleutels voor toegang tot S3 of het wachtwoord voor een Apache Hive-metastore.

Zie Verbinding maken naar Azure Data Lake Storage Gen2 en Blob Storage voor meer informatie over het werken met Azure Data Lake Storage Gen2.

Notitie

U moet het spark.hadoop. voorvoegsel toevoegen aan de spark_conf configuratiesleutel waarmee de geheime waarde wordt ingesteld.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> met de naam van het ADLS Gen2-opslagaccount.
  • <scope-name> met de naam van het geheime bereik van Azure Databricks.
  • <secret-name> met de naam van de sleutel die de toegangssleutel van het Azure-opslagaccount bevat.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> met de naam van de Container van het Azure-opslagaccount waarin de invoergegevens worden opgeslagen.
  • <storage-account-name> met de naam van het ADLS Gen2-opslagaccount.
  • <path-to-input-dataset> met het pad naar de invoergegevensset.

Gegevens laden vanuit Azure Event Hubs

Azure Event Hubs is een service voor gegevensstreaming die een compatibele Apache Kafka-interface biedt. U kunt de Structured Streaming Kafka-connector, die is opgenomen in de Delta Live Tables-runtime, gebruiken om berichten van Azure Event Hubs te laden. Zie Azure Event Hubs gebruiken als een delta livetabelgegevensbron voor meer informatie over het laden en verwerken van berichten van Azure Event Hubs.