The complete code for this question is available here: https://github.com/NACHC-CAD/thread-tool
The code shown below seems to run to completion but never escapes the while loop shown here.
while (this.active.size() > 0) {
// System.out.println("here");
}
If I uncomment the // System.out.println("here");
it does run to completion. It also runs to completion if I add a sleep-for-one-second.
The larger problem is that if I try to use this code in a real application the code runs for awhile and then seems to deadlock (i.e. the code just stops running).
What do I need to do to fix this?
package org.nachc.tools.threadtool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.nachc.tools.threadtool.runnableiter.ThreadToolUser;
import org.nachc.tools.threadtool.worker.ThreadToolWorker;
import org.nachc.tools.threadtool.worker.ThreadToolWorkerRunnable;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ThreadRunner {
private int numberOfThreadsPerWorker;
private int numberOfRunnablesPerWorker;
private int numberOfWorkers;
private ThreadToolUser runnableIter;
private List<ThreadToolWorker> active = new ArrayList<ThreadToolWorker>();
private Object lock = new Object();
private ThreadPoolExecutor executor;
public ThreadRunner(int numberOfThreadsPerWorker, int numberOfRunnablesPerWorker, int numberOfWorkers, ThreadToolUser runnableIter) {
this.numberOfThreadsPerWorker = numberOfThreadsPerWorker;
this.numberOfRunnablesPerWorker = numberOfRunnablesPerWorker;
this.numberOfWorkers = numberOfWorkers;
this.runnableIter = runnableIter;
this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfWorkers);
}
public void exec() {
synchronized (lock) {
addWorkers();
}
while (this.active.size() > 0) {
// System.out.println("here");
}
log.info("SHUTTING DOWN----------------");
executor.shutdown();
try {
executor.awaitTermination(1000, TimeUnit.HOURS);
} catch (Exception exp) {
throw (new RuntimeException(exp));
}
}
private void addWorkers() {
log.info("start addWorkers");
while (runnableIter.hasNext() && active.size() < numberOfWorkers) {
ThreadToolWorker worker = getNextWorker();
if (worker == null) {
break;
} else {
this.active.add(worker);
}
}
log.info("done addWorkers");
}
private ThreadToolWorker getNextWorker() {
synchronized (lock) {
log.info("start next worker");
if (runnableIter.hasNext() == false) {
return null;
}
List<Runnable> runnableList = new ArrayList<Runnable>();
while (runnableList.size() < numberOfRunnablesPerWorker && runnableIter.hasNext()) {
runnableList.add(runnableIter.getNext());
}
ThreadToolWorker worker = new ThreadToolWorker(runnableList, numberOfThreadsPerWorker, this);
ThreadToolWorkerRunnable runnable = new ThreadToolWorkerRunnable(worker);
this.executor.execute(runnable);
log.info("done next worker");
return worker;
}
}
public void done(ThreadToolWorker worker) {
synchronized (lock) {
log.info("start done");
this.active.remove(worker);
if (active.size() > 0) {
addWorkers();
}
log.info("done done");
}
}
public void logActive() {
synchronized (lock) {
log.info("------------");
log.info("active: " active.size());
log.info("waiting: " runnableIter.waiting());
log.info("------------");
}
}
}
CodePudding user response:
Your loop accesses this.active.size()
without any synchronization. Java's memory visibility rules do not guarantee when (if ever) one thread that looks at shared variables will see changes made by other threads if the thread that's looking does not use some form of synchronization with the other threads.
If you want to poll the state of the active
list, then consider doing something like this:
while (true) {
synchronized(lock) {
if (this.active.size() <= 0) break;
}
try {
Thread.sleep(1000);
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
The sleep()
call gives other threads a chance to run. Without the sleep, the thread that's running this loop will use 100% of a CPU, which may be detrimental to the performance of your application.
But there are better options than polling. You could do this to wait until the active
list becomes empty.
synchronized(lock) {
while (this.active.size() > 0) {
lock.wait();
}
}
The wait()
call temporarily releases the lock
, then it waits for another thread to notify()
the lock, and finally, it re-locks the lock before it returns:
public void done(ThreadToolWorker worker) {
synchronized (lock) {
log.info("start done");
this.active.remove(worker);
if (active.size() > 0) {
addWorkers();
}
else {
lock.notifyAll();
}
log.info("done done");
}
}