I've build this "throttled" task runner, that collects some data in a HashMap and at the same time (every minute) sends that data "away" and clears the HashMap. In my tests I've noticed that the executor part can stop the scheduleAtFixedRate
and never clear
the HashMap again. I'm assuming this is because HashMap modification I'm doing are not threadsafe and it is crashing inside run()
without a recovery. I'm modifying the HashMap in both threads. Can somebody point me into the right direction on how to optimize my modifications of the HashMap.
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class StatisticEventsDispatcher {
public HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();
final Duration timeout = Duration.ofMinutes(1);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
public StatisticEventsDispatcher(EventBus eventBus) {
executor.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
mappedCachedEvents.values().forEach(eventBus::post);
mappedCachedEvents.clear();
// i think this is not thread safe
}
},
timeout.toMillis(),
timeout.toMillis(),
TimeUnit.MILLISECONDS);
}
public void applyChanges(String type, Map<String, Long> changes) {
AbstractStatisticsEvent event;
if (mappedCachedEvents.containsKey(type)) {
event = mappedCachedEvents.get(type);
} else {
event = new AbstractStatisticsEvent(type);
mappedCachedEvents.put(type, event);
// i think this not thread safe
}
event.apply(changes);
}
}
CodePudding user response:
Your code is definitely not thread-safe. But from your description, there's little reason for it to be. Do you really need multiple threads in this scheduler?
Just updating and copying a Map is very fast. One thread will always be faster than multiple threads competing for access even if you have hundreds of thousands of consumers/producers.
So my suggestion would be to keep a single-threaded executor that "owns" the Map (i.e. do all updates/reads from inside the scheduler's Thread).
If "sending" the contents of the Map away requires IO, make sure to copy the Map into an immutable one with Map.copyOf(mutableMap)
, then give this copy to another Thread for processing it. Meanwhile, the "active" map can continue being updated/read... you can even write a "drain" operation that puts all the map's contents into another one efficiently to avoid the duplicate Maps existing simultaneously.
CodePudding user response:
I suggest following implementation:
public class StatisticEventsDispatcher {
private ConcurrentMap<String, AbstractStatisticsEvent> mappedCachedEvents = new ConcurrentHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
final Duration timeout = Duration.ofMinutes(1);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
public StatisticEventsDispatcher(EventBus eventBus) {
executor.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Map<String, AbstractStatisticsEvent> tmp;
lock.writeLock().lock();
try {
tmp = mappedCachedEvents;
mappedCachedEvents = new ConcurrentHashMap<>();
} finally {
lock.writeLock().unlock();
}
tmp.values().forEach(eventBus::post);
}
},
timeout.toMillis(),
timeout.toMillis(),
TimeUnit.MILLISECONDS);
}
public void applyChanges(String type, Map<String, Long> changes) {
ConcurrentMap<String, AbstractStatisticsEvent> tmp;
lock.readLock().lock();
try {
tmp = mappedCachedEvents;
} finally {
lock.readLock().unlock();
}
AbstractStatisticsEvent event = tmp.computeIfAbsent(type, AbstractStatisticsEvent::new);
event.apply(changes);
}
}
Note that I made mappedCachedEvents
private
because you can't access it without getting a lock.
CodePudding user response:
Yes, if two threads are modifying one HashMap
without any concurrency control, it will not behave as you intend.
While there are ConcurrentMap
implementations, it appears that you want to "post" a snapshot of the collected events atomically. That is, it appears that you don't want new events to be added or events to be updated while you are iterating over the collection, even if it can be done in a thread-safe manner.
It would be relatively simple to synchronize access to the shared map during iteration, but if iteration takes a while, updates from the main thread would be blocked during that time, which seems undesirable.
Alternatively, the scheduled task could lock briefly to exchange the map with a new one:
private final Object lock = new Object();
private HashMap<String, AbstractStatisticsEvent> mappedCachedEvents = new HashMap<>();
...
executor.scheduleAtFixedRate(() ->
{
Map<String, AbstractStatisticsEvent> toPost;
synchronized(lock) {
toPost = mappedCachedEvents;
mappedCachedEvents = new HashMap<>();
}
toPost.values().forEach(eventBus::post);
},
timeout.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
...
public void applyChanges(String type, Map<String, Long> changes) {
synchronized(lock) {
AbstractStatisticsEvent event =
mappedCachedEvents.computeIfAbsent(type, AbstractStatisticsEvent::new);
event.apply(changes);
}
}
}