Home > other >  ZMQ of the ROUTER to N the worker load balancing rules of doubt
ZMQ of the ROUTER to N the worker load balancing rules of doubt

Time:01-26

- on the ROUTER in a high level request response to N the rules of the worker load balance
- an example of P96 rtreq. C described in agents, in accordance with the rules of the least recently used for workers to distribute tasks, I will first workers work every time time is changed to 400 milliseconds, the worker ID and print the retrieval task order, it is the result of the interception of the first half of the, from the point of view, the results before ten is ten workers random order to take the task, since the eleventh, according to the rules, should be the zeroth workers distributed task, but actually for the sixth workers to distribute the tasks, according to????????
0 8 7 3 4 5 6 9 2 1 2 0 5 3 4 7 8 9 June 1 0 July 8 2 3 4 5 9 0 8 4 5 6 7 1 2 3 9 8 1 0 3 4 6 7 2
- guess: the workers work, performed and sent agents ready signal, at this point, the workers into the queue, waiting for the agents assigned tasks, but due to the state of the network, is not necessarily the ready signal to reach 0 queue,
 
Package NLB;

//the ROUTER to N worker


The import org. Zeromq. ZMQ;
The import org. Zeromq. ZMQ. Context;
The import org. Zeromq. ZMQ. The Socket;

import java.util.Random;

Public class rtreq {
Private static final ints NBR_WORKERS=10;
Private static final long start=System. CurrentTimeMillis ();
Private static int ID=0;

Public static void main (String [] args) throws the Exception {

The Context Context=ZMQ. The Context (1);
The Socket broker=context. The Socket (ZMQ. The ROUTER).

Broker. Bind (" TCP://* : 5671 ");

For (int worker_nbr=0; Worker_nbr & lt; NBR_WORKERS; + + worker_nbr) {
New Thread (() - & gt; Worker_task ()). The start ();
}

Long end_time=clock () + 5000;
Int workers_fired=0;
While (true) {
String identity=new String (broker. Recv (0));
System. The out. Print (identity + "");
Broker. SendMore (identity);
The broker. The recv ();
The broker. The recv ();
Broker. SendMore (" ");

If (clock () & lt; End_time) {
Broker. Send (" Work harder ");
} else {
Broker. Send (" Fired!" );
If (+ + workers_fired==NBR_WORKERS) {
break;
}
}
}

The broker. The close ();
The context. The close ();
}

Private static void worker_task () {
The Context Context=ZMQ. The Context (1);
The Socket worker=context. The Socket (ZMQ. The REQ);
String id=(ID++) + "";
Worker. SetIdentity (id) getBytes ());
The worker. The connect (" TCP://localhost: 5671 ");

Int total=0;
While (true) {
//prepare worke
Boss worker. Send (" Hi ");

//get workload util recv Fired
Byte [] workload=worker. Recv (0);
If (new String (workload.) equals (" Fired!" )) {
System. The out. Println (id + "Completed:" + total + "tasks");
break;
}
+ + total;

//do something
Try {
//Thread. Sleep (new Random () nextInt (500) + 1);
Thread.sleep (300);
} the catch (Exception e) {
System. The out. Println (" error ");
}
}
The worker. The close ();
The context. The close ();
}

Private static long clock () {
Return System. CurrentTimeMillis () - start;
}
}

  • Related