question

EmonHaque-1485 avatar image
0 Votes"
EmonHaque-1485 asked Will-9174 answered

Does ConcurrentQueue fail on Enqueue or TryDequeue

I've been experimenting on UDP for a few days and in one of the UDP projects, Client, I've a ConcurrentQueue where client stores byte[] in a function receiveOrder

 void receiveOrder()
 {
     var buffer = new byte[Constants.ORDER_SIZE];
     while (!receiveSource.IsCancellationRequested)
     {
         tradeClient.Receive(buffer);
         recCount++;
         orderQueue.Enqueue(buffer.ToArray());
     }
 }

and dequeues byte[] in another function processOrder:

 void processOrder()
 {
     while (!receiveSource.IsCancellationRequested)
     {
         if (orderQueue.Count > 0)
         {
             byte[] buffer;
             while (!orderQueue.TryDequeue(out buffer)) { }
             Orders.Add(Encoding.UTF8.GetString(buffer));
             count++;
         }
     }
 }

I start both of these functions with these:

 receiveSource = new CancellationTokenSource();
 Task.Run(receiveOrder, receiveSource.Token);
 Task.Run(processOrder, receiveSource.Token);

I've used 2 PCs in my experiment one with IP 1**.**.***.101 and the other 1**.**.***.105. Server and a Client were on 1**.**.***.101 and on 1**.**.***.105 I'd only the client. Both Client and Server are .Net5 application and here's what it does and here's the project.

I've no problem with Client on 1**.**.***.105 BUT, it looks like, the Client on 1**.**.***.101 fails either in enqueuing or dequeuing.


EDIT

Do I really need ConcurrentQueue where only one thread is assigned for enqueuing and only one other thread is dequeuing or a normal Queue will do the job perfectly in such scenario?



Notes for reference

Over the past couple of days what I've learnt is: these blocking Receive or continuous processing, as in processOrder, are long task and I shouldn't use Task in these cases because Task with long running option under the hood creates a new Thread. There're two options, excluding sequential Socket.Select, 1) ReceiveAsync with a Callback (eg. SocketAsyncEventArgs) or Thread.

Synchronous (blocking Receive) operation with Thread is the most efficient BUT it takes about a MB of RAM for each Thread. ReceiveAsync uses ThreadPool (Limited Number of Thread and as a result it lets us have more client/connection) for the Callback and keeps polling in the background to see which socket has data and fire the OnCompleted event so it's less efficient.

windows-wpf
· 6
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

58494-concurrentqueue.jpg



There is no issue in my internal test. The video in YouTube is unavailable "This video is private"

1 Vote 1 ·
concurrentqueue.jpg (49.5 KiB)

@ESCfomm-4204, the video is public again.

It happens sometime when I start submitting orders simultaneously in both PCs

0 Votes 0 ·

@EmonHaque-1485 I can't see the video for what you want, could you show it again for me to get more details? And the IP belongs to private info, please update it in the form of 1**.**.***.105.

0 Votes 0 ·

I thought, someone has seen already and made it private this morning. Now it's public again.

0 Votes 0 ·

@EmonHaque-1485 I tested your project, but I can't show the two clients, could you tell me the way to show it?

0 Votes 0 ·
Show more comments

1 Answer

Will-9174 avatar image
1 Vote"
Will-9174 answered

UDP protocol could be missing data packet. If you send mass data with UDP, client was able to handle the data and there is no sequencing of data in UDP and no retransmission of lost packets. If you require to guaranteed the data completion , I suggest you can use TCP protocol.

ConcurrentQueue is thread-safe collection. It provided the multiple threads operation. So I thinks this issue is not related with ConcurrentQueue.

I checked your project, to improve the data efficient, we could set the the connections number using socket.Listen(100) . Also you can try TCP protocol to check data transfer issue, as the sample below.

  class Program
     {
         static Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
         private static byte[] result = new byte[1024];
    
         static void Main(string[] args)
         {
             SocketServie();
         }
    
    
         public static void SocketServie()
         {
             Console.WriteLine("Server start");
             string host = ConfigurationManager.AppSettings["HostIP"];
    
             int port = Convert.ToInt32(ConfigurationManager.AppSettings["Port"]);
             socket.Bind(new IPEndPoint(IPAddress.Parse(host), port));
             socket.Listen(100);
             Thread myThread = new Thread(ListenClientConnect);
             myThread.Start();
             Console.ReadLine();
         }
    
           
         private static void ListenClientConnect()
         {
             while (true)
             {
                 Socket clientSocket = socket.Accept();
                 clientSocket.Send(Encoding.UTF8.GetBytes("from server"));
                 Thread receiveThread = new Thread(ReceiveMessage);
                 receiveThread.Start(clientSocket);
             }
         }
    
           
         private static void ReceiveMessage(object clientSocket)
         {
             Socket myClientSocket = (Socket)clientSocket;
             while (true)
             {
                 try
                 {
                      
                     int receiveNumber = myClientSocket.Receive(result);
                     if (receiveNumber == 0)
                         return;
                     Console.WriteLine("received client {0} message:{1}", myClientSocket.RemoteEndPoint.ToString(), Encoding.UTF8.GetString(result, 0, receiveNumber));
                      
                     string sendStr = "received cliented send msg ";
                     byte[] bs = Encoding.UTF8.GetBytes(sendStr);
                     myClientSocket.Send(bs, bs.Length, 0);  
                      
                 }
                 catch (Exception ex)
                 {
                     Console.WriteLine(ex.Message);
                     myClientSocket.Close();
                     myClientSocket.Shutdown(SocketShutdown.Both);
                     break;
                 }
             }
         }
     }



5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.