Utiliser la diffusion en continu dans ASP.NET Core SignalR

Par Brennan Conroy

SignalR ASP.NET Core prend en charge la diffusion en continu de client à serveur et de serveur à client. Cela est utile pour les scénarios où des fragments de données arrivent au fil du temps. Lors de la diffusion en continu, chaque fragment est envoyé au client ou au serveur dès qu’il devient disponible, plutôt que d’attendre que toutes les données soient disponibles.

Affichez ou téléchargez l’exemple de code (procédure de téléchargement)

Configurer un hub pour la diffusion en continu

Une méthode hub devient automatiquement une méthode hub de streaming lorsqu’elle retourne IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>ou Task<ChannelReader<T>>.

Diffuser en continu de serveur à client

Les méthodes de hub de streaming peuvent retourner IAsyncEnumerable<T> en plus de ChannelReader<T>. Le moyen le plus simple de retourner IAsyncEnumerable<T> consiste à faire de la méthode hub une méthode d’itérateur asynchrone, comme le montre l’exemple suivant. Les méthodes d’itérateur asynchrone hub peuvent accepter un paramètre CancellationToken qui est déclenché lorsque le client se désinscrit du flux. Les méthodes d’itérateur asynchrone évitent les problèmes courants avec les canaux, tels que le fait de ne pas retourner ChannelReader suffisamment tôt ou de quitter la méthode sans terminer le ChannelWriter<T>.

Notes

L’exemple suivant nécessite C# 8.0 ou version ultérieure.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

L’exemple suivant montre les principes de base de la diffusion de données vers le client à l’aide de Canaux. Chaque fois qu’un objet est écrit dans ChannelWriter<T>, l’objet est immédiatement envoyé au client. À la fin, le ChannelWriter est terminé pour indiquer au client que le flux est fermé.

Notes

Écrivez dans le ChannelWriter<T> sur un thread d’arrière-plan et retournez le ChannelReader dès que possible. Les autres appels de hub sont bloqués jusqu’à ce qu’un ChannelReader soit retourné.

Encapsuler la logique dans une try ... catch instruction. Effectuez le Channel dans un finally bloc. Si vous souhaitez transmettre une erreur, capturez-la à l’intérieur du bloc catch et écrivez-la dans le bloc finally.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Les méthodes du hub de streaming de serveur à client peuvent accepter un paramètre CancellationToken qui est déclenché lorsque le client se désinscrit du flux. Utilisez ce jeton pour arrêter l’opération serveur et libérer toutes les ressources si le client se déconnecte avant la fin du flux.

Diffuser en continu de client à serveur

Une méthode hub devient automatiquement une méthode de hub de streaming de client à serveur lorsqu’elle accepte un ou plusieurs objets de type ChannelReader<T> ou IAsyncEnumerable<T>. L’exemple suivant montre les principes de base de la lecture des données de streaming envoyées à partir du client. Chaque fois que le client écrit dans ChannelWriter<T>, les données sont écrites dans le ChannelReader sur le serveur à partir duquel la méthode hub lit.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

Une version IAsyncEnumerable<T> de la méthode est la suivante.

Notes

L’exemple suivant nécessite C# 8.0 ou version ultérieure.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

Client .NET

Diffuser en continu de serveur à client

Les méthodes StreamAsync et StreamAsChannelAsync sur HubConnection sont utilisées pour appeler des méthodes de streaming de serveur à client. Passez le nom et les arguments de la méthode hub définis dans la méthode hub à StreamAsync ou StreamAsChannelAsync. Le paramètre générique sur StreamAsync<T> et StreamAsChannelAsync<T> spécifie le type d’objets retournés par la méthode de streaming. Un objet de type IAsyncEnumerable<T> ou ChannelReader<T> est retourné à partir de l’appel de flux et représente le flux sur le client.

Exemple StreamAsync qui retourne IAsyncEnumerable<int> :

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Exemple correspondant StreamAsChannelAsync qui retourne ChannelReader<int> :

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

Dans le code précédent :

  • La méthode StreamAsChannelAsync sur HubConnection est utilisée pour appeler une méthode de streaming de serveur à client. Passez le nom et les arguments de la méthode hub définis dans la méthode hub à StreamAsChannelAsync.
  • Le paramètre générique sur StreamAsChannelAsync<T> spécifie le type d’objets retournés par la méthode de streaming.
  • Un ChannelReader<T> est retourné à partir de l’appel de flux et représente le flux sur le client.

Diffuser en continu de client à serveur

Il existe deux façons d’appeler une méthode de hub de streaming de client à serveur à partir du client .NET. Vous pouvez passer un IAsyncEnumerable<T> ou un ChannelReader en tant qu’argument à SendAsync, InvokeAsyncou StreamAsChannelAsync, en fonction de la méthode hub appelée.

Chaque fois que des données sont écrites dans l’objet IAsyncEnumerable ou ChannelWriter, la méthode hub sur le serveur reçoit un nouvel élément avec les données du client.

Si vous utilisez un objetIAsyncEnumerable, le flux se termine après la sortie de la méthode des éléments de flux de retour.

Notes

L’exemple suivant nécessite C# 8.0 ou version ultérieure.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Ou si vous utilisez un ChannelWriter, vous complétez le canal avec channel.Writer.Complete() :

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

Client JavaScript

Diffuser en continu de serveur à client

Les clients JavaScript appellent des méthodes de streaming de serveur à client sur des hubs avec connection.stream. La méthode stream accepte deux arguments :

  • Le nom de la méthode hub. Dans l’exemple suivant, le nom de la méthode hub est Counter.
  • Arguments définis dans la méthode hub. Dans l’exemple suivant, les arguments correspondent au nombre d’éléments de flux à recevoir et au délai entre les éléments de flux.

connection.stream retourne un IStreamResult, qui contient une méthode subscribe. Passez un IStreamSubscriber à subscribe et définissez les rappels next, erroret complete pour recevoir des notifications de l’appel stream.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Pour mettre fin au flux à partir du client, appelez la méthode disposesur le ISubscription qui est retourné par la méthode subscribe. L’appel de cette méthode entraîne l’annulation du paramètre CancellationToken de la méthode Hub, si vous en avez fourni une.

Diffuser en continu de client à serveur

Les clients JavaScript appellent des méthodes de diffusion en continu de client à serveur sur des hubs en transmettant un Subject en tant qu’argument à send, invokeou stream, en fonction de la méthode hub appelée. Le Subject est une classe qui ressemble à un Subject. Par exemple, dans RxJS, vous pouvez utiliser la classe Subject de cette bibliothèque.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

L’appel subject.next(item) avec un élément écrit l’élément dans le flux, et la méthode hub reçoit l’élément sur le serveur.

Pour mettre fin au flux, appelez subject.complete().

Client Java

Diffuser en continu de serveur à client

Le SignalR client Java utilise la méthode stream pour appeler des méthodes de streaming. stream accepte au moins trois arguments :

  • Type attendu des éléments de flux.
  • Le nom de la méthode hub.
  • Arguments définis dans la méthode hub.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

La méthode stream sur HubConnection retourne une observable du type d’élément de flux. La méthode du type Observable est l’emplacement subscribe où les gestionnaires et onNext,onError et onCompleted sont définis.

Diffuser en continu de client à serveur

Le SignalR client Java peut appeler des méthodes de diffusion en continu de client à serveur sur des hubs en transmettant un observable en tant qu’argument à send, invokeou stream, en fonction de la méthode hub appelée.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

L’appel stream.onNext(item) avec un élément écrit l’élément dans le flux, et la méthode hub reçoit l’élément sur le serveur.

Pour mettre fin au flux, appelez stream.onComplete().

Ressources supplémentaires