Sdílet prostřednictvím


Definování vlastního monitorování kanálů Delta Live Tables s využitím háků událostí

Důležité

Podpora pro háky událostí je ve verzi Public Preview.

Pomocí háků událostí můžete přidat vlastní funkce zpětného volání Pythonu, které se spustí, když se události zachovají do protokolu událostí kanálu Delta Live Tables. Pomocí hooků událostí můžete implementovat vlastní řešení monitorování a upozorňování. Pomocí háků událostí můžete například odesílat e-maily nebo zapisovat do protokolu, když dojde k určitým událostem nebo k integraci s řešeními třetích stran pro monitorování událostí kanálu.

Definujete háček události pomocí funkce Pythonu, která přijímá jeden argument, kde je argument slovníkem představujícím událost. Pak zahrnete háky událostí jako součást zdrojového kódu kanálu. Všechny události háky definované v kanálu se pokusí zpracovat všechny události vygenerované během každé aktualizace kanálu. Pokud se kanál skládá z několika artefaktů zdrojového kódu, například z několika poznámkových bloků, použijí se všechny definované háky událostí na celý kanál. I když jsou volání událostí zahrnutá do zdrojového kódu pro váš kanál, nejsou součástí grafu kanálu.

Můžete použít háky událostí s kanály, které publikují do metastoru Hive nebo katalogu Unity.

Poznámka:

  • Python je jediný jazyk podporovaný pro definování háků událostí.
  • Háky událostí se aktivují pouze pro události, ve kterých je STABLEmaturity_level .
  • Háky událostí se spouští asynchronně z aktualizací kanálu, ale synchronně s jinými háky událostí. To znamená, že najednou běží jenom jeden háček událostí a ostatní háky událostí čekají na spuštění, dokud se nespustí aktuálně spuštěný háček událostí. Pokud se háček události spustí na neomezenou dobu, zablokuje všechny ostatní háky událostí.
  • Rozdílové živé tabulky se pokusí spustit každou událost háku na každou událost vygenerované během aktualizace kanálu. Aby se zajistilo, že opožděné háky událostí mají čas zpracovat všechny události ve frontě, Delta Live Tables čeká před ukončením výpočetního výkonu kanálu nekonfigurovatelné pevné období. Není však zaručeno, že se všechny háky aktivují u všech událostí před ukončením výpočetních prostředků.

Monitorování zpracování háku událostí

hook_progress Pomocí typu události v protokolu událostí Delta Live Tables můžete monitorovat stav volání událostí aktualizace. Aby se zabránilo cyklovým závislostem, triggery událostí se neaktivují pro hook_progress události.

Definování háku události

K definování háku události použijte on_event_hook dekorátor:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

Popisuje max_allowable_consecutive_failures maximální počet po sobě jdoucích po sobě jdoucích selhání háku události předtím, než bude zakázán. Selhání háku události je definováno tak, jak kdykoli háček události vyvolá výjimku. Pokud je volání událostí zakázané, nezpracuje nové události, dokud se kanál nerestartuje.

max_allowable_consecutive_failures musí být celé číslo větší nebo rovno 0 nebo None. Hodnota None (přiřazená ve výchozím nastavení) znamená, že neexistuje žádné omezení počtu po sobě jdoucích selhání povolených pro háku události a háček událostí není nikdy zakázán.

Selhání háku událostí a zakázání háků událostí je možné monitorovat v protokolu událostí jako hook_progress události.

Funkce háku události musí být funkce Pythonu, která přijímá přesně jeden parametr, slovníkovou reprezentaci události, která aktivovala tento háček události. Jakákoli návratová hodnota funkce háku události se ignoruje.

Příklad: Výběr konkrétních událostí pro zpracování

Následující příklad ukazuje háku události, která vybere konkrétní události pro zpracování. Konkrétně tento příklad čeká na přijetí událostí kanálu STOPPING a výstupem zprávy do protokolů stdoutovladače .

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Příklad: Odeslání všech událostí do kanálu Slack

Následující příklad implementuje háček události, který odesílá všechny události přijaté do kanálu Slack pomocí rozhraní Slack API.

Tento příklad používá tajný kód Databricks k bezpečnému uložení tokenu potřebného k ověření v rozhraní Slack API.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Příklad: Konfigurace háku události pro zakázání po čtyřech po sobě jdoucích selháních

Následující příklad ukazuje, jak nakonfigurovat háček událostí, který je zakázán, pokud selže po sobě po sobě čtyřikrát.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Příklad: Kanál Delta Live Tables s hákem události

Následující příklad ukazuje přidání háku události do zdrojového kódu kanálu. Toto je jednoduchý, ale úplný příklad použití hooků událostí s kanálem.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })