COM-464 Deep Sleep with Rouser#3
Conversation
Busy wait behavious can now be configured in the task options. Constructors now use rvalues.
| */ | ||
| * @brief push Push data to queue. | ||
| * @param data Data to be pushed. | ||
| * @param is_strong false if we wish to allow spurious failures |
| */ | ||
| * @brief pop Pop data from queue. | ||
| * @param data Place to store popped data. | ||
| * @param is_strong false if we wish to allow spurious failures |
| }; | ||
|
|
||
| inline Rouser::Rouser(std::chrono::microseconds rouse_period) | ||
| : m_running_flag(true) |
There was a problem hiding this comment.
should this be true here? or false, and set to true in start().
There was a problem hiding this comment.
Fixed, Rouser and Workers are now robust to repeated start/stop cycles.
| template <typename Task, template<typename> class Queue> | ||
| inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>* workers, SlottedBag<Queue>* idle_workers, std::atomic<size_t>* num_busy_waiters) | ||
| { | ||
| m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, workers, idle_workers, num_busy_waiters); |
There was a problem hiding this comment.
on repeated calls to start/stop/start/stop, the m_running_flag state will never be reset back to true.
If we only allow one transition from true to false, then consider throwing here if the value of m_running_flag is not true.
Otherwise, throw if it not false and set to true before starting the thread.
There was a problem hiding this comment.
👍 will add the throw. I think it's safe to assume we will not need to be able to restart thread pools.
The ability to restart also begs the question of what to do with the tasks left in the worker queues. Currently they're all just dropped.
|
|
||
| /** | ||
| * @brief tryEmptyAny Try to empty any slot in the bag. | ||
| * @return a pair containing true upon success along with the id of the emptied slot, false otherwise with an id of UINT_MAX. |
There was a problem hiding this comment.
UINT_MAX? or ((size_t) -1)? is there a constant for that? numeric_limits<size_t>::max()?
| */ | ||
| * @brief push Push data to queue. | ||
| * @param data Data to be pushed. | ||
| * @param is_strong false if we wish to allow spurious failures |
There was a problem hiding this comment.
Comment is outdated, remove the is_strong parameter definition
| */ | ||
| * @brief pop Pop data from queue. | ||
| * @param data Place to store popped data. | ||
| * @param is_strong false if we wish to allow spurious failures |
| } | ||
| } | ||
|
|
||
| template <typename T> |
There was a problem hiding this comment.
Why did you remove move constructor and move assignment operator? If you got confused with my comment in the previous review then it was for copy constructor/assignment only.
| { | ||
| Cell* cell; | ||
| size_t pos = m_enqueue_pos.load(std::memory_order_relaxed); | ||
| size_t pos = m_enqueue_pos.load(std::memory_order_acquire); |
| { | ||
| Cell* cell; | ||
| size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); | ||
| size_t pos = m_dequeue_pos.load(std::memory_order_acquire); |
| /** | ||
| * @brief Move ctor implementation. | ||
| */ | ||
| ThreadPoolImpl(ThreadPoolImpl&& rhs) noexcept = delete; |
There was a problem hiding this comment.
Move semantics is useful with this type of objects. Would be better if you restore the move constructor and assignment operator.
| { | ||
|
|
||
| template <typename Task, template<typename> class Queue> | ||
| class ThreadPoolImpl; |
There was a problem hiding this comment.
Just a stylistic comment. It's a common practice to use the Impl suffix when you implement "pimpl" idiom. For template implementations of classes you can use GenericThreadPool, BasicThreadPool, ThreadPoolTemplate, etc.
There was a problem hiding this comment.
This was the original naming scheme in the upstream. Will alter the naming.
There was a problem hiding this comment.
Updated to use ThreadPool and GenericThreadPool
| m_next_worker = rhs.m_next_worker.load(); | ||
| } | ||
| return *this; | ||
| m_rouser.stop(); |
There was a problem hiding this comment.
There might be an issue with order here. Needs to be double checked. Seems that stops should be in the inverse order of starts.
| return getWorker().tryPost(std::forward<Handler>(handler)); | ||
| // If there aren't busy waiters, let's see if we have any idling threads. | ||
| // These incur higher overhead to wake up than the busy waiters. | ||
| if (m_num_busy_waiters.load(std::memory_order_acquire) == 0) |
There was a problem hiding this comment.
This type of checks are not giving you any guarantee that when the execution reaches next line the condition still holds. This is the same type of issue that you had in the queue class in the previous version. If you want to have this type of decision points you need to look at them only as if the purpose is to decrease statistical likelihood of hitting a slow path.
You need to model what happens in two scenarios and make sure the logic is correct
- variable m_num_busy_waiters loads 0 but when the code enters the body of if statement state of the variable is modified by other thread
- variable m_num_busy_waiters loads other value than 0, execution jumps over if statement and then other thread changes it to 0
If in both cases behavior is what you expect then you are good.
There was a problem hiding this comment.
This part of the posting algorithm should be ok given the two points you mentioned. The busy waiter count check does not need to be strict. It just lowers the likelihood of two situations:
- Posting the task to the queue of an active worker when non-active workers are available in the pool.
- Hitting the slow-ish path of checking for idle threads (which requires a queue pop operation as opposed to just an atomic read).
|
|
||
| // We have to ensure that at least one thread is active after our submission. | ||
| // Threads could have transitioned into idling under our feet. We need to account for this. | ||
| if (m_num_busy_waiters.load(std::memory_order_acquire) == 0) |
There was a problem hiding this comment.
The same issue is present here. Please carefully model cases when other thread changes the state of the variable.
There was a problem hiding this comment.
Same as above; this is a soft check that lower the likelihood of task dropping and improves the response time of posted tasks. The Rouser thread enforces the strict constraint on dropped tasks.
There was a problem hiding this comment.
I realize the comments above this check are misleading, will update.
| */ | ||
| * @brief MPMCBoundedQueue destructor. | ||
| */ | ||
| virtual ~MPMCBoundedQueue() = default; |
There was a problem hiding this comment.
Is this class ever derived from? Are there any other virtual methods? It may be worth marking the class as final and removing the virtual dtor.
| /** | ||
| * @brief SlottedBag destructor. | ||
| */ | ||
| virtual ~SlottedBag() = default; |
There was a problem hiding this comment.
make class final and remove virtual methods?
| if (result.first) | ||
| { | ||
| auto success = m_workers[result.second]->tryPost(std::forward<Handler>(handler)); | ||
| m_workers[result.second]->wake(); |
There was a problem hiding this comment.
if success is false, is it even worth waking the worker?
There was a problem hiding this comment.
tryEmptyAny removes the worker from the idle worker bag, so once we fail to post we need to ensure that the worker will be re-added to the bag. The wake function spins the worker back up so that it does this.
Realistically this condition would be very low probability, the only time posting fails is when the queue is full, and if the worker is idle, its queue will end up empty (barring certain races).
There was a problem hiding this comment.
I think we can optimize this a little though, re-adding the worker to the queue could safely occur here as opposed to on the worker thread (via wake()), since the caller retains ownership of the worker's idling synchronization at this point in the execution.
|
|
||
| // No idle threads. Our threads are either active or busy waiting | ||
| // Either way, submit the work item in a round robin fashion. | ||
| if (!getWorker().tryPost(std::forward<Handler>(handler))) |
There was a problem hiding this comment.
should we retry the next worker is this fails? Should we retry as many times as there are threads in the pool? If so, will we need another getWorker() function variant that guarantees retrieving the next thread in the round robin?
There was a problem hiding this comment.
I'd like to include post retrying in another Pull Request. I have a ticket for it > COM-466. It would (hopefully 😉) be the last feature that we'd need to add to this thread pool for it to be consumable.
|
|
||
| inline void Rouser::stop() | ||
| { | ||
| if (m_running_flag.exchange(false, std::memory_order_acq_rel)) |
There was a problem hiding this comment.
You probably need a symmetrical exception here. in start function you throw exception when it is called on already started object. The opposite should be done here.
| { | ||
| static thread_local size_t tss_id = std::numeric_limits<size_t>::max(); | ||
| return &tss_id; | ||
| static thread_local std::atomic<size_t> tss_id(std::numeric_limits<size_t>::max()); |
There was a problem hiding this comment.
why does a thread local need to be atomic?
There was a problem hiding this comment.
There is a race between when this gets assigned (at the start of threadFunc) and the first post that occurs (which can query for tss_id).
There was a problem hiding this comment.
My above comment is wrong, will update :)
| } | ||
|
|
||
| // If post is unsuccessful, we need to re-add the worker to the idle worker bag. | ||
| m_idle_workers.fill(result.second); |
There was a problem hiding this comment.
is there a race here? should we not let the thread add itself back once it is awake?
There was a problem hiding this comment.
This should be safe, the previous version had the thread add itself back in, this just short circuits the condition variable overhead (see #3 (comment)).
|
|
||
| inline void Rouser::stop() | ||
| { | ||
| if (!m_started_flag.load(std::memory_order_acquire)) |
There was a problem hiding this comment.
Actually, I think calling stop() shouldn't throw... in general there is an intractable race in this scenario, so would prefer to always allow stop() to succeed if already stopped (since there is no cost or allocation of resources)
| * in worker queues. The second is that it increases the likelihood of at least one worker busy-waiting at any | ||
| * point in time, which speeds up task processing response time. | ||
| */ | ||
| class Rouser final |
There was a problem hiding this comment.
You need to add a destructor to this class and stop the thread it inside if it's not stopped yet.
|
Looks good! |
| template <typename Task, template<typename> class Queue> | ||
| void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters); | ||
|
|
||
| std::atomic<bool> m_running_flag; |
There was a problem hiding this comment.
Just a general recommendation with designing this type of logic controlled with flags. You could alternatively have a single atomic variable which represents the states of the object. When you have one variable representing the state it's easier to control transitions between the states with exchange and compare exchange operations and easier to reason about the state.
For this class you can model the following states:
not started
start requested
running
stop requested
stopped
This is not a necessary change now, but if you have time later you can experiment with it.
There was a problem hiding this comment.
My reasoning for keeping the states separate was that the m_running_flag check in the threadFunc loop would be faster with boolean comparison versus an enum.
Not sure if the performance gain is worth the hit to readability?
There was a problem hiding this comment.
I concur with Hayk. Modeling this with an enum rather than a bunch of boolean flags would make your life much easier. The difference in performance should be so small as to be negligible, AFAIK.
A change set performed in response to the feedback on #2. This pull request basically ups the robustness of the pull request.
Problems addressed:
std::memory_order_relaxedin response to Hayk's comments.SlottedBag::tryEmptyAny.On the performance side, I don't have exact numbers, but based off preliminary results this updated PR is slightly faster than #2 due to the removal of spin-locking. Overall still a major performance improvement over the old pplx scheduler.