Skrypt przepływu danych (DFS)

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

Przepływy danych są dostępne zarówno w usłudze Azure Data Factory, jak i w potokach usługi Azure Synapse. Ten artykuł dotyczy przepływów danych mapowania. Jeśli dopiero zaczynasz transformacje, zapoznaj się z artykułem wprowadzającym Przekształcanie danych przy użyciu przepływu danych mapowania.

Skrypt przepływu danych (DFS) to podstawowe metadane, podobne do języka kodowania, które są używane do wykonywania przekształceń zawartych w przepływie danych mapowania. Każda transformacja jest reprezentowana przez serię właściwości, które dostarczają niezbędnych informacji do prawidłowego uruchomienia zadania. Skrypt jest widoczny i edytowalny z usługi ADF, klikając przycisk "skrypt" na górnej wstążce interfejsu użytkownika przeglądarki.

Script button

Na przykład allowSchemaDrift: true, w przekształceniu źródłowym usługa informuje usługę o dołączeniu wszystkich kolumn ze źródłowego zestawu danych do przepływu danych, nawet jeśli nie są one uwzględnione w projekcji schematu.

Przypadki użycia

System plików DFS jest automatycznie generowany przez interfejs użytkownika. Możesz kliknąć przycisk Skrypt, aby wyświetlić i dostosować skrypt. Skrypty można również wygenerować poza interfejsem użytkownika usługi ADF, a następnie przekazać je do polecenia cmdlet programu PowerShell. Podczas debugowania złożonych przepływów danych można łatwiej przeskanować kod skryptu zamiast skanować reprezentację wykresu interfejsu użytkownika przepływów.

Oto kilka przykładowych przypadków użycia:

  • Programowo tworząc wiele przepływów danych, które są dość podobne, tj. przepływy danych "stamping-out".
  • Złożone wyrażenia, które są trudne do zarządzania w interfejsie użytkownika lub są wynikiem problemów z walidacją.
  • Debugowanie i lepsze zrozumienie różnych błędów zwracanych podczas wykonywania.

Podczas kompilowania skryptu przepływu danych do użycia z programem PowerShell lub interfejsem API należy zwinąć sformatowany tekst do pojedynczego wiersza. Karty i nowe linie można przechowywać jako znaki ucieczki. Jednak tekst musi być sformatowany w celu dopasowania do właściwości JSON. W dolnej części interfejsu użytkownika edytora skryptów znajduje się przycisk, który sformatuje skrypt jako pojedynczy wiersz.

Copy button

Jak dodawać przekształcenia

Dodawanie przekształceń wymaga trzech podstawowych kroków: dodanie podstawowych danych przekształcania, przekierowanie strumienia wejściowego, a następnie przekierowanie strumienia wyjściowego. Można to zobaczyć najłatwiej w przykładzie. Załóżmy, że zaczynamy od prostego źródła do ujścia przepływu danych w następujący sposób:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Jeśli zdecydujemy się dodać przekształcenie pochodne, najpierw musimy utworzyć podstawowy tekst przekształcenia, który zawiera proste wyrażenie, aby dodać nową wielką kolumnę o nazwie upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Następnie użyjemy istniejącego systemu plików DFS i dodamy przekształcenie:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Teraz przekierowujemy strumień przychodzący, identyfikując transformację, po której ma pojawić się nowa transformacja (w tym przypadku source1) i kopiując nazwę strumienia do nowej transformacji:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Na koniec zidentyfikujemy przekształcenie, które chcemy wykonać po tej nowej transformacji, i zastąp jego strumień wejściowy (w tym przypadku sink1) nazwą strumienia wyjściowego nowej transformacji:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Podstawy systemu plików DFS

System plików DFS składa się z serii połączonych przekształceń, w tym źródeł, ujść i różnych innych, które mogą dodawać nowe kolumny, filtrować dane, łączyć dane i wiele innych. Zazwyczaj skrypt rozpoczyna się od co najmniej jednego źródła, po którym następuje wiele przekształceń i kończy się co najmniej jednym ujściem.

Wszystkie źródła mają tę samą podstawową konstrukcję:

source(
  source properties
) ~> source_name

Na przykład proste źródło z trzema kolumnami (movieId, title, gatunek) to:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

Wszystkie przekształcenia inne niż źródła mają tę samą podstawową konstrukcję:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Na przykład prosta transformacja pochodna, która przyjmuje kolumnę (tytuł) i zastępuje ją wielką wersją, będzie następująca:

source1 derive(
  title = upper(title)
) ~> derive1

A ujście bez schematu byłoby:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Fragmenty kodu skryptu

Fragmenty skryptu to współużytkowalny kod skryptu Przepływ danych, którego można użyć do udostępniania między przepływami danych. W poniższym filmie wideo omówiono sposób używania fragmentów skryptów i używania skryptu Przepływ danych Do kopiowania i wklejania fragmentów skryptu za grafami przepływu danych:

Zagregowane statystyki podsumowania

Dodaj przekształcenie agregacji do przepływu danych o nazwie "SummaryStats", a następnie wklej poniższy kod dla funkcji agregującej w skrypcie, zastępując istniejące PodsumowanieStats. Zapewni to ogólny wzorzec statystyk podsumowania profilu danych.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Możesz również użyć poniższego przykładu, aby zliczyć liczbę unikatowych i liczbę odrębnych wierszy w danych. Poniższy przykład można wkleić do przepływu danych za pomocą przekształcenia agregacji o nazwie ValueDistAgg. W tym przykładzie użyto kolumny o nazwie "title". Pamiętaj, aby zastąpić ciąg "title" kolumną ciągu w danych, których chcesz użyć, aby pobrać liczby wartości.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Uwzględnij wszystkie kolumny w agregacji

Jest to ogólny wzorzec agregacji, który pokazuje, jak zachować pozostałe kolumny w metadanych wyjściowych podczas tworzenia agregacji. W tym przypadku użyjemy first() funkcji , aby wybrać pierwszą wartość w każdej kolumnie, której nazwa nie jest "filmem". Aby to użyć, utwórz przekształcenie agregacji o nazwie DistinctRows, a następnie wklej go w skrycie nad istniejącym skryptem agregacji DistinctRows.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Tworzenie odcisku palca skrótu wiersza

Użyj tego kodu w skrypcie przepływu danych, aby utworzyć nową kolumnę pochodną sha1 o nazwie DWhash , która generuje skrót trzech kolumn.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Możesz również użyć poniższego skryptu, aby wygenerować skrót wiersza przy użyciu wszystkich kolumn, które znajdują się w strumieniu, bez konieczności nazywania każdej kolumny:

derive(DWhash = sha1(columns())) ~> DWHash

odpowiednik String_agg

Ten kod będzie działać podobnie jak funkcja T-SQL string_agg() i agreguje wartości ciągów do tablicy. Następnie można rzutować tablicę do ciągu, który ma być używany z miejscami docelowymi SQL.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Liczba aktualizacji, operacji upsert, wstawiania, usuwania

W przypadku korzystania z przekształcenia Alter Row można policzyć liczbę aktualizacji, operacji upsert, wstawień, usuwa wynik z zasad Alter Row. Dodaj przekształcenie agregacji po alter row i wklej ten Przepływ danych Script do definicji agregującej dla tych liczb.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Odrębny wiersz przy użyciu wszystkich kolumn

Ten fragment kodu doda nową transformację Agregacja do przepływu danych, co spowoduje przejście do wszystkich kolumn przychodzących, wygenerowanie skrótu używanego do grupowania w celu wyeliminowania duplikatów, a następnie podanie pierwszego wystąpienia każdego duplikatu jako danych wyjściowych. Nie musisz jawnie nazywać kolumn, ale będą one generowane automatycznie na podstawie przychodzącego strumienia danych.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

Sprawdzanie list NUL we wszystkich kolumnach

Jest to fragment kodu, który można wkleić do przepływu danych, aby ogólnie sprawdzić wszystkie kolumny dla wartości NULL. Ta technika wykorzystuje dryf schematu do przeglądania wszystkich kolumn we wszystkich wierszach i używa podziału warunkowego, aby oddzielić wiersze od wierszy bez list NULLs.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

Dryf schematu automap z wybraną pozycją

Jeśli musisz załadować istniejący schemat bazy danych z nieznanego lub dynamicznego zestawu kolumn przychodzących, musisz zamapować kolumny po prawej stronie w transformacji ujścia. Jest to konieczne tylko wtedy, gdy ładujesz istniejącą tabelę. Dodaj ten fragment kodu przed ujściem, aby utworzyć pozycję Wybierz, która automatycznie mapuje kolumny. Pozostaw mapowanie ujścia na automapę.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Utrwalanie typów danych kolumn

Dodaj ten skrypt wewnątrz definicji kolumny pochodnej, aby przechowywać nazwy kolumn i typy danych z przepływu danych do magazynu trwałego przy użyciu ujścia.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Wypełnij w dół

Poniżej przedstawiono sposób implementowania typowego problemu "Wypełnij w dół" z zestawami danych, gdy chcesz zastąpić wartości NULL wartością z poprzedniej wartości innej niż NULL w sekwencji. Pamiętaj, że ta operacja może mieć negatywne konsekwencje dla wydajności, ponieważ należy utworzyć syntetyczne okno w całym zestawie danych z wartością kategorii "fikcyjne". Ponadto należy sortować według wartości, aby utworzyć właściwą sekwencję danych, aby znaleźć poprzednią wartość inną niż NULL. Poniższy fragment kodu tworzy kategorię syntetyczną jako "fikcyjną" i sortuje według klucza zastępczego. Możesz usunąć klucz zastępczy i użyć własnego klucza sortowania specyficznego dla danych. W tym fragmencie kodu założono, że dodano już transformację źródłową o nazwie source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Średnia ruchoma

Średnia ruchoma może być bardzo łatwo zaimplementowana w przepływach danych przy użyciu transformacji systemu Windows. W poniższym przykładzie przedstawiono tworzenie 15-dniowej średniej ruchomej cen akcji dla firmy Microsoft.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Unikatowa liczba wszystkich wartości kolumn

Ten skrypt służy do identyfikowania kolumn kluczy i wyświetlania kardynalności wszystkich kolumn w strumieniu przy użyciu pojedynczego fragmentu skryptu. Dodaj ten skrypt jako przekształcenie zagregowane do przepływu danych i automatycznie udostępni unikatowe liczby wszystkich kolumn.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Porównywanie poprzednich lub następnych wartości wierszy

W tym przykładowym fragmencie kodu pokazano, jak transformacja okna może służyć do porównywania wartości kolumn z bieżącego kontekstu wiersza z wartościami kolumn z wierszy przed i po bieżącym wierszu. W tym przykładzie kolumna pochodna służy do generowania fikcyjnej wartości w celu włączenia partycji okna w całym zestawie danych. Przekształcenie klucza zastępczego służy do przypisywania unikatowej wartości klucza dla każdego wiersza. Po zastosowaniu tego wzorca do przekształceń danych możesz usunąć klucz zastępczy, jeśli jesteś kolumną, którą chcesz zamówić, i możesz usunąć kolumnę pochodną, jeśli masz kolumny używane do partycjonowania danych.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Ile kolumn jest w moich danych?

size(array(columns()))

Zapoznaj się z Przepływ danych, zaczynając od artykułu omówienie przepływów danych