Adatfolyam-szkript (DFS)

A következőkre vonatkozik: Azure Data Factory Azure Synapse Analytics

Tipp.

Próbálja ki a Data Factoryt a Microsoft Fabricben, amely egy teljes körű elemzési megoldás a nagyvállalatok számára. A Microsoft Fabric az adattovábbítástól az adatelemzésig, a valós idejű elemzésig, az üzleti intelligenciáig és a jelentéskészítésig mindent lefed. Ismerje meg, hogyan indíthat új próbaverziót ingyenesen!

Az adatfolyamok az Azure Data Factoryben és az Azure Synapse Pipelinesban is elérhetők. Ez a cikk az adatfolyamok leképezésére vonatkozik. Ha még nem használta az átalakításokat, tekintse meg az adatok leképezési adatfolyam használatával történő átalakításáról szóló bevezető cikket.

Az adatfolyam-szkript (DFS) a kódolási nyelvhez hasonló mögöttes metaadatok, amelyek a leképezési adatfolyamban található átalakítások végrehajtására szolgálnak. Minden átalakítást olyan tulajdonságok sorozata jelöl, amelyek biztosítják a feladat megfelelő futtatásához szükséges információkat. A szkript látható és szerkeszthető az ADF-ből a böngésző felhasználói felületének felső menüszalagján található "szkript" gombra kattintva.

Script button

Egy forrásátalakítás például arra utasítja a szolgáltatást, allowSchemaDrift: true, hogy a forrásadatkészlet összes oszlopát belefoglalja az adatfolyamba, még akkor is, ha azok nem szerepelnek a sémavetítésben.

Használati esetek

Az elosztott fájlrendszert a felhasználói felület automatikusan hozza létre. A szkript megtekintéséhez és testreszabásához kattintson a Szkript gombra. Szkripteket az ADF felhasználói felületén kívül is létrehozhat, majd továbbíthatja azt a PowerShell-parancsmagba. Összetett adatfolyamok hibakeresése esetén egyszerűbb lehet a szkript mögötti kód beolvasása a folyamatok felhasználói felületi gráfképének vizsgálata helyett.

Íme néhány példa használati esetekre:

  • Programozott módon számos, meglehetősen hasonló adatfolyamot hoz létre, azaz "kibélyegezi" az adatfolyamokat.
  • Összetett kifejezések, amelyek nehezen kezelhetők a felhasználói felületen, vagy amelyek érvényesítési problémákat eredményeznek.
  • Hibakeresés és a végrehajtás során visszaadott különböző hibák jobb megértése.

Amikor powershell-lel vagy API-val használható adatfolyam-szkriptet hoz létre, a formázott szöveget egyetlen sorba kell összecsuknia. A tabulátorokat és az új vonalakat feloldó karakterekként is megtarthatja. A szöveget azonban úgy kell formázni, hogy elférjen egy JSON-tulajdonságban. A szkriptszerkesztő felhasználói felületén alul található egy gomb, amely egyetlen sorként formázza a szkriptet.

Copy button

Átalakítások hozzáadása

Az átalakítások hozzáadásához három alapvető lépés szükséges: az alapvető átalakítási adatok hozzáadása, a bemeneti adatfolyam átirányítása, majd a kimeneti adatfolyam átirányítása. Ez egy példában a legkönnyebben látható. Tegyük fel, hogy egy egyszerű forrással kezdjük, amely az adatfolyamot az alábbihoz hasonlóan nyeli el:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Ha úgy döntünk, hogy származtatott átalakítást adunk hozzá, először létre kell hoznunk az alapvető átalakítási szöveget, amely egy egyszerű kifejezéssel rendelkezik egy új nagybetűs oszlop upperCaseTitlehozzáadásához:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Ezután a meglévő elosztott fájlrendszert vesszük fel, és hozzáadjuk az átalakítást:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Most pedig átirányítjuk a bejövő streamet úgy, source1hogy azonosítjuk, hogy melyik átalakítás után érkezik az új átalakítás (ebben az esetben) és másolja a stream nevét az új átalakításra:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Végül azonosítjuk azt az átalakítást, amelyet az új átalakítás után el szeretnénk végezni, és lecseréljük a bemeneti adatfolyamot (ebben az esetben sink1) az új átalakítás kimeneti streamjének nevére:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Az elosztott fájlrendszer alapjai

Az elosztott fájlrendszer összekapcsolt átalakítások sorozatából áll, beleértve a forrásokat, a fogadókat és másokat, amelyek új oszlopokat adhatnak hozzá, adatokat szűrhetnek, adatokat illeszthetnek össze és még sok mást. A szkript általában egy vagy több forrással kezdődik, amelyet számos átalakítás követ, és egy vagy több fogadóval végződik.

A források mindegyikének ugyanaz az alapszerkezete:

source(
  source properties
) ~> source_name

Egy három oszlopból (movieId, cím, műfajok) álló egyszerű forrás például a következő:

source(output(
        movieId as string,
        title as string,
        genres as string
    ),
    allowSchemaDrift: true,
    validateSchema: false) ~> source1

A forrásokon kívül minden átalakításnak ugyanaz az alapszerkezete:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

Például egy egyszerű származtatott átalakítás, amely egy oszlopot (címet) vesz fel, és felülírja nagybetűs verzióval, a következő:

source1 derive(
  title = upper(title)
) ~> derive1

Egy séma nélküli fogadó pedig a következő:

derive1 sink(allowSchemaDrift: true,
    validateSchema: false) ~> sink1

Szkriptrészletek

A szkriptrészletek Adatfolyam szkriptek megosztható kódjai, amelyeket az adatfolyamok közötti megosztáshoz használhat. Ez az alábbi videó bemutatja, hogyan használhat szkriptrészleteket, és hogyan használhatja Adatfolyam szkriptet a szkript egyes részeinek másolásához és beillesztéséhez az adatfolyam-grafikonok mögött:

Összesített összesítő statisztikák

Adjon hozzá egy "SummaryStats" nevű összesítési átalakítást az adatfolyamhoz, majd illessze be az alábbi kódba a szkript összesítő függvényéhez a meglévő SummaryStats helyére. Ez általános mintát biztosít az adatprofil-összefoglaló statisztikákhoz.

aggregate(each(match(true()), $$+'_NotNull' = countIf(!isNull($$)), $$ + '_Null' = countIf(isNull($$))),
		each(match(type=='double'||type=='integer'||type=='short'||type=='decimal'), $$+'_stddev' = round(stddev($$),2), $$ + '_min' = min ($$), $$ + '_max' = max($$), $$ + '_average' = round(avg($$),2), $$ + '_variance' = round(variance($$),2)),
		each(match(type=='string'), $$+'_maxLength' = max(length($$)))) ~> SummaryStats

Az alábbi mintával megszámolhatja az egyedi sorok számát és a különböző sorok számát az adatokban. Az alábbi példa egy ValueDistAgg nevű összesített átalakítással beilleszthető egy adatfolyamba. Ez a példa egy "title" nevű oszlopot használ. Mindenképpen cserélje le a "title" kifejezést az adatok azon sztringoszlopára, amelyet az értékek számának lekéréséhez szeretne használni.

aggregate(groupBy(title),
	countunique = count()) ~> ValueDistAgg
ValueDistAgg aggregate(numofunique = countIf(countunique==1),
		numofdistinct = countDistinct(title)) ~> UniqDist

Az összes oszlop belefoglalása összesítésbe

Ez egy általános összesítési minta, amely bemutatja, hogyan tarthatja meg a kimeneti metaadatok fennmaradó oszlopait az aggregátumok létrehozásakor. Ebben az esetben a függvény használatával first() választjuk ki az első értéket minden olyan oszlopban, amelynek a neve nem "film". Ennek használatához hozzon létre egy DistinctRows nevű összesítő átalakítást, majd illessze be a szkriptbe a meglévő DistinctRows összesítő szkript fölé.

aggregate(groupBy(movie),
	each(match(name!='movie'), $$ = first($$))) ~> DistinctRows

Sorkivonat ujjlenyomatának létrehozása

Ezzel a kóddal hozhat létre egy új, három oszlopból álló kivonatot létrehozósha1, származtatott oszlopot DWhash az adatfolyam-szkriptben.

derive(DWhash = sha1(Name,ProductNumber,Color)) ~> DWHash

Az alábbi szkripttel is létrehozhat egy sorkivonatot a streamben található összes oszlop használatával anélkül, hogy az egyes oszlopokat el kellene neveznie:

derive(DWhash = sha1(columns())) ~> DWHash

String_agg egyenértékű

Ez a kód a T-SQL string_agg() függvényhez hasonlóan fog működni, és egy tömbbe összesíti a sztringértékeket. Ezután a tömböt egy sztringbe helyezheti az SQL-célhelyekkel való használathoz.

source1 aggregate(groupBy(year),
	string_agg = collect(title)) ~> Aggregate1
Aggregate1 derive(string_agg = toString(string_agg)) ~> StringAgg

Frissítések, upserts, inserts, deletes száma

Az Alter Row átalakítás használatakor érdemes lehet megszámolni az Alter Row-szabályzatokból eredő frissítések, upsertsek, beszúrások és törlések számát. Adjon hozzá egy összesítési átalakítást az alter sor után, és illessze be ezt a Adatfolyam szkriptet a darabszám összesítési definíciójába.

aggregate(updates = countIf(isUpdate(), 1),
		inserts = countIf(isInsert(), 1),
		upserts = countIf(isUpsert(), 1),
		deletes = countIf(isDelete(),1)) ~> RowCount

Különálló sor az összes oszlop használatával

Ez a kódrészlet új összesítési átalakítást ad hozzá az adatfolyamhoz, amely az összes bejövő oszlopot átveszi, létrehoz egy kivonatot, amelyet a csoportosításhoz használnak az ismétlődések kiküszöböléséhez, majd kimenetként adja meg az egyes duplikált elemek első előfordulását. Nem kell explicit módon elneveznie az oszlopokat, azok automatikusan létrejönnek a bejövő adatfolyamból.

aggregate(groupBy(mycols = sha2(256,columns())),
    each(match(true()), $$ = first($$))) ~> DistinctRows

NULL-ek keresése az összes oszlopban

Ez egy kódrészlet, amelyet beilleszthet az adatfolyamba, hogy általánosan ellenőrizze az összes oszlop null értékeit. Ez a technika a sémaeltolódást használja az összes sor összes oszlopának megtekintéséhez, és feltételes felosztással választja el a NULL-ekkel rendelkező sorokat a NULL-ekkel rendelkező soroktól.

split(contains(array(toString(columns())),isNull(#item)),
	disjoint: false) ~> LookForNULLs@(hasNULLs, noNULLs)

AutoMap-sémaeltolódás kijelöléssel

Ha egy meglévő adatbázissémát ismeretlen vagy dinamikus bejövő oszlopok készletéből kell betöltenie, a Fogadó transzformációban le kell képeznie a jobb oldali oszlopokat. Erre csak akkor van szükség, ha egy meglévő táblát tölt be. Adja hozzá ezt a kódrészletet a Fogadó elé, és hozzon létre egy Választót, amely automatikusan megfelelteti az oszlopokat. Hagyja automatikus leképezésre a fogadóleképezést.

select(mapColumn(
		each(match(true()))
	),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> automap

Oszlopadattípusok megőrzése

Adja hozzá ezt a szkriptet egy származtatott oszlopdefinícióhoz, hogy az adatfolyam oszlopneveit és adattípusait egy állandó tárolóba tárolja egy fogadó használatával.

derive(each(match(type=='string'), $$ = 'string'),
	each(match(type=='integer'), $$ = 'integer'),
	each(match(type=='short'), $$ = 'short'),
	each(match(type=='complex'), $$ = 'complex'),
	each(match(type=='array'), $$ = 'array'),
	each(match(type=='float'), $$ = 'float'),
	each(match(type=='date'), $$ = 'date'),
	each(match(type=='timestamp'), $$ = 'timestamp'),
	each(match(type=='boolean'), $$ = 'boolean'),
	each(match(type=='long'), $$ = 'long'),
	each(match(type=='double'), $$ = 'double')) ~> DerivedColumn1

Kitöltés lefelé

Az alábbi módon valósíthatja meg az adathalmazokkal kapcsolatos gyakori "Kitöltés lefelé" problémát, ha a NULL értékeket a sorozat előző nem NULL értékének értékére szeretné cserélni. Vegye figyelembe, hogy ez a művelet negatív hatással lehet a teljesítményre, mivel a teljes adatkészleten létre kell hoznia egy szintetikus ablakot egy "dummy" kategóriaértékkel. Emellett egy érték alapján kell rendeznie a megfelelő adatütemezést az előző nem NULL érték megkereséséhez. Ez az alábbi kódrészlet a szintetikus kategóriát "dummy" néven hozza létre, és helyettesítő kulccsal rendezi. Eltávolíthatja a helyettesítő kulcsot, és használhatja a saját adatspecifikus rendezési kulcsát. Ez a kódrészlet feltételezi, hogy már hozzáadott egy forrásátalakítást source1

source1 derive(dummy = 1) ~> DerivedColumn
DerivedColumn keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey window(over(dummy),
	asc(sk, true),
	Rating2 = coalesce(Rating, last(Rating, true()))) ~> Window1

Mozgóátlag

A mozgóátlag nagyon könnyen implementálható az adatfolyamokban Windows-átalakítással. Az alábbi példa a Microsoft részvényárfolyamainak 15 napos mozgóátlagát hozza létre.

window(over(stocksymbol),
	asc(Date, true),
	startRowOffset: -7L,
	endRowOffset: 7L,
	FifteenDayMovingAvg = round(avg(Close),2)) ~> Window1

Az összes oszlopérték eltérő száma

Ezzel a szkripttel azonosíthatja a kulcsoszlopokat, és egyetlen szkriptrészlettel megtekintheti a stream összes oszlopának számosságát. Adja hozzá ezt a szkriptet összesített átalakításként az adatfolyamhoz, és automatikusan különböző oszlopszámokat biztosít.

aggregate(each(match(true()), $$ = countDistinct($$))) ~> KeyPattern

Előző vagy következő sorértékek összehasonlítása

Ez a mintarészlet bemutatja, hogyan használható az Ablak átalakítás az aktuális sorkörnyezet oszlopértékeinek összehasonlítására az aktuális sor előtti és utáni sorok oszlopértékeivel. Ebben a példában egy származtatott oszlopot használunk egy hamis érték létrehozásához, amely lehetővé teszi egy ablakpartíciót a teljes adatkészleten. A helyettesítő kulcs átalakítással minden sorhoz egyedi kulcsértéket rendelhet hozzá. Ha ezt a mintát alkalmazza az adatátalakításokra, eltávolíthatja a helyettesítő kulcsot, ha ön egy olyan oszlop, amelyet rendezni szeretne, és eltávolíthatja a származtatott oszlopot, ha oszlopokkal szeretné particionálni az adatokat.

source1 keyGenerate(output(sk as long),
	startAt: 1L) ~> SurrogateKey1
SurrogateKey1 derive(dummy = 1) ~> DerivedColumn1
DerivedColumn1 window(over(dummy),
	asc(sk, true),
	prevAndCurr = lag(title,1)+'-'+last(title),
		nextAndCurr = lead(title,1)+'-'+last(title)) ~> leadAndLag

Hány oszlop található az adataimban?

size(array(columns()))

Az adatfolyamok áttekintési cikkével ismerkedhet meg a Adatfolyam