Adatok streamelése bemenetként a Stream Analyticsbe

A Stream Analytics első osztályú integrációval rendelkezik az Azure-adatfolyamokkal háromféle erőforrás bemeneteként:

Ezek a bemeneti erőforrások ugyanabban az Azure-előfizetésben élhetnek, mint a Stream Analytics-feladat vagy egy másik előfizetés.

Tömörítés

A Stream Analytics támogatja az összes bemeneti forrás tömörítését. A támogatott tömörítési típusok a következők: Nincs, Gzip és Deflate. A tömörítés támogatása nem érhető el referenciaadatokhoz. Ha a bemeneti adatok tömörített Avro-adatok, a Stream Analytics transzparensen kezeli azokat. Nem kell megadnia a tömörítés típusát az Avro szerializálásával.

Bemenetek létrehozása, szerkesztése vagy tesztelése

Az Azure Portal, a Visual Studio és a Visual Studio Code használatával meglévő bemeneteket vehet fel és tekinthet meg vagy szerkeszthet a streamelési feladatban. Emellett tesztelheti a bemeneti kapcsolatokat és tesztelheti a mintaadatokból származó lekérdezéseket az Azure Portalról, a Visual Studióból és a Visual Studio Code-ból. Lekérdezés írásakor a FROM záradékban listázhatja a bemenetet. Az elérhető bemenetek listáját a portál Lekérdezés lapján szerezheti be. Ha több bemenetet szeretne használni, JOIN vagy több SELECT lekérdezést szeretne írni.

Feljegyzés

Határozottan javasoljuk, hogy a legjobb helyi fejlesztési élmény érdekében használja a Stream Analytics-eszközöket a Visual Studio Code-hoz. A Visual Studio 2019-hez készült Stream Analytics-eszközökben (2.6.3000.0-s verzió) ismert funkcióbeli hiányosságok vannak, és a jövőben nem fog javulni.

Adatok streamelése az Event Hubsból

Az Azure Event Hubs egy nagymértékben skálázható közzétételi-feliratkozási eseménybetöltés. Az eseményközpont másodpercenként több millió eseményt gyűjthet össze, hogy feldolgozhassa és elemezhesse a csatlakoztatott eszközök és alkalmazások által előállított nagy mennyiségű adatot. Az Event Hubs és a Stream Analytics együttesen kínálhat végpontok közötti megoldást a valós idejű elemzésekhez. Az Event Hubs lehetővé teszi az események valós idejű továbbítását az Azure-ba, a Stream Analytics-feladatok pedig valós időben feldolgozhatják ezeket az eseményeket. Küldhet például webes kattintásokat, érzékelőolvasásokat vagy online naplóeseményeket az Event Hubsnak. Ezután Létrehozhat Stream Analytics-feladatokat az Event Hubs használatával a bemeneti adatokhoz valós idejű szűréshez, összesítéshez és korrelációhoz.

EventEnqueuedUtcTime az esemény eseményközpontba való érkezésének időbélyege, és az Event Hubsból a Stream Analyticsbe érkező események alapértelmezett időbélyege. Ha streamként szeretné feldolgozni az adatokat az esemény hasznos adatainak időbélyegével, a TIMESTAMP BY kulcsszót kell használnia.

Event Hubs Fogyasztói csoportok

Minden eseményközpont bemenetét úgy kell konfigurálnia, hogy saját fogyasztói csoportjuk legyen. Ha egy feladat önálló illesztéseket tartalmaz, vagy több bemenettel rendelkezik, előfordulhat, hogy egynél több olvasó olvas be néhány bemenetet. Ez a helyzet egyetlen fogyasztói csoport olvasóinak számát befolyásolja. Annak érdekében, hogy ne lépje túl az Event Hubs felhasználói csoportonkénti öt olvasóra vonatkozó korlátját partíciónként, ajánlott egy fogyasztói csoportot kijelölni minden Stream Analytics-feladathoz. A Standard szintű eseményközpontok esetében 20 fogyasztói csoportra van korlátozva. További információ: Azure Stream Analytics-bemenetek hibaelhárítása.

Bemenet létrehozása az Event Hubsból

Az alábbi táblázat az Azure Portal Új bemeneti oldalának minden tulajdonságát ismerteti az adatbevitel eseményközpontból való streameléséhez:

Tulajdonság Leírás
Bemeneti alias Egy rövid név, amelyet a feladat lekérdezésében használ a bemenetre való hivatkozáshoz.
Előfizetés Válassza ki azt az Azure-előfizetést, amelyben az Event Hub-erőforrás létezik.
Event Hub-névtér Az Event Hubs-névtér az eseményközpontok tárolója. Eseményközpont létrehozásakor a névteret is létre kell hoznia.
Event Hub neve A bemenetként használni kívánt eseményközpont neve.
Event Hub fogyasztói csoport (ajánlott) Javasoljuk, hogy minden Stream Analytics-feladathoz külön fogyasztói csoportot használjon. Ez a sztring azonosítja azt a fogyasztói csoportot, amely az eseményközpontból származó adatok betöltésére szolgál. Ha nincs megadva fogyasztói csoport, a Stream Analytics-feladat a $Default fogyasztói csoportot használja.
Hitelesítési mód Adja meg az eseményközponthoz való csatlakozáshoz használni kívánt hitelesítés típusát. Az eseményközponttal való hitelesítéshez használhat egy kapcsolati sztring vagy egy felügyelt identitást. A felügyelt identitás beállításhoz létrehozhat egy rendszer által hozzárendelt felügyelt identitást a Stream Analytics-feladathoz, vagy egy felhasználó által hozzárendelt felügyelt identitást az eseményközponttal való hitelesítéshez. Felügyelt identitás használatakor a felügyelt identitásnak az Azure Event Hubs-adatátvevő vagy az Azure Event Hubs adattulajdonosi szerepköreinek tagjának kell lennie.
Event Hub-szabályzat neve A közös hozzáférési szabályzat, amely hozzáférést biztosít az Event Hubshoz. Minden megosztott hozzáférési szabályzat rendelkezik névvel, beállított engedélyekkel és hozzáférési kulcsokkal. Ez a beállítás automatikusan fel lesz töltve, kivéve, ha manuálisan szeretné megadni az Event Hubs beállításait.
Partíciókulcs Ez egy nem kötelező mező, amely csak akkor érhető el, ha a feladat az 1.2-es vagy újabb kompatibilitási szint használatára van konfigurálva. Ha a bemenetet egy tulajdonság particionálta, itt adhatja hozzá a tulajdonság nevét. A lekérdezés teljesítményének javítására szolgál, ha tartalmaz egy vagy GROUP BY több PARTITION BY záradékot ezen a tulajdonságon. Ha ez a feladat az 1.2-es vagy újabb kompatibilitási szintet használja, ez a mező alapértelmezés szerint a PartitionId.
Eseményszerializációs formátum A bejövő adatfolyam szerializálási formátuma (JSON, CSV, Avro, Parquet vagy Egyéb (Protobuf, XML, védett...)). Győződjön meg arról, hogy a JSON formátum megfelel a specifikációnak, és nem tartalmaz 0-t a tizedesjegyekhez.
Kódolás Jelenleg az UTF-8 az egyetlen támogatott kódolási formátum.
Eseménytömörítés típusa A bejövő adatfolyam olvasásához használt tömörítési típus, például Nincs (alapértelmezett), Gzip vagy Deflate.
Sémaregisztrációs adatbázis (előzetes verzió) Kiválaszthatja a sémaregisztrációs adatbázist az eseményközponttól kapott eseményadatok sémáival.

Amikor az adatok egy Event Hubs-streambemenetből származnak, a Stream Analytics-lekérdezésben a következő metaadatmezőkhöz férhet hozzá:

Tulajdonság Leírás
EventProcessedUtcTime A Stream Analytics által az esemény feldolgozásakor megadott dátum és idő.
EventEnqueuedUtcTime Az a dátum és idő, amikor az Event Hubs megkapja az eseményeket.
PartitionId A bemeneti adapter nulla alapú partícióazonosítója.

Ha például ezeket a mezőket használja, az alábbi példához hasonlóan írhat lekérdezést:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Feljegyzés

Ha az Event Hubsot az IoT Hub-útvonalak végpontjaként használja, a GetMetadataPropertyValue függvény használatával hozzáférhet az IoT Hub metaadataihoz.

Adatok streamelése az IoT Hubról

Az Azure IoT Hub egy nagymértékben skálázható, IoT-forgatókönyvekhez optimalizált közzétételi-feliratkozási eseménybetöltés.

A Stream Analytics egy IoT Hubjáról érkező események alapértelmezett időbélyege az az időbélyeg, amelyet az esemény az IoT Hubba érkezett, azaz EventEnqueuedUtcTime. Ha streamként szeretné feldolgozni az adatokat az esemény hasznos adatainak időbélyegével, a TIMESTAMP BY kulcsszót kell használnia.

Iot Hub fogyasztói csoportok

Az egyes Stream Analytics IoT Hub-bemeneteket úgy kell konfigurálni, hogy saját fogyasztói csoportjuk legyen. Ha egy feladat önillesztéseket tartalmaz, vagy ha több bemenettel rendelkezik, előfordulhat, hogy egynél több olvasó olvas be néhány bemenetet az alsóbb rétegben. Ez a helyzet egyetlen fogyasztói csoport olvasóinak számát befolyásolja. Annak érdekében, hogy ne lépje túl az Azure IoT Hub felhasználói csoportonkénti öt olvasóra vonatkozó korlátját partíciónként, ajánlott egy fogyasztói csoportot kijelölni minden Stream Analytics-feladathoz.

IoT Hub konfigurálása adatfolyam-bemenetként

Az alábbi táblázat az Azure Portal Új bemeneti lapján található összes tulajdonságot ismerteti, amikor streambemenetként konfigurál egy IoT Hubot.

Tulajdonság Leírás
Bemeneti alias Egy rövid név, amelyet a feladat lekérdezésében használ a bemenetre való hivatkozáshoz.
Előfizetés Válassza ki azt az előfizetést, amelyben az IoT Hub-erőforrás létezik.
IoT hub A bemenetként használni kívánt IoT Hub neve.
Fogyasztói csoport Javasoljuk, hogy minden Stream Analytics-feladathoz használjon másik fogyasztói csoportot. A fogyasztói csoport az IoT Hubból származó adatok betöltésére szolgál. A Stream Analytics a $Default fogyasztói csoportot használja, hacsak másként nem adja meg.
Megosztott hozzáférési szabályzat neve A megosztott hozzáférési szabályzat, amely hozzáférést biztosít az IoT Hubhoz. Minden megosztott hozzáférési szabályzat rendelkezik névvel, beállított engedélyekkel és hozzáférési kulcsokkal.
Közös hozzáférésű hozzáférési szabályzatkulcs Az IoT Hubhoz való hozzáférés engedélyezéséhez használt közös hozzáférési kulcs. Ez a beállítás automatikusan fel lesz töltve, hacsak nem választja ki az Iot Hub-beállítások manuális megadására vonatkozó beállítást.
Végpont Az IoT Hub végpontja.
Partíciókulcs Ez egy nem kötelező mező, amely csak akkor érhető el, ha a feladat az 1.2-es vagy újabb kompatibilitási szint használatára van konfigurálva. Ha a bemenetet egy tulajdonság particionálta, itt adhatja hozzá a tulajdonság nevét. A lekérdezés teljesítményének javítására szolgál, ha tartalmaz egy PARTITION BY vagy GROUP BY záradékot ezen a tulajdonságon. Ha ez a feladat 1.2-es vagy újabb kompatibilitási szintet használ, ez a mező alapértelmezés szerint "PartitionId".
Eseményszerializációs formátum A bejövő adatfolyam szerializálási formátuma (JSON, CSV, Avro, Parquet vagy Egyéb (Protobuf, XML, védett...)). Győződjön meg arról, hogy a JSON formátum megfelel a specifikációnak, és nem tartalmaz 0-t a tizedesjegyekhez.
Kódolás Jelenleg az UTF-8 az egyetlen támogatott kódolási formátum.
Eseménytömörítés típusa A bejövő adatfolyam olvasásához használt tömörítési típus, például Nincs (alapértelmezett), Gzip vagy Deflate.

Ha streamadatokat használ egy IoT Hubról, a Stream Analytics-lekérdezésben a következő metaadatmezőkhöz férhet hozzá:

Tulajdonság Leírás
EventProcessedUtcTime Az esemény feldolgozásának dátuma és időpontja.
EventEnqueuedUtcTime Az a dátum és idő, amikor az IoT Hub megkapja az eseményt.
PartitionId A bemeneti adapter nulla alapú partícióazonosítója.
IoTHub.MessageId Az IoT Hub kétirányú kommunikációjának korrelációjára használt azonosító.
IoTHub.CorrelationId Az IoT Hub üzenetválaszaiban és visszajelzéseiben használt azonosító.
IoTHub. Csatlakozás ionDeviceId Az üzenet küldéséhez használt hitelesítési azonosító. Ezt az értéket az IoT Hub szolgáltatáshoz kötött üzenetekre pecsételi.
IoTHub. Csatlakozás ionDeviceGenerationId Az üzenet küldéséhez használt hitelesített eszköz generációazonosítója. Ezt az értéket az IoT Hub lepecsételi a szolgáltatáshoz kötött üzenetekre.
IoTHub.EnqueuedTime Az az idő, amikor az IoT Hub megkapja az üzenetet.

Adatok streamelése a Blob Storage-ból vagy a Data Lake Storage Gen2-ből

A felhőben nagy mennyiségű strukturálatlan adatot tartalmazó forgatókönyvek esetén az Azure Blob Storage vagy az Azure Data Lake Storage Gen2 költséghatékony és méretezhető megoldást kínál. A Blob Storage-ban vagy az Azure Data Lake Storage Gen2-ben tárolt adatok inaktív adatoknak minősülnek. Ezeket az adatokat azonban adatfolyamként is feldolgozhatja a Stream Analytics.

A naplófeldolgozás gyakran használt forgatókönyv az ilyen bemenetek Stream Analytics-lel való használatához. Ebben a forgatókönyvben a telemetriai adatfájlokat egy rendszer rögzíti, és elemezni és feldolgozni kell, hogy hasznos adatokat nyerjen ki.

Egy Blob Storage- vagy Azure Data Lake Storage Gen2-esemény alapértelmezett időbélyege a Stream Analyticsben az az időbélyeg, amelyet legutóbb módosítottak, azaz BlobLastModifiedUtcTime. Ha egy blobot 13:00-kor tölt fel egy tárfiókba, és az Azure Stream Analytics-feladat a Most 13:01 időpontban lehetőséggel indul el, a program nem veszi fel, mivel a módosított idő a feladatfuttatási időszakon kívül esik.

Ha egy blobot 13:00-kor tölt fel egy tárfiók tárolójába, és az Azure Stream Analytics-feladat az egyéni időpontot használja 13:00-kor vagy korábban, a blob fel lesz véve, mivel a módosított idő a feladat futási időszakába esik.

Ha egy Azure Stream Analytics-feladatot 13:00-kor kezd el használni, és 13:01-kor feltölt egy blobot a tárfiók tárolójába, az Azure Stream Analytics felveszi a blobot. Az egyes blobokhoz rendelt időbélyeg csak a .BlobLastModifiedTime A blobhoz tartozó mappának nincs köze a hozzárendelt időbélyeghez. Ha például van egy blob 2019/10-01/00/b1.txt az egyikkel BlobLastModifiedTime2019-11-11, akkor a blobhoz rendelt időbélyeg a következő 2019-11-11.

Ha streamként szeretné feldolgozni az adatokat az esemény hasznos adatainak időbélyegével, a TIMESTAMP BY kulcsszót kell használnia. Egy Stream Analytics-feladat másodpercenként lekéri az adatokat az Azure Blob Storage-ból vagy az Azure Data Lake Storage Gen2 bemenetéből, ha a blobfájl elérhető. Ha a blobfájl nem érhető el, exponenciális visszalépés történik 90 másodperces maximális késleltetéssel.

Feljegyzés

A Stream Analytics nem támogatja a tartalom hozzáadását egy meglévő blobfájlhoz. A Stream Analytics csak egyszer tekinti meg az egyes fájlokat, és a feladat olvasása után a fájlban bekövetkező módosítások nem lesznek feldolgozva. Ajánlott eljárás, ha egy blobfájl összes adatát egyszerre tölti fel, majd újabb eseményeket ad hozzá egy másik, új blobfájlhoz.

Azokban a helyzetekben, amikor számos blobot adnak hozzá folyamatosan, és a Stream Analytics a hozzáadásukkor dolgozza fel a blobokat, előfordulhat, hogy egyes blobok kihagyhatók ritka esetekben a BlobLastModifiedTimeblobok részletessége miatt. Ezt az esetet úgy háríthatja el, hogy legalább két másodperc távolságra feltölti a blobokat. Ha ez a lehetőség nem megvalósítható, az Event Hubs használatával nagy mennyiségű eseményt streamelhet.

Blob Storage konfigurálása streambemenetként

Az alábbi táblázat az Azure Portal Új bemeneti lapján található összes tulajdonságot ismerteti, amikor streambemenetként konfigurálja a Blob Storage-t.

Tulajdonság Leírás
Bemeneti alias Egy rövid név, amelyet a feladat lekérdezésében használ a bemenetre való hivatkozáshoz.
Előfizetés Válassza ki azt az előfizetést, amelyben a tárolási erőforrás létezik.
Storage-fiók Annak a tárfióknak a neve, amelyben a blobfájlok találhatók.
Tárfiók kulcsa A tárfiókhoz társított titkos kulcs. Ez a beállítás automatikusan fel lesz töltve, hacsak nem választja ki a beállítást a beállítások manuális megadásához.
Tároló A tárolók logikai csoportosítást biztosítanak a blobokhoz. Választhatja a Meglévő tároló használata vagy az Új létrehozása lehetőséget egy új tároló létrehozásához.
Hitelesítési mód Adja meg a tárfiókhoz való csatlakozáshoz használni kívánt hitelesítés típusát. A tárfiókkal való hitelesítéshez használhat egy kapcsolati sztring vagy egy felügyelt identitást. A felügyelt identitás beállításhoz létrehozhat egy rendszer által hozzárendelt felügyelt identitást a Stream Analytics-feladathoz, vagy egy felhasználó által hozzárendelt felügyelt identitást a tárfiókkal való hitelesítéshez. Felügyelt identitás használatakor a felügyelt identitásnak egy megfelelő szerepkör tagjának kell lennie a tárfiókban.
Útvonalminta (nem kötelező) A blobok a megadott tárolóban való megkereséséhez használt fájl elérési útja. Ha blobokat szeretne olvasni a tároló gyökeréből, ne állítson be elérési utat. Az elérési úton a következő három változó egy vagy több példányát adhatja meg: {date}, {time}vagy {partition}

1. példa: cluster1/logs/{date}/{time}/{partition}

2. példa: cluster1/logs/{date}

A * karakter nem engedélyezett érték az elérési út előtagjában. Csak érvényes Azure-blobkarakterek engedélyezettek. Ne adjon meg tárolóneveket vagy fájlneveket.
Dátumformátum (nem kötelező) Ha az elérési úton a dátumváltozót használja, az a dátumformátum, amelyben a fájlok rendszerezésre kerülnek. Példa: YYYY/MM/DD

Ha a blobbemenet elérési {date} útja vagy {time} elérési útja, a mappák növekvő időrendben jelennek meg.
Időformátum (nem kötelező) Ha az elérési úton az időváltozót használja, a fájlok rendszerezésének időformátuma. Jelenleg az egyetlen támogatott érték órákra vonatkozik HH .
Partíciókulcs Ez egy nem kötelező mező, amely csak akkor érhető el, ha a feladat az 1.2-es vagy újabb kompatibilitási szint használatára van konfigurálva. Ha a bemenetet egy tulajdonság particionálta, itt adhatja hozzá a tulajdonság nevét. A lekérdezés teljesítményének javítására szolgál, ha tartalmaz egy PARTITION BY vagy GROUP BY záradékot ezen a tulajdonságon. Ha ez a feladat 1.2-es vagy újabb kompatibilitási szintet használ, ez a mező alapértelmezés szerint "PartitionId".
Bemeneti partíciók száma Ez a mező csak akkor jelenik meg, ha {partition} szerepel az elérésiút-mintában. A tulajdonság értéke =1 egész szám >. Ahol a(z) {partition} megjelenik a pathPatternben, a rendszer a -1 mező értéke és a 0 közötti számot fogja használni.
Eseményszerializációs formátum A bejövő adatfolyam szerializálási formátuma (JSON, CSV, Avro, Parquet vagy Egyéb (Protobuf, XML, védett...)). Győződjön meg arról, hogy a JSON formátum megfelel a specifikációnak, és nem tartalmaz 0-t a tizedesjegyekhez.
Kódolás CSV és JSON esetén jelenleg az UTF-8 az egyetlen támogatott kódolási formátum.
Tömörítés A bejövő adatfolyam olvasásához használt tömörítési típus, például Nincs (alapértelmezett), Gzip vagy Deflate.

Ha az adatok egy Blob Storage-forrásból származnak, a Stream Analytics-lekérdezésben a következő metaadatmezőkhöz férhet hozzá:

Tulajdonság Leírás
BlobName Annak a bemeneti blobnak a neve, amelyből az esemény származik.
EventProcessedUtcTime A Stream Analytics által az esemény feldolgozásakor megadott dátum és idő.
BlobLastModifiedUtcTime A blob utolsó módosításának dátuma és időpontja.
PartitionId A bemeneti adapter nulla alapú partícióazonosítója.

Ha például ezeket a mezőket használja, az alábbi példához hasonlóan írhat lekérdezést:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Adatok streamelése az Apache Kafkából

Az Azure Stream Analytics lehetővé teszi, hogy közvetlenül apache Kafka-fürtökhöz csatlakozzon az adatok betöltéséhez. A megoldás alacsony kódszámú, és teljes egészében a Microsoft Azure Stream Analytics csapata felügyeli, lehetővé téve, hogy megfeleljen az üzleti megfelelőségi szabványoknak. A Kafka bemenet visszamenőlegesen kompatibilis, és a legújabb ügyfélkiadással rendelkező összes verziót támogatja a 0.10-es verziótól kezdve. A felhasználók a konfigurációktól függően csatlakozhatnak a virtuális hálózaton belüli Kafka-fürtökhöz és a nyilvános végponttal rendelkező Kafka-fürtökhöz. A konfiguráció a meglévő Kafka konfigurációs konvenciókra támaszkodik. A támogatott tömörítési típusok: None, Gzip, Snappy, LZ4 és Zstd.

További információkért tekintse meg a Kafkából az Azure Stream Analyticsbe (előzetes verzió) irányuló streamadatokat.

Következő lépések