There is a race condition that can occur in these methods due to the delay (and lack of synchronization) between the check of the futures map and the insert of the scheduled thread into the futures map. This problem results in multiple scheduled polling threads being created for the same slave on certain (slower) hardware. The issue is easily resolved by synchronizing the maintenance of the futures map on the slave. Fix is pasted below.
private void handleSub(final SlaveNode slave, final Node event) {
synchronized (slave)
{
slave.addToSub(event);
LOGGER.info(String.format("Added subscription for slave %s, register %s", slave.node.getName(), event.getName()));
if (futures.containsKey(slave))
{
return;
}
ScheduledThreadPoolExecutor stpe = slave.getDaemonThreadPool();
ScheduledFuture<?> future = stpe.scheduleWithFixedDelay(new Runnable()
{
@Override
public void run()
{
slave.readPoints();
}
}, 0, slave.intervalInMs, TimeUnit.MILLISECONDS);
futures.put(slave, future);
LOGGER.info(String.format("Scheduled poll for slave %s at %dms", slave.node.getName(), slave.intervalInMs));
}
}
private void handleUnsub(SlaveNode slave, Node event) {
synchronized (slave)
{
slave.removeFromSub(event);
LOGGER.info(String.format("Removed subscription for slave %s, register %s", slave.node.getName(), event.getName()));
if (slave.noneSubscribed())
{
ScheduledFuture<?> future = futures.remove(slave);
if (future != null)
{
future.cancel(false);
LOGGER.info(String.format("Cancelled poll for slave %s", slave.node.getName()));
}
}
}
}
There is a race condition that can occur in these methods due to the delay (and lack of synchronization) between the check of the
futuresmap and the insert of the scheduled thread into thefuturesmap. This problem results in multiple scheduled polling threads being created for the same slave on certain (slower) hardware. The issue is easily resolved by synchronizing the maintenance of the futures map on the slave. Fix is pasted below.