There are different ways to do it, but if you are inclined to stick with POCO, you may want to look at the macchina.io (OSP portion) WebEvent implementation - it is essentially a pub/sub messaging framework. There's more there than what you need but it's relatively simple and architecturally you should be able to quickly tailor it to your needs. I have used it in production for many years and it works well; it will also be ported in an OSP-independent form to Poco for one of the next releases.
Client can be either (1) a web socket endpoint or (2) an in-process observer which can send (i.e. post events) data and/or subscribe (i.e. receive notifications) to one or more subjects (topics). You'll probably need many in-process observers and one remote endpoint.
The framework runs in two threads handling:
Each queue is dealt with in its own thread and there is a dotted-notation naming scheme for subject names, see here for details. Note that documentation only mentions WebSockets but naming works exactly the same for in-process observers and you may want or need a different naming scheme.
Ok - I am not sure whether the following will be of any help to you, because I made some assumptions in developing a solution which may or may not be true in your case. Maybe my "solution" is too theoretical and only works for artifical examples - I have not done any testing beyond the stuff below.
In addition, I would see the following more a workaround than a real solution but considering the lack of responses I think it might still be better than nothing (I kept watching your question waiting for a solution, but not seeing one getting posted I started playing around with the issue).
But enough said: Let's say we have a simple data service which can be used to retrieve an integer:
public interface IDataService
{
Task<int> LoadMagicInteger();
}
A simple implementation uses asynchronous code:
public sealed class CustomDataService
: IDataService
{
public async Task<int> LoadMagicInteger()
{
Console.WriteLine("LoadMagicInteger - 1");
await Task.Delay(100);
Console.WriteLine("LoadMagicInteger - 2");
var result = 42;
Console.WriteLine("LoadMagicInteger - 3");
await Task.Delay(100);
Console.WriteLine("LoadMagicInteger - 4");
return result;
}
}
Now, a problem arises, if we are using the code "incorrectly" as illustrated by this class. Foo
incorrectly accesses Task.Result
instead of await
ing the result like Bar
does:
public sealed class ClassToTest
{
private readonly IDataService _dataService;
public ClassToTest(IDataService dataService)
{
this._dataService = dataService;
}
public async Task<int> Foo()
{
var result = this._dataService.LoadMagicInteger().Result;
return result;
}
public async Task<int> Bar()
{
var result = await this._dataService.LoadMagicInteger();
return result;
}
}
What we (you) now need is a way to write a test which succeeds when calling Bar
but fails when calling Foo
(at least if I understood the question correctly ;-) ).
I'll let the code speak; here's what I came up with (using Visual Studio tests, but it should work using NUnit, too):
DataServiceMock
utilizes TaskCompletionSource<T>
. This allows us to set the result at a defined point in the test run which leads to the following test. Note that we are using a delegate to pass back the TaskCompletionSource back into the test. You might also put this into the Initialize method of the test and use properties.
TaskCompletionSource<int> tcs = null;
this._dataService.LoadMagicIntegerMock = t => tcs = t;
Task<int> task = null;
TaskTestHelper.AssertDoesNotBlock(() => task = this._instance.Foo());
tcs.TrySetResult(42);
var result = task.Result;
Assert.AreEqual(42, result);
this._end = true;
What's happening here is that we first verify that we can leave the method without blocking (this would not work if someone accessed Task.Result
- in this case we would run into a timeout as the result of the task is not made available until after the method has returned).
Then, we set the result (now the method can execute) and we verify the result (inside a unit test we can access Task.Result as we actually want the blocking to occur).
Complete test class - BarTest
succeeds and FooTest
fails as desired.
[TestClass]
public class UnitTest1
{
private DataServiceMock _dataService;
private ClassToTest _instance;
private bool _end;
[TestInitialize]
public void Initialize()
{
this._dataService = new DataServiceMock();
this._instance = new ClassToTest(this._dataService);
this._end = false;
}
[TestCleanup]
public void Cleanup()
{
Assert.IsTrue(this._end);
}
[TestMethod]
public void FooTest()
{
TaskCompletionSource<int> tcs = null;
this._dataService.LoadMagicIntegerMock = t => tcs = t;
Task<int> task = null;
TaskTestHelper.AssertDoesNotBlock(() => task = this._instance.Foo());
tcs.TrySetResult(42);
var result = task.Result;
Assert.AreEqual(42, result);
this._end = true;
}
[TestMethod]
public void BarTest()
{
TaskCompletionSource<int> tcs = null;
this._dataService.LoadMagicIntegerMock = t => tcs = t;
Task<int> task = null;
TaskTestHelper.AssertDoesNotBlock(() => task = this._instance.Bar());
tcs.TrySetResult(42);
var result = task.Result;
Assert.AreEqual(42, result);
this._end = true;
}
}
And a little helper class to test for deadlocks / timeouts:
public static class TaskTestHelper
{
public static void AssertDoesNotBlock(Action action, int timeout = 1000)
{
var timeoutTask = Task.Delay(timeout);
var task = Task.Factory.StartNew(action);
Task.WaitAny(timeoutTask, task);
Assert.IsTrue(task.IsCompleted);
}
}
Best Answer
I think you're getting a few things confused, here. What you're asking for is already possible using
System.Threading.Tasks
, theasync
andawait
in C# 5 are just going to provide a little nicer syntactic sugar for the same feature.Let's use a Winforms example - drop a button and a textbox on the form and use this code:
Run it and you'll see that (a) it doesn't block the UI thread and (b) you don't get the usual "cross-thread operation not valid" error - unless you remove the
TaskScheduler
argument from the lastContinueWith
, in which case you will.This is bog-standard continuation passing style. The magic happens in the
TaskScheduler
class and specifically the instance retrieved byFromCurrentSynchronizationContext
. Pass this into any continuation and you tell it that the continuation must run on whichever thread called theFromCurrentSynchronizationContext
method - in this case, the UI thread.Awaiters are slightly more sophisticated in the sense that they're aware of which thread they started on and which thread the continuation needs to happen on. So the above code can be written a little more naturally:
These two should look very similar, and in fact they are very similar. The
DelayedAddAsync
method now returns aTask<int>
instead of anint
, and so theawait
is just slapping continuations onto each one of those. The main difference is that it's passing along the synchronization context on each line, so you don't have to do it explicitly like we did in the last example.In theory the differences are a lot more significant. In the second example, every single line in the
button1_Click
method is actually executed in the UI thread, but the task itself (DelayedAddAsync
) runs in the background. In the first example, everything runs in the background, except for the assignment totextBox1.Text
which we've explicitly attached to the UI thread's synchronization context.That's what's really interesting about
await
- the fact that an awaiter is able to jump in and out of the same method without any blocking calls. You callawait
, the current thread goes back to processing messages, and when it's done, the awaiter will pick up exactly where it left off, in the same thread it left off in. But in terms of yourInvoke
/BeginInvoke
contrast in the question, I'm sorry to say that you should have stopped doing that a long time ago.