I want to use snmp method to query some data by mutithread in sometime,if over time,then the task will be cancel,I write the code like this, if there is something wrong with my code(It means that the thread may only do step 1and step 2,but do not do the step 4:close snmp connetction),how to fix ?Can I have the method that is the task is cancel,I can still close the snmp clent at step 4?
public static void main(String[] args) throws InterruptedException {
var pool = new ThreadPoolExecutor(
100, 100, 0L, TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>(100));
System.out.println("Executing first batch of tasks...");
submitTasks(pool);
System.out.println("Finish first batch of tasks...");
//call submitTasks(pool) many times
...
}
private static void submitTasks(ExecutorService executor) throws InterruptedException {
var tasks = new ArrayList<Callable<String>>(100);
for (int i = 0; i < 100; i ) {
tasks.add(() -> {
try {
//1.create snmp client
//2.query data with udp link
//3.return result
return result;
}catch (Exception ex){
log.error(String.valueOf(ex));
}
} finally {
//4.close snmp
if (snmp != null) {
snmp.close();
}
}
});
}
List<Future<String>> futureList=executor.invokeAll(tasks,1,TimeUnit.SECONDS);
List<String> list=new ArrayList<>();
for (int i = 0; i < futureList.size(); i ) {
Future<String> future = futureList.get(i);
try {
list.add(future.get());
} catch (CancellationException e) {
log.info("timeOut Task:{}", i);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
I have change the code from Alexander Pavlov's example,it seems that if the task is cancel,the finally code will never run.
public static void main(String[] args) throws InterruptedException {
var pool = new ThreadPoolExecutor(
3, 3, 0L, TimeUnit.NANOSECONDS, new LinkedBlockingQueue<>(1));
try {
System.out.println("Executing first batch of tasks...");
submitTasks(pool);
System.out.println("Executing second batch of tasks...");
} finally {
pool.shutdown();
}
}
private static void submitTasks(ExecutorService executor) throws InterruptedException {
var tasks = new ArrayList<Callable<String>>(3);
final var latch = new CountDownLatch(3);
log.info(String.valueOf(latch.getCount()));
for (int i = 0; i < 3; i ) {
tasks.add(() -> {
try {
String s="1";
log.info("this.name:" Thread.currentThread().getName());
for(int a=0;a<1000000L;a ) {
s=a "";
}
return s;
}catch (Exception ex){
log.error(String.valueOf(ex));
}finally {
latch.countDown();
}
return null;
});
}
List<Future<String>> futureList=executor.invokeAll(tasks,1,TimeUnit.NANOSECONDS);
List<String> list=new ArrayList<>();
//latch.await();//
for (int i = 0; i < futureList.size(); i ) {
Future<String> future = futureList.get(i);
try {
list.add(future.get());
} catch (CancellationException e) {
log.info("timeOut Task:{}", i);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
log.info(String.valueOf(latch.getCount()));
log.info("start to await:");
try {
latch.await(); // WAIT UNTIL ALL TASKS ARE REALLY DONE
} catch (InterruptedException ex) {
log.error(String.valueOf(ex));
}
//never log end to await
log.info("end to await:");
log.info(String.valueOf(latch.getCount()));
}
CodePudding user response:
invokeAll
with timeout just calls Thread.interrupt()
for running tasks. It does not wait until task stops, it notifies task that interruption is requested. Moreover, some tasks may ignore interruption and continue working. So when you are returning from submitTasks
then it does not mean all underlying tasks are really stopped.
If you want to be 100% sure that all tasks are stopped when you exit from submitTasks
then use CountDownLatch
to control how many tasks have been really finished and exit when running tasks count is zero.
private static void submitTasks(ExecutorService executor) throws InterruptedException {
var c = 100;
final var latch = new CountDownLatch(c); // EXPECT 100 TASKS TO BE COMPLETED
var tasks = new ArrayList<Callable<String>>(c);
for (int i = 0; i < c; i ) {
tasks.add(() -> {
try {
//1.create snmp client
//2.query data with udp link
//3.return result
return result;
} catch (Exception ex) {
log.error(String.valueOf(ex));
} finally {
//4.close snmp
if (snmp != null) {
snmp.close();
}
latch.countDown(); // 1 MORE TASK IS COMPLETED
}
});
}
List<Future<String>> futureList = executor.invokeAll(tasks, 1, TimeUnit.SECONDS);
List<String> list = new ArrayList<>();
for (int i = 0; i < futureList.size(); i ) {
Future<String> future = futureList.get(i);
try {
list.add(future.get());
} catch (CancellationException e) {
log.info("timeOut Task:{}", i);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
try {
latch.await(); // WAIT UNTIL ALL TASKS ARE REALLY DONE
} catch (InterruptedException ex) {
log.error(ex);
}
}