Juin 2016

Volume 31, numéro 6

Cet article a fait l'objet d'une traduction automatique.

Cadre réactif - Faire évoluer un lien client-serveur asynchrone grâce à une approche réactive

Par Peter Vogel | Juin 2016

Le traitement asynchrone est devenu plus courant dans le développement d’applications, Microsoft .NET Framework a acquis une grande variété d’outils qui prennent en charge les modèles de design asynchrone spécifique. Création d’une application bien conçue asynchrone souvent revient à reconnaître le modèle de conception, votre application est mise en œuvre et de prélèvement puis l’ensemble correct de composants .NET.

Dans certains cas, la correspondance requiert l’intégration de plusieurs composants .NET. L’article de Stephen Cleary, « modèles pour les Applications MVVM asynchrones : Commandes » (bit.ly/233Kocr), indique comment prend totalement en charge le modèle Model-View-ViewModel (MVVM) de manière asynchrone. Dans d’autres cas, prise en charge nécessite qu’un seul composant du .NET Framework. J’ai abordé l’implémentation du modèle de fournisseur/consommateur à l’aide de BlockingCollection dans mon VisualStudioMagazine.com colonnes pratique .NET, « Create Simple et fiable des applications asynchrones avec BlockingCollection » (bit.ly/1TuOpE6) et « Créer sophistiquées asynchrone Applications avec BlockingCollection » (bit.ly/1SpYyD4).

Un autre exemple est l’implémentation du modèle de design observateur pour surveiller une opération longue de façon asynchrone. Dans ce scénario, une méthode asynchrone qui renvoie un objet de tâche unique ne fonctionne pas, car le client renvoie un flux de résultats fréquemment. Pour ces scénarios, vous pouvez tirer parti au moins deux outils du .NET Framework : les Extensions réactives (Rx) ObservableCollection. Pour les solutions simples, ObservableCollection (ainsi que l’opération asynchrone de mots clés et await) est nécessaire. Toutefois, pour les plus « intéressantes » et, en particulier, les problèmes pilotée par événements, Rx vous offre mieux contrôler le processus.

Définition du modèle

Tandis que le modèle observer est fréquemment utilisé dans les modèles de conception de l’interface utilisateur, y compris Model-View-Controller (MVC), Model-View-Presenter (MVP) et MVVM — interfaces utilisateur doivent être considérées comme un scénario à partir d’un ensemble plus important de scénarios où le modèle observer s’applique. La définition du modèle d’observateur (citation de Wikipédia) est : « Un objet, appelé le sujet, [] gère une liste de ses objets dépendants, appelée observateurs et les informe automatiquement de toute modification d’état, généralement en appelant une de leurs méthodes. »

Le modèle observer est vraiment, sur l’obtention de résultats à partir de processus longs au client dès que ces résultats sont disponibles. Sans une version de modèle Observateur, les clients doivent attendre jusqu'à ce que le dernier résultat est disponible et puis ont tous les résultats envoyé dans un seul forfaitaire. Dans un monde de plus en plus asynchrone, vous souhaitez que les observateurs pour traiter les résultats en parallèle avec le client que les résultats sont disponibles. Pour mettre en évidence que vous parlez plusieurs interfaces utilisateur lors de l’exploitation le modèle observer, je vais utiliser « client » et « serveur » au lieu de « observateur » et « objet », dans le reste de cet article.

Problèmes et opportunités

Il existe au moins trois problèmes et deux opportunités avec le modèle observer. Le premier problème est le problème de l’écouteur ont expiré : De nombreuses implémentations, le modèle observer que le serveur contenant une référence à tous ses clients. Par conséquent, les clients peuvent conservées en mémoire par le serveur jusqu'à ce que le serveur s’arrête. Évidemment, ce n’est pas une solution optimale pour un processus à long terme dans un système dynamique où les clients se connecteront et déconnecter fréquemment.

Le problème de l’écouteur écoulée, cependant, est juste un symptôme du problème de la deuxième, la plus grande : De nombreuses implémentations du modèle observation requièrent le serveur et le client pour être étroitement couplés, nécessitant le serveur et le client d’assister à tout moment. Au minimum, le client doit être en mesure de déterminer si le serveur est présent et choisissez de ne pas attacher ; en outre, le serveur doit être en mesure de fonctionner même si aucun client accepte les résultats.

Le troisième problème est lié aux performances : Combien de temps faut-il pour le serveur notifier tous les clients ? Performances dans le modèle observer sont directement affectée par le nombre de clients pour être notifié. Par conséquent, il existe une opportunité pour améliorer les performances dans le modèle observer en laissant le client préemptive filtrer les résultats renvoyés à partir du serveur. Il traite également les scénarios où le serveur génère plus de résultats (ou un grand nombre de résultats) que le client est intéressé par : Le client peut indiquer qu’il doit uniquement être informé dans les cas spécifiques. La deuxième occasion de performances existe autour de reconnaissance lorsque le serveur n’a aucun résultat ou a fini de produire des résultats. Les clients peuvent ignorer lors de l’acquisition des ressources nécessaires pour traiter les événements de serveur jusqu'à ce que le client est garantie que quelque chose à traiter et les clients peuvent libérer ces ressources dès qu’ils savent qu’ils avez traités le dernier résultat.

À partir de l’Observateur de publication/abonnement

En tenant compte de ces considérations conduit à partir des implémentations simples du modèle observateur pour le modèle de publication/abonnement connexes. Publication/abonnement implémente le modèle observer de façon souple qui permet aux serveurs et clients exécutent même si l’autre n’est pas disponible actuellement. Publication/abonnement implémente généralement filtrage côté client en laissant le client à s’abonner à des rubriques/canaux (« m’avertir à propos des bons de commande ») ou à des attributs associés à différents types de contenu (« m’avertir sur toutes les demandes urgentes »).

Un problème reste toutefois. Toutes les implémentations du modèle d’observateur ont tendance à étroitement deux clients et serveurs un format de message spécifique. Modification du format d’un message dans la plupart des implémentations de publication/abonnement peut être difficile, car tous les clients doivent être mis à jour pour utiliser le nouveau format.

De nombreuses manières, cela est similaire à la description d’un curseur côté serveur dans une base de données. Pour réduire les coûts de transmission, le serveur de base de données ne retourne pas des résultats que chaque ligne est récupérée. Toutefois, pour les ensembles de lignes volumineux, la base de données aussi ne retourne pas toutes les lignes dans un seul lot à la fin. Au lieu de cela, le serveur de base de données généralement retourne des sous-ensembles d’un curseur conservés sur le serveur, souvent en tant que ces sous-ensembles deviennent disponibles. Une base de données, le client et le serveur ne doivent être présents simultanément : Le serveur de base de données peut exécuter lorsque aucun client présentes ; un client peut vérifier si le serveur est accessible et, dans le cas contraire, décider quel (le cas échéant), il peut faire. Le processus de filtrage (SQL) est également très flexible. Toutefois, si le moteur de base de données change le format qu’il utilise pour retourner les lignes, puis tous les clients doivent, au minimum, être recompilées.

Un Cache d’objets de traitement

Comme mon étude de cas permettant d’examiner une implémentation du modèle observer simple, j’utilise en tant que mon serveur une classe qui recherche dans un cache en mémoire des factures. Ce serveur peut, à la fin de son traitement, retourner une collection de toutes les factures. Cependant, je préfère que le client à traiter les factures individuellement et parallèlement au processus de recherche du serveur. Cela signifie que je préfère une version du processus, qui retourne chaque facture, car elle est trouvée et permet au client de traiter chaque facture en parallèle avec la recherche de la prochaine facture.

Une implémentation simple du serveur peut ressembler à ceci :

private List<Invoice> foundInvoices = new List<Invoice>();
public List<Invoice> FindInvoices(decimal Amount)
{
  foundInvoices.Clear();
  Invoice inv;
    // ...search logic to add invoices to the collection
     foundInvoices.Add(inv);
    // ...repeat until all invoices found
    return foundInvoices;
}

Solutions plus sophistiquées peuvent utiliser yield return pour retourner chaque facture qu’il s’avère plutôt que de l’assemblage de la liste. Malgré tout, un client qui appelle la méthode FindInvoices pouvez effectuer certaines activités critiques avant et après le traitement. Par exemple, une fois que le premier élément est trouvé, le client souhaite peut-être activer une liste de MatchingInvoices contenir les factures au niveau du client ou acquérir / d’initialiser toutes les ressources requises pour traiter une facture. Lorsque des factures supplémentaires sont ajoutés, le client devra traiter les factures et, lorsque le serveur signale que la facture finale est récupérée, libérer toutes les ressources qui ne sont plus requis étant donné que les factures « plus » à traiter.

Au cours d’une récupération de la base de données, par exemple, une lecture bloque jusqu'à ce que la première ligne est retournée. Une fois la première ligne est renvoyée, le client initialise les ressources nécessaires au traitement d’une ligne. La lecture retourne également false lorsque la dernière ligne est extraite, laissant le client à libérer ces ressources, car il n’y a plus aucune ligne à traiter.

Création de Solutions simples avec ObservableCollection

Le choix le plus évident pour implémenter le modèle observer dans .NET Framework est ObservableCollection. ObservableCollection informera le client (par un événement) chaque fois qu’elle a été modifiée.

Réécriture de mon exemple de serveur pour utiliser la classe ObservableCollection requiert uniquement deux modifications. Tout d’abord, la collection contenant les résultats doit être définies sous la forme ObservableCollection et rendue publique. Deuxièmement, il n’est plus nécessaire pour la méthode retourner un résultat : Le serveur doit uniquement ajouter des factures à la collection.

La nouvelle implémentation du serveur peut ressembler à ceci :

public List<Invoice> FindInvoices(decimal Amount)
{
  public ObservableCollection<Invoice> foundInvoices =
    new ObservableCollection<Invoice>();
  public void FindInvoices(decimal Amount)
  {
    foundInvoices.Clear();
    Invoice inv;
    // ...search logic to set inv
     foundInvoices.Add(inv);
    // ...repeat until all invoices are added to the collection   
  }

Un client qui utilise cette version du serveur ne doit associer un gestionnaire d’événements pour l’événement CollectionChanged de collection de foundInvoices de la InvoiceManagement. Dans le code suivant, j’ai eu la classe implémentent l’interface IDisposable pour prendre en charge la déconnexion de l’événement :

public class SearchInvoices: IDisposable
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public void SearchInvoices()
  {
    invMgmt.foundInvoices.CollectionChanged += InvoicesFound;
  }
  public void Dispose()
  {
    invMgmt.foundInvoices.CollectionChanged -= InvoicesChanged;
  }

Dans le client, l’événement CollectionChanged est passé à un objet NotifyCollectionChangedEventArgs comme second paramètre. Que la propriété Action de l’objet spécifie à la fois quelle modification a été effectuée sur la collection (les actions sont : la collection a été effacée, nouveaux éléments ont été ajoutés à la collection, les éléments existants ont été déplacés, remplacé/supprimés) et des informations sur les éléments modifiés (une collection de tous les éléments ajoutés, une collection d’éléments présents dans la collection avant les nouveaux éléments ajoutés, la position de l’élément qui a été déplacé, supprimé/remplacée).

Code simple dans le client en mode asynchrone traitant chaque facture lorsqu’il est ajouté à la collection dans le serveur se présentera comme le code dans Figure 1.

Figure 1 traitement asynchrone des factures à l’aide de ObservableCollection

private async void InvoicesFound(object sender,
  NotifyCollectionChangedEventArgs e)
{
  switch (e.Action)
  {
    case NotifyCollectionChangedAction.Reset:
      {
        // ...initial item processing
        return;
      }
    case NotifyCollectionChangedAction.Add:
      {
        foreach (Invoice inv in e.NewItems)
        {
          await HandleInvoiceAsync(inv);
        }
        return;
      }
  }
}

While simple, ce code peut être insuffisant pour vos besoins, en particulier si vous gérer un processus à long terme ou dans un environnement dynamique. À partir d’un point de vue Conception asynchrone, par exemple, le code peut capturer l’objet de tâche retourné par la HandleInvoiceAsync afin que le client peut gérer les tâches asynchrones. Vous devez également vous assurer que l’événement CollectionChanged est déclenché sur le thread d’interface utilisateur, même si FindInvoices s’exécute sur un thread d’arrière-plan.

Rétablissement de la valeur de la propriété Action utilisable comme signal indiquant le premier élément doit être récupéré en raison d’où la méthode Clear est appelée dans la classe de serveur (juste avant la recherche de la première facture). Toutefois, bien sûr, aucune facture ne peut apparaître dans la recherche, donc à l’aide de l’Action de réinitialisation peut entraîner les ressources client allocation qui ne sont jamais utilisées. Pour gérer les « premier élément « traitement réellement, vous devrez ajouter un indicateur pour le traitement d’ajouter une Action à exécuter uniquement lorsque le premier élément a été trouvé.

En outre, le serveur a un nombre limité d’options pour indiquer que la dernière facture est trouvée, afin que le client peut cesser d’attendre « celle qui suit. » Le serveur a probablement, impossible, effacer la collection après avoir trouvé le dernier élément, mais qui seraient forcés simplement plus complexes de traitement dans le traitement de l’Action de réinitialisation (ai j’ai été traitement des factures ? Si Oui, j’ai traité la dernière facture ; Si non, puis je suis au processus de la première facture).

Alors que, pour les problèmes simples, ObservableCollection fera l’affaire, toute implémentation relativement sophistiquée basée sur ObservableCollection (et toutes les applications que les valeurs de l’efficacité) va nécessitent du code complexe, en particulier dans le client.

Les Solutions Rx

Si vous souhaitez asynchrone traitement puis Rx (disponible via NuGet) peut fournir une meilleure solution pour implémenter le modèle observer par emprunt à partir du modèle de publication et d’abonnement. Cette solution fournit également un modèle filtrage basé sur LINQ, mieux signalisation de conditions de premier/dernier élément et meilleure gestion des erreurs.

Rx peut également gérer des implémentations Observateur plus intéressantes qu’avec un ObservableCollection. Dans mon étude de cas, après le retour de la liste initiale des factures, mon serveur peut continuer à rechercher les factures qui sont ajoutés au cache après la fin de la recherche d’origine (et qui correspondent aux critères de recherche, bien sûr). Une réunion de facture les critères s’affiche, le client souhaite être averti de l’événement, afin que la nouvelle facture peut être ajoutée à la liste. Rx prend en charge ces types d’extensions basées sur les événements pour le modèle d’observateur mieux que ObservableCollection.

Il existe deux interfaces principales dans Rx pour prendre en charge le modèle observer. Le premier est IObservable < T >, implémentée par le serveur et en spécifiant une méthode unique : S’abonner. Le serveur de l’implémentation de la méthode Subscribe est passé une référence à un objet à partir d’un client. Pour gérer le problème de l’écouteur interrompue, la méthode Subscribe retourne une référence au client pour un objet qui implémente l’interface IDisposable. Le client peut utiliser cet objet pour vous déconnecter du serveur. Lorsque le client ne se déconnecte, le serveur est prévu pour supprimer le client à partir de sa liste interne.

Le second est l’interface IObserver < T >, qui doit être implémentée par le client. Cette interface nécessite que le client d’implémenter trois méthodes pour le serveur et : OnNext, OnCompleted et OnError. La méthode critique ici est OnNext, qui est utilisé pour transmettre un message au client par le serveur (dans mon étude de cas ce message serait nouveaux objets facture qui seront renvoyés comme chacun d'entre eux s’affiche). Le serveur peut utiliser la méthode du client OnCompleted pour signaler qu’il n’existe aucune donnée. La troisième méthode, OnError, offre un moyen pour le serveur signaler au client qu’une exception s’est produite.

Vous êtes invité à implémenter l’interface IObserver vous-même, bien entendu (il s’agit de la partie du .NET Framework). Ainsi que ObservableCollection, qui peuvent suffire si vous créez une solution synchrone (j’ai écrit une colonne à ce sujet, « Écriture plus propre Code avec Extensions réactives » [bit.ly/10nfQtm]).

Toutefois, le Rx inclut plusieurs packages qui fournissent des implémentations asynchrones de ces interfaces, y compris les implémentations de JavaScript et des services RESTful. La classe Rx sujet fournit une implémentation de IObservable qui simplifie l’implémentation d’une version asynchrone de publication/abonnement, le modèle observer.

Création d’une Solution asynchrone

Création d’un serveur pour travailler avec un objet sujet requiert très peu de modifications au code côté serveur synchrone d’origine. Remplacer l’ancien ObservableCollection avec un objet sujet qui transmet chaque facture telle qu’elle apparaît à tous les clients à l’écoute. Je déclare l’objet cible comme étant public afin que les clients puissent y accéder :

public class InvoiceManagement
{
  public IObservable<Invoice> foundInvoice =
    new Subject<Invoice>();

Dans le corps de la méthode, au lieu d’ajouter une facture à une collection, j’utilise OnNext (méthode) du sujet pour passer chaque facture au client dès qu’il est trouvé :

public void FindInvoices(decimal Amount)
{
  inv = GetInvoicesForAmount(Amount) // Poll for invoices
  foundInvoice.OnNext(inv);
  // ...repeat...
}

Dans mon client, je déclare tout d’abord une instance de la classe de serveur. Ensuite, dans une méthode marquée comme asynchrone, j’appelle méthode Subscribe du sujet pour indiquer que je souhaite commencer la récupération des messages :

public class InvoiceManagementTests
{
  InvoiceManagement invMgmt = new InvoiceManagement();
  public async void ProcessInvoices()
  {
    invMgmt.foundInvoice.Subscribe<Invoice>();

Pour filtrer les résultats aux factures que je veux juste, je peux appliquer une instruction LINQ à l’objet cible. Cet exemple montre comment filtre les factures pour ceux qui est en attente (à utiliser les extensions Rx LINQ vous devrez ajouter une à l’aide de l’instruction pour l’espace de noms System.Reactive.Linq) :

invMgmt.foundInvoice.Where(i => i.BackOrder == "BackOrder").Subscribe();

Une fois que j’ai commencé à écouter le sujet, je peux spécifier quel traitement que je veux faire lorsque je reçois une facture. Je peux, par exemple, utiliser FirstAsync simplement la première facture renvoyé par le service. Dans cet exemple, j’utilise l’instruction await avec l’appel de FirstAsync afin que je peux revenir contrôle dans le corps principal de l’application lors du traitement de la facture. Ce code de mise en attente pour récupérer cette première facture, puis passe au tout code que j’utilise pour initialiser le processus de traitement des factures et, enfin, traite la facture :

Invoice inv;
inv = await invMgmt.foundInvoice.FirstAsync();
// ...setup code invoices...
HandleInvoiceAsync(inv);

Un dernier avertissement cependant : FirstAsync bloque si le serveur n’a pas encore produite tous les résultats. Si vous voulez éviter le blocage, vous pouvez utiliser FirstOrDefaultAsync, qui retourne la valeur null si le serveur n’a pas produit les résultats. S’il n’y a aucun résultat, le client peut décider, voire rien à faire.

Le cas le plus classique est que le client souhaite traiter toutes les factures retournées (après le filtrage) et effectuer de manière asynchrone. Dans ce cas, au lieu d’utiliser une combinaison de s’abonner et OnNext, vous pouvez simplement utiliser la méthode ForEachAsync. Vous pouvez passer d’une méthode ou une expression lambda qui traite les résultats entrants. Si vous passez d’une méthode (qui ne peut pas être asynchrone), comme faire ici, que méthode recevront la facture qui a déclenché ForEachAsync :

invMgmt.foundInvoice.ForEachAsync(HandleInvoice);

La méthode ForEachAsync peut également être transmise à un jeton d’annulation pour permettre le signal de client qu’il se déconnecte. Une bonne pratique consisterait à transmettre le jeton lors de l’appel des Rx * méthodes asynchrones pour prendre en charge permettant au client de terminer le traitement sans avoir à attendre que tous les objets à traiter.

Le ForEachAsync ne générera pas de résultat déjà traitée par une méthode First (ou FirstOrDefaultAsync) afin de pouvoir utiliser FirstOrDefaultAsync avec ForEachAsync pour vérifier si le serveur a rien à traiter avant de traiter les objets suivants. Toutefois, IsEmpty (méthode) du sujet effectue la vérification même plus simplement. Si le client doit allouer toutes les ressources requises pour le traitement des résultats, la fonction IsEmpty permet au client afin de vérifier s’il existe rien à faire avant d’allouer les ressources (alternative consisterait à allouer ces ressources sur le premier élément traité dans la boucle). À l’aide de la fonction IsEmpty avec un client qui vérifie pour voir s’il existe des résultats avant l’allocation des ressources (et du traitement de départ) tout en prenant également en charge l’annulation donnerait code ressemblant à Figure 2.

Figure 2 Code d’annulation de prise en charge et différer de traitement tant que résultats sont prêts

CancellationTokenSource cancelSource = new CancellationTokenSource();
CancellationToken cancel;
cancel = cancelSource.Token;
if (!await invMgmt.foundInvoice.IsEmpty())
{
  // ...setup code for processing invoices...
  try
  {
    invMgmt.foundInvoice.ForEachAsync(HandleInvoice, cancel);
  }
  catch (Exception ex)
  {
    if (ex.GetType() != typeof(CancellationToken))
    {
      // ...report message
    }
   }
   // ...clean up code when all invoices are processed or client disconnects
}

Synthèse

Si vous avez besoin est une implémentation simple du modèle Observateur, puis ObservableCollection peut faire il que vous suffit de traiter un flux de résultats. Pour un meilleur contrôle et d’une application basée sur des événements, la classe d’objet et les extensions qui accompagnent Rx permettra de votre application fonctionne en mode asynchrone, en prenant en charge une implémentation du modèle de publication/abonnement puissante (et je n’ai pas examiné la riche bibliothèque d’opérateurs qui accompagnent Rx). Si vous travaillez avec Rx, il est judicieux de télécharger le Guide de conception Rx (bit.ly/1VOPxGS), qui présente les meilleures pratiques concernant l’utilisation et de production de flux observables.

Rx fournit également une prise en charge pour la conversion du type de message passé entre le client et le serveur à l’aide de l’interface ISubject < TSource, TResult >. L’interface ISubject < TSource, TResult > spécifie deux types de données : un type de données « in » et « out » d’un type de données. La classe d’objet qui implémente cette interface, vous pouvez effectuer les opérations nécessaires pour convertir le résultat retourné par le serveur (datatype « in ») dans le résultat requis par le client (datatype « out »). En outre, le paramètre est covariant (qu’il accepte le type de données spécifié ou tout type de données hérite) et le paramètre de sortie est contravariant (il accepte le type de données spécifié ou tout ce qui en dérive), ce qui vous donne davantage de flexibilité.

Nous vivons dans un monde de plus en plus asynchrone et, dans ce monde, le modèle observer va devenir plus important, c’est un outil utile pour n’importe quelle interface entre les processus dans lequel le processus serveur retourne plus d’un résultat unique. Heureusement, vous disposez de plusieurs options pour implémenter le modèle observer dans .NET Framework, y compris les Rx ObservableCollection.


Peter Vogelest un architecte et PH & V Information Services. PH & V fournit à pile complète conseils de conception UX via la conception de modélisation et de base de données d’objet.

Remercie les experts techniques Microsoft suivants pour avoir relu cet article : Stephen Cleary, James McCaffrey et Dave Sexton
Stephen Cleary a collaboré avec le multithreading et asynchrone depuis 16 ans de programmation et utilise la prise en charge asynchrone dans le Microsoft .NET Framework depuis la première version community technology preview. Il est l’auteur de « D’accès concurrentiel dans c# livre de recettes » (o ' Reilly Media, 2014). Sa page d’accueil, y compris son blog est à stephencleary.com.