Does ConcurrentQueue fail on Enqueue or TryDequeue

Emon Haque 3,176 Reputation points
2020-11-14T14:23:59.21+00:00

I've been experimenting on UDP for a few days and in one of the UDP projects, Client, I've a ConcurrentQueue where client stores byte[] in a function receiveOrder

void receiveOrder()
{
    var buffer = new byte[Constants.ORDER_SIZE];
    while (!receiveSource.IsCancellationRequested)
    {
        tradeClient.Receive(buffer);
        recCount++;
        orderQueue.Enqueue(buffer.ToArray());
    }
}

and dequeues byte[] in another function processOrder:

void processOrder()
{
    while (!receiveSource.IsCancellationRequested)
    {
        if (orderQueue.Count > 0)
        {
            byte[] buffer;
            while (!orderQueue.TryDequeue(out buffer)) { }
            Orders.Add(Encoding.UTF8.GetString(buffer));
            count++;
        }
    }
}

I start both of these functions with these:

receiveSource = new CancellationTokenSource();
Task.Run(receiveOrder, receiveSource.Token);
Task.Run(processOrder, receiveSource.Token);

I've used 2 PCs in my experiment one with IP 1**.**.***.101 and the other 1**.**.***.105. Server and a Client were on 1**.**.***.101 and on 1**.**.***.105 I'd only the client. Both Client and Server are .Net5 application and here's what it does and here's the project.

I've no problem with Client on 1**.**.***.105 BUT, it looks like, the Client on 1**.**.***.101 fails either in enqueuing or dequeuing.


EDIT

Do I really need ConcurrentQueue where only one thread is assigned for enqueuing and only one other thread is dequeuing or a normal Queue will do the job perfectly in such scenario?


Notes for reference

Over the past couple of days what I've learnt is: these blocking Receive or continuous processing, as in processOrder, are long task and I shouldn't use Task in these cases because Task with long running option under the hood creates a new Thread. There're two options, excluding sequential Socket.Select, 1) ReceiveAsync with a Callback (eg. SocketAsyncEventArgs) or Thread.

Synchronous (blocking Receive) operation with Thread is the most efficient BUT it takes about a MB of RAM for each Thread. ReceiveAsync uses ThreadPool (Limited Number of Thread and as a result it lets us have more client/connection) for the Callback and keeps polling in the background to see which socket has data and fire the OnCompleted event so it's less efficient.

Windows Presentation Foundation
Windows Presentation Foundation
A part of the .NET Framework that provides a unified programming model for building line-of-business desktop applications on Windows.
2,663 questions
{count} votes

Accepted answer
  1. Will 81 Reputation points Microsoft Employee
    2021-06-15T02:54:27.443+00:00

    UDP protocol could be missing data packet. If you send mass data with UDP, client was able to handle the data and there is no sequencing of data in UDP and no retransmission of lost packets. If you require to guaranteed the data completion , I suggest you can use TCP protocol.

    ConcurrentQueue is thread-safe collection. It provided the multiple threads operation. So I thinks this issue is not related with ConcurrentQueue.

    I checked your project, to improve the data efficient, we could set the the connections number using socket.Listen(100) . Also you can try TCP protocol to check data transfer issue, as the sample below.

     class Program
        {
            static Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            private static byte[] result = new byte[1024];
    
            static void Main(string[] args)
            {
                SocketServie();
            }
    
    
            public static void SocketServie()
            {
                Console.WriteLine("Server start");
                string host = ConfigurationManager.AppSettings["HostIP"];
    
                int port = Convert.ToInt32(ConfigurationManager.AppSettings["Port"]);
                socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
                socket.Listen(100);
                Thread myThread = new Thread(ListenClientConnect);
                myThread.Start();
                Console.ReadLine();
            }
    
    
            private static void ListenClientConnect()
            {
                while (true)
                {
                    Socket clientSocket = socket.Accept();
                    clientSocket.Send(Encoding.UTF8.GetBytes("from server"));
                    Thread receiveThread = new Thread(ReceiveMessage);
                    receiveThread.Start(clientSocket);
                }
            }
    
    
            private static void ReceiveMessage(object clientSocket)
            {
                Socket myClientSocket = (Socket)clientSocket;
                while (true)
                {
                    try
                    {
    
                        int receiveNumber = myClientSocket.Receive(result);
                        if (receiveNumber == 0)
                            return;
                        Console.WriteLine("received client {0} message:{1}", myClientSocket.RemoteEndPoint.ToString(), Encoding.UTF8.GetString(result, 0, receiveNumber));
    
                        string sendStr = "received cliented send msg ";
                        byte[] bs = Encoding.UTF8.GetBytes(sendStr);
                        myClientSocket.Send(bs, bs.Length, 0);  
    
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        myClientSocket.Close();
                        myClientSocket.Shutdown(SocketShutdown.Both);
                        break;
                    }
                }
            }
        }
    
    1 person found this answer helpful.
    0 comments No comments

0 additional answers

Sort by: Most helpful