Share via


STREAMINGTABEL MAKEN

Van toepassing op:vinkje als ja aan Databricks SQL vinkje als ja aan Databricks Runtime 13.3 LTS en hoger

Belangrijk

Deze functie is beschikbaar als openbare preview.

Hiermee maakt u een streamingtabel, een Delta-tabel met extra ondersteuning voor streaming of incrementele gegevensverwerking.

Streamingtabellen worden alleen ondersteund in Delta Live Tables en in Databricks SQL met Unity Catalog. Als u deze opdracht uitvoert op ondersteunde Databricks Runtime-berekening, wordt de syntaxis alleen geparseerd. Zie Een Delta Live Tables-pijplijn implementeren met SQL.

Syntaxis

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

Parameters

  • VERNIEUWEN

    Indien opgegeven, vernieuwt u de tabel met de meest recente gegevens die beschikbaar zijn vanuit de bronnen die in de query zijn gedefinieerd. Alleen nieuwe gegevens die binnenkomen voordat de query wordt gestart, worden verwerkt. Nieuwe gegevens die worden toegevoegd aan de bronnen tijdens de uitvoering van de opdracht, worden genegeerd tot de volgende vernieuwing.

  • ALS DEZE NIET BESTAAT

    Als deze is opgegeven en er al een tabel met dezelfde naam bestaat, wordt de instructie genegeerd.

    IF NOT EXISTS kan niet samen met REFRESH, wat betekent CREATE OR REFRESH TABLE IF NOT EXISTS dat dit niet is toegestaan.

  • Table_name

    De naam van de tabel die moet worden gemaakt. De naam mag geen tijdelijke specificatie bevatten. Als de naam niet is gekwalificeerd, wordt de tabel gemaakt in het huidige schema.

  • table_specification

    Deze optionele component definieert de lijst met kolommen, hun typen, eigenschappen, beschrijvingen en kolombeperkingen.

    Als u geen kolommen in het tabelschema definieert, moet u opgeven AS query.

    • column_identifier

      Een unieke naam voor de kolom.

      • column_type

        Hiermee geeft u het gegevenstype van de kolom.

      • NIET NULL

        Als de kolom is opgegeven, worden geen waarden geaccepteerd NULL .

      • OPMERKING column_comment

        Een letterlijke tekenreeks om de kolom te beschrijven.

      • column_constraint

        Belangrijk

        Deze functie is beschikbaar als openbare preview.

        Hiermee voegt u een primaire sleutel of refererende sleutelbeperking toe aan de kolom in een streamingtabel. Beperkingen worden niet ondersteund voor tabellen in de hive_metastore catalogus.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ BIJ SCHENDING { FAIL UPDATE | DROP ROW } ]

        Voegt verwachtingen voor gegevenskwaliteit toe aan de tabel. Deze verwachtingen voor gegevenskwaliteit kunnen in de loop van de tijd worden bijgehouden en worden geopend via het gebeurtenislogboek van de streamingtabel. Een FAIL UPDATE verwachting zorgt ervoor dat de verwerking mislukt bij het maken van de tabel en het vernieuwen van de tabel. Een DROP ROW verwachting zorgt ervoor dat de hele rij wordt verwijderd als niet aan de verwachting wordt voldaan.

        expectation_expr kan bestaan uit letterlijke waarden, kolom-id's in de tabel en deterministische, ingebouwde SQL-functies of -operators, met uitzondering van:

        Mag ook expr geen subquery bevatten.

      • table_constraint

        Belangrijk

        Deze functie is beschikbaar als openbare preview.

        Voegt een informatieve primaire sleutel of informatieve refererende sleutelbeperkingen toe aan een streamingtabel. Sleutelbeperkingen worden niet ondersteund voor tabellen in de hive_metastore catalogus.

  • table_clauses

    Geef desgewenst partitionering, opmerkingen, door de gebruiker gedefinieerde eigenschappen en een vernieuwingsschema voor de nieuwe tabel op. Elke subcomponent mag slechts eenmaal worden opgegeven.

    • GEPARTITIONEERD DOOR

      Een optionele lijst met kolommen van de tabel om de tabel te partitioneren op.

    • OPMERKING table_comment

      Een STRING letterlijke om de tabel te beschrijven.

    • TBLPROPERTIES

      U kunt desgewenst een of meer door de gebruiker gedefinieerde eigenschappen instellen.

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      Indien opgegeven, plant u de streamingtabel of de gerealiseerde weergave om de gegevens te vernieuwen met het opgegeven kwarts cron-schema . Alleen time_zone_values worden geaccepteerd. AT TIME ZONE LOCAL wordt niet ondersteund. Als AT TIME ZONE deze afwezig is, wordt de sessietijdzone gebruikt. Als AT TIME ZONE deze afwezig is en de sessietijdzone niet is ingesteld, wordt er een fout gegenereerd. SCHEDULE is semantisch gelijk aan SCHEDULE REFRESH.

      U kunt de SCHEDULE syntaxis niet gebruiken in een pijplijndefinitie van Delta Live Tables.

      De SCHEDULE component is niet toegestaan in een CREATE OR REFRESH opdracht. De planning kan worden opgegeven als onderdeel van de CREATE opdracht. Gebruik ALTER STREAMING TABLE om het schema van een streamingtabel te wijzigen na het maken.

  • AS-query

    Met deze component wordt de tabel gevuld met behulp van de gegevens uit query. Deze query moet een streamingquery zijn. Dit kan worden bereikt door het STREAM trefwoord toe te voegen aan elke relatie die u incrementeel wilt verwerken. Wanneer u een query en een table_specification samen opgeeft, moet het opgegeven table_specification tabelschema alle kolommen bevatten die worden geretourneerd door de query, anders krijgt u een foutmelding. Kolommen die zijn opgegeven in table_specification maar niet worden geretourneerd door query retourwaarden null wanneer er query's worden uitgevoerd.

    Deze component is vereist voor streamingtabellen die zijn gemaakt in Databricks SQL, maar niet vereist in Delta Live Tables. Als deze component niet is opgegeven in Delta Live Tables, moet u verwijzen naar deze tabel in een APPLY CHANGES opdracht in uw DLT-pijplijn. Zie Gegevens vastleggen wijzigen met SQL in Delta Live Tables.

Verschillen tussen streamingtabellen en andere tabellen

Streamingtabellen zijn stateful tabellen, ontworpen om elke rij slechts eenmaal te verwerken wanneer u een groeiende gegevensset verwerkt. Omdat de meeste gegevenssets in de loop van de tijd continu groeien, zijn streamingtabellen geschikt voor de meeste opnameworkloads. Streamingtabellen zijn optimaal voor pijplijnen waarvoor nieuwe gegevens en lage latentie nodig zijn. Streamingtabellen kunnen ook handig zijn voor grootschalige transformaties, omdat resultaten incrementeel kunnen worden berekend wanneer nieuwe gegevens binnenkomen, zodat de resultaten up-to-date blijven zonder dat alle brongegevens volledig hoeven te worden gecomputeerd met elke update. Streamingtabellen zijn ontworpen voor gegevensbronnen die alleen worden toegevoegd.

Streamingtabellen accepteren aanvullende opdrachten, zoals REFRESH, waarmee de meest recente gegevens worden verwerkt die beschikbaar zijn in de bronnen die in de query zijn opgegeven. Wijzigingen in de opgegeven query worden alleen doorgevoerd in nieuwe gegevens door een REFRESH, niet eerder verwerkte gegevens aan te roepen. Als u ook de wijzigingen op bestaande gegevens wilt toepassen, moet u uitvoeren REFRESH TABLE <table_name> FULL om een FULL REFRESH. Alle gegevens die beschikbaar zijn in de bron, worden opnieuw verwerkt met de meest recente definitie. Het is niet raadzaam om volledige vernieuwingen aan te roepen voor bronnen die de volledige geschiedenis van de gegevens niet behouden of korte bewaarperioden hebben, zoals Kafka, omdat de volledige vernieuwing de bestaande gegevens afkapt. Mogelijk kunt u oude gegevens niet herstellen als de gegevens niet meer beschikbaar zijn in de bron.

Beperkingen

  • Alleen eigenaren van tabellen kunnen streamingtabellen vernieuwen om de meest recente gegevens op te halen.

  • ALTER TABLE opdrachten zijn niet toegestaan voor streamingtabellen. De definitie en eigenschappen van de tabel moeten worden gewijzigd via de ALTER STREAMING TABLE instructie.

  • Query's voor tijdreizen worden niet ondersteund.

  • Het tabelschema ontwikkelen via DML-opdrachten zoals INSERT INTOen MERGE wordt niet ondersteund.

  • De volgende opdrachten worden niet ondersteund voor streamingtabellen:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing wordt niet ondersteund.

  • Het wijzigen van de naam van de tabel of het wijzigen van de eigenaar wordt niet ondersteund.

  • Tabelbeperkingen zoals PRIMARY KEY en FOREIGN KEY worden niet ondersteund.

  • Gegenereerde kolommen, identiteitskolommen en standaardkolommen worden niet ondersteund.

Voorbeelden

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');