VYTVOŘENÍ TABULKY STREAMOVÁNÍ

Platí pro:zaškrtnutí označeného ano Databricks SQL zaškrtnutí označeného ano Databricks Runtime 13.3 LTS a vyšší

Důležité

Tato funkce je ve verzi Public Preview.

Vytvoří streamovací tabulku, tabulku Delta s dodatečnou podporou streamování nebo přírůstkového zpracování dat.

Streamované tabulky jsou podporovány pouze v rozdílových živých tabulkách a v Databricks SQL s katalogem Unity. Spuštěním tohoto příkazu na podporovaných výpočetních prostředcích Databricks Runtime se analyzuje pouze syntaxe. Viz Implementace kanálu Delta Live Tables s SQL.

Syntaxe

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

Parametry

  • REFRESH

    Pokud je zadáno, aktualizuje tabulku nejnovějšími daty dostupnými ze zdrojů definovaných v dotazu. Pouze nová data, která přicházejí před zahájením dotazu, budou zpracována. Nová data, která se přidají do zdrojů během provádění příkazu, se ignorují až do další aktualizace.

  • POKUD NEEXISTUJE

    Pokud je zadána tabulka se stejným názvem již existuje, příkaz bude ignorován.

    IF NOT EXISTS nelze použít společně s REFRESH, což znamená, že CREATE OR REFRESH TABLE IF NOT EXISTS není povoleno.

  • Table_name

    Název tabulky, kterou chcete vytvořit. Název nesmí obsahovat dočasnou specifikaci. Pokud název není kvalifikovaný, vytvoří se tabulka v aktuálním schématu.

  • table_specification

    Tato volitelná klauzule definuje seznam sloupců, jejich typů, vlastností, popisů a omezení sloupců.

    Pokud nedefinujete sloupce ve schématu tabulky, je nutné zadat AS query.

    • column_identifier

      Jedinečný název sloupce

      • column_type

        Určuje datový typ sloupce.

      • NOT NULL

        Pokud je zadaný sloupec nepřijme NULL hodnoty.

      • COLUMN_COMMENT KOMENTÁŘE

        Řetězcový literál, který popisuje sloupec.

      • column_constraint

        Důležité

        Tato funkce je ve verzi Public Preview.

        Přidá omezení primárního klíče nebo cizího klíče do sloupce v tabulce streamování. Omezení nejsou podporována pro tabulky v hive_metastore katalogu.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ PŘI PORUŠENÍ { FAIL UPDATE | DROP ROW } ]

        Přidá do tabulky očekávání kvality dat. Tato očekávání kvality dat je možné sledovat v průběhu času a přistupovat k němu prostřednictvím protokolu událostí streamované tabulky. Očekávání FAIL UPDATE způsobí selhání zpracování při vytváření tabulky i při aktualizaci tabulky. Očekávání DROP ROW způsobí, že se celý řádek zahodí, pokud se očekávání nesplní.

        expectation_expr mohou se skládat z literálů, identifikátorů sloupců v tabulce a deterministické předdefinované funkce nebo operátory SQL s výjimkou:

        Nesmí obsahovat ani expr poddotaz.

      • table_constraint

        Důležité

        Tato funkce je ve verzi Public Preview.

        Přidá do tabulky streamování omezení informačního primárního klíče nebo informačního cizího klíče. Pro tabulky v hive_metastore katalogu nejsou podporována klíčová omezení.

  • table_clauses

    Volitelně můžete zadat dělení, komentáře, uživatelem definované vlastnosti a plán aktualizace nové tabulky. Každou dílčí klauzuli lze zadat pouze jednou.

    • DĚLENÉ PODLE

      Volitelný seznam sloupců tabulky, podle kterých chcete tabulku rozdělit.

    • TABLE_COMMENT KOMENTÁŘE

      Literál STRING , který popisuje tabulku.

    • TBLPROPERTIES

      Volitelně nastaví jednu nebo více uživatelem definovaných vlastností.

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      Pokud je k dispozici, naplánuje streamovací tabulku nebo materializované zobrazení, aby aktualizovala data s daným plánem quartz cron . Akceptují se pouze time_zone_values . AT TIME ZONE LOCAL není podporováno. Pokud AT TIME ZONE chybí, použije se časové pásmo relace. Pokud AT TIME ZONE chybí a časové pásmo relace není nastavené, vyvolá se chyba. SCHEDULE je sémanticky ekvivalentní SCHEDULE REFRESH.

      Syntaxi nelze použít SCHEDULE v definici kanálu Delta Live Tables.

      Klauzule SCHEDULE není povolena CREATE OR REFRESH v příkazu. Plán lze zadat jako součást CREATE příkazu. Pomocí příkazu ALTER STREAMING TABLE můžete po vytvoření změnit plán tabulky streamování.

  • Dotaz AS

    Tato klauzule naplní tabulku pomocí dat z query. Tento dotaz musí být streamovaným dotazem. Toho lze dosáhnout přidáním klíčového STREAM slova do libovolného vztahu, který chcete zpracovat přírůstkově. Když zadáte a query a společně table_specification , schéma tabulky zadané v table_specification musí obsahovat všechny sloupce vrácené adresou query, jinak se zobrazí chyba. Všechny sloupce zadané v table_specification vrácené hodnotě, které nejsou vráceny querynull při dotazech.

    Tato klauzule je vyžadována pro streamované tabulky vytvořené v Databricks SQL, ale nevyžaduje se v Delta Live Tables. Pokud tato klauzule není k dispozici v rozdílových živých tabulkách, musíte na tuto tabulku odkazovat v APPLY CHANGES příkazu v kanálu DLT. Viz Změna zachytávání dat pomocí SQL v rozdílových živých tabulkách.

Rozdíly mezi streamovanými tabulkami a jinými tabulkami

Streamované tabulky jsou stavové tabulky navržené tak, aby zpracovávaly každý řádek pouze jednou při zpracování rostoucí datové sady. Vzhledem k tomu, že většina datových sad v průběhu času roste, jsou streamované tabulky vhodné pro většinu úloh příjmu dat. Tabulky streamování jsou optimální pro kanály, které vyžadují aktuálnost dat a nízkou latenci. Streamované tabulky můžou být také užitečné pro masivní transformace škálování, protože výsledky se dají postupně vypočítat při příchodu nových dat, přičemž výsledky budou aktuální, aniž by bylo nutné plně překompilovat všechna zdrojová data s každou aktualizací. Streamované tabulky jsou navržené pro zdroje dat, které jsou jen pro připojení.

Streamované tabulky přijímají další příkazy, například REFRESH, které zpracovávají nejnovější data dostupná ve zdrojích poskytovaných v dotazu. Změny zadaného dotazu se projeví jenom na nových datech voláním REFRESHdříve nezpracovaných dat. Pokud chcete změny použít i u existujících dat, musíte REFRESH TABLE <table_name> FULL provést provedení příkazu FULL REFRESH. Úplné aktualizace znovu zpracovávají všechna data dostupná ve zdroji s nejnovější definicí. Nedoporučuje se volat úplné aktualizace zdrojů, které nezachovají celou historii dat nebo mají krátké doby uchovávání, například Kafka, protože úplná aktualizace zkracuje stávající data. Pokud už data nejsou ve zdroji dostupná, možná nebudete moct obnovit stará data.

Omezení

  • Nejnovější data můžou získat jenom vlastníci tabulek, kteří můžou aktualizovat streamované tabulky.

  • ALTER TABLE příkazy jsou u streamovaných tabulek zakázány. Definice a vlastnosti tabulky by měly být změněny příkazem ALTER STREAMING TABLE .

  • Dotazy na časovou cestu se nepodporují.

  • Vývoj schématu tabulky pomocí příkazů DML, jako je INSERT INTO, a MERGE není podporován.

  • U streamovaných tabulek se nepodporují následující příkazy:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Rozdílové sdílení se nepodporuje.

  • Přejmenování tabulky nebo změna vlastníka se nepodporuje.

  • Omezení tabulek, jako PRIMARY KEY jsou a FOREIGN KEY nejsou podporována.

  • Vygenerované sloupce, sloupce identit a výchozí sloupce se nepodporují.

Příklady

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');