I have to make three producers and one consumer thread. Producers threads is reading chars from file and using buffer of one char. I made Store like above, now i have to synchronize threads to consumer write whole word and give back control to other producer - patern like this
Producer1->Word1
Producer2->word1
Producer3->word1
Producer1->Word2
Producer2->word2
Producer3->word2
Producer1->Word3
Producer2->word3
Producer3->word3
Store (edited):
public class Store {
public static final char CONSUMER_FREE = '\0';
private volatile char consumer = CONSUMER_FREE;
private final ThreadPoolExecutor executor;
public Store(ThreadPoolExecutor executor) {
this.executor = executor;
}
public synchronized void produce(char c) {
while (isConsumerBussy()) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
consumer = c;
notify(); // single Consumer
}
public synchronized char consume() throws StoreProducersRip {
while (isConsumerFree()) {
try {
wait(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
return consumer;
} finally {
freeConsumer();
notifyAll();
}
}
private void freeConsumer() {
consumer = CONSUMER_FREE;
}
private boolean isConsumerBussy() {
return consumer != CONSUMER_FREE;
}
private boolean isConsumerFree() throws StoreProducersRip {
if (executor.getActiveCount() == 0) throw new StoreProducersRip();
return consumer == CONSUMER_FREE;
}
}
CodePudding user response:
You can use locks for this purpose. Below code demonstrates 3 producer synchronous and 1 consumer async to producers but it'd also be sync to multiple consumers here. Also you should add some logic to consume same amount that produced. If you want to get benefits of multithreading, you should buffer your producer input and create some pipeline before consuming.
public class Executor {
public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
Store store = new Store(executor);
Runnable word1 = new Runnable() {
@Override
public void run() {
store.producerLock.lock();
store.produce('1');
store.produce('2');
store.produce('3');
store.producerLock.unlock();
}
};
Runnable consume = new Runnable() {
@Override
public void run() {
store.consumerLock.lock();
System.out.print(store.consume());
System.out.print(store.consume());
System.out.print(store.consume());
System.out.println();
store.consumerLock.unlock();
}
};
Future p1 = executor.submit(word1);
Future p2 = executor.submit(word1);
Future p3 = executor.submit(word1);
executor.submit(consume);
executor.submit(consume);
executor.submit(consume);
executor.shutdown();
}
}