Lekérdezés-párhuzamosítás használata az Azure Stream Analyticsben

Ez a cikk bemutatja, hogyan használhatja ki a párhuzamosítás előnyeit az Azure Stream Analyticsben. Megtudhatja, hogyan méretezheti a Stream Analytics-feladatokat a bemeneti partíciók konfigurálásával és az elemzési lekérdezés definíciójának finomhangolásával.

Előfeltételként érdemes lehet megismerkednie a streamelési egység fogalmával, amely a streamelési egységek értelmezésében és módosításában szerepel.

Mik a Stream Analytics-feladatok részei?

A Stream Analytics-feladatok definíciója legalább egy streambemeneti bemenetet, lekérdezést és kimenetet tartalmaz. A bemenetek azok, amelyekből a feladat beolvassa az adatfolyamot. A lekérdezés az adatbemeneti adatfolyam átalakítására szolgál, és a kimenet az, ahová a feladat elküldi a feladat eredményeit.

Partíciók bemenetekben és kimenetekben

A particionálás lehetővé teszi az adatok részhalmazokra való felosztását egy partíciókulcs alapján. Ha a bemenetet (például az Event Hubsot) egy kulcs particionálta, javasoljuk, hogy adja meg a partíciókulcsot, amikor bemenetet ad hozzá a Stream Analytics-feladathoz. A Stream Analytics-feladatok méretezése kihasználja a bemeneti és kimeneti partíciók előnyeit. A Stream Analytics-feladatok különböző partíciókat használhatnak és írhatnak párhuzamosan, ami növeli az átviteli sebességet.

Bevitelek

Minden Azure Stream Analytics-streambemenet kihasználhatja a particionálás előnyeit: Event Hubs, IoT Hub, Blob Storage, Data Lake Storage Gen2.

Feljegyzés

Az 1.2-es és újabb kompatibilitási szint esetén a partíciókulcsot bemeneti tulajdonságként kell beállítani, és nincs szükség a PARTITION BY kulcsszóra a lekérdezésben. Az 1.1-es és újabb kompatibilitási szint esetén a partíciókulcsot a partíciókulcsot a partíciónkénti kulcsszóval kell definiálni a lekérdezésben.

Kimenetek

A Stream Analytics használatakor kihasználhatja a particionálás előnyeit a kimenetekben:

  • Azure Data Lake Storage
  • Azure Functions
  • Azure-tábla
  • Blob Storage (explicit módon beállíthatja a partíciókulcsot)
  • Azure Cosmos DB (explicit módon kell beállítani a partíciókulcsot)
  • Event Hubs (explicit módon kell beállítani a partíciókulcsot)
  • IoT Hub (explicit módon kell beállítani a partíciókulcsot)
  • Service Bus
  • SQL és Azure Synapse Analytics opcionális particionálással: további információk az Azure SQL Database-beli kimenetről.

A Power BI nem támogatja a particionálást. A bemenetet azonban továbbra is particionálásra használhatja az ebben a szakaszban leírtak szerint.

A partíciókkal kapcsolatos további információkért tekintse meg a következő cikkeket:

Lekérdezés

Ahhoz, hogy egy feladat párhuzamos legyen, a partíciókulcsokat az összes bemenethez, az összes lekérdezési logikai lépéshez és az összes kimenethez kell igazítani. A lekérdezési logikai particionálást az illesztésekhez és aggregációkhoz (GROUP BY) használt kulcsok határozzák meg. Ez az utolsó követelmény figyelmen kívül hagyható, ha a lekérdezési logika nincs kulcsban (kivetítés, szűrők, hivatkozási illesztések...).

  • Ha egy bemenetet és kimenetet particionált WarehouseIda rendszer, és a lekérdezési csoportok ProductId nem WarehouseId, akkor a feladat nem párhuzamos.
  • Ha két csatlakoztatni kívánt bemenetet különböző partíciókulcsok (WarehouseId és ProductId) particionálnak, akkor a feladat nem párhuzamos.
  • Ha két vagy több független adatfolyam található egyetlen feladatban, mindegyiknek saját partíciókulcsa van, akkor a feladat nem párhuzamos.

A feladat csak akkor párhuzamos, ha az összes bemenet, kimenet és lekérdezési lépés ugyanazt a kulcsot használja.

Kínosan párhuzamos feladatok

A kínosan párhuzamos feladat az Azure Stream Analytics legskálázhatóbb forgatókönyve. A bemenet egy partícióját csatlakoztatja a lekérdezés egy példányához a kimenet egyik partíciójára. Ez a párhuzamosság a következő követelményekkel rendelkezik:

  • Ha a lekérdezési logika attól függ, hogy ugyanazt a kulcsot dolgozza fel ugyanaz a lekérdezéspéldány, győződjön meg arról, hogy az események a bemenet ugyanazon partíciójára kerülnek. Az Event Hubs vagy az IoT Hub esetében ez azt jelenti, hogy az eseményadatoknak a PartitionKey értékkészlettel kell rendelkezniük. Másik lehetőségként particionált feladókat is használhat. Blob Storage esetén ez azt jelenti, hogy az események ugyanabba a partíciómappába kerülnek. Ilyen például egy olyan lekérdezéspéldány, amely felhasználóazonosítónként összesíti az adatokat, ahol a bemeneti eseményközpont particionálása a userID használatával történik partíciókulcsként. Ha azonban a lekérdezési logikához nem szükséges ugyanaz a kulcs ugyanazzal a lekérdezéspéldánysal feldolgozni, figyelmen kívül hagyhatja ezt a követelményt. Erre a logikára példa egy egyszerű select-project-filter lekérdezés.

  • A következő lépés a lekérdezés particionálása. Az 1.2-es vagy újabb kompatibilitási szinttel rendelkező feladatok esetében (ajánlott) az egyéni oszlop partíciókulcsként adható meg a bemeneti beállításokban, és a feladat automatikusan párhuzamos lesz. Az 1.0-s vagy 1.1-es kompatibilitási szinttel rendelkező feladatokhoz a PARTITION BY PartitionId azonosítót kell használnia a lekérdezés minden lépésében. Több lépés is engedélyezett, de mindegyiknek ugyanazzal a kulccsal kell particionálást végeznie.

  • A Stream Analyticsben támogatott kimenetek többsége kihasználhatja a particionálás előnyeit. Ha olyan kimeneti típust használ, amely nem támogatja a feladat particionálását, az nem lesz kínosan párhuzamos. Az Event Hubs kimenete esetén győződjön meg arról, hogy a partíciókulcs oszlopa a lekérdezésben használt partíciókulcsra van beállítva. További információkért lásd a kimeneti szakaszt.

  • A bemeneti partíciók számának meg kell egyezik a kimeneti partíciók számával. A Blob Storage-kimenet támogatja a partíciókat, és örökli a felsőbb rétegbeli lekérdezés particionálási sémáját. A Blob Storage partíciókulcsának megadásakor az adatok particionálása bemeneti partíciónként történik, így az eredmény továbbra is teljesen párhuzamos marad. Íme néhány példa a partícióértékekre, amelyek teljes mértékben párhuzamos feladatokat engedélyeznek:

    • Nyolc eseményközpont bemeneti partíciója és nyolc eseményközpont kimeneti partíciója
    • Nyolc eseményközpont bemeneti partíciója és blobtároló kimenete
    • Nyolc eseményközpont bemeneti partíciója és egy egyéni mező által particionált Blob Storage-kimenet tetszőleges számossággal
    • Nyolc blobtároló bemeneti partíciója és blobtároló kimenete
    • Nyolc blobtároló bemeneti partíciója és nyolc eseményközpont kimeneti partíciója

Az alábbi szakaszok néhány olyan példaforgatókönyvet mutatnak be, amelyek kínosan párhuzamosak.

Egyszerű lekérdezés

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Egy nyolc partícióval rendelkező eseményközpontot ("Partíciókulcs oszlopát" kell használni PartitionId)

Lekérdezés:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

Ez a lekérdezés egy egyszerű szűrő. Ezért nem kell aggódnunk az eseményközpontba küldött bemenet particionálása miatt. Figyelje meg, hogy az 1.2 előtti kompatibilitási szinttel rendelkező feladatoknak tartalmazniuk kell a PARTITION BY PartitionId záradékot, így teljesíti a korábbi 2. követelményt. A kimenethez konfigurálnunk kell az eseményközpont kimenetét a feladatban úgy, hogy a partíciókulcs a PartitionId értékre legyen állítva. Az utolsó ellenőrzés annak ellenőrzése, hogy a bemeneti partíciók száma megegyezik-e a kimeneti partíciók számával.

Lekérdezés csoportosítási kulccsal

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Blob Storage

Lekérdezés:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Ez a lekérdezés egy csoportosítási kulccsal rendelkezik. Ezért a csoportosított eseményeket ugyanarra az Event Hubs-partícióra kell elküldeni. Mivel ebben a példában a TollBoothID szerint csoportosítunk, meg kell győződnünk arról, hogy TollBoothID ez lesz a partíciókulcs, amikor az eseményeket elküldjük az Event Hubsnak. Ezután az Azure Stream Analyticsben a PARTITION BY PartitionId használatával örökölheti ezt a partíciós sémát, és engedélyezheti a teljes párhuzamosítást. Mivel a kimenet blobtároló, nem kell a 4. követelménynek megfelelően konfigurálnunk egy partíciókulcs-értéket.

Példa olyan forgatókönyvekre, amelyek nem* kínosan párhuzamosak

Az előző szakaszban a cikk néhány kínosan párhuzamos forgatókönyvet tárgyalt. Ebben a szakaszban olyan forgatókönyveket ismerhet meg, amelyek nem felelnek meg az összes követelménynek, hogy kínosan párhuzamosak legyenek.

Nem egyező partíciók száma

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Eseményközpont 32 partícióval

Ha a bemeneti partíciók száma nem egyezik meg a kimeneti partíciók számával, a topológia nem kínosan párhuzamos a lekérdezéstől függetlenül. Azonban továbbra is kaphatunk némi párhuzamosítási szintet.

Lekérdezés nem particionált kimenettel

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Power BI

A Power BI-kimenet jelenleg nem támogatja a particionálást. Ezért ez a forgatókönyv nem kínosan párhuzamos.

Többlépéses lekérdezés különböző PARTITION BY értékekkel

  • Bemenet: Eseményközpont nyolc partícióval
  • Kimenet: Eseményközpont nyolc partícióval
  • Kompatibilitási szint: 1.0 vagy 1.1

Lekérdezés:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Mint látható, a második lépés a TollBoothId azonosítót használja particionálási kulcsként. Ez a lépés nem ugyanaz, mint az első lépés, ezért szükség van egy shuffle-ra.

Többlépéses lekérdezés különböző PARTITION BY értékekkel

  • Bemenet: Eseményközpont nyolc partícióval ("Partíciókulcs oszlopa" nincs beállítva, alapértelmezés szerint "PartitionId")
  • Kimenet: Nyolc partícióval rendelkező eseményközpont ("A partíciókulcs oszlopát a "TollBoothId" használatára kell beállítani)
  • Kompatibilitási szint – 1,2 vagy újabb

Lekérdezés:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Az 1.2-es vagy újabb kompatibilitási szint alapértelmezés szerint lehetővé teszi a párhuzamos lekérdezések végrehajtását. Az előző szakasz lekérdezése például particionálásra kerül, ha a "TollBoothId" oszlop bemeneti partíciókulcsként van beállítva. A PARTITION BY PartitionId záradék nem szükséges.

Feladat maximális streamelési egységeinek kiszámítása

A Stream Analytics-feladatok által használható streamelési egységek teljes száma a feladathoz definiált lekérdezés lépéseinek számától és az egyes lépések partícióinak számától függ.

A lekérdezés lépései

A lekérdezések egy vagy több lépéssel is rendelkezhetnek. Minden lépés egy, a WITH kulcsszó által definiált részquery. A WITH kulcsszón kívül eső lekérdezés (csak egy lekérdezés) is lépésnek számít, például a Standard kiadás LECT utasítást a következő lekérdezésben:

Lekérdezés:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

Ez a lekérdezés két lépésből áll.

Feljegyzés

Ezt a lekérdezést a cikk későbbi részében részletesebben tárgyaljuk.

Lépés particionálása

A lépés particionálásához a következő feltételek szükségesek:

  • A bemeneti forrást particionáltnak kell lennie.
  • A lekérdezés Standard kiadás LECT utasításának particionált bemeneti forrásból kell beolvasnia.
  • A lépésen belüli lekérdezésnek a PARTITION BY kulcsszóval kell rendelkeznie.

A lekérdezés particionálásakor a bemeneti események külön partíciócsoportokban lesznek feldolgozva és összesítve, a kimeneti események pedig mindegyik csoporthoz létre lesznek hozva. Ha kombinált aggregátumot szeretne, létre kell hoznia egy második nem particionált lépést az összesítéshez.

Feladat maximális streamelési egységeinek kiszámítása

A nem particionált lépések együttesen egy Stream Analytics-feladat streamelési egységére (SU V2-ekre) méretezhetők. Emellett egy particionált lépésben minden partícióhoz hozzáadhat egy SU V2-t. Az alábbi táblázatban néhány példát láthat.

Lekérdezés A feladathoz tartozó termékváltozatok maximális száma
  • A lekérdezés egy lépést tartalmaz.
  • A lépés nincs particionálva.
1 SU V2
  • A bemeneti adatfolyam particionálása 16-tal történik.
  • A lekérdezés egy lépést tartalmaz.
  • A lépés particionálása történik.
16 SU V2 (1 * 16 partíció)
  • A lekérdezés két lépést tartalmaz.
  • Egyik lépés sem particionált.
1 SU V2
  • A bemeneti adatfolyam particionálása 3-ra történik.
  • A lekérdezés két lépést tartalmaz. A bemeneti lépés particionálva van, a második lépés pedig nem.
  • A Standard kiadás LECT utasítás a particionált bemenetből olvas be.
4 SU V2s (3 particionált lépésekhez + 1 nem particionált lépések esetén

Példák a skálázásra

Az alábbi lekérdezés kiszámítja az autók számát egy háromperces ablakban, amely egy három útdíjköteles útdíj-állomáson halad át. Ez a lekérdezés egy SU V2-re skálázható.

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

Ha több termékváltozatot szeretne használni a lekérdezéshez, a bemeneti adatfolyamot és a lekérdezést is particionelni kell. Mivel az adatfolyam partíciója 3-ra van állítva, a következő módosított lekérdezés 3 SU V2-re skálázható:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

A lekérdezés particionálásakor a bemeneti események feldolgozása és összesítése külön partíciócsoportokban történik. A kimeneti események az egyes csoportokhoz is létre lesznek hozva. A particionálás váratlan eredményeket okozhat, ha a GROUP BY mező nem a bemeneti adatfolyam partíciókulcsa. Az előző lekérdezés TollBoothId mezője például nem az Input1 partíciókulcsa. Az eredmény az, hogy a TollBooth #1 adatai több partícióban is eloszthatók.

Az Egyes Bemeneti1 partíciókat a Stream Analytics külön dolgozza fel. Ennek eredményeképpen több rekord is létrejön ugyanahhoz az útdíjköteles autóhoz ugyanabban a bukóablakban. Ha a bemeneti partíciókulcs nem módosítható, ez a probléma megoldható egy nem particionálási lépés hozzáadásával a partíciók értékeinek összesítéséhez, ahogyan az alábbi példában is látható:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

Ez a lekérdezés 4 SU V2-re skálázható.

Feljegyzés

Ha két streamet csatlakoztat, győződjön meg arról, hogy a streameket particionálta az illesztések létrehozásához használt oszlop partíciókulcsa. Győződjön meg arról is, hogy mindkét streamben ugyanannyi partíció található.

Nagyobb átviteli sebesség elérése nagy méretekben

Egy kínosan párhuzamos feladat szükséges, de nem elegendő a nagyobb átviteli sebesség nagy léptékű fenntartásához. Minden tárolórendszer és annak megfelelő Stream Analytics-kimenete eltérően rendelkezik a lehető legjobb írási átviteli sebesség elérésének módjával. Mint minden nagy léptékű forgatókönyv esetében, itt is vannak olyan kihívások, amelyek a megfelelő konfigurációk használatával oldhatók meg. Ez a szakasz néhány gyakori kimenet konfigurációit ismerteti, és mintákat tartalmaz az 1 K, 5 K és 10 K események másodpercenkénti betöltési sebességének fenntartásához.

Az alábbi megfigyelések egy Stream Analytics-feladatot használnak állapot nélküli (átmenő) lekérdezéssel, amely egy alapszintű JavaScript UDF, amely az Event Hubsba, az Azure SQL-be vagy az Azure Cosmos DB-be ír.

Event Hubs

Betöltési sebesség (események másodpercenként) Folyamatos átviteli egységek Kimeneti erőforrások
1 K 1/3 2 TU
5 K 0 6 TU
10 E 2 10 TU

Az Event Hubs-megoldás lineárisan skálázható a streamelési egységek (SU) és az átviteli sebesség szempontjából, így ez a leghatékonyabb és leghatékonyabb módja az adatok Stream Analyticsből való elemzésének és streamelésének. A feladatok 66 SU V2-esre skálázhatók, ami nagyjából 400 MB/s-os feldolgozást, vagy napi 38 billió esemény feldolgozását jelenti.

Azure SQL

Betöltési sebesség (események másodpercenként) Folyamatos átviteli egységek Kimeneti erőforrások
1 K 2/3 S3
5 K 3 P4
10 E 6 P6

Az Azure SQL támogatja a párhuzamos írást, a particionálás öröklését, de alapértelmezés szerint nincs engedélyezve. Előfordulhat azonban, hogy a particionálás öröklésének engedélyezése egy teljesen párhuzamos lekérdezéssel együtt nem elegendő a magasabb átviteli sebesség eléréséhez. Az SQL írási átviteli sebessége jelentősen függ az adatbázis konfigurációjától és a táblasémától. Az SQL Kimeneti teljesítmény című cikk részletesebben ismerteti azokat a paramétereket, amelyek maximalizálhatják az írási teljesítményt. Amint azt az Azure Stream Analytics Azure SQL Database-hez készült kimenetében említettük, ez a megoldás nem lineárisan skálázható teljesen párhuzamos folyamatként 8 partíción túl, és előfordulhat, hogy újraparticionálásra van szükség az SQL-kimenet előtt (lásd: INTO). A magas I/O-sebesség fenntartásához prémium termékváltozatokra van szükség, valamint a napló biztonsági mentéseinek néhány percenkénti terhelése is.

Azure Cosmos DB

Betöltési sebesség (események másodpercenként) Folyamatos átviteli egységek Kimeneti erőforrások
1 K 2/3 20 K RU
5 K 4 60 K RU
10 E 8 120 K RU

A Stream Analytics Azure Cosmos DB-kimenetét úgy frissítettük, hogy natív integrációt használjon az 1.2-es kompatibilitási szinten. Az 1.2-es kompatibilitási szint jelentősen nagyobb átviteli sebességet tesz lehetővé, és csökkenti az ru-használatot az 1.1-es szinthez képest, amely az új feladatok alapértelmezett kompatibilitási szintje. A megoldás a /deviceId-en particionált Azure Cosmos DB-tárolókat használja, a többi megoldás pedig azonos módon van konfigurálva.

A Scale Azure-mintákban a streamelés az Event Hubsot használja bemenetként, amelyet a tesztelési ügyfelek terhelésének szimulálása táplál. Minden bemeneti esemény egy 1 KB-os JSON-dokumentum, amely egyszerűen lefordítja a konfigurált betöltési sebességet az átviteli sebességre (1 MB/s, 5 MB/s és 10 MB/s). Az események egy olyan IoT-eszközt szimulálnak, amely legfeljebb 1000 eszközhöz küldi a következő JSON-adatokat (rövidített formában):

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

Feljegyzés

A konfigurációk a megoldásban használt különböző összetevők miatt változhatnak. A pontosabb becslés érdekében szabja testre a mintákat a forgatókönyvnek megfelelően.

Szűk keresztmetszetek azonosítása

Az Azure Stream Analytics-feladat Metrikák paneljén azonosíthatja a folyamat szűk keresztmetszeteit. Tekintse át a bemeneti/kimeneti eseményeket az átviteli sebesség és a "Vízjel késleltetése" vagy a Háttérrendszer eseményei között annak ellenőrzéséhez, hogy a feladat lépést tart-e a bemeneti sebességgel. Event Hubs-metrikák esetén keresse meg a szabályozott kérelmeket , és ennek megfelelően módosítsa a küszöbérték-egységeket. Az Azure Cosmos DB-metrikák esetében tekintse át a partíciókulcs-tartományonkénti maximálisan felhasznált RU/s-t az Átviteli sebesség területen, hogy a partíciókulcs-tartományok egységesen legyenek felhasználva. Az Azure SQL DB-hez figyelje a Log IO-t és a CPU-t.

Segítség kérése

További segítségért próbálja ki a Microsoft Q&A kérdésoldalát az Azure Stream Analyticshez.

Következő lépések