Boss asked me to do a program, multi-thread asynchronous access, set my application to the client, electricity box set the equipment for the server, don't make fun, I also know that my program is the server, the electric box is the client will be much simpler, but have already done so, now the question is, if the electric box (server) are normal, but if one an electric box was shut down, will affect my number, I want to catch errors when the socket communication, then skip this thread,
Besides my program is relatively complicated, first with A program with multi-thread asynchronous socket access electric box, access to the source data after the preliminary processing in the RabbitMQ, then B program to get the corresponding data from the RabbitMQ second step analysis,
See now is, in A certain electric box after the break, other threads or to continue the RabbitMQ lost data, but B program don't read, I tried to get rid of the bad in A program of electric box IP, and then continue to execute, can count is normal, so influence me several mining is A process rather than B program, now I just want to go wrong in A program, or 10060 errors, skip or closed this one thread does not affect my number, this step is done then consider constantly monitored, to wait for the broken electric box better access to its data again,
Another: utek equipment (I this version does not support the heart
If master who can solve my problem, there will be ChongXie, just can have real solutions, 'interacting with professional developers' bosses don't reply, I ability is limited don't understand your answer, thank you!
The post code:
A program started traversing open multithreading:
LimitedConcurrencyLevelTaskScheduler LCTS=new LimitedConcurrencyLevelTaskScheduler (16);
List
//Create a TaskFactory and pass it to our custom scheduler.
TaskFactory factory=new TaskFactory (LCTS);
CancellationTokenSource CTS=new CancellationTokenSource ();
//Use our factory to run a set of tasks.
The Object lockObj=new Object ();
List
10.176.149.11 ip_list. Add (" ");
10.176.149.12 ip_list. Add (" ");
10.176.149.13 ip_list. Add (" ");
10.176.149.14 ip_list. Add (" ");
10.176.149.15 ip_list. Add (" ");
10.176.149.16 ip_list. Add (" ");
10.176.149.17 ip_list. Add (" ");
10.176.149.18 ip_list. Add (" ");
10.176.149.19 ip_list. Add (" ");
10.176.149.20 ip_list. Add (" ");
10.176.149.21 ip_list. Add (" ");
10.176.149.22 ip_list. Add (" ");
10.176.149.23 ip_list. Add (" ");
10.176.149.24 ip_list. Add (" ");
10.176.149.25 ip_list. Add (" ");
10.176.149.26 ip_list. Add (" ");
Try
{
The foreach (var item in ip_list)
{
Task t=factory. StartNew (()=& gt;
{
MySocket ms1=new MySocket ();
Ms1. Connect (item, Convert ToInt32 (10001));
System. The Timers. The Timer Timer=new System. Timers. The Timer (100);
The timer. Elapsed +=new System. The Timers. ElapsedEventHandler (ms1. MyTimer_Elapsed);
The timer. Enabled=true;
Ms1. Client. The Close ();
}, CTS. Token, TaskCreationOptions LongRunning, TaskScheduler. Default);//TaskCreationOptions LongRunning
The tasks. The Add (t);
}
Task. WaitAll (tasks. ToArray ());
CTS. The Dispose ();
}
The catch (Exception ex)
{
Console. WriteLine (ex);
Throw;
}
A program MySocket categories:
Private static ManualResetEvent connectDone=new ManualResetEvent (false);
Private static ManualResetEvent sendDone=new ManualResetEvent (false);
IP public void connect (string, int port)
{
Console. WriteLine (" connect "+ IP);
_ip=IP;
_port=port;
This. Client=new Socket (AddressFamily. InterNetwork, SocketType Stream, ProtocolType. Tcp);
This. Client. SendTimeout=1000;
This. Client. SetSocketOption (SocketOptionLevel. Socket, SocketOptionName. NoDelay, true);
IPEndPoint serverEndPoint=new IPEndPoint (IPAddress. Parse (IP), the port).
This. Client. BeginConnect (serverEndPoint, new AsyncCallback (ConnectCallback), the client);
If (connectDone WaitOne (5 * 1000))
{
ConnectDone. Set ();
}
The else
{
ConnectDone. Reset ();
}
}
Public void myTimer_Elapsed (object sender, System. Timers. The ElapsedEventArgs e)
{
Try
{
Byte [] sbuf=new byte [16].
Sbuf [0]=0 x00;
Sbuf [1]=0 x5a;
Sbuf [2]=0 x41;
Sbuf [3]=0 x00;
Sbuf [4]=0 x07;
Sbuf [5]=0 x00;
Sbuf [6]=0 x00;
Sbuf [7]=0 x00;
Sbuf [8]=0 x00;
Sbuf [9]=0 x00;
Sbuf [10]=0 x00;
Sbuf [11]=0 x00;
Sbuf [12]=0 x00;
Sbuf [13]=0 x00;
Sbuf [14]=0 x00;
//SerialPort SerialPort=new SerialPort (" 10002 ", 9600);
Int j=0;
for (int i=0; i <8; I++)
{
J +=sbuf [I];
}
Sbuf [15]=(byte) j;
If (this. Client==null | | this. Client. Connected==false)
Enclosing the connect (enclosing _ip _port);
IAsyncResult result=this. Client. BeginSend (sbuf, 0, sbuf. Length, 0, new AsyncCallback (SendCallback), enclosing the client);
SendDone. WaitOne (5 * 1000);
SendDone. Reset ();
}
The catch (Exception f)
{
Socket of logger. The Info (" request ");
Logger. The Info (f. may essage);
Logger. The Info (f.I nnerException);
}
}
Private static void SendCallback IAsyncResult (ar)
{
Try
{
The Socket client1=(Socket) ar. AsyncState;
Client1. SetSocketOption (SocketOptionLevel. Socket, SocketOptionName SendTimeout, 10000);
Int bytesSent=client1. EndSend (ar);
SendDone. Set ();
String recStr="";
Byte [] recBytes=new byte [16].
Int bytes=client1. The Receive (recBytes, recBytes. Length, 0);
for (int i=0; i
RecStr +=the Convert. ToString (recBytes [I], 16) + "";
}
RabbitMQSend (client1. RemoteEndPoint. ToString (). The Split (" : ") [0], recStr);
}
The catch (SocketException ex)
{
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull