C# – Async network programming using Reactive Extensions

cnettcp

After doing some (more-or-less) "low-level" async socket programming years ago (in an Event-based Asynchronous Pattern (EAP) fashion) and recently moving "up" to a TcpListener (Asynchronous Programming Model (APM)) and then trying to move to async/await (Task-based Asynchronous Pattern (TAP)) I've pretty much had it with keep having to bother with all this 'low level plumbing'. So I was figuring; why not give RX a go (Reactive Extensions) since it might fit more snugly to my problem domain.

A lot of code I write has to to with many clients connecting over Tcp to my application which then start a two-way (async) communication. The client or server may at any point decide a message needs to be sent and do so, so this is not your classic request/response setup but more of a real-time, two-way, "line" open to both parties to send whatever they want, whenever they want. (If anyone has a decent name to describe this I'd be glad to hear it!).

The "protocol" differs per application (and isn't really relevant to my question). I do, however have an initial question:

  1. Given that only one "server" is running, but it has to keep track of many (usually thousands) of connections (e.g. clients) each having (for lack of a better description) their own "state machine" to keep track of their internal states etc, which approach would you prefer? EAP/TAP/APM? Would RX even be considered an option? If not, why?

So, I need to work Async since a) it's not a request/response protocol so I cannot have a thread/client in a "waiting for message"-blocking call or "sending message"-blocking call (however, if the send is blocking for that client only I could live with it) and b) I need to handle many concurrent connections. I see no way of doing this (reliably) using blocking calls.

Most of my applications are VoiP related; be it SIP messages from SIP cients or PBX (related) messaging from applications like FreeSwitch / OpenSIPS etc. but you can, in it's simplest form, try to imagine a "chat"server trying to handle many "chat"clients. Most protocols are text based (ASCII).

So, after having implemented many different permutations of aforementioned techniques I would like to simplify my work by creating an object that I can simply instantiate, tell it on which IPEndpoint to listen and have it tell me whenever something of interest is going on (which I usually use events for, so some EAP is usually mixed with the other two techniques). The class should not bother trying to 'understand' the protocol; it should merely handle incoming/outgoing strings. And thus, having my eye on RX hoping that would (in the end) simplify the work, I created a new "fiddle" from scratch:

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
        f.Start();
        Console.ReadKey();
        f.Stop();
        Console.ReadKey();
    }
}

public class FiddleServer
{
    private TcpListener _listener;
    private ConcurrentDictionary<ulong, FiddleClient> _clients;
    private ulong _currentid = 0;

    public IPEndPoint LocalEP { get; private set; }

    public FiddleServer(IPEndPoint localEP)
    {
        this.LocalEP = localEP;
        _clients = new ConcurrentDictionary<ulong, FiddleClient>();
    }

    public void Start()
    {
        _listener = new TcpListener(this.LocalEP);
        _listener.Start();
        Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
            //OnNext
            tcpclient =>
            {
                //Create new FSClient with unique ID
                var fsclient = new FiddleClient(_currentid++, tcpclient);
                //Keep track of clients
                _clients.TryAdd(fsclient.ClientId, fsclient);
                //Initialize connection
                fsclient.Send("connect\n\n");

                Console.WriteLine("Client {0} accepted", fsclient.ClientId);
            },
            //OnError
            ex =>
            {

            },
            //OnComplete
            () =>
            {
                Console.WriteLine("Client connection initialized");
                //Accept new connections
                _listener.AcceptTcpClientAsync();
            }
        );
        Console.WriteLine("Started");
    }

    public void Stop()
    {
        _listener.Stop();
        Console.WriteLine("Stopped");
    }

    public void Send(ulong clientid, string rawmessage)
    {
        FiddleClient fsclient;
        if (_clients.TryGetValue(clientid, out fsclient))
        {
            fsclient.Send(rawmessage);
        }
    }
}

public class FiddleClient
{
    private TcpClient _tcpclient;

    public ulong ClientId { get; private set; }

    public FiddleClient(ulong id, TcpClient tcpclient)
    {
        this.ClientId = id;
        _tcpclient = tcpclient;
    }

    public void Send(string rawmessage)
    {
        Console.WriteLine("Sending {0}", rawmessage);
        var data = Encoding.ASCII.GetBytes(rawmessage);
        _tcpclient.GetStream().WriteAsync(data, 0, data.Length);    //Write vs WriteAsync?
    }
}

I am aware that, in this "fiddle", there is a little implementation specific detail; in this case I'm working with FreeSwitch ESL so the "connect\n\n" in the fiddle should, when refactoring to a more generic approach, be removed.

I am also aware that I need to refactor the anonymous methods to private instance methods on the Server class; I'm just not sure what convention (e.g. "OnSomething" for example) to use for their method-names?

This is my basis/starting-point/foundation (which needs some "tweaking"). I have some questions about this:

  1. See above question "1"
  2. Am I on the right track? Or are my "design" decisions unjust?
  3. Concurrency-wise: would this cope with thousands of clients (parsing/handling the actual messages aside)
  4. On exceptions: I'm not sure how to get exceptions raised within clients "up" to the server ("RX-wise"); what would be a good way?
  5. I can now get any connected client from my server class (using it's ClientId), assuming I expose the clients in one way or another, and call methods on them directly. I can also call methods via the Server class (for example, the Send(clientId, rawmessage) method (whereas the latter approach would be a "convenience" method for quickly getting a message to the other side).
  6. I am not quite sure where (and how) to go from here:
    • a) I need to handle incoming messages; how would I set this up? I can get the stream ofcourse, but where would I handle retrieving the received bytes? I think I need some kind of "ObservableStream"-something I can subscribe to? Would I put this in the FiddleClient or FiddleServer?
    • b) Assuming I want to avoid using event until these FiddleClient/FiddleServer classes are implemented more specifically to tailor their application specific protocol handling etc. using more specific FooClient/FooServer classes: how would I go from getting the data in the underlying 'Fiddle'-classes to their more specific counterparts?

Articles/links I already read / skimmed / used for reference:

Best Answer

...I would like to simplify my work by creating an object that I can simply instantiate, tell it on which IPEndpoint to listen and have it tell me whenever something of interest is going on...

After reading that statement I immediately thought "actors". Actors are very similar to objects except they only have a single input where you pass it the message (instead of directly calling the object's methods) and they operate asynchronously. In a very simplified example... you would create the actor and send it a message with the IPEndpoint and the address of the actor to send the result to. It goes off and does it work in the background. You only hear back from it when "something of interest" happens. You can instantiate as many actors as you need to handle the load.

I am not familiar with any actors libraries in .Net although I know there are some. I am familiar with the TPL Dataflow library (there will be a section covering it in my book http://DataflowBook.com) and it should be easy to implement a simple actor model with that library.

Related Topic