Strömma data som indata till Stream Analytics

Stream Analytics har förstklassig integrering med Azure-dataströmmar som indata från tre typer av resurser:

Dessa indataresurser kan finnas i samma Azure-prenumeration som ditt Stream Analytics-jobb eller en annan prenumeration.

Komprimering

Stream Analytics stöder komprimering för alla indatakällor. Komprimeringstyper som stöds är: None, Gzip och Deflate. Stöd för komprimering är inte tillgängligt för referensdata. Om indata komprimeras avro-data hanterar Stream Analytics dem transparent. Du behöver inte ange komprimeringstypen med Avro-serialisering.

Skapa, redigera eller testa indata

Du kan använda Azure-portalen, Visual Studio och Visual Studio Code för att lägga till och visa eller redigera befintliga indata i ditt strömningsjobb. Du kan också testa indataanslutningar och testa frågor från exempeldata från Azure-portalen, Visual Studio och Visual Studio Code. När du skriver en fråga listar du indata i FROM-satsen. Du kan hämta listan över tillgängliga indata från sidan Fråga i portalen. Om du vill använda flera indata kan JOIN du skriva flera SELECT frågor.

Kommentar

Vi rekommenderar starkt att du använder Stream Analytics-verktyg för Visual Studio Code för bästa lokala utvecklingsupplevelse. Det finns kända funktionsluckor i Stream Analytics-verktygen för Visual Studio 2019 (version 2.6.3000.0) och det kommer inte att förbättras framöver.

Strömma data med Event Hubs

Azure Event Hubs är en mycket skalbar ingestor för publicerings-prenumerationshändelser. En händelsehubb kan samla in miljontals händelser per sekund så att du kan bearbeta och analysera de enorma mängder data som produceras av dina anslutna enheter och program. Tillsammans kan Event Hubs och Stream Analytics tillhandahålla en lösning från slutpunkt till slutpunkt för realtidsanalys. Med Event Hubs kan du mata in händelser i Azure i realtid, och Stream Analytics-jobb kan bearbeta dessa händelser i realtid. Du kan till exempel skicka webbklick, sensoravläsningar eller onlinelogghändelser till Event Hubs. Du kan sedan skapa Stream Analytics-jobb för att använda Event Hubs för indata för realtidsfiltrering, aggregering och korrelation.

EventEnqueuedUtcTime är tidsstämpeln för en händelses ankomst till en händelsehubb och är standardtidsstämpeln för händelser som kommer från Event Hubs till Stream Analytics. Om du vill bearbeta data som en dataström med hjälp av en tidsstämpel i händelsenyttolasten måste du använda nyckelordet TIMESTAMP BY .

Event Hubs-konsumentgrupper

Du bör konfigurera varje händelsehubbindata så att de har en egen konsumentgrupp. När ett jobb innehåller en självkoppling eller har flera indata kan vissa indata läsas av mer än en läsare nedströms. Den här situationen påverkar antalet läsare i en enskild konsumentgrupp. För att undvika att överskrida gränsen för Event Hubs på fem läsare per konsumentgrupp per partition är det bästa praxis att utse en konsumentgrupp för varje Stream Analytics-jobb. Det finns också en gräns på 20 konsumentgrupper för en händelsehubb på standardnivå. Mer information finns i Felsöka Azure Stream Analytics-indata.

Skapa indata från Event Hubs

I följande tabell förklaras varje egenskap på sidan Ny indata i Azure-portalen för att strömma dataindata från en händelsehubb:

Property beskrivning
Indataalias Ett eget namn som du använder i jobbets fråga för att referera till dessa indata.
Abonnemang Välj den Azure-prenumeration där händelsehubbresursen finns.
Event Hub-namnområde Event Hubs-namnområdet är en container för händelsehubbar. När du skapar en händelsehubb skapar du även namnområdet.
Namn på händelsehubb Namnet på den händelsehubb som ska användas som indata.
Händelsehubbens konsumentgrupp (rekommenderas) Vi rekommenderar att du använder en distinkt konsumentgrupp för varje Stream Analytics-jobb. Den här strängen identifierar konsumentgruppen som ska användas för att mata in data från händelsehubben. Om ingen konsumentgrupp anges använder $Default Stream Analytics-jobbet konsumentgruppen.
Autentiseringsläge Ange vilken typ av autentisering du vill använda för att ansluta till händelsehubben. Du kan använda en anslutningssträng eller en hanterad identitet för att autentisera med händelsehubben. För alternativet hanterad identitet kan du antingen skapa en systemtilldelad hanterad identitet till Stream Analytics-jobbet eller en användartilldelad hanterad identitet för att autentisera med händelsehubben. När du använder en hanterad identitet måste den hanterade identiteten vara medlem i Azure Event Hubs Data Receiver- eller Azure Event Hubs Data Owner-roller.
Namn på händelsehubbprincip Principen för delad åtkomst som ger åtkomst till Event Hubs. Varje princip för delad åtkomst har ett namn, behörigheter som du anger och åtkomstnycklar. Det här alternativet fylls i automatiskt, såvida du inte väljer alternativet att ange Event Hubs-inställningarna manuellt.
Partitionsnyckel Det är ett valfritt fält som endast är tillgängligt om ditt jobb är konfigurerat att använda kompatibilitetsnivå 1.2 eller senare. Om dina indata partitioneras av en egenskap kan du lägga till namnet på den här egenskapen här. Den används för att förbättra frågans prestanda om den innehåller en eller-sats PARTITION BYGROUP BY för den här egenskapen. Om det här jobbet använder kompatibilitetsnivå 1.2 eller senare är det här fältet standard PartitionId.
Händelseserialiseringsformat Serialiseringsformatet (JSON, CSV, Avro, Parquet eller Other (Protobuf, XML, proprietary...)) för inkommande dataström. Se till att JSON-formatet överensstämmer med specifikationen och inkluderar inte inledande 0 för decimaltal.
Kodning UTF-8 är för närvarande det enda kodningsformat som stöds.
Händelsekomprimeringstyp Komprimeringstypen som används för att läsa inkommande dataström, till exempel Ingen (standard), Gzip eller Deflate.
Schemaregister (förhandsversion) Du kan välja schemaregistret med scheman för händelsedata som tas emot från händelsehubben.

När dina data kommer från en Event Hubs-indata har du åtkomst till följande metadatafält i Stream Analytics-frågan:

Property beskrivning
EventProcessedUtcTime Datum och tid när Stream Analytics bearbetar händelsen.
EventEnqueuedUtcTime Datum och tid då Event Hubs tar emot händelserna.
Partitionid Det nollbaserade partitions-ID:t för indatakortet.

Om du till exempel använder de här fälten kan du skriva en fråga som i följande exempel:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Kommentar

När du använder Event Hubs som slutpunkt för IoT Hub Routes kan du komma åt IoT Hub-metadata med hjälp av funktionen GetMetadataPropertyValue.

Strömma data från IoT Hub

Azure IoT Hub är en mycket skalbar publiceringsprenumereringshändelse som är optimerad för IoT-scenarier.

Standardtidsstämpeln för händelser som kommer från en IoT Hub i Stream Analytics är tidsstämpeln som händelsen anlände till IoT Hub, som är EventEnqueuedUtcTime. Om du vill bearbeta data som en dataström med hjälp av en tidsstämpel i händelsenyttolasten måste du använda nyckelordet TIMESTAMP BY .

Konsumentgrupper för Iot Hub

Du bör konfigurera varje Stream Analytics IoT Hub-indata så att de har en egen konsumentgrupp. När ett jobb innehåller en självkoppling eller om det har flera indata kan vissa indata läsas av mer än en läsare nedströms. Den här situationen påverkar antalet läsare i en enskild konsumentgrupp. För att undvika att överskrida gränsen för Azure IoT Hub på fem läsare per konsumentgrupp per partition är det bästa praxis att utse en konsumentgrupp för varje Stream Analytics-jobb.

Konfigurera en IoT Hub som indata för dataström

I följande tabell beskrivs varje egenskap på sidan Ny indata i Azure-portalen när du konfigurerar en IoT Hub som en indataström.

Property beskrivning
Indataalias Ett eget namn som du använder i jobbets fråga för att referera till dessa indata.
Abonnemang Välj den prenumeration där IoT Hub-resursen finns.
IoT Hub Namnet på den IoT Hub som ska användas som indata.
Konsumentgrupp Vi rekommenderar att du använder en annan konsumentgrupp för varje Stream Analytics-jobb. Konsumentgruppen används för att mata in data från IoT Hub. Stream Analytics använder konsumentgruppen $Default om du inte anger något annat.
Namn på princip för delad åtkomst Principen för delad åtkomst som ger åtkomst till IoT Hub. Varje princip för delad åtkomst har ett namn, behörigheter som du anger och åtkomstnycklar.
Principnyckel för delad åtkomst Nyckeln för delad åtkomst som används för att auktorisera åtkomst till IoT Hub. Det här alternativet fylls i automatiskt om du inte väljer alternativet för att ange Iot Hub-inställningarna manuellt.
Slutpunkt Slutpunkten för IoT Hub.
Partitionsnyckel Det är ett valfritt fält som endast är tillgängligt om ditt jobb är konfigurerat att använda kompatibilitetsnivå 1.2 eller senare. Om dina indata partitioneras av en egenskap kan du lägga till namnet på den här egenskapen här. Den används för att förbättra frågans prestanda om den innehåller en PARTITION BY- eller GROUP BY-sats för den här egenskapen. Om det här jobbet använder kompatibilitetsnivå 1.2 eller senare är det här fältet som standard "PartitionId".
Händelseserialiseringsformat Serialiseringsformatet (JSON, CSV, Avro, Parquet eller Other (Protobuf, XML, proprietary...)) för inkommande dataström. Se till att JSON-formatet överensstämmer med specifikationen och inkluderar inte inledande 0 för decimaltal.
Kodning UTF-8 är för närvarande det enda kodningsformat som stöds.
Händelsekomprimeringstyp Komprimeringstypen som används för att läsa inkommande dataström, till exempel Ingen (standard), Gzip eller Deflate.

När du använder dataströmdata från en IoT Hub har du åtkomst till följande metadatafält i Din Stream Analytics-fråga:

Property beskrivning
EventProcessedUtcTime Datum och tid då händelsen bearbetades.
EventEnqueuedUtcTime Datum och tid när IoT Hub tar emot händelsen.
Partitionid Det nollbaserade partitions-ID:t för indatakortet.
IoTHub.MessageId Ett ID som används för att korrelera dubbelriktad kommunikation i IoT Hub.
IoTHub.CorrelationId Ett ID som används i meddelandesvar och feedback i IoT Hub.
IoTHub. Anslut ionDeviceId Det autentiserings-ID som användes för att skicka det här meddelandet. Det här värdet stämplas på tjänstbundna meddelanden av IoT Hub.
IoTHub. Anslut ionDeviceGenerationId Generations-ID för den autentiserade enhet som användes för att skicka det här meddelandet. Det här värdet stämplas på servicebound-meddelanden av IoT Hub.
IoTHub.EnqueuedTime Den tid då IoT Hub tar emot meddelandet.

Strömma data från Blob Storage eller Data Lake Storage Gen2

För scenarier med stora mängder ostrukturerade data som ska lagras i molnet erbjuder Azure Blob Storage eller Azure Data Lake Storage Gen2 en kostnadseffektiv och skalbar lösning. Data i Blob Storage eller Azure Data Lake Storage Gen2 betraktas som vilande data. Dessa data kan dock bearbetas som en dataström av Stream Analytics.

Loggbearbetning är ett vanligt scenario för att använda sådana indata med Stream Analytics. I det här scenariot samlas telemetridatafiler in från ett system och måste parsas och bearbetas för att extrahera meningsfulla data.

Standardtidsstämpeln för en Blob Storage- eller Azure Data Lake Storage Gen2-händelse i Stream Analytics är tidsstämpeln som senast ändrades, vilket är BlobLastModifiedUtcTime. Om en blob laddas upp till ett lagringskonto klockan 13:00 och Azure Stream Analytics-jobbet startas med alternativet Nu kl. 13:01 hämtas den inte eftersom den ändrade tiden faller utanför jobbkörningsperioden.

Om en blob laddas upp till en lagringskontocontainer kl. 13:00 och Azure Stream Analytics-jobbet startas med anpassad tid kl. 13:00 eller tidigare hämtas blobben när den ändrade tiden infaller inom jobbkörningsperioden.

Om ett Azure Stream Analytics-jobb startas med Nu klockan 13:00 och en blob laddas upp till lagringskontocontainern kl. 13:01 hämtar Azure Stream Analytics bloben. Tidsstämpeln som tilldelats varje blob baseras endast på BlobLastModifiedTime. Mappen som bloben finns i har ingen relation till den tilldelade tidsstämpeln. Om det till exempel finns en blob 2019/10-01/00/b1.txt med en BlobLastModifiedTime av 2019-11-11är 2019-11-11tidsstämpeln tilldelad till den här blobben .

Om du vill bearbeta data som en dataström med hjälp av en tidsstämpel i händelsenyttolasten måste du använda nyckelordet TIMESTAMP BY . Ett Stream Analytics-jobb hämtar data från Azure Blob Storage eller Azure Data Lake Storage Gen2-indata varje sekund om blobfilen är tillgänglig. Om blobfilen inte är tillgänglig finns det en exponentiell backoff med en maximal tidsfördröjning på 90 sekunder.

Kommentar

Stream Analytics har inte stöd för att lägga till innehåll i en befintlig blobfil. Stream Analytics visar endast varje fil en gång och alla ändringar som sker i filen när jobbet har läst data bearbetas inte. Bästa praxis är att ladda upp alla data för en blobfil samtidigt och sedan lägga till ytterligare nyare händelser i en annan, ny blobfil.

I scenarier där många blobar läggs till kontinuerligt och Stream Analytics bearbetar blobarna när de läggs till är det möjligt att vissa blobar hoppas över i sällsynta fall på grund av kornigheten i BlobLastModifiedTime. Du kan minimera det här fallet genom att ladda upp blobar med minst två sekunders mellanrum. Om det här alternativet inte är möjligt kan du använda Event Hubs för att strömma stora mängder händelser.

Konfigurera Blob Storage som indata för dataström

I följande tabell förklaras varje egenskap på sidan Ny indata i Azure-portalen när du konfigurerar Blob Storage som en indataström.

Property beskrivning
Indataalias Ett eget namn som du använder i jobbets fråga för att referera till dessa indata.
Abonnemang Välj den prenumeration där lagringsresursen finns.
Lagringskonto Namnet på lagringskontot där blobfilerna finns.
Lagringskontonyckel Den hemliga nyckel som är associerad med lagringskontot. Det här alternativet fylls i automatiskt om du inte väljer alternativet för att ange inställningarna manuellt.
Container Containrar tillhandahåller en logisk gruppering för blobar. Du kan välja antingen Använd befintlig container eller Skapa ny för att skapa en ny container.
Autentiseringsläge Ange vilken typ av autentisering du vill använda för att ansluta till lagringskontot. Du kan använda en anslutningssträng eller en hanterad identitet för att autentisera med lagringskontot. För alternativet hanterad identitet kan du antingen skapa en systemtilldelad hanterad identitet till Stream Analytics-jobbet eller en användartilldelad hanterad identitet för att autentisera med lagringskontot. När du använder en hanterad identitet måste den hanterade identiteten vara medlem i en lämplig roll i lagringskontot.
Sökvägsmönster (valfritt) Filsökvägen som används för att hitta blobarna i den angivna containern. Om du vill läsa blobar från containerns rot anger du inget sökvägsmönster. I sökvägen kan du ange en eller flera instanser av följande tre variabler: {date}, {time}eller {partition}

Exempel 1: cluster1/logs/{date}/{time}/{partition}

Exempel 2: cluster1/logs/{date}

Tecknet * är inte ett tillåtet värde för sökvägsprefixet. Endast giltiga Azure-blobtecken tillåts. Ta inte med containernamn eller filnamn.
Datumformat (valfritt) Om du använder datumvariabeln i sökvägen, datumformatet där filerna är ordnade. Exempel: YYYY/MM/DD

När blobindata har {date} eller {time} i sökvägen granskas mapparna i stigande tidsordning.
Tidsformat (valfritt) Om du använder tidsvariabeln i sökvägen, tidsformatet där filerna är ordnade. För närvarande är HH det enda värdet som stöds i timmar.
Partitionsnyckel Det är ett valfritt fält som endast är tillgängligt om ditt jobb är konfigurerat att använda kompatibilitetsnivå 1.2 eller senare. Om dina indata partitioneras av en egenskap kan du lägga till namnet på den här egenskapen här. Den används för att förbättra frågans prestanda om den innehåller en PARTITION BY- eller GROUP BY-sats för den här egenskapen. Om det här jobbet använder kompatibilitetsnivå 1.2 eller senare är det här fältet som standard "PartitionId".
Antal indatapartitioner Det här fältet finns bara när {partition} finns i sökvägsmönstret. Värdet för den här egenskapen är ett heltal >=1. Oavsett var {partition} visas i pathPattern används ett tal mellan 0 och värdet för fältet -1.
Händelseserialiseringsformat Serialiseringsformatet (JSON, CSV, Avro, Parquet eller Other (Protobuf, XML, proprietary...)) för inkommande dataström. Se till att JSON-formatet överensstämmer med specifikationen och inkluderar inte inledande 0 för decimaltal.
Kodning FÖR CSV och JSON är UTF-8 för närvarande det enda kodningsformat som stöds.
Komprimering Komprimeringstypen som används för att läsa inkommande dataström, till exempel Ingen (standard), Gzip eller Deflate.

När dina data kommer från en Blob Storage-källa har du åtkomst till följande metadatafält i Stream Analytics-frågan:

Property beskrivning
BlobName Namnet på den indatablob som händelsen kom från.
EventProcessedUtcTime Datum och tid när Stream Analytics bearbetar händelsen.
BlobLastModifiedUtcTime Datum och tid då bloben senast ändrades.
Partitionid Det nollbaserade partitions-ID:t för indatakortet.

Om du till exempel använder de här fälten kan du skriva en fråga som i följande exempel:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Strömma data från Apache Kafka

Med Azure Stream Analytics kan du ansluta direkt till Apache Kafka-kluster för att mata in data. Lösningen är låg kod och hanteras helt av Azure Stream Analytics-teamet på Microsoft, vilket gör att den kan uppfylla standarder för affärsefterlevnad. Kafka-indata är bakåtkompatibla och stöder alla versioner med den senaste klientversionen från version 0.10. Användare kan ansluta till Kafka-kluster i ett virtuellt nätverk och Kafka-kluster med en offentlig slutpunkt, beroende på konfigurationerna. Konfigurationen förlitar sig på befintliga Kafka-konfigurationskonventioner. Komprimeringstyper som stöds är None, Gzip, Snappy, LZ4 och Zstd.

Mer information finns i Strömma data från Kafka till Azure Stream Analytics (förhandsversion).

Nästa steg