Implementace vlastního úložiště pro robota

PLATÍ PRO: SDK v4

Interakce robota spadají do tří oblastí: výměna aktivit se službou Azure AI Bot Service, načítání a ukládání stavu robota a dialogového okna s úložištěm paměti a integrace s back-endovými službami.

Interaction diagram outlining relationship between the Azure AI Bot Service, a bot, a memory store, and other services.

Tento článek popisuje, jak rozšířit sémantiku mezi službou Azure AI Bot Service a stavem paměti a úložištěm robota.

Poznámka:

Sady SDK služby Bot Framework JavaScript, C# a Python budou nadále podporovány, ale sada Java SDK se vyřazuje s konečnou dlouhodobou podporou končící v listopadu 2023. V tomto úložišti budou provedeny pouze kritické opravy zabezpečení a chyb.

Stávající roboti sestavení pomocí sady Java SDK budou i nadále fungovat.

Pro nové vytváření robotů zvažte použití Power Virtual Agents a přečtěte si o výběru správného řešení chatovacího robota.

Další informace najdete v tématu Budoucnost vytváření robotů.

Předpoklady

Tento článek se zaměřuje na verzi C# ukázky.

Pozadí

Sada SDK služby Bot Framework zahrnuje výchozí implementaci stavu robota a úložiště paměti. Tato implementace odpovídá potřebám aplikací, ve kterých se jednotlivé části používají společně s několika řádky inicializačního kódu, jak je znázorněno v mnoha ukázkách.

Sada SDK je architektura, nikoli aplikace s pevným chováním. Jinými slovy, implementace mnoha mechanismů v rámci je výchozí implementace, a ne jedinou možnou implementací. Architektura nediktuje vztah mezi výměnou aktivit se službou Azure AI Bot Service a načítáním a ukládáním stavu robota.

Tento článek popisuje jeden ze způsobů, jak upravit sémantiku výchozího stavu a implementace úložiště, když pro vaši aplikaci úplně nefunguje. Ukázka horizontálního navýšení kapacity poskytuje alternativní implementaci stavu a úložiště, která má jinou sémantiku než výchozí. Toto alternativní řešení je v architektuře stejně dobře. V závislosti na vašem scénáři může být toto alternativní řešení vhodnější pro aplikaci, kterou vyvíjíte.

Chování výchozího adaptéru a poskytovatele úložiště

Při výchozí implementaci při přijetí aktivity robot načte stav odpovídající konverzaci. Potom spustí logiku dialogového okna s tímto stavem a příchozí aktivitou. Při spuštění dialogového okna se vytvoří a okamžitě odešle jedna nebo více odchozích aktivit. Po dokončení zpracování dialogového okna robot uloží aktualizovaný stav a přepíše starý stav.

Sequence diagram showing the default behavior of a bot and its memory store.

S tímto chováním se ale může pokazit několik věcí.

  • Pokud operace uložení z nějakého důvodu selže, stav se implicitně vyklouzl ze synchronizace s tím, co uživatel uvidí v kanálu. Uživatel viděl odpovědi z robota a věří, že se stav přesunul dopředu, ale ne. Tato chyba může být horší než v případě, že aktualizace stavu proběhla úspěšně, ale uživatel neobdržel zprávy s odpovědí.

    Takové chyby stavu můžou mít vliv na návrh konverzace. Dialogové okno může například vyžadovat dodatečné, jinak redundantní výměny potvrzení s uživatelem.

  • Pokud se implementace nasadí škálovat na více uzlů, může se stát omylem přepsat. Tato chyba může být matoucí, protože dialogové okno pravděpodobně odeslalo aktivity do kanálu s potvrzovacími zprávami.

    Představte si robota objednávky pizzy, kde robot požádá uživatele o zastavení voleb a uživatel pošle dvě rychlé zprávy: jeden přidá houby a jeden pro přidání sýra. Ve scénáři s horizontálním navýšením kapacity může být aktivních více instancí robota a dvě uživatelské zprávy mohou být zpracovány dvěma samostatnými instancemi na samostatných počítačích. Takový konflikt se označuje jako stav časování, kdy jeden počítač může přepsat stav napsaný jiným počítačem. Vzhledem k tomu, že odpovědi byly již odeslány, uživatel obdržel potvrzení, že do své objednávky byly přidány houbové i sýrové. Bohužel, když pizza přijde, obsahuje pouze houbu nebo sýr, ale ne obojí.

Optimistické uzamčení

Ukázka horizontálního navýšení kapacity představuje několik uzamčení stavu. Ukázka implementuje optimistické uzamčení, které umožňuje spuštění každé instance, jako by to byla jediná spuštěná, a pak zkontrolujte případné porušení souběžnosti. Toto uzamykání může znít složitě, ale existují známá řešení a můžete používat technologie cloudového úložiště a správné body rozšíření v bot Frameworku.

Ukázka používá standardní mechanismus HTTP založený na hlavičce značky entity (ETag). Pochopení tohoto mechanismu je zásadní pro pochopení následujícího kódu. Následující diagram znázorňuje posloupnost.

Sequence diagram showing a race condition, with the second update failing.

Diagram obsahuje dva klienty, kteří provádějí aktualizaci některého prostředku.

  1. Když klient vydá požadavek GET a prostředek se vrátí ze serveru, server obsahuje hlavičku značky ETag.

    Hlavička ETag je neprůhlená hodnota, která představuje stav prostředku. Pokud dojde ke změně prostředku, server aktualizuje jeho značku ETag prostředku.

  2. Když chce klient zachovat změnu stavu, vydá požadavek POST na server s hodnotou ETag v If-Match předběžné hlavičce.

  3. Pokud hodnota ETag požadavku neodpovídá hodnotě serveru, předběžná kontrola selže s 412 odpovědí (Předběžná podmínka selhala).

    Tato chyba značí, že aktuální hodnota na serveru už neodpovídá původní hodnotě, na které klient fungoval.

  4. Pokud klient obdrží předběžnou podmínku neúspěšné odpovědi, klient obvykle získá novou hodnotu prostředku, použije požadovanou aktualizaci a pokusí se znovu publikovat aktualizaci prostředku.

    Tento druhý požadavek POST je úspěšný, pokud prostředek neaktualizoval žádný jiný klient. V opačném případě může klient akci opakovat.

Tento proces se nazývá optimistický , protože jakmile má prostředek, pokračuje ve zpracování – samotný prostředek není uzamčený, protože k němu mají přístup i ostatní klienti bez omezení. Jakékoli kolize mezi klienty o tom, jaký stav prostředku by se měl určit, dokud se zpracování nedokončí. V distribuovaném systému je tato strategie často optimalnější než opačný pesimistický přístup.

Mechanismus optimistického uzamčení, jak je popsáno, předpokládá, že logika programu se dá bezpečně opakovat. Ideální je situace, kdy jsou tyto žádosti o služby idempotentní. V počítačových vědách je idempotentní operace, která nemá žádný dodatečný účinek, pokud se volá více než jednou se stejnými vstupními parametry. Čistě služby HTTP REST, které implementují požadavky GET, PUT a DELETE, jsou často idempotentní. Pokud požadavek na službu nevygeneruje další efekty, je možné žádosti bezpečně znovu spustit jako součást strategie opakování.

Ukázka horizontálního navýšení kapacity a zbývající část tohoto článku předpokládají, že back-endové služby, které robot používá, jsou všechny idempotentní služby HTTP REST.

Ukládání odchozích aktivit do vyrovnávací paměti

Odeslání aktivity není idempotentní operace. Aktivita je často zpráva, která předává informace uživateli a opakování stejné zprávy dvakrát nebo vícekrát může být matoucí nebo zavádějící.

Optimistické uzamčení znamená, že logika robota může být potřeba spustit několikrát znovu. Pokud se chcete vyhnout vícenásobnému odesílání jakékoli dané aktivity, počkejte, než se operace aktualizace stavu úspěšně odešle uživateli. Logika robota by měla vypadat přibližně jako v následujícím diagramu.

Sequence diagram with messages being sent after dialog state is saved.

Jakmile do provádění dialogového okna sestavíte smyčku opakování, budete mít následující chování, když dojde k předběžné chybě operace uložení.

Sequence diagram with messages being sent after a retry attempt succeeds.

S tímto mechanismem by robot pizzy z předchozího příkladu nikdy neměl posílat chybné kladné potvrzení o tom, že se pizza přestane přidávat do objednávky. I když je robot nasazený na více počítačích, schéma optimistického zamykání efektivně serializuje aktualizace stavu. V robotovi pizzy může potvrzení o přidání položky nyní dokonce přesně odrážet celý stav. Pokud například uživatel rychle zadá "sýr" a "houba" a tyto zprávy se zpracovávají dvěma různými instancemi robota, poslední instance, která se dokončí, může jako součást odpovědi obsahovat "pizzu se sýrem a houbou".

Toto nové vlastní řešení úložiště dělá tři věci, které výchozí implementace v sadě SDK nedělá:

  1. K detekci kolizí používá značky ETag.
  2. Při zjištění selhání značky ETag opakuje zpracování.
  3. Čeká na odesílání odchozích aktivit, dokud se úspěšně neuloží.

Zbývající část tohoto článku popisuje implementaci těchto tří částí.

Implementace podpory značky ETag

Nejprve definujte rozhraní pro naše nové úložiště, které zahrnuje podporu značky ETag. Rozhraní pomáhá používat mechanismy injektáže závislostí v ASP.NET. Počínaje rozhraním můžete implementovat samostatné verze pro testy jednotek a pro produkční prostředí. Například verze testu jednotek může běžet v paměti a nevyžaduje síťové připojení.

Rozhraní se skládá z metod načítání a ukládání . Obě metody budou používat parametr klíče k identifikaci stavu, ze které se má načíst, nebo uložit do úložiště.

  • Načtení vrátí hodnotu stavu a přidruženou značku ETag.
  • Uložení bude mít parametry pro hodnotu stavu a přidruženou značku ETag a vrátí logickou hodnotu, která označuje, jestli operace proběhla úspěšně. Vrácená hodnota nebude sloužit jako obecný indikátor chyby, ale jako konkrétní indikátor selhání předběžné podmínky. Kontrola návratového kódu bude součástí logiky smyčky opakování.

Pokud chcete, aby implementace úložiště byla široce použitelná, vyhněte se požadavkům serializace na ni. Mnoho moderních služeb úložiště ale podporuje JSON jako typ obsahu. V jazyce C# můžete použít JObject typ k reprezentaci objektu JSON. V JavaScriptu nebo TypeScriptu je JSON běžným nativním objektem.

Tady je definice vlastního rozhraní.

IStore.cs

public interface IStore
{
    Task<(JObject content, string etag)> LoadAsync(string key);

    Task<bool> SaveAsync(string key, JObject content, string etag);
}

Tady je implementace služby Azure Blob Storage.

BlobStore.cs

public class BlobStore : IStore
{
    private readonly CloudBlobContainer _container;

    public BlobStore(string accountName, string accountKey, string containerName)
    {
        if (string.IsNullOrWhiteSpace(accountName))
        {
            throw new ArgumentException(nameof(accountName));
        }

        if (string.IsNullOrWhiteSpace(accountKey))
        {
            throw new ArgumentException(nameof(accountKey));
        }

        if (string.IsNullOrWhiteSpace(containerName))
        {
            throw new ArgumentException(nameof(containerName));
        }

        var storageCredentials = new StorageCredentials(accountName, accountKey);
        var cloudStorageAccount = new CloudStorageAccount(storageCredentials, useHttps: true);
        var client = cloudStorageAccount.CreateCloudBlobClient();
        _container = client.GetContainerReference(containerName);
    }

    public async Task<(JObject content, string etag)> LoadAsync(string key)
    {
        if (string.IsNullOrWhiteSpace(key))
        {
            throw new ArgumentException(nameof(key));
        }

        var blob = _container.GetBlockBlobReference(key);
        try
        {
            var content = await blob.DownloadTextAsync();
            var obj = JObject.Parse(content);
            var etag = blob.Properties.ETag;
            return (obj, etag);
        }
        catch (StorageException e)
            when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotFound)
        {
            return (new JObject(), null);
        }
    }

    public async Task<bool> SaveAsync(string key, JObject obj, string etag)
    {
        if (string.IsNullOrWhiteSpace(key))
        {
            throw new ArgumentException(nameof(key));
        }

        if (obj == null)
        {
            throw new ArgumentNullException(nameof(obj));
        }

        var blob = _container.GetBlockBlobReference(key);
        blob.Properties.ContentType = "application/json";
        var content = obj.ToString();
        if (etag != null)
        {
            try
            {
                await blob.UploadTextAsync(content, Encoding.UTF8, new AccessCondition { IfMatchETag = etag }, new BlobRequestOptions(), new OperationContext());
            }
            catch (StorageException e)
                when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
            {
                return false;
            }
        }
        else
        {
            await blob.UploadTextAsync(content);
        }

        return true;
    }
}

Azure Blob Storage toho hodně dělá. Každá metoda kontroluje konkrétní výjimku, aby splňovala očekávání volajícího kódu.

  • Metoda LoadAsync v reakci na výjimku úložiště s nenalezeným stavovým kódem vrátí hodnotu null.
  • MetodaSaveAsync, v reakci na výjimku úložiště s předběžným neúspěšným kódem, vrátí false.

Implementace smyčky opakování

Návrh smyčky opakování implementuje chování zobrazené v sekvenčních diagramech.

  1. Při příjmu aktivity vytvořte klíč pro stav konverzace.

    Vztah mezi aktivitou a stavem konverzace je stejný pro vlastní úložiště jako pro výchozí implementaci. Proto můžete klíč vytvořit stejným způsobem jako výchozí implementace stavu.

  2. Pokus o načtení stavu konverzace

  3. Spusťte dialogy robota a zaznamenejte odchozí aktivity, které chcete odeslat.

  4. Pokus o uložení stavu konverzace

    • V případě úspěchu odešlete odchozí aktivity a ukončete je.

    • Při selhání tento proces zopakujte z kroku a načtěte stav konverzace.

      Nové zatížení stavu konverzace získá novou a aktuální značku ETag a stav konverzace. Dialogové okno se znovu spustí a krok stavu uložení má šanci uspět.

Tady je implementace obslužné rutiny aktivity zprávy.

ScaleoutBot.cs

protected override async Task OnMessageActivityAsync(ITurnContext<IMessageActivity> turnContext, CancellationToken cancellationToken)
{
    // Create the storage key for this conversation.
    var key = $"{turnContext.Activity.ChannelId}/conversations/{turnContext.Activity.Conversation?.Id}";

    // The execution sits in a loop because there might be a retry if the save operation fails.
    while (true)
    {
        // Load any existing state associated with this key
        var (oldState, etag) = await _store.LoadAsync(key);

        // Run the dialog system with the old state and inbound activity, the result is a new state and outbound activities.
        var (activities, newState) = await DialogHost.RunAsync(_dialog, turnContext.Activity, oldState, cancellationToken);

        // Save the updated state associated with this key.
        var success = await _store.SaveAsync(key, newState, etag);

        // Following a successful save, send any outbound Activities, otherwise retry everything.
        if (success)
        {
            if (activities.Any())
            {
                // This is an actual send on the TurnContext we were given and so will actual do a send this time.
                await turnContext.SendActivitiesAsync(activities, cancellationToken);
            }

            break;
        }
    }
}

Poznámka:

Ukázka implementuje provádění dialogového okna jako volání funkce. Složitějším přístupem může být definování rozhraní a použití injektáže závislostí. V tomto příkladu ale statická funkce zdůrazňuje funkční povahu tohoto optimistického uzamčení. Obecně platí, že když implementujete klíčové části kódu funkčním způsobem, zlepšíte jeho šanci na úspěšné fungování v sítích.

Implementace vyrovnávací paměti odchozí aktivity

Dalším požadavkem je ukládat odchozí aktivity do vyrovnávací paměti, dokud nedojde k úspěšné operaci uložení, což vyžaduje vlastní implementaci adaptéru. Vlastní SendActivitiesAsync metoda by neměla posílat aktivity do použití, ale přidat aktivity do seznamu. Kód dialogového okna nebude potřebovat úpravy.

  • V tomto konkrétním scénáři nejsou podporované operace aktivity aktualizace a aktivity odstranění a přidružené metody nevyvolají implementované výjimky.
  • Návratovou hodnotu operace odesílání aktivit používají některé kanály, aby robot mohl upravit nebo odstranit dříve odeslanou zprávu, například zakázat tlačítka na kartách zobrazených v kanálu. Tyto výměny zpráv můžou být složité, zejména pokud je vyžadován stav, a jsou mimo rozsah tohoto článku.
  • Dialogové okno vytvoří a používá tento vlastní adaptér, aby mohl ukládat aktivity do vyrovnávací paměti.
  • Obslužná rutina otáčení robota bude používat ke AdapterWithErrorHandler standardnímu odesílání aktivit uživateli.

Tady je implementace vlastního adaptéru.

DialogHostAdapter.cs

public class DialogHostAdapter : BotAdapter
{
    private List<Activity> _response = new List<Activity>();

    public IEnumerable<Activity> Activities => _response;

    public override Task<ResourceResponse[]> SendActivitiesAsync(ITurnContext turnContext, Activity[] activities, CancellationToken cancellationToken)
    {
        foreach (var activity in activities)
        {
            _response.Add(activity);
        }

        return Task.FromResult(new ResourceResponse[0]);
    }

    #region Not Implemented
    public override Task DeleteActivityAsync(ITurnContext turnContext, ConversationReference reference, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }

    public override Task<ResourceResponse> UpdateActivityAsync(ITurnContext turnContext, Activity activity, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }
    #endregion
}

Použití vlastního úložiště v robotovi

Posledním krokem je použití těchto vlastních tříd a metod s existujícími třídami a metodami architektury.

  • Hlavní smyčka opakování se stane součástí metody robota ActivityHandler.OnMessageActivityAsync a zahrnuje vlastní úložiště prostřednictvím injektáže závislostí.
  • Kód hostování dialogového okna se přidá do DialogHost třídy, která zveřejňuje statickou RunAsync metodu. Hostitel dialogového okna:
    • Vezme příchozí aktivitu a starý stav a vrátí výsledné aktivity a nový stav.
    • Vytvoří vlastní adaptér a jinak spustí dialogové okno stejným způsobem jako sada SDK.
    • Vytvoří vlastní přístupové objekty vlastností stavu, které předá stav dialogového okna do systému dialogového okna. Přistupovač používá referenční sémantiku k předání popisovače přístupového úchytu do dialogového systému.

Tip

Serializace JSON se přidá do hostitelského kódu, aby byla mimo připojitelnou vrstvu úložiště, aby se různé implementace mohly serializovat jinak.

Tady je implementace hostitele dialogového okna.

DialogHost.cs

public static class DialogHost
{
    // The serializer to use. Moving the serialization to this layer will make the storage layer more pluggable.
    private static readonly JsonSerializer StateJsonSerializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.All };

    /// <summary>
    /// A function to run a dialog while buffering the outbound Activities.
    /// </summary>
    /// <param name="dialog">THe dialog to run.</param>
    /// <param name="activity">The inbound Activity to run it with.</param>
    /// <param name="oldState">Th eexisting or old state.</param>
    /// <returns>An array of Activities 'sent' from the dialog as it executed. And the updated or new state.</returns>
    public static async Task<(Activity[], JObject)> RunAsync(Dialog dialog, IMessageActivity activity, JObject oldState, CancellationToken cancellationToken)
    {
        // A custom adapter and corresponding TurnContext that buffers any messages sent.
        var adapter = new DialogHostAdapter();
        var turnContext = new TurnContext(adapter, (Activity)activity);

        // Run the dialog using this TurnContext with the existing state.
        var newState = await RunTurnAsync(dialog, turnContext, oldState, cancellationToken);

        // The result is a set of activities to send and a replacement state.
        return (adapter.Activities.ToArray(), newState);
    }

    /// <summary>
    /// Execute the turn of the bot. The functionality here closely resembles that which is found in the
    /// IBot.OnTurnAsync method in an implementation that is using the regular BotFrameworkAdapter.
    /// Also here in this example the focus is explicitly on Dialogs but the pattern could be adapted
    /// to other conversation modeling abstractions.
    /// </summary>
    /// <param name="dialog">The dialog to be run.</param>
    /// <param name="turnContext">The ITurnContext instance to use. Note this is not the one passed into the IBot OnTurnAsync.</param>
    /// <param name="state">The existing or old state of the dialog.</param>
    /// <returns>The updated or new state of the dialog.</returns>
    private static async Task<JObject> RunTurnAsync(Dialog dialog, ITurnContext turnContext, JObject state, CancellationToken cancellationToken)
    {
        // If we have some state, deserialize it. (This mimics the shape produced by BotState.cs.)
        var dialogStateProperty = state?[nameof(DialogState)];
        var dialogState = dialogStateProperty?.ToObject<DialogState>(StateJsonSerializer);

        // A custom accessor is used to pass a handle on the state to the dialog system.
        var accessor = new RefAccessor<DialogState>(dialogState);

        // Run the dialog.
        await dialog.RunAsync(turnContext, accessor, cancellationToken);

        // Serialize the result (available as Value on the accessor), and put its value back into a new JObject.
        return new JObject { { nameof(DialogState), JObject.FromObject(accessor.Value, StateJsonSerializer) } };
    }
}

A konečně tady je implementace vlastního přístupového objektu vlastností stavu.

RefAccessor.cs

public class RefAccessor<T> : IStatePropertyAccessor<T>
    where T : class
{
    public RefAccessor(T value)
    {
        Value = value;
    }

    public T Value { get; private set; }

    public string Name => nameof(T);

    public Task<T> GetAsync(ITurnContext turnContext, Func<T> defaultValueFactory = null, CancellationToken cancellationToken = default(CancellationToken))
    {
        if (Value == null)
        {
            if (defaultValueFactory == null)
            {
                throw new KeyNotFoundException();
            }

            Value = defaultValueFactory();
        }

        return Task.FromResult(Value);
    }

    #region Not Implemented
    public Task DeleteAsync(ITurnContext turnContext, CancellationToken cancellationToken = default(CancellationToken))
    {
        throw new NotImplementedException();
    }

    public Task SetAsync(ITurnContext turnContext, T value, CancellationToken cancellationToken = default(CancellationToken))
    {
        throw new NotImplementedException();
    }
    #endregion
}

Další informace

Ukázka škálování na více instancí je k dispozici v úložišti ukázek služby Bot Framework na GitHubu v C#, Pythonu a Javě.