Självstudie: Köra Azure Functions från Azure Stream Analytics-jobb

I den här självstudien skapar du ett Azure Stream Analytics-jobb som läser händelser från Azure Event Hubs, kör en fråga på händelsedata och sedan anropar en Azure-funktion som skriver till en Azure Cache for Redis-instans.

Skärmbild som visar relationen mellan Azure-tjänster i lösningen.

Kommentar

  • Du kan köra Azure Functions från Azure Stream Analytics genom att konfigurera Functions som en av mottagare (utdata) till Stream Analytics-jobbet. Functions är en händelsedriven upplevelse med beräkning på begäran där du kan implementera kod som utlöses av händelser i Azure eller tjänster från tredje part. Azure Functions kapacitet att reagera på utlösare gör det till ett naturligt utdatamål för Stream Analytics-jobb.
  • Stream Analytics anropar Functions med HTTP-utlösare. Med utdataadaptern i Functions kan användarna ansluta Functions till Stream Analytics, så att händelserna kan utlösas baserat på Stream Analytics-frågor.
  • Anslut ion till Azure Functions i ett virtuellt nätverk (VNet) från ett Stream Analytics-jobb som körs i ett kluster med flera klientorganisationer stöds inte.

I den här självstudien lär du dig att:

  • Skapa en Azure Event Hubs-instans
  • Skapa en Azure Cache for Redis-instans
  • Skapa en Azure-funktion
  • Skapa ett Stream Analytics-jobb
  • Konfigurera händelsehubben som indata och funktion som utdata
  • Köra Stream Analytics-jobbet
  • Kontrollera resultatet i Azure Cache for Redis

Om du inte har en Azure-prenumeration kan du skapa ett kostnadsfritt konto innan du börjar.

Förutsättningar

Kontrollera att du har slutfört följande steg innan du börjar:

  • Om du inte har en Azure-prenumeration skapar du ett kostnadsfritt konto.
  • Ladda ned händelsegeneratorappen för telefonsamtal, TelcoGenerator.zip från Microsoft Download Center eller hämta källkoden från GitHub.

Logga in på Azure

Logga in på Azure-portalen.

Skapa en händelsehubb

Du måste skicka exempeldata till en händelsehubb innan Stream Analytics kan analysera dataströmmen för bedrägliga anrop. I den här självstudien skickar du data till Azure med hjälp av Azure Event Hubs.

Använd följande steg för att skapa en händelsehubb och skicka samtalsdata till händelsehubben:

  1. Logga in på Azure-portalen.

  2. Välj Alla tjänster på den vänstra menyn, välj Sakernas Internet, musen över Event Hubs och välj sedan + (Lägg till).

    Skärmbild som visar sidan för att skapa Event Hubs.

  3. Följ dessa steg på sidan Skapa namnområde :

    1. Välj en Azure-prenumeration där du vill skapa händelsehubben.

    2. För Resursgrupp väljer du Skapa ny och anger ett namn för resursgruppen. Event Hubs-namnområdet skapas i den här resursgruppen.

    3. Som Namnområdesnamn anger du ett unikt namn för Event Hubs-namnområdet.

    4. För Plats väljer du den region där du vill skapa namnområdet.

    5. Som Prisnivå väljer du Standard.

    6. Välj Granska + skapa längst ned på sidan.

      Skärmbild som visar sidan Skapa namnområde.

    7. På sidan Granska + skapa i guiden skapa namnområde väljer du Skapa längst ned på sidan när du har granskat alla inställningar.

  4. När namnområdet har distribuerats väljer du Gå till resurs för att navigera till sidan Event Hubs-namnområde .

  5. På sidan Event Hubs-namnområde väljer du +Händelsehubb i kommandofältet.

    Skärmbild som visar knappen Lägg till händelsehubb på sidan Event Hubs-namnområde.

  6. På sidan Skapa händelsehubb anger du ett namn för händelsehubben. Ange partitionsantalet till 2. Använd standardalternativen i de återstående inställningarna och välj Granska + skapa.

    Skärmbild som visar sidan Skapa händelsehubb.

  7. På sidan Granska + skapa väljer du Skapa längst ned på sidan. Vänta sedan tills distributionen är klar.

Bevilja åtkomst till händelsehubben och få en anslutningssträng

Innan ett program kan skicka data till Azure Event Hubs måste händelsehubben ha en princip som tillåter åtkomst. Åtkomstprincipen producerar en anslutningssträng som inkluderar auktoriseringsinformation.

  1. På sidan Event Hubs-namnområde väljer du Principer för delad åtkomst på den vänstra menyn.

  2. Välj RootManageSharedAccessKey i listan med principer.

  3. Välj sedan kopieringsknappen bredvid Anslut ionssträng – primärnyckel.

  4. Klistra in anslutningssträngen i en textredigerare. Du behöver den här anslutningssträngen i nästa avsnitt.

    Anslutningssträngen ser ut så här:

    Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access policy name>;SharedAccessKey=<generated key>

    Observera att anslutningssträng innehåller flera nyckel/värde-par avgränsade med semikolon: Slutpunkt, SharedAccessKeyName och SharedAccessKey.

Starta händelsegeneratorprogrammet

Innan du startar appen TelcoGenerator ska du konfigurera den så att den skickar data till de Azure Event Hubs du skapade tidigare.

  1. Extrahera innehållet i filen TelcoGenerator.zip.

  2. TelcoGenerator\TelcoGenerator\telcodatagen.exe.config Öppna filen i valfri textredigerare Det finns fler än en .config fil, så se till att du öppnar rätt fil.

  3. Uppdatera elementet <appSettings> i config-filen med följande information:

    • Ange värdet för nyckeln EventHubName till värdet för EntityPath i slutet av anslutningssträng.
    • Ange värdet för Microsoft.ServiceBus.AnslutionString-nyckel till anslutningssträng till namnområdet. Om du använder en anslutningssträng till en händelsehubb, inte ett namnområde, tar du bort EntityPath värdet (;EntityPath=myeventhub) i slutet. Glöm inte att ta bort semikolonet som föregår EntityPath-värdet.
  4. Spara filen.

  5. Öppna sedan ett kommandofönster och växla till den mapp där du packade upp programmet TelcoGenerator. Ange sedan följande kommando:

    .\telcodatagen.exe 1000 0.2 2
    

    Kommandot stöder följande parametrar:

    • Antal samtalsdataposter per timme.
    • Procentandel för sannolikhet för bedrägeri, vilket är hur ofta appen ska simulera ett bedrägligt samtal. Värdet 0,2 innebär att cirka 20 % av samtalsposterna ser bedrägliga ut.
    • Längd i timmar, vilket är det antal timmar som appen ska köras. Du kan också stoppa appen när som helst genom att avsluta processen (Ctrl+C) på kommandoraden.

    Efter några sekunder börjar appen visa telefonsamtalsposter på skärmen och skickar dem till en händelsehubb. Telefonsamtalsdata innehåller följande fält:

    Post Definition
    CallrecTime Tidsstämpeln för samtalets starttid.
    SwitchNum Telefonväxeln används för att ansluta samtalet. I det här exemplet är växlarna strängar som representerar ursprungslandet/ursprungsregionen (USA, Kina, Storbritannien, Tyskland eller Australien).
    CallingNum Uppringarens telefonnummer.
    CallingIMSI International Mobile Subscriber Identity (IMSI). Det är en unik identifierare för uppringaren.
    CalledNum Telefonnumret till mottagaren.
    CalledIMSI International Mobile Subscriber Identity (IMSI). Det är en unik identifierare för mottagaren.

Skapa ett Stream Analytics-jobb

Nu nr du har en ström av anropshändelser kan du skapa ett Stream Analytics-jobb som läser data från händelsehubben.

  1. Du skapar du ett Stream Analytics-jobb genom att gå till Azure-portalen.
  2. Välj Skapa en resurs och sök efter Stream Analytics-jobb. Välj panelen Stream Analytics-jobb och välj Skapa.
  3. Följ dessa steg på sidan Nytt Stream Analytics-jobb :
    1. För Prenumeration väljer du den prenumeration som innehåller Event Hubs-namnområdet.

    2. För Resursgrupp väljer du den resursgrupp som du skapade tidigare.

    3. I avsnittet Instansinformation anger du ett unikt namn för Stream Analytics-jobbet.

    4. För Region väljer du den region där du vill skapa Stream Analytics-jobbet. Vi rekommenderar att du placerar jobbet och händelsehubben i samma region för bästa prestanda och så att du inte betalar för att överföra data mellan regioner.

    5. För Värdmiljö< väljer du Moln om det inte redan har valts. Stream Analytics-jobb kan distribueras till molnet eller edge. Med Molnet kan du distribuera till Azure Cloud, och Med Edge kan du distribuera till en IoT Edge-enhet.

    6. För Enheter för direktuppspelning väljer du 1. Strömningsenheter representerar de bearbetningsresurser som krävs för att köra ett jobb. Standardvärdet är inställt på 1. Mer information om skalning av strömningsenheter finns i artikeln om att förstå och justera strömningsenheter.

    7. Välj Granska + skapa längst ned på sidan.

      Skärmbild som visar sidan Skapa Azure Stream Analytics-jobb.

  4. På sidan Granska + skapa granskar du inställningarna och väljer sedan Skapa för att skapa Stream Analytics-jobbet.
  5. När jobbet har distribuerats väljer du Gå till resurs för att navigera till Stream Analytics-jobbsidan .

Konfigurera jobbindata

Nästa steg är att definiera en indatakälla som jobbet använder för att läsa data med hjälp av den händelsehubb som du skapade i föregående avsnitt.

  1. På sidan Stream Analytics-jobb går du till avsnittet Jobbtopologi på den vänstra menyn och väljer Indata.

  2. På sidan Indata väljer du + Lägg till indata och Händelsehubb.

    Skärmbild som visar indatasidan för ett Stream Analytics-jobb.

  3. Följ dessa steg på sidan Händelsehubb :

    1. För Indataalias anger du CallStream. Indataalias är ett eget namn för att identifiera dina indata. Indataaliaset får enbart innehålla alfanumeriska tecken, bindestreck och understreck och måste vara mellan 3-63 tecken långt.

    2. För Prenumeration väljer du den Azure-prenumeration där du skapade händelsehubben. Händelsehubben kan finnas i samma eller en annan prenumeration som Stream Analytics-jobbet.

    3. För Event Hubs-namnområde väljer du det Event Hubs-namnområde som du skapade i föregående avsnitt. Alla namnområden som är tillgängliga i din aktuella prenumeration visas i listrutan.

    4. Som Händelsehubbnamn väljer du den händelsehubb som du skapade i föregående avsnitt. Alla händelsehubbar som är tillgängliga i det valda namnområdet visas i listrutan.

    5. För händelsehubbens konsumentgrupp väljer du alternativet Skapa ny så att en ny konsumentgrupp skapas på händelsehubben. Vi rekommenderar att du använder en distinkt konsumentgrupp för varje Stream Analytics-jobb. Om ingen konsumentgrupp anges använder $Default Stream Analytics-jobbet konsumentgruppen. När ett jobb innehåller en självkoppling eller har flera indata kan vissa indata senare läsas av mer än en läsare. Den här situationen påverkar antalet läsare i en enskild konsumentgrupp.

    6. För Autentiseringsläge väljer du Anslut ionssträng. Det är enklare att testa självstudien med det här alternativet.

    7. Som Namn på händelsehubbprincip väljer du Använd befintlig och sedan den princip som du skapade tidigare.

    8. Välj Spara längst ned på sidan.

      Skärmbild som visar konfigurationssidan för Event Hubs för indata.

Skapa en Azure Cache for Redis-instans

  1. Skapa en cache i Azure Cache for Redis med anvisningarna i Skapa en cache.

  2. När du har skapat cachen under Inställningar väljer du Åtkomstnycklar. Skriv ner den primära anslutningssträngen.

    Skärmbild som visar valet av menyalternativet Åtkomstnyckel.

Skapa en funktion i Azure Functions som kan skriva data till Azure Cache for Redis

  1. Läs avsnittet Skapa en funktionsapp i dokumentationen till Functions. Det här exemplet byggdes på:

    • Azure Functions-körningsversion 4
    • .NET 6.0
    • StackExchange.Redis 2.2.8
  2. Skapa en httptrigger-standardfunktionsapp i Visual Studio Code genom att följa den här självstudien. Följande information används: språk: C#, runtime: .NET 6 (under funktion v4), mall: HTTP trigger.

  3. Installera Redis-klientbiblioteket genom att köra följande kommando i en terminal i projektmappen:

    dotnet add package StackExchange.Redis --version 2.2.88
    
  4. Lägg till objekten RedisConnectionString och RedisDatabaseIndex i Values avsnittet i , local.settings.jsonoch fyll i anslutningssträng på målservern:

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "RedisConnectionString": "Your Redis Connection String",
            "RedisDatabaseIndex":"0"
        }
    }
    

    Redis Database Index är talet från 0 till 15 som identifierar databasen på instansen.

  5. Ersätt hela funktionen (.cs fil i projektet) med följande kodfragment. Uppdatera namnområdet, klassnamnet och funktionsnamnet efter ditt eget:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.Http;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    
    using StackExchange.Redis;
    
    namespace Company.Function
    {
        public static class HttpTrigger1{
            [FunctionName("HttpTrigger1")]
            public static async Task<IActionResult> Run(
                [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
                ILogger log)
            {
                // Extract the body from the request
                string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
                if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check
    
                dynamic data = JsonConvert.DeserializeObject(requestBody);
    
                // Reject if too large, as per the doc
                if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge
    
                string RedisConnectionString = Environment.GetEnvironmentVariable("RedisConnectionString");
                int RedisDatabaseIndex = int.Parse(Environment.GetEnvironmentVariable("RedisDatabaseIndex"));
    
                using (var connection = ConnectionMultiplexer.Connect(RedisConnectionString))
                {
                    // Connection refers to a property that returns a ConnectionMultiplexer
                    IDatabase db = connection.GetDatabase(RedisDatabaseIndex);
    
                    // Parse items and send to binding
                    for (var i = 0; i < data.Count; i++)
                    {
                        string key = data[i].Time + " - " + data[i].CallingNum1;
    
                        db.StringSet(key, data[i].ToString());
                        log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
    
                        // Simple get of data types from the cache
                        string value = db.StringGet(key);
                        log.LogInformation($"Database got: {key} => {value}");
    
                    }
                }
                return new OkResult(); // 200
            }
        }
    }
    

    När Stream Analytics tar emot undantaget "HTTP-begärandeentiteten är för stor" från funktionen minskar storleken på de batchar som den skickar till funktioner. Följande kod ser till att Stream Analytics inte skickar överdimensionerade batchar. Se till att de värden för maximalt antal batchar och maximal batchstorlek som används i funktionen stämmer överens med de värden som angetts i Stream Analytics-portalen.

  6. Funktionen kan nu publiceras till Azure.

  7. Öppna funktionen på Azure-portalen och ange programinställningar för RedisConnectionString och RedisDatabaseIndex.

Uppdatera Stream Analytics-jobbet med funktionen som utdata

  1. Öppna ditt Stream Analytics-jobb på Azure-portalen.

  2. Bläddra till din funktion och välj Översikt>Utdata>Lägg till. Om du vill lägga till en ny utdatakanal väljer du Azure-funktion för kanalmottagaralternativet. Utdataadaptern för Functions har följande egenskaper:

    Egenskapsnamn Beskrivning
    Utdataalias Ett användarvänligt namn som du använder i jobbets fråga för att referera till utdata.
    Importalternativ Du kan använda funktionen från den aktuella prenumerationen eller ange inställningarna manuellt om funktionen finns i en annan prenumeration.
    Funktionsapp Namnet på din Functions-app.
    Funktion Namnet på funktionen i din Functions-app (namnet på din run.csx-funktion).
    Max batchstorlek Anger den maximala storleken för varje utdatabatch som skickas till din funktion i byte. Som standard är värdet är inställt på 262 144 byte (256 kB).
    Max batchantal Anger det maximala antalet händelser i varje batch som skickas till funktionen. Standardvärdet är 100. Den här egenskapen är valfri.
    Nyckel Gör att du kan använda en funktion från en annan prenumeration. Ange nyckelvärdet för att få åtkomst till din funktion. Den här egenskapen är valfri.
  3. Ange ett namn för utdataaliaset. I den här självstudien heter den saop1, men du kan använda valfritt namn. Fyll i övrig information.

  4. Öppna Stream Analytics-jobbet och uppdatera frågan till följande.

    Viktigt!

    Följande exempelskript förutsätter att du använde CallStream som indatanamn och saop1 för utdatanamnet. Om du har använt olika namn ska du inte glömma att uppdatera frågan.

     SELECT
             System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
             CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
         INTO saop1
         FROM CallStream CS1 TIMESTAMP BY CallRecTime
            JOIN CallStream CS2 TIMESTAMP BY CallRecTime
             ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
         WHERE CS1.SwitchNum != CS2.SwitchNum
    
  5. Starta telcodatagen.exe-programmet genom att köra följande kommando på kommandoraden. Kommandot använder formatet telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours].

    telcodatagen.exe 1000 0.2 2
    
  6. Starta Stream Analytics-jobbet.

  7. På sidan Övervaka för din Azure-funktion ser du att funktionen anropas.

    Skärmbild som visar sidan Övervaka för Azure Functions med funktionsanrop.

  8. På sidan Azure Cache for Redis väljer du Ditt cacheminne, väljer Mått på den vänstra menyn, lägger till cacheskrivningsmått och anger varaktigheten till den senaste timmen. Du ser diagrammet som liknar följande bild.

    Skärmbild som visar sidan Mått för Azure Cache for Redis.

Kontrollera resultatet i Azure Cache for Redis

Hämta nyckeln från Azure Functions-loggar

Hämta först nyckeln för en post som infogats i Azure Cache for Redis. I koden beräknas nyckeln i Azure-funktionen enligt följande kodfragment:

string key = data[i].Time + " - " + data[i].CallingNum1;

db.StringSet(key, data[i].ToString());
log.LogInformation($"Object put in database. Key is {key} and value is {data[i].ToString()}");
  1. Bläddra till Azure-portalen och leta reda på din Azure Functions-app.

  2. Välj Funktioner på den vänstra menyn.

  3. Välj HTTPTrigger1 i listan över funktioner.

  4. Välj Övervaka på den vänstra menyn.

  5. Växla till fliken Loggar .

  6. Anteckna en nyckel från det informationsmeddelande som visas i följande skärmbild. Du använder den här nyckeln för att hitta värdet i Azure Cache for Redis.

    Skärmbild som visar sidan Övervaka loggar för Azure-funktionen.

Använd nyckeln för att hitta posten i Azure Cache for Redis

  1. Bläddra till Azure-portalen och leta rätt på din Azure Cache for Redis. Välj Konsol.

  2. Verifiera att dina data finns i Azure Cache for Redis genom att använda Azure Cache for Redis-kommandon. (Kommandot tar formatet Get {key}.) Använd nyckeln som du kopierade från Monitor-loggarna för Azure-funktionen (i föregående avsnitt).

    Hämta "KEY-FROM-THE-PREVIOUS-SECTION"

    Det här kommandot bör skriva ut värdet för den angivna nyckeln:

    Skärmbild som visar Redis Cache-konsolen som visar utdata från kommandot Hämta.

Felhantering och återförsök

Om ett fel inträffar när händelser skickas till Azure Functions försöker Stream Analytics utföra de flesta åtgärder igen. Alla http-undantag görs på nytt tills det lyckas förutom http-fel 413 (entiteten är för stor). Ett för stort entitetsfel behandlas som ett datafel som utsätts för återförsöks- eller släppprincipen.

Kommentar

Tidsgränsen för HTTP-begäranden från Stream Analytics till Azure Functions är inställd på 100 sekunder. Om det tar mer än 100 sekunder för Din Azure Functions-app att bearbeta en batch, felar Stream Analytics och fyller i batchen igen.

Om du försöker igen för timeouter kan det resultera i duplicerade händelser som skrivits till utdatamottagaren. När Stream Analytics försöker igen för en misslyckad batch försöker den igen för alla händelser i batchen. Tänk dig till exempel en batch med 20 händelser som skickas till Azure Functions från Stream Analytics. Anta att Azure Functions tar 100 sekunder att bearbeta de första 10 händelserna i batchen. Efter 100 sekunder pausar Stream Analytics begäran eftersom den inte har fått något positivt svar från Azure Functions och en annan begäran skickas för samma batch. De första 10 händelserna i batchen bearbetas igen av Azure Functions, vilket orsakar en dubblett.

Kända problem

När du försöker återställa värdet för Max batchstorlek Max batchantal till tomt (standard) i Azure-portalen ändras värdet tillbaka till det tidigare angivna värdet när du spara. Ange standardvärdena för de här fälten manuellt i det här fallet.

Användningen av HTTP-routning på dina Azure Functions stöds för närvarande inte av Stream Analytics.

Stöd för att ansluta till Azure Functions som finns i ett virtuellt nätverk är inte aktiverat.

Rensa resurser

Ta bort resursgruppen, strömningsjobbet och alla relaterade resurser när de inte längre behövs. Om du tar bort jobbet undviker du att bli fakturerad för de strömmande enheter som används av jobbet. Om du planerar att använda jobbet i framtiden kan du stoppa det och sedan starta det igen när du behöver det. Om du inte kommer att fortsätta att använda det här jobbet tar du bort alla resurser som skapats i den här snabbstarten med hjälp av följande steg:

  1. Klicka på Resursgrupper på den vänstra menyn i Azure-portalen och välj sedan namnet på den resurs du skapade.
  2. På sidan med resursgrupper klickar du på Ta bort, skriver in namnet på resursen som ska tas bort i textrutan och väljer sedan Ta bort.

Nästa steg

I den här självstudien har du skapat ett enkelt Stream Analytics-jobb som kör en Azure-funktion. Om du vill veta mer om Stream Analytics-jobb kan du fortsätta till nästa självstudie: