ReactiveX Middlewares

Published on Friday, June 12, 2020

While implementing sharpbrick/powered-up an interesting problem came up to solve: How to handle the incoming messages of the Bluetooth Low Energy based communication protocol. On top of the Bluetooth abstraction there is a simple callback which would receive roughly 50 different message types which would need dispatching to several locations, some locations would even spawn up dynamically over time. I needed an infrastructure! I needed a middleware 😀.

Possible Solutions

Within the tighter .NET ecosystem (a.k.a. everything what Microsoft and close friends throw at us) there are the following libraries for push based messaging:

  • System.Reactive: The .NET implementation part (and godfather) of the ReactiveX project (e.g. JavaScript: rxjs). The library is push-based, threading/buffering is optional, allow optional subscription, supports LINQ, allow building dynamic pipelines but is heap based.
  • ASP.NET Core middleware stack. Unfortunately, in the current state bound to HTTP. Standalone implementations like violetgrass/middleware (disclaimer: another side project) make the basic concept available to other server/dispatcher applications.
  • System.Threading.Channels: A part of the ASP.NET Core stack, focused on channeling data between separate producer and consumer threads. The library is push- and pull-based, buffering is including, requires active consumers and does not support LINQ.

Middlewares

Ignoring System.Threading.Channels for now, both System.Reactive and ASP.NET Core compiled middlewares are basically methods to build a series of functions between the place of source and an ultimate target

A compiled middleware is build upfront and is optimized on efficiency

public class QueueMessageContext : Context
{
    public Message Message { get; set; }
}

var stack = new MiddlewareBuilder<QueueMessageContext>()
    // middleware step
    .Use(async (context, next) => {
        // do something with the context.Message
        await next(context); 
    })
    // middleware step
    .Use(async (context, next) => {
        // do something with the context.Message
        await next(context); 
    })
    // final handling of message
    .Use(async (context, next) => {
        // do something with the context.Message
    })
    .Build();

// invoke
var x = new QueueMessageContext() { Message = msg, };

await stack(x); // the whole stack is essentially a stacked function delegate. no magic anymore. no overhead despite millions of requests.

Using System.Reactive middlewares can be dynamically constructed

var source = new Subject<Message>();

var firstPart = source
    .Select(msg => { /* do something */ return msg; }); // middleware step

// somewhere else (e.g. on demand)
var d = firstPart
    .Select(msg => { /* do something */ return msg; }) // middleware step
    .Subscribe(msg => /* do something */); // final handling of message

// invoke
source.next(msg); // subscriptions are checked, data is boxed, heap is allocated, ...

While a compiled middleware has the benefits of performance, a reactive stream can be dynamically composed, filtered and combined (like known from LINQ). It also allows multiple subscriptions which an ASP.NET Core middleware does not. However, there is also a disadvantage: It does not support async/await and will block the producer (exept explictely configured with .ObserveOn). It does not naturally fit to the common async/await programming pattern in C#.

Within sharpbrick/powered-up there is a critical requirement to dynamically compose pipelines of the incoming upstream messages to models representing PoweredUP hubs and devices.

System.Reactive is therefore the right choice to use within the library. The library behaves similar to a user interface (PoweredUP devices <-> UI controls) with external influences. In the world of UI progamming, the reactive programming model is used by all currently popular frameworks (React (Native), Angular, ...).

For consumers of the library however, the SharpBrick.PoweredUp library exposes an async/await based interface for invocations. Internally, the library converts the subscriptions to the reactive stream into an awaiter using .GetAwaiter() as seen below in a request/response matching algorithm:

public static async Task<TResultMessage> SendMessageReceiveResultAsync<TResultMessage>(this IPoweredUpProtocol self, PoweredUpMessage requestMessage, Func<TResultMessage, bool> filter = default)
{
    // UpstreamMessages is an IObservable
    var awaitable = self.UpstreamMessages
        .OfType<TResultMessage>()
        .Where(responseMessage => filter == null || filter(responseMessage))
        .FirstAsync() // only the first message observed is forwarded
        .GetAwaiter(); // make sure the subscription is present at the moment the message is sent.

    await self.SendMessageAsync(requestMessage);

    var response = await awaitable; // subscription is discarded

    return response;
}

Conclusion

When building an application or a library with many variable exit points - may it be UI controls or other model endpoints - for an incoming data stream, using System.Reactive is the right choice.