Share via


Implementación de almacenamiento personalizado en un bot

SE APLICA A: SDK v4

Las interacciones de un bot se dividen en tres áreas: el intercambio de actividades con Azure AI Bot Service, la carga y el almacenamiento de estados de bot y diálogo con un almacenamiento de memoria y la integración con servicios back-end.

Diagrama de interacción que describe la relación entre Azure AI Bot Service, un bot, un almacén de memoria y otros servicios.

En este artículo se explora cómo ampliar la semántica entre el Azure AI Bot Service y el estado y el almacenamiento de la memoria del bot.

Nota:

Los SDK de JavaScript, C# y Python de Bot Framework seguirán siendo compatibles, pero el SDK de Java se va a retirar con la compatibilidad final a largo plazo que finaliza en noviembre de 2023.

Los bots existentes creados con el SDK de Java seguirán funcionando.

Para la creación de nuevos bots, considera el uso de Power Virtual Agents y lee sobre cómo elegir la solución de bot de chat adecuada.

Para obtener más información, consulta El futuro de la creación de bots.

Requisitos previos

Este artículo se centra en la versión de C# del ejemplo.

Fondo

El SDK de Bot Framework incluye una implementación predeterminada del estado del bot y el almacenamiento de memoria. Esta implementación se ajusta a las necesidades de las aplicaciones en las que se usan las piezas junto con algunas líneas de código de inicialización, como se muestra en muchos de los ejemplos.

El SDK es un marco y no una aplicación con comportamiento fijo. Es decir, la implementación de muchos de los mecanismos del marco es una predeterminada, pero no es la única posible. El marco no dicta la relación entre el intercambio de actividades con Azure AI Bot Service y la carga y el almacenamiento de cualquier estado del bot.

En este artículo se describe una manera de modificar la semántica de la implementación predeterminada de estado y almacenamiento cuando no funciona como le gustaría en la aplicación. El ejemplo de escalabilidad horizontal proporciona una implementación alternativa de estado y almacenamiento que tiene una semántica diferente a la predeterminada. Esta solución alternativa se adapta igual de bien al marco. En función de su escenario, esta solución alternativa puede ser más adecuada para la aplicación que está desarrollando.

Comportamiento del adaptador y del proveedor de almacenamiento predeterminados

Con la implementación predeterminada, al recibir una actividad, el bot carga el estado correspondiente a la conversación. A continuación, ejecuta la lógica del diálogo con ese estado y la actividad entrante. Durante el proceso de ejecución del diálogo, se crearán una o varias actividades salientes y se enviarán inmediatamente. Una vez que el procesamiento del diálogo está completo, el bot guarda el estado actualizado y sobrescribe el anterior estado.

Diagrama de secuencia que muestra el comportamiento predeterminado de un bot y su almacén de memoria.

Sin embargo, algunas cosas pueden salir mal con este comportamiento.

  • Si se produce un error en la operación de guardado por algún motivo, el estado pierde la sincronización con lo que ve el usuario en el canal. El usuario ve respuestas del bot y cree que el estado ha avanzado, pero no es así. Este error puede ser peor que si se actualiza correctamente el estado, pero el usuario no recibe los mensajes de respuesta.

    Estos errores de estado pueden afectar al diseño de la conversación. Por ejemplo, el diálogo podría requerir intercambios de confirmación adicionales y redundantes con el usuario.

  • Si la implementación se realiza horizontalmente en varios nodos, el estado puede sobrescribirse accidentalmente. Este error puede resultar confuso porque probablemente el diálogo haya enviado actividades al canal con mensajes de confirmación.

    Imagínese un bot para pedir pizzas, en el que se solicita al usuario que seleccione los ingredientes y el usuario envía dos mensajes rápidos: uno para añadir champiñones y otro para añadir queso. En un escenario de escalado horizontal, es posible que haya varias instancias del bot activas y los dos mensajes del usuario se puedan controlar mediante dos instancias diferentes en máquinas independientes. Este tipo de conflicto se conoce como condición de carrera, según la cual una máquina podría sobrescribir el estado que ha escrito otra. Sin embargo, dado que ya se han enviado las respuestas, el usuario recibe la confirmación de que se han añadido los champiñones y el queso al pedido. Lamentablemente, cuando llegue la pizza, solo llevará champiñones o queso, pero no ambos.

Bloqueo optimista

En el ejemplo de escalabilidad horizontal se presentan algunos bloqueos con respecto al estado. El ejemplo implementa el bloqueo optimista, que permite que cada instancia se ejecute como si fuera la única en ejecución y, a continuación, compruebe si hay infracciones de simultaneidad. Este bloqueo puede parecer complicado, pero existen soluciones conocidas, y puede usar tecnologías de almacenamiento en la nube y los puntos de extensión adecuados en el Bot Framework.

En el ejemplo se utiliza un mecanismo estándar de HTTP basado en el encabezado de etiqueta de entidad (ETag). Entender este mecanismo resulta crucial para comprender el código que sigue. En el siguiente diagrama se ilustra la secuencia.

Diagrama de secuencia que muestra una condición de carrera, con el segundo error de actualización.

El diagrama muestra dos clientes que están llevando a cabo una actualización en algún recurso.

  1. Cuando un cliente emite una solicitud GET y se devuelve un recurso desde el servidor, este incluye un encabezado ETag.

    El encabezado ETag es un valor opaco que representa el estado del recurso. Si se cambia un recurso, el servidor actualiza su ETag del recurso.

  2. Cuando el cliente desea conservar un cambio de estado, emite una solicitud POST al servidor, con el valor ETag en un encabezado de condición previa If-Match.

  3. Si el valor de ETag de la solicitud no coincide con el del servidor, se produce un error en la comprobación de condición previa con una respuesta 412 (Error en la condición previa).

    Este error indica que el valor actual del servidor ya no coincide con el valor original en el que estaba funcionando el cliente.

  4. Si el cliente recibe una respuesta errónea en la condición previa, normalmente obtiene un valor nuevo para el recurso, aplica la actualización que quería e intenta volver a publicar la actualización del recurso.

    Esta segunda solicitud POST se realiza correctamente si ningún otro cliente ha actualizado el recurso. Si no es así, el cliente puede intentarlo de nuevo.

Este proceso se denomina optimista porque el cliente, una vez obtenido un recurso, pasa a realizar su procesamiento. El recurso en sí no está bloqueado, ya que otros clientes pueden acceder a él sin ninguna restricción. Cualquier contención entre los clientes sobre cuál debe ser el estado del recurso no se determina hasta que se ha realizado el procesamiento. En un sistema distribuido, esta estrategia suele ser mejor que el enfoque opuesto, el pesimista.

El mecanismo de bloqueo optimista, tal como se describe, supone que la lógica del programa se puede reintentar de forma segura. Lo ideal es que estas solicitudes de servicio sean idempotentes. En informática, una operación idempotente es aquella que no tiene ningún efecto adicional si se la llama varias veces con los mismos parámetros de entrada. Los servicios REST de HTTP puros que implementan las solicitudes GET, PUT y DELETE suelen ser idempotentes. Si una solicitud de servicio no produce efectos adicionales, las solicitudes se pueden volver a ejecutar de forma segura como parte de una estrategia de reintento.

En el ejemplo de escalabilidad horizontal y el resto de este artículo, se supone que los servicios back-end que usa el bot son todos servicios REST de HTTP idempotentes.

Almacenamiento en búfer de actividades salientes

El envío de una actividad no es una operación idempotente. La actividad suele ser un mensaje que retransmite información al usuario y repetir el mismo mensaje dos o más veces podría resultar confuso o engañoso.

El bloqueo optimista implica que la lógica del bot posiblemente tenga que volver a ejecutarse varias veces. Para evitar enviar cualquier actividad determinada varias veces, espere a que la operación de actualización de estado se realice correctamente antes de enviar actividades al usuario. La lógica del bot debe tener un aspecto similar al siguiente diagrama.

Diagrama de secuencia con mensajes que se envían después de guardar el estado del cuadro de diálogo.

Una vez generado un bucle de reintento en la ejecución del diálogo, obtiene el siguiente comportamiento cuando se produce un error en la condición previa de la operación de guardado.

Diagrama de secuencia con mensajes que se envían después de que un intento de reintento se realice correctamente.

Si aplicamos este mecanismo, el bot de pizza del ejemplo anterior nunca debería enviar por error una confirmación positiva de que se ha añadido un ingrediente de pizza a un pedido. Incluso con el bot implementado en varias máquinas, el esquema de bloqueo optimista serializa eficazmente las actualizaciones de estado. En el bot de pizza, la confirmación después de añadir un elemento ahora incluso puede reflejar el estado completo correctamente. Por ejemplo, si el usuario escribe rápidamente "queso" y luego "champiñón" y estos mensajes se controlan mediante dos instancias diferentes del bot, la última instancia que se completará puede incluir "una pizza con queso y champiñones" como parte de su respuesta.

Esta nueva solución de almacenamiento personalizado hace tres cosas que la implementación predeterminada en el SDK no hace:

  1. Usa ETags para detectar la contención.
  2. Reintenta el procesamiento cuando se detecta un error de ETag.
  3. Espera a enviar las actividades salientes hasta que se haya guardado correctamente el estado.

El resto de este artículo describe la implementación de estas tres partes.

Implementación de la compatibilidad con ETag

En primer lugar, defina una interfaz para nuestro nuevo almacén que incluya la compatibilidad con ETag. La interfaz ayuda a usar los mecanismos de inserción de dependencias en ASP.NET. A partir de la interfaz, puede implementar versiones independientes para pruebas unitarias y para producción. Por ejemplo, la versión de la prueba unitaria podría ejecutarse en la memoria sin necesidad de una conexión de red.

La interfaz consta de los métodos de carga y guardado. Ambos métodos usarán un parámetro de clave para identificar el estado para cargar desde el almacenamiento o guardar en el mismo.

  • Load devolverá el valor de estado y el ETag asociado.
  • Save incluirá parámetros para el valor de estado y el ETag asociado y devolverá un valor booleano que indica si la operación se realizó correctamente. El valor devuelto no servirá como un indicador de error general, sino como un indicador específico del error en la condición previa. La comprobación del código de retorno formará parte de la lógica del bucle de reintento.

Para que la implementación de almacenamiento se pueda aplicar ampliamente, evite incluir requisitos de serialización en ella. Sin embargo, muchos servicios de almacenamiento modernos admiten JSON como tipo de contenido. En C#, puede usar el tipo JObject para representar un objeto JSON. En JavaScript o TypeScript, JSON es un objeto nativo normal.

Esta es una definición de la interfaz personalizada.

IStore.cs

public interface IStore
{
    Task<(JObject content, string etag)> LoadAsync(string key);

    Task<bool> SaveAsync(string key, JObject content, string etag);
}

Esta es una implementación de Azure Blob Storage.

BlobStore.cs

public class BlobStore : IStore
{
    private readonly CloudBlobContainer _container;

    public BlobStore(string accountName, string accountKey, string containerName)
    {
        if (string.IsNullOrWhiteSpace(accountName))
        {
            throw new ArgumentException(nameof(accountName));
        }

        if (string.IsNullOrWhiteSpace(accountKey))
        {
            throw new ArgumentException(nameof(accountKey));
        }

        if (string.IsNullOrWhiteSpace(containerName))
        {
            throw new ArgumentException(nameof(containerName));
        }

        var storageCredentials = new StorageCredentials(accountName, accountKey);
        var cloudStorageAccount = new CloudStorageAccount(storageCredentials, useHttps: true);
        var client = cloudStorageAccount.CreateCloudBlobClient();
        _container = client.GetContainerReference(containerName);
    }

    public async Task<(JObject content, string etag)> LoadAsync(string key)
    {
        if (string.IsNullOrWhiteSpace(key))
        {
            throw new ArgumentException(nameof(key));
        }

        var blob = _container.GetBlockBlobReference(key);
        try
        {
            var content = await blob.DownloadTextAsync();
            var obj = JObject.Parse(content);
            var etag = blob.Properties.ETag;
            return (obj, etag);
        }
        catch (StorageException e)
            when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotFound)
        {
            return (new JObject(), null);
        }
    }

    public async Task<bool> SaveAsync(string key, JObject obj, string etag)
    {
        if (string.IsNullOrWhiteSpace(key))
        {
            throw new ArgumentException(nameof(key));
        }

        if (obj == null)
        {
            throw new ArgumentNullException(nameof(obj));
        }

        var blob = _container.GetBlockBlobReference(key);
        blob.Properties.ContentType = "application/json";
        var content = obj.ToString();
        if (etag != null)
        {
            try
            {
                await blob.UploadTextAsync(content, Encoding.UTF8, new AccessCondition { IfMatchETag = etag }, new BlobRequestOptions(), new OperationContext());
            }
            catch (StorageException e)
                when (e.RequestInformation.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
            {
                return false;
            }
        }
        else
        {
            await blob.UploadTextAsync(content);
        }

        return true;
    }
}

Azure Blob Storage realiza gran parte del trabajo. Cada método comprueba si hay una excepción específica para satisfacer las expectativas del código de llamada.

  • El método LoadAsync, en respuesta a una excepción de almacenamiento con un código de estado no encontrado, devuelve un valor NULL.
  • El método SaveAsync, en respuesta a una excepción de almacenamiento con un código de error en la condición previa, devuelve false.

Implementación de un bucle de reintento

El diseño del bucle de reintento implementa el comportamiento que se muestra en los diagramas de secuencia.

  1. Al recibir una actividad, se crea una clave para el estado de la conversación.

    La relación entre una actividad y el estado de la conversación es la misma para el almacenamiento personalizado que para la implementación predeterminada. Por lo tanto, puede construir la clave de la misma manera que la implementación de estado predeterminada.

  2. Intente cargar el estado de la conversación.

  3. Ejecute los diálogos del bot y capture las actividades salientes que se van a enviar.

  4. Intente guardar el estado de la conversación.

    • Si se ejecuta correctamente, envíe las actividades salientes y salga.

    • Si se produce un error, repita este proceso desde el paso de carga del estado de la conversación.

      La nueva carga del estado de la conversación obtiene un estado de la conversación y un ETag nuevo y actual. El diálogo se vuelve a ejecutar y el paso para guardar el estado puede realizarse correctamente.

Esta es una implementación para el controlador de actividad de mensajes.

ScaleoutBot.cs

protected override async Task OnMessageActivityAsync(ITurnContext<IMessageActivity> turnContext, CancellationToken cancellationToken)
{
    // Create the storage key for this conversation.
    var key = $"{turnContext.Activity.ChannelId}/conversations/{turnContext.Activity.Conversation?.Id}";

    // The execution sits in a loop because there might be a retry if the save operation fails.
    while (true)
    {
        // Load any existing state associated with this key
        var (oldState, etag) = await _store.LoadAsync(key);

        // Run the dialog system with the old state and inbound activity, the result is a new state and outbound activities.
        var (activities, newState) = await DialogHost.RunAsync(_dialog, turnContext.Activity, oldState, cancellationToken);

        // Save the updated state associated with this key.
        var success = await _store.SaveAsync(key, newState, etag);

        // Following a successful save, send any outbound Activities, otherwise retry everything.
        if (success)
        {
            if (activities.Any())
            {
                // This is an actual send on the TurnContext we were given and so will actual do a send this time.
                await turnContext.SendActivitiesAsync(activities, cancellationToken);
            }

            break;
        }
    }
}

Nota:

El ejemplo implementa la ejecución del diálogo como una llamada de función. Un enfoque más sofisticado podría ser definir una interfaz y usar la inserción de dependencias. Sin embargo, en este ejemplo, la función estática hace hincapié en la naturaleza funcional de este enfoque de bloqueo optimista. En general, cuando se implementan las partes cruciales del código de una manera funcional, se mejoran las posibilidades de que trabaje correctamente en redes.

Implementación de un búfer de actividad saliente

El siguiente requisito es almacenar en búfer las actividades salientes hasta que se produzca una operación de guardado correcta, lo que requiere una implementación de adaptador personalizado. El método SendActivitiesAsync personalizado no debe enviar las actividades al uso, sino añadirlas a una lista. El código del diálogo no necesitará ninguna modificación.

  • En este escenario concreto, no se admiten las operaciones de actividad de actualización y actividad de eliminación y los métodos asociados generarán excepciones no implementadas.
  • Algunos canales usan el valor devuelto en la operación de actividades de envío para permitir que un bot modifique o elimine un mensaje enviado anteriormente, por ejemplo, para deshabilitar los botones de las tarjetas mostradas en el canal. Estos intercambios de mensajes pueden resultar complicados, especialmente cuando se requiere el estado, y están fuera del ámbito de este artículo.
  • El diálogo crea y usa este adaptador personalizado, por lo que puede almacenar en búfer actividades.
  • El controlador de turnos del bot usará un AdapterWithErrorHandler más estándar para enviar las actividades al usuario.

Esta es una implementación del adaptador personalizado.

DialogHostAdapter.cs

public class DialogHostAdapter : BotAdapter
{
    private List<Activity> _response = new List<Activity>();

    public IEnumerable<Activity> Activities => _response;

    public override Task<ResourceResponse[]> SendActivitiesAsync(ITurnContext turnContext, Activity[] activities, CancellationToken cancellationToken)
    {
        foreach (var activity in activities)
        {
            _response.Add(activity);
        }

        return Task.FromResult(new ResourceResponse[0]);
    }

    #region Not Implemented
    public override Task DeleteActivityAsync(ITurnContext turnContext, ConversationReference reference, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }

    public override Task<ResourceResponse> UpdateActivityAsync(ITurnContext turnContext, Activity activity, CancellationToken cancellationToken)
    {
        throw new NotImplementedException();
    }
    #endregion
}

Uso del almacenamiento personalizado en un bot

El último paso es usar estas clases y estos métodos personalizados con las clases y los métodos del marco existentes.

  • El bucle de reintento principal se convierte en parte del método ActivityHandler.OnMessageActivityAsync del bot e incluye el almacenamiento personalizado a través de la inserción de dependencias.
  • El código de hospedaje del diálogo se añade a la clase DialogHost que expone un método RunAsync estático. El host de diálogo:
    • Toma la actividad entrante y el estado anterior y, a continuación, devuelve las actividades resultantes y el nuevo estado.
    • Crea el adaptador personalizado y ejecuta el diálogo de la misma manera que lo hace el SDK.
    • Crea un descriptor de acceso de propiedades de estado personalizado, una corrección de compatibilidad que envía el estado del diálogo al sistema de diálogos. El descriptor de acceso usa la semántica de referencia para enviar un identificador del descriptor de acceso al sistema de diálogos.

Sugerencia

La serialización JSON se añade insertada al código de hospedaje para mantenerlo fuera de la capa de almacenamiento conectable, de modo que las distintas implementaciones puedan serializarse de forma diferente.

Esta es una implementación del host de diálogo.

DialogHost.cs

public static class DialogHost
{
    // The serializer to use. Moving the serialization to this layer will make the storage layer more pluggable.
    private static readonly JsonSerializer StateJsonSerializer = new JsonSerializer() { TypeNameHandling = TypeNameHandling.All };

    /// <summary>
    /// A function to run a dialog while buffering the outbound Activities.
    /// </summary>
    /// <param name="dialog">THe dialog to run.</param>
    /// <param name="activity">The inbound Activity to run it with.</param>
    /// <param name="oldState">Th eexisting or old state.</param>
    /// <returns>An array of Activities 'sent' from the dialog as it executed. And the updated or new state.</returns>
    public static async Task<(Activity[], JObject)> RunAsync(Dialog dialog, IMessageActivity activity, JObject oldState, CancellationToken cancellationToken)
    {
        // A custom adapter and corresponding TurnContext that buffers any messages sent.
        var adapter = new DialogHostAdapter();
        var turnContext = new TurnContext(adapter, (Activity)activity);

        // Run the dialog using this TurnContext with the existing state.
        var newState = await RunTurnAsync(dialog, turnContext, oldState, cancellationToken);

        // The result is a set of activities to send and a replacement state.
        return (adapter.Activities.ToArray(), newState);
    }

    /// <summary>
    /// Execute the turn of the bot. The functionality here closely resembles that which is found in the
    /// IBot.OnTurnAsync method in an implementation that is using the regular BotFrameworkAdapter.
    /// Also here in this example the focus is explicitly on Dialogs but the pattern could be adapted
    /// to other conversation modeling abstractions.
    /// </summary>
    /// <param name="dialog">The dialog to be run.</param>
    /// <param name="turnContext">The ITurnContext instance to use. Note this is not the one passed into the IBot OnTurnAsync.</param>
    /// <param name="state">The existing or old state of the dialog.</param>
    /// <returns>The updated or new state of the dialog.</returns>
    private static async Task<JObject> RunTurnAsync(Dialog dialog, ITurnContext turnContext, JObject state, CancellationToken cancellationToken)
    {
        // If we have some state, deserialize it. (This mimics the shape produced by BotState.cs.)
        var dialogStateProperty = state?[nameof(DialogState)];
        var dialogState = dialogStateProperty?.ToObject<DialogState>(StateJsonSerializer);

        // A custom accessor is used to pass a handle on the state to the dialog system.
        var accessor = new RefAccessor<DialogState>(dialogState);

        // Run the dialog.
        await dialog.RunAsync(turnContext, accessor, cancellationToken);

        // Serialize the result (available as Value on the accessor), and put its value back into a new JObject.
        return new JObject { { nameof(DialogState), JObject.FromObject(accessor.Value, StateJsonSerializer) } };
    }
}

Y, por último, esta es una implementación del descriptor de acceso de propiedades de estado personalizado.

RefAccessor.cs

public class RefAccessor<T> : IStatePropertyAccessor<T>
    where T : class
{
    public RefAccessor(T value)
    {
        Value = value;
    }

    public T Value { get; private set; }

    public string Name => nameof(T);

    public Task<T> GetAsync(ITurnContext turnContext, Func<T> defaultValueFactory = null, CancellationToken cancellationToken = default(CancellationToken))
    {
        if (Value == null)
        {
            if (defaultValueFactory == null)
            {
                throw new KeyNotFoundException();
            }

            Value = defaultValueFactory();
        }

        return Task.FromResult(Value);
    }

    #region Not Implemented
    public Task DeleteAsync(ITurnContext turnContext, CancellationToken cancellationToken = default(CancellationToken))
    {
        throw new NotImplementedException();
    }

    public Task SetAsync(ITurnContext turnContext, T value, CancellationToken cancellationToken = default(CancellationToken))
    {
        throw new NotImplementedException();
    }
    #endregion
}

Información adicional

El ejemplo de escalabilidad horizontal está disponible en el repositorio de ejemplos de Bot Framework en GitHub para C#, Python y Java.