How to Guarantee 100 Tasks Running in Parallel in C#

cmultithreading

I was trying to create an integration test for my service where 100 clients would connect, login, send request, and log all responses for some configurable amount of time.

I was build a class for the client using async sockets and it works fine. I started them all up using Task and Task.Factory, sent the login, and posted receives every time I got data, until the time expired and then I called shutdown on them.

It seems these never really run in parallel. Sometimes I might get 35ish running at once, sometimes a little more. I assume the task scheduler is running them when it seems fit rather than all at once.

Now I understand that I cannot truly have 100 threads running simultaneous, but I want to guarantee that all 100 are started and that the OS is context switching back and forth attempting to execute them all.

In the end, I want to simulate a large number of clients connected to my service all getting a stream of data.

What construct do I use if Task does not work?

Current Attempt:

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace IntegrationTests
{
    class Program
    {
        static void Main(string[] args)
        {
            string       server            = ConfigurationManager.AppSettings["server"];
            int          port              = int.Parse(ConfigurationManager.AppSettings["port"]);
            int          numClients        = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
            TimeSpan     clientLifetime    = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
            TimeSpan     timeout           = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
            TimeSpan     reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
            List<string> clientIds         = ConfigurationManager.GetSection("clientIds") as List<string>;

            try
            {
                // SNIP configure logging

                // Create the specified number of clients, to carry out test operations, each on their own threads
                Task[] tasks = new Task[numClients];
                for(int count = 0; count < numClients; ++count)
                {
                    var index = count;
                    tasks[count] = Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                // Reuse client Ids, if there are more clients then clientIds.
                                // Keep in mind that tasks are not necessarily started in the order they were created in this loop.
                                // We may see client id 1 be assigned client id 2 if another client was started before it, but we
                                // are using all clientIds
                                string clientId = null;
                                if (numClients < clientIds.Count)
                                {
                                    clientId = clientIds[index];
                                }
                                else
                                {
                                    clientId = clientIds[index % clientIds.Count];
                                }

                                // Create the actual client
                                Client client = new Client(server, port, clientId, timeout, reconnectInterval);
                                client.Startup();

                                // Will make an sync request issue a recv.
                                // Everytime we get a reponse, it will be logged and another recv will be posted.
                                // This will continue until shutdown is called
                                client.MakeRequest(symbol);

                                System.Threading.Thread.Sleep(clientLifetime);

                                client.Shutdown();
                            }
                            catch(Exception e)
                            {
                                // SNIP - Log it
                            }
                        });
                }
                Task.WaitAll(tasks);
            }
            catch (Exception e)
            {
                // SNIP - Log it
            }
        }
    }
}

Best Answer

Tasks and Threads exist for different purposes. Tasks are intended to be short running things which need to be run in the background. Threads represent an operating resource for concurrent execution.

Internally, the TaskManager uses a Thread pool so that it can reuse threads to process more tasks. Threads are expensive to set up and tear down so they don't work well for the purpose that Tasks were created. While you can influence the number of threads available to the task manager, it is still responsible for dishing out the work to the threads.

Guaranteeing X Number of Concurrent Clients

The only way to guarantee this is to use Thread instead of Task. If you were to restructure your code a bit you could handle your simultaneous clients like this:

using System;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Text;
using System.Threading;

namespace IntegrationTests
{
    private static string server;
    private static int port;
    private static TimeSpan clientLifetime;
    private static TimeSpan timeout;
    private static TimeSpan reconnectInterval;
    private static List<string> clientIds;
    private static Barrier barrier;

    class Program
    {
        static void Main(string[] args)
        {
            int          numClients        = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
            server            = ConfigurationManager.AppSettings["server"];
            port              = int.Parse(ConfigurationManager.AppSettings["port"]);
            clientLifetime    = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
            timeout           = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
            reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
            clientIds         = ConfigurationManager.GetSection("clientIds") as List<string>;
            barrier           = new Barrier(numClients + 1);

            try
            {
                // SNIP configure logging

                // Create the specified number of clients, to carry out test operations, each on their own threads
                Thread[] threads= new Thread[numClients];
                for(int count = 0; count < numClients; ++count)
                {
                    var index = count;
                    threads[count] = new Thread();
                    threads[count].Name = $"Client {count}"; // for debugging
                    threads[count].Start(RunClient);
                }

                // We loose the convenience of awaiting all tasks,
                // but use a thread barrier to block this thread until all the others are done.
                barrier.SignalAndWait();
            }
            catch (Exception e)
            {
                // SNIP - Log it
            }
        }

        private void RunClient()
        {
            try
            {
                // Reuse client Ids, if there are more clients then clientIds.
                // Keep in mind that tasks are not necessarily started in the order they were created in this loop.
                // We may see client id 1 be assigned client id 2 if another client was started before it, but we
                // are using all clientIds
                string clientId = null;
                if (numClients < clientIds.Count)
                {
                    clientId = clientIds[index];
                }
                else
                {
                    clientId = clientIds[index % clientIds.Count];
                }

                // Create the actual client
                Client client = new Client(server, port, clientId, timeout, reconnectInterval);
                client.Startup();

                // Will make an sync request issue a recv.
                // Everytime we get a reponse, it will be logged and another recv will be posted.
                // This will continue until shutdown is called
                client.MakeRequest(symbol);

                System.Threading.Thread.Sleep(clientLifetime);

                client.Shutdown();
            }
            catch(Exception e)
            {
                // SNIP - Log it
            }
            finally
            {
                barrier.SignalAndWait();
            }
        }
    }
}
Related Topic