After doing some (more-or-less) "low-level" async socket
programming years ago (in an Event-based Asynchronous Pattern (EAP) fashion) and recently moving "up" to a TcpListener
(Asynchronous Programming Model (APM)) and then trying to move to async/await
(Task-based Asynchronous Pattern (TAP)) I've pretty much had it with keep having to bother with all this 'low level plumbing'. So I was figuring; why not give RX
a go (Reactive Extensions) since it might fit more snugly to my problem domain.
A lot of code I write has to to with many clients connecting over Tcp to my application which then start a two-way (async) communication. The client or server may at any point decide a message needs to be sent and do so, so this is not your classic request/response
setup but more of a real-time, two-way, "line" open to both parties to send whatever they want, whenever they want. (If anyone has a decent name to describe this I'd be glad to hear it!).
The "protocol" differs per application (and isn't really relevant to my question). I do, however have an initial question:
- Given that only one "server" is running, but it has to keep track of many (usually thousands) of connections (e.g. clients) each having (for lack of a better description) their own "state machine" to keep track of their internal states etc, which approach would you prefer? EAP/TAP/APM? Would RX even be considered an option? If not, why?
So, I need to work Async since a) it's not a request/response protocol so I cannot have a thread/client in a "waiting for message"-blocking call or "sending message"-blocking call (however, if the send is blocking for that client only I could live with it) and b) I need to handle many concurrent connections. I see no way of doing this (reliably) using blocking calls.
Most of my applications are VoiP related; be it SIP messages from SIP cients or PBX (related) messaging from applications like FreeSwitch / OpenSIPS etc. but you can, in it's simplest form, try to imagine a "chat"server trying to handle many "chat"clients. Most protocols are text based (ASCII).
So, after having implemented many different permutations of aforementioned techniques I would like to simplify my work by creating an object that I can simply instantiate, tell it on which IPEndpoint
to listen and have it tell me whenever something of interest is going on (which I usually use events for, so some EAP is usually mixed with the other two techniques). The class should not bother trying to 'understand' the protocol; it should merely handle incoming/outgoing strings. And thus, having my eye on RX hoping that would (in the end) simplify the work, I created a new "fiddle" from scratch:
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;
class Program
{
static void Main(string[] args)
{
var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
f.Start();
Console.ReadKey();
f.Stop();
Console.ReadKey();
}
}
public class FiddleServer
{
private TcpListener _listener;
private ConcurrentDictionary<ulong, FiddleClient> _clients;
private ulong _currentid = 0;
public IPEndPoint LocalEP { get; private set; }
public FiddleServer(IPEndPoint localEP)
{
this.LocalEP = localEP;
_clients = new ConcurrentDictionary<ulong, FiddleClient>();
}
public void Start()
{
_listener = new TcpListener(this.LocalEP);
_listener.Start();
Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
//OnNext
tcpclient =>
{
//Create new FSClient with unique ID
var fsclient = new FiddleClient(_currentid++, tcpclient);
//Keep track of clients
_clients.TryAdd(fsclient.ClientId, fsclient);
//Initialize connection
fsclient.Send("connect\n\n");
Console.WriteLine("Client {0} accepted", fsclient.ClientId);
},
//OnError
ex =>
{
},
//OnComplete
() =>
{
Console.WriteLine("Client connection initialized");
//Accept new connections
_listener.AcceptTcpClientAsync();
}
);
Console.WriteLine("Started");
}
public void Stop()
{
_listener.Stop();
Console.WriteLine("Stopped");
}
public void Send(ulong clientid, string rawmessage)
{
FiddleClient fsclient;
if (_clients.TryGetValue(clientid, out fsclient))
{
fsclient.Send(rawmessage);
}
}
}
public class FiddleClient
{
private TcpClient _tcpclient;
public ulong ClientId { get; private set; }
public FiddleClient(ulong id, TcpClient tcpclient)
{
this.ClientId = id;
_tcpclient = tcpclient;
}
public void Send(string rawmessage)
{
Console.WriteLine("Sending {0}", rawmessage);
var data = Encoding.ASCII.GetBytes(rawmessage);
_tcpclient.GetStream().WriteAsync(data, 0, data.Length); //Write vs WriteAsync?
}
}
I am aware that, in this "fiddle", there is a little implementation specific detail; in this case I'm working with FreeSwitch ESL so the "connect\n\n"
in the fiddle should, when refactoring to a more generic approach, be removed.
I am also aware that I need to refactor the anonymous methods to private instance methods on the Server class; I'm just not sure what convention (e.g. "OnSomething
" for example) to use for their method-names?
This is my basis/starting-point/foundation (which needs some "tweaking"). I have some questions about this:
- See above question "1"
- Am I on the right track? Or are my "design" decisions unjust?
- Concurrency-wise: would this cope with thousands of clients (parsing/handling the actual messages aside)
- On exceptions: I'm not sure how to get exceptions raised within clients "up" to the server ("RX-wise"); what would be a good way?
- I can now get any connected client from my server class (using it's
ClientId
), assuming I expose the clients in one way or another, and call methods on them directly. I can also call methods via the Server class (for example, theSend(clientId, rawmessage)
method (whereas the latter approach would be a "convenience" method for quickly getting a message to the other side). - I am not quite sure where (and how) to go from here:
- a) I need to handle incoming messages; how would I set this up? I can get the stream ofcourse, but where would I handle retrieving the received bytes? I think I need some kind of "ObservableStream"-something I can subscribe to? Would I put this in the
FiddleClient
orFiddleServer
? - b) Assuming I want to avoid using event until these
FiddleClient
/FiddleServer
classes are implemented more specifically to tailor their application specific protocol handling etc. using more specificFooClient
/FooServer
classes: how would I go from getting the data in the underlying 'Fiddle'-classes to their more specific counterparts?
- a) I need to handle incoming messages; how would I set this up? I can get the stream ofcourse, but where would I handle retrieving the received bytes? I think I need some kind of "ObservableStream"-something I can subscribe to? Would I put this in the
Articles/links I already read / skimmed / used for reference:
- Consume a Socket using Reactive Extension
- Creating and Subscribing to Simple Observable Sequences
- How to add or associate context to each ObservableTcpListener Connection/Session
- Asynchronous Programming with the Reactive Framework and the Task Parallel Library — Part 1, 2 and 3.
- Using Reactive Extensions (Rx) for socket programming practical?
- A .NET Rx Driven Web Server and .NET Rx Driven Web Server, Take 2
- Some Rx tutorials
Best Answer
After reading that statement I immediately thought "actors". Actors are very similar to objects except they only have a single input where you pass it the message (instead of directly calling the object's methods) and they operate asynchronously. In a very simplified example... you would create the actor and send it a message with the IPEndpoint and the address of the actor to send the result to. It goes off and does it work in the background. You only hear back from it when "something of interest" happens. You can instantiate as many actors as you need to handle the load.
I am not familiar with any actors libraries in .Net although I know there are some. I am familiar with the TPL Dataflow library (there will be a section covering it in my book http://DataflowBook.com) and it should be easy to implement a simple actor model with that library.