Flow Control in Axum

One of the problems in asynchronous message passing is preventing a situation where a sender produces messages faster than a receiver can handle them. Consider this piece of Axum code:

foreach(var price in GetNextStockPrice())
{
    report::Price <-- price;
}

If the stock quotes are produced faster than the agent behind the report channel can handle them, an overflow is bound to occur.

While this problem sounds like a fundamental flaw in asynchronous message passing due its “fire and forget” nature, in the real life things are not nearly so bad. In many cases you know from your application logic that the incoming rate of messages is such that in all likelihood they can comfortably be handled by the receiver. In a GUI application, when a message is triggered by the user pressing a button, and the receiver can handle at least a thousand of such messages per second, it is reasonable to assume that the receiver will always be faster than the sender. (That is, until sometime later someone adds a call to some blocking API in the receiver, invalidating the above assumption)

Having established that the rate of incoming message rate is a potential problem, can we still get away without solving it? What if we could tolerate a loss of some messages and simply refuse to handle them if we’re too busy?

For example, if the code above were a part of a service that implements a stock dashboard, it might be fine to skip a few updates. If we didn’t get the latest quote now, we’ll get it in a few minutes with the new update. Not a perfect solution for a day trader, but could be OK for a Windows gadget.

On way to implement it would be to put a proxy agent between the sender and the receiver who is doing the actual work.

agent StockAgentProxy : channel StockTicker
{
    public StockAgentProxy()
    {
        while(true)
        {
            var price =
receive(PrimaryChannel::Price);

            if( WorkerAgentIsBusy() )
            {
// Too busy to handle the message, just drop it
            }
            else
            {
                // Forward the message to the actual worker:
                worker::Price <-- price;
            }
        }
    }
}

The proxy agent runs in parallel with the worker-agent, so that if the worker is busy processing data, the proxy can still consume messages coming from the sender and drop them if necessary.

Both the worker agent and the proxy implement the same channel, so the sender doesn’t even need to know it’s talking to a proxy.

If asynchrony is the problem we can effectively turn asynchronous message passing into synchronous by requiring a response to every request sent to the port. In Axum, we call such two-way ports request-reply ports.

While a regular one-way port is defined like this:

input Decimal Price;

the definition of the request-reply port would have the type of the reply follow the name of the port and a colon. Like this:

input Decimal Price : Signal;

For the purposes of this example, I decided to use a value-less type Signal, defined in the Axum runtime. A more refined solution could have some useful data in the payload of the reply – for example, the time of the acceptance of the message, or the congestion level of the server (based on which the sender could dial down the rate of messages) and so on.

Sending to a request-reply port yields a value that can be used to retrieve the acknowledgment of the request – here is how:

foreach(var price in GetNextStockPrice())
{
    var acknowledgment = report::Price <-- price;
    receive(acknowledgment);
}

Now the sender and receiver move at the same speed: the sender will not proceed before getting an acknowledgment that the message was received on the other end of the channel. Completely sacrificing asynchrony is a heavy-handed solution but we can use it as a base to build something more sophisticated.

Instead of requesting an immediate acknowledgment we could keep on sending “optimistically” while keeping track of the pending requests, making sure we don’t have more a certain number of them in flight. Here is how I coded it up:

var acknowledgments =
new Queue<IInteractionSource<Signal>>();

foreach(var price in GetNextStockPrice())
{
    var acknowledgment = report::Price <-- price;
acknowledgments.Enqueue(acknowledgment);

    if( acknowledgments.Count >= 10 )
    {
        receive(acknowledgments.Dequeue());
    }
}

Here we stash pending requests into a queue and then, having reached 10 pending requests, receive an acknowledgement from the request at the head of the queue. The protocol still requires a receive operation for each send, but the more pending operations we can have in flight, the more likely it is that the receive will complete without blocking.

What we’ve just implemented is a rather simplistic version of the Sliding Window Protocol. The protocol is used in various systems such as TCP or Microsoft SQL Server.

Another idea is to do away with acknowledgments and instead rely on the receiver to tell us when to pause the transmission. We need to introduce two new ports, On and Off for the receiver to communicate with the sender. Here is the code:

foreach(var price in GetNextStockPrice())
{
    Signal offSignal;
    if( tryreceive(report::Off, out offSignal) )
    {
        receive(report::On);
    }
    report::Price <-- price;
}

The tryreceive operator checks the availability of messages on the Off port. Unlike receive, tryreceive merely “peeks” into the port and doesn’t block waiting for a message. If there aren’t any messages on the Off port, the sender proceeds without slowing down. When a message does appear on the Off port, the sender consumes it and waits for a message from the port On to resume the submission.

The sender side is simple, but the receiver requires a bit more work. We again split it into the two agents: the worker processing the data and the proxy communicating with the sender and telling it when to stop or resume the transmission. First, the proxy:

public agent StockAgentProxy : channel StockTicker
{
    public StockAgentProxy()
    {
        var worker = Stock.StockAgent.CreateInNewDomain();
        bool senderIsActive = true;
        int itemsQueuedUp = 0;

        while(true)
        {
            var price = receive(PrimaryChannel::Price);
            do
            {
                worker::Price <-- price;
                // Have we reached the max capacity of
// the receiver?
                if( ++itemsQueuedUp == 10 )
                {
                    if( senderIsActive )
                    {
                        // Tell the sender to stop sending
                        PrimaryChannel::Off <-- Signal.Value;
                        senderIsActive = false;
                    }
                    // Wait for the worker to complete
// pending work
                    while(itemsQueuedUp-- != 0
receive(worker::On);
                }
            }
            while( tryreceive(PrimaryChannel::Price, out price) )

            if( !senderIsActive )
            {
                // Tell sender to resume sending
                PrimaryChannel::On <-- Signal.Value;
                senderIsActive = true;
            }
        }
    }
}

The proxy starts by waiting for a message from the sender, then forwards it to the worker. Having reached the maximum number of queued up items – 10 in my example – the proxy tells the sender to stop, then waits for the worker to complete all the pending requests. The process repeats for all the pending messages on the port Price, until it drains empty. After that the sender is told to resume the transmission.

The worker agent looks like this:

public agent StockAgent : channel StockTicker
{
    public StockAgent()
    {
        PrimaryChannel::Price ==> HandleUpdatedPrice;
    }

    private void HandleUpdatedPrice(Decimal price)
    {
        // Do some real work here ...
        PrimaryChannel::On <-- Signal.Value; // signal completion
    }
}

The above solution is known in networking as XON/XOFF protocol. This protocol is in fact so common that XON and XOFF commands have reserved positions in the ASCII table, as characters 17 and 19.

When would you use the Sliding Window and when the XON/XOFF protocol? It all depends on your situation. The Sliding Window requires a round-trip for each message, which is an obvious drawback. However, the sender will always stop after sending a certain number of unacknowledged messages.

The XON/XOFF might not work if the receiver fails to respond in a timely manner – for example, the sender can potentially send thousands of messages before the receiver gets around to tell it to stop. Keep this in mind when using the protocol.

Other things to consider are whether you need to resend undelivered messages, how to ensure proper message order, or whether any of this would in fact be a problem in your situation. Lots of fascinating things to think about, but well beyond the scope of this post.

Artur Laksberg,
Axumite