Thread synchronization for multiple threads accessing a message stream

multithreading

My project has a single message stream (communicating with an external device) which is accessed via multiple client threads. The workflow is:

  • There is a thread (the "messaging thread") which reads the message stream and is able to dispatch received messages to clients. This works by waiting on a thread-safe queue which the message stream exposes.
  • Messages may be sent on the stream from any thread (the stream is thread-safe and handles concurrent send/receive)
  • A client thread sends a request across the message stream, and provides a callback function.
  • The client thread waits for responses. There may be multiple responses. The messaging thread knows how to match a received message to a client thread, based on the request sent by the client thread. The messaging thread invokes the right callback function for each received message.
  • After sending the request, the client thread must block until all responses have been processed. This is normally indicated by the callback function returning a particular status, but it must also be possible to unblock a client thread and end the interaction by the messaging server receiving a particular message which it will recognize, or the application shutting down.
  • From time to time the messaging service may be unavailable (the messaging thread will know this); so the client threads must be able to wait for a certain time to see if the messaging service has become available.

Those are my requirements but I can't get my head around how to code the logic for the client threads "entering" and waiting; and how the client threads should communicate with the messaging thread. Which synchronization primitives to use, and so on.

I'm coding in C++ with the Poco libraries, and can use all the usual primitives (mutex, event, semaphore, condition variable etc.) as well as higher level constructs like a notification queue, notification centre, and event dispatcher.

Best Answer

There are different ways to do it, but if you are inclined to stick with POCO, you may want to look at the macchina.io (OSP portion) WebEvent implementation - it is essentially a pub/sub messaging framework. There's more there than what you need but it's relatively simple and architecturally you should be able to quickly tailor it to your needs. I have used it in production for many years and it works well; it will also be ported in an OSP-independent form to Poco for one of the next releases.

Client can be either (1) a web socket endpoint or (2) an in-process observer which can send (i.e. post events) data and/or subscribe (i.e. receive notifications) to one or more subjects (topics). You'll probably need many in-process observers and one remote endpoint.

The framework runs in two threads handling:

  • Main queue - responsible for dispatching subscribe/unsubscribe request events from clients.

  • Worker queue - responsible for dispatching the data events (messages).

Each queue is dealt with in its own thread and there is a dotted-notation naming scheme for subject names, see here for details. Note that documentation only mentions WebSockets but naming works exactly the same for in-process observers and you may want or need a different naming scheme.

Related Topic