Skip to content

COM-464 Deep Sleep with Rouser#3

Merged
SeverTopan merged 23 commits intosynaptive/masterfrom
synaptive/dev/sever/COM-464_DeepSleepWithRouser
Mar 20, 2018
Merged

COM-464 Deep Sleep with Rouser#3
SeverTopan merged 23 commits intosynaptive/masterfrom
synaptive/dev/sever/COM-464_DeepSleepWithRouser

Conversation

@SeverTopan
Copy link
Copy Markdown

A change set performed in response to the feedback on #2. This pull request basically ups the robustness of the pull request.

Problems addressed:

  • Compiler operation reordering: no more usage of std::memory_order_relaxed in response to Hayk's comments.
  • Function signature change for SlottedBag::tryEmptyAny.
  • There is an inherent problem with having a worker going to sleep. How does it ensure there is no task present in its queue? Any attempt to use a task counter neccessarily introduces spin-locking. This is solved by allowing threads to go into the idle state while there are still tasks in their queue, and introducing a 'Rouser' that periodically (currently every 10ms) wakes threads up, ensuring that no tasks are dropped. The Rouser thread essentially upper bounds the delay between when a task is posted, and when it is processed.

On the performance side, I don't have exact numbers, but based off preliminary results this updated PR is slightly faster than #2 due to the removal of spin-locking. Overall still a major performance improvement over the old pplx scheduler.

*/
* @brief push Push data to queue.
* @param data Data to be pushed.
* @param is_strong false if we wish to allow spurious failures
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_strong is no longer a param

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

*/
* @brief pop Pop data from queue.
* @param data Place to store popped data.
* @param is_strong false if we wish to allow spurious failures
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer a param

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Comment thread include/thread_pool/rouser.hpp Outdated
};

inline Rouser::Rouser(std::chrono::microseconds rouse_period)
: m_running_flag(true)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be true here? or false, and set to true in start().

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, Rouser and Workers are now robust to repeated start/stop cycles.

Comment thread include/thread_pool/rouser.hpp Outdated
template <typename Task, template<typename> class Queue>
inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>* workers, SlottedBag<Queue>* idle_workers, std::atomic<size_t>* num_busy_waiters)
{
m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, workers, idle_workers, num_busy_waiters);
Copy link
Copy Markdown

@mmthomas mmthomas Mar 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on repeated calls to start/stop/start/stop, the m_running_flag state will never be reset back to true.

If we only allow one transition from true to false, then consider throwing here if the value of m_running_flag is not true.

Otherwise, throw if it not false and set to true before starting the thread.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will add the throw. I think it's safe to assume we will not need to be able to restart thread pools.

The ability to restart also begs the question of what to do with the tasks left in the worker queues. Currently they're all just dropped.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Comment thread include/thread_pool/slotted_bag.hpp Outdated

/**
* @brief tryEmptyAny Try to empty any slot in the bag.
* @return a pair containing true upon success along with the id of the emptied slot, false otherwise with an id of UINT_MAX.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UINT_MAX? or ((size_t) -1)? is there a constant for that? numeric_limits<size_t>::max()?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

*/
* @brief push Push data to queue.
* @param data Data to be pushed.
* @param is_strong false if we wish to allow spurious failures
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is outdated, remove the is_strong parameter definition

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

*/
* @brief pop Pop data from queue.
* @param data Place to store popped data.
* @param is_strong false if we wish to allow spurious failures
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same here

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

}
}

template <typename T>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove move constructor and move assignment operator? If you got confused with my comment in the previous review then it was for copy constructor/assignment only.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted!

{
Cell* cell;
size_t pos = m_enqueue_pos.load(std::memory_order_relaxed);
size_t pos = m_enqueue_pos.load(std::memory_order_acquire);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changing the memory orders

{
Cell* cell;
size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
size_t pos = m_dequeue_pos.load(std::memory_order_acquire);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here, thanks for changing

Comment thread include/thread_pool/thread_pool.hpp Outdated
/**
* @brief Move ctor implementation.
*/
ThreadPoolImpl(ThreadPoolImpl&& rhs) noexcept = delete;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move semantics is useful with this type of objects. Would be better if you restore the move constructor and assignment operator.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted!

Comment thread include/thread_pool/thread_pool.hpp Outdated
{

template <typename Task, template<typename> class Queue>
class ThreadPoolImpl;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a stylistic comment. It's a common practice to use the Impl suffix when you implement "pimpl" idiom. For template implementations of classes you can use GenericThreadPool, BasicThreadPool, ThreadPoolTemplate, etc.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the original naming scheme in the upstream. Will alter the naming.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use ThreadPool and GenericThreadPool

m_next_worker = rhs.m_next_worker.load();
}
return *this;
m_rouser.stop();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be an issue with order here. Needs to be double checked. Seems that stops should be in the inverse order of starts.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed!

return getWorker().tryPost(std::forward<Handler>(handler));
// If there aren't busy waiters, let's see if we have any idling threads.
// These incur higher overhead to wake up than the busy waiters.
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type of checks are not giving you any guarantee that when the execution reaches next line the condition still holds. This is the same type of issue that you had in the queue class in the previous version. If you want to have this type of decision points you need to look at them only as if the purpose is to decrease statistical likelihood of hitting a slow path.

You need to model what happens in two scenarios and make sure the logic is correct

  1. variable m_num_busy_waiters loads 0 but when the code enters the body of if statement state of the variable is modified by other thread
  2. variable m_num_busy_waiters loads other value than 0, execution jumps over if statement and then other thread changes it to 0

If in both cases behavior is what you expect then you are good.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the posting algorithm should be ok given the two points you mentioned. The busy waiter count check does not need to be strict. It just lowers the likelihood of two situations:

  1. Posting the task to the queue of an active worker when non-active workers are available in the pool.
  2. Hitting the slow-ish path of checking for idle threads (which requires a queue pop operation as opposed to just an atomic read).


// We have to ensure that at least one thread is active after our submission.
// Threads could have transitioned into idling under our feet. We need to account for this.
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue is present here. Please carefully model cases when other thread changes the state of the variable.

Copy link
Copy Markdown
Author

@SeverTopan SeverTopan Mar 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above; this is a soft check that lower the likelihood of task dropping and improves the response time of posted tasks. The Rouser thread enforces the strict constraint on dropped tasks.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize the comments above this check are misleading, will update.

*/
* @brief MPMCBoundedQueue destructor.
*/
virtual ~MPMCBoundedQueue() = default;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class ever derived from? Are there any other virtual methods? It may be worth marking the class as final and removing the virtual dtor.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Comment thread include/thread_pool/slotted_bag.hpp Outdated
/**
* @brief SlottedBag destructor.
*/
virtual ~SlottedBag() = default;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make class final and remove virtual methods?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Comment thread include/thread_pool/thread_pool.hpp Outdated
if (result.first)
{
auto success = m_workers[result.second]->tryPost(std::forward<Handler>(handler));
m_workers[result.second]->wake();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if success is false, is it even worth waking the worker?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryEmptyAny removes the worker from the idle worker bag, so once we fail to post we need to ensure that the worker will be re-added to the bag. The wake function spins the worker back up so that it does this.

Realistically this condition would be very low probability, the only time posting fails is when the queue is full, and if the worker is idle, its queue will end up empty (barring certain races).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can optimize this a little though, re-adding the worker to the queue could safely occur here as opposed to on the worker thread (via wake()), since the caller retains ownership of the worker's idling synchronization at this point in the execution.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to reflect #3 (comment).


// No idle threads. Our threads are either active or busy waiting
// Either way, submit the work item in a round robin fashion.
if (!getWorker().tryPost(std::forward<Handler>(handler)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we retry the next worker is this fails? Should we retry as many times as there are threads in the pool? If so, will we need another getWorker() function variant that guarantees retrieving the next thread in the round robin?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to include post retrying in another Pull Request. I have a ticket for it > COM-466. It would (hopefully 😉) be the last feature that we'd need to add to this thread pool for it to be consumable.


inline void Rouser::stop()
{
if (m_running_flag.exchange(false, std::memory_order_acq_rel))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably need a symmetrical exception here. in start function you throw exception when it is called on already started object. The opposite should be done here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented!

Comment thread include/thread_pool/worker.hpp Outdated
{
static thread_local size_t tss_id = std::numeric_limits<size_t>::max();
return &tss_id;
static thread_local std::atomic<size_t> tss_id(std::numeric_limits<size_t>::max());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does a thread local need to be atomic?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race between when this gets assigned (at the start of threadFunc) and the first post that occurs (which can query for tss_id).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My above comment is wrong, will update :)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

}

// If post is unsuccessful, we need to re-add the worker to the idle worker bag.
m_idle_workers.fill(result.second);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a race here? should we not let the thread add itself back once it is awake?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be safe, the previous version had the thread add itself back in, this just short circuits the condition variable overhead (see #3 (comment)).

Comment thread include/thread_pool/rouser.hpp Outdated

inline void Rouser::stop()
{
if (!m_started_flag.load(std::memory_order_acquire))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think calling stop() shouldn't throw... in general there is an intractable race in this scenario, so would prefer to always allow stop() to succeed if already stopped (since there is no cost or allocation of resources)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted!

* in worker queues. The second is that it increases the likelihood of at least one worker busy-waiting at any
* point in time, which speeds up task processing response time.
*/
class Rouser final
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add a destructor to this class and stop the thread it inside if it's not stopped yet.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented!

@PoyangLiu PoyangLiu removed their request for review March 19, 2018 15:35
@mmthomas
Copy link
Copy Markdown

Looks good!

@SeverTopan SeverTopan merged commit c48dd19 into synaptive/master Mar 20, 2018
@SeverTopan SeverTopan deleted the synaptive/dev/sever/COM-464_DeepSleepWithRouser branch March 20, 2018 14:54
template <typename Task, template<typename> class Queue>
void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);

std::atomic<bool> m_running_flag;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general recommendation with designing this type of logic controlled with flags. You could alternatively have a single atomic variable which represents the states of the object. When you have one variable representing the state it's easier to control transitions between the states with exchange and compare exchange operations and easier to reason about the state.

For this class you can model the following states:
not started
start requested
running
stop requested
stopped

This is not a necessary change now, but if you have time later you can experiment with it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning for keeping the states separate was that the m_running_flag check in the threadFunc loop would be faster with boolean comparison versus an enum.

Not sure if the performance gain is worth the hit to readability?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur with Hayk. Modeling this with an enum rather than a bunch of boolean flags would make your life much easier. The difference in performance should be so small as to be negligible, AFAIK.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants