-
Notifications
You must be signed in to change notification settings - Fork 4
COM-464 Deep Sleep with Rouser #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c505f23
49fc93e
28890fa
90dcd37
56dac93
9fb102a
e81af46
f8b7ac9
7017c70
dc351e6
1e48afd
d50c57e
6666a22
05877d9
99bfe6c
d33cc97
acd28d3
f84cce8
395b7ba
c51a63f
56590c1
208cd4c
3411673
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,7 +52,7 @@ namespace tp | |
| * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue | ||
| */ | ||
| template <typename T> | ||
| class MPMCBoundedQueue | ||
| class MPMCBoundedQueue final | ||
| { | ||
| static_assert( | ||
| std::is_move_constructible<T>::value, "Should be of movable type"); | ||
|
|
@@ -65,6 +65,16 @@ class MPMCBoundedQueue | |
| */ | ||
| explicit MPMCBoundedQueue(size_t size); | ||
|
|
||
| /** | ||
| * @brief Copy ctor implementation. | ||
| */ | ||
| MPMCBoundedQueue(MPMCBoundedQueue const&) = delete; | ||
|
|
||
| /** | ||
| * @brief Copy assignment implementation. | ||
| */ | ||
| MPMCBoundedQueue& operator=(MPMCBoundedQueue const& rhs) = delete; | ||
|
|
||
| /** | ||
| * @brief Move ctor implementation. | ||
| */ | ||
|
|
@@ -76,18 +86,23 @@ class MPMCBoundedQueue | |
| MPMCBoundedQueue& operator=(MPMCBoundedQueue&& rhs) noexcept; | ||
|
|
||
| /** | ||
| * @brief push Push data to queue. | ||
| * @param data Data to be pushed. | ||
| * @return true on success. | ||
| */ | ||
| * @brief MPMCBoundedQueue destructor. | ||
| */ | ||
| ~MPMCBoundedQueue() = default; | ||
|
|
||
| /** | ||
| * @brief push Push data to queue. | ||
| * @param data Data to be pushed. | ||
| * @return true on success. | ||
| */ | ||
| template <typename U> | ||
| bool push(U&& data); | ||
|
|
||
| /** | ||
| * @brief pop Pop data from queue. | ||
| * @param data Place to store popped data. | ||
| * @return true on sucess. | ||
| */ | ||
| * @brief pop Pop data from queue. | ||
| * @param data Place to store popped data. | ||
| * @return true on sucess. | ||
| */ | ||
| bool pop(T& data); | ||
|
|
||
| private: | ||
|
|
@@ -115,7 +130,6 @@ class MPMCBoundedQueue | |
| } | ||
| }; | ||
|
|
||
| private: | ||
| typedef char Cacheline[64]; | ||
|
|
||
| Cacheline pad0; | ||
|
|
@@ -151,7 +165,7 @@ inline MPMCBoundedQueue<T>::MPMCBoundedQueue(size_t size) | |
| template <typename T> | ||
| inline MPMCBoundedQueue<T>::MPMCBoundedQueue(MPMCBoundedQueue&& rhs) noexcept | ||
| { | ||
| *this = rhs; | ||
| *this = std::move(rhs); | ||
| } | ||
|
|
||
| template <typename T> | ||
|
|
@@ -160,10 +174,11 @@ inline MPMCBoundedQueue<T>& MPMCBoundedQueue<T>::operator=(MPMCBoundedQueue&& rh | |
| if (this != &rhs) | ||
| { | ||
| m_buffer = std::move(rhs.m_buffer); | ||
| m_buffer_mask = std::move(rhs.m_buffer_mask); | ||
| m_buffer_mask = rhs.m_buffer_mask; | ||
| m_enqueue_pos = rhs.m_enqueue_pos.load(); | ||
| m_dequeue_pos = rhs.m_dequeue_pos.load(); | ||
| } | ||
|
|
||
| return *this; | ||
| } | ||
|
|
||
|
|
@@ -172,27 +187,24 @@ template <typename U> | |
| inline bool MPMCBoundedQueue<T>::push(U&& data) | ||
| { | ||
| Cell* cell; | ||
| size_t pos = m_enqueue_pos.load(std::memory_order_relaxed); | ||
| size_t pos = m_enqueue_pos.load(std::memory_order_acquire); | ||
| for(;;) | ||
| { | ||
| cell = &m_buffer[pos & m_buffer_mask]; | ||
| size_t seq = cell->sequence.load(std::memory_order_acquire); | ||
| intptr_t dif = (intptr_t)seq - (intptr_t)pos; | ||
| if(dif == 0) | ||
| { | ||
| if(m_enqueue_pos.compare_exchange_weak( | ||
| pos, pos + 1, std::memory_order_relaxed)) | ||
| { | ||
| if(m_enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel)) | ||
| break; | ||
| } | ||
| } | ||
| else if(dif < 0) | ||
| { | ||
| return false; | ||
| } | ||
| else | ||
| { | ||
| pos = m_enqueue_pos.load(std::memory_order_relaxed); | ||
| pos = m_enqueue_pos.load(std::memory_order_acquire); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -207,34 +219,30 @@ template <typename T> | |
| inline bool MPMCBoundedQueue<T>::pop(T& data) | ||
| { | ||
| Cell* cell; | ||
| size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); | ||
| size_t pos = m_dequeue_pos.load(std::memory_order_acquire); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same here, thanks for changing |
||
| for(;;) | ||
| { | ||
| cell = &m_buffer[pos & m_buffer_mask]; | ||
| size_t seq = cell->sequence.load(std::memory_order_acquire); | ||
| intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); | ||
| if(dif == 0) | ||
| { | ||
| if(m_dequeue_pos.compare_exchange_weak( | ||
| pos, pos + 1, std::memory_order_relaxed)) | ||
| { | ||
| if(m_dequeue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel)) | ||
| break; | ||
| } | ||
| } | ||
| else if(dif < 0) | ||
| { | ||
| return false; | ||
| } | ||
| else | ||
| { | ||
| pos = m_dequeue_pos.load(std::memory_order_relaxed); | ||
| pos = m_dequeue_pos.load(std::memory_order_acquire); | ||
| } | ||
| } | ||
|
|
||
| data = std::move(cell->data); | ||
|
|
||
| cell->sequence.store( | ||
| pos + m_buffer_mask + 1, std::memory_order_release); | ||
| cell->sequence.store(pos + m_buffer_mask + 1, std::memory_order_release); | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| #pragma once | ||
|
|
||
| #include <thread_pool/slotted_bag.hpp> | ||
| #include <thread_pool/thread_pool_options.hpp> | ||
| #include <thread_pool/worker.hpp> | ||
|
|
||
| #include <atomic> | ||
| #include <thread> | ||
| #include <limits> | ||
| #include <mutex> | ||
| #include <chrono> | ||
| #include <condition_variable> | ||
|
|
||
| namespace tp | ||
| { | ||
|
|
||
| /** | ||
| * @brief Rouser is a specialized worker that periodically wakes other workers. | ||
| * @detail This serves two purposes: | ||
| * The first is that it emplaces an upper bound on the processing time of tasks in the thread pool, since | ||
| * it is otherwise possible for the thread pool to enter a state where all threads are asleep, and tasks exist | ||
| * in worker queues. The second is that it increases the likelihood of at least one worker busy-waiting at any | ||
| * point in time, which speeds up task processing response time. | ||
| */ | ||
| class Rouser final | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to add a destructor to this class and stop the thread it inside if it's not stopped yet.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented! |
||
| { | ||
| public: | ||
| /** | ||
| * @brief Worker Constructor. | ||
| */ | ||
| Rouser(std::chrono::microseconds rouse_period); | ||
|
|
||
| /** | ||
| * @brief Copy ctor implementation. | ||
| */ | ||
| Rouser(Rouser const&) = delete; | ||
|
|
||
| /** | ||
| * @brief Copy assignment implementation. | ||
| */ | ||
| Rouser& operator=(Rouser const& rhs) = delete; | ||
|
|
||
| /** | ||
| * @brief Move ctor implementation. | ||
| * @note Be very careful when invoking this while the thread pool is | ||
| * active, or in an otherwise undefined state. | ||
| */ | ||
| Rouser(Rouser&& rhs) noexcept; | ||
|
|
||
| /** | ||
| * @brief Move assignment implementaion. | ||
| * @note Be very careful when invoking this while the thread pool is | ||
| * active, or in an otherwise undefined state. | ||
| */ | ||
| Rouser& operator=(Rouser&& rhs) noexcept; | ||
|
|
||
| /** | ||
| * @brief Destructor implementation. | ||
| */ | ||
| ~Rouser(); | ||
|
|
||
| /** | ||
| * @brief start Create the executing thread and start tasks execution. | ||
| * @param workers A pointer to the vector containing sibling workers for performing round robin work stealing. | ||
| * @param idle_workers A pointer to the slotted bag containing all idle workers. | ||
| * @param num_busy_waiters A pointer to the atomic busy waiter counter. | ||
| * @note The parameters passed into this function generally relate to the global thread pool state. | ||
| */ | ||
| template <typename Task, template<typename> class Queue> | ||
| void start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters); | ||
|
|
||
| /** | ||
| * @brief stop Stop all worker's thread and stealing activity. | ||
| * Waits until the executing thread becomes finished. | ||
| */ | ||
| void stop(); | ||
|
|
||
| private: | ||
|
|
||
| /** | ||
| * @brief threadFunc Executing thread function. | ||
| * @param workers A pointer to the vector containing sibling workers for performing round robin work stealing. | ||
| * @param idle_workers A pointer to the slotted bag containing all idle workers. | ||
| * @param num_busy_waiters A pointer to the atomic busy waiter counter. | ||
| */ | ||
| template <typename Task, template<typename> class Queue> | ||
| void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters); | ||
|
|
||
| std::atomic<bool> m_running_flag; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a general recommendation with designing this type of logic controlled with flags. You could alternatively have a single atomic variable which represents the states of the object. When you have one variable representing the state it's easier to control transitions between the states with exchange and compare exchange operations and easier to reason about the state. For this class you can model the following states: This is not a necessary change now, but if you have time later you can experiment with it.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My reasoning for keeping the states separate was that the Not sure if the performance gain is worth the hit to readability?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I concur with Hayk. Modeling this with an enum rather than a bunch of boolean flags would make your life much easier. The difference in performance should be so small as to be negligible, AFAIK.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 will change. |
||
| std::atomic<bool> m_started_flag; | ||
| std::thread m_thread; | ||
| std::chrono::microseconds m_rouse_period; | ||
| }; | ||
|
|
||
| inline Rouser::Rouser(std::chrono::microseconds rouse_period) | ||
| : m_running_flag(false) | ||
| , m_started_flag(false) | ||
| , m_rouse_period(std::move(rouse_period)) | ||
| { | ||
| } | ||
|
|
||
| inline Rouser::Rouser(Rouser&& rhs) noexcept | ||
| { | ||
| *this = std::move(rhs); | ||
| } | ||
|
|
||
| inline Rouser& Rouser::operator=(Rouser&& rhs) noexcept | ||
| { | ||
| if (this != &rhs) | ||
| { | ||
| m_running_flag = rhs.m_running_flag.load(); | ||
| m_started_flag = rhs.m_started_flag.load(); | ||
| m_thread = std::move(rhs.m_thread); | ||
| m_rouse_period = std::move(rhs.m_rouse_period); | ||
| } | ||
|
|
||
| return *this; | ||
| } | ||
|
|
||
| inline Rouser::~Rouser() | ||
| { | ||
| stop(); | ||
| } | ||
|
|
||
| template <typename Task, template<typename> class Queue> | ||
| inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters) | ||
| { | ||
| if (m_started_flag.exchange(true, std::memory_order_acq_rel)) | ||
| throw std::runtime_error("The Rouser has already been started."); | ||
|
|
||
| m_running_flag.store(true, std::memory_order_release); | ||
| m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters)); | ||
| } | ||
|
|
||
| inline void Rouser::stop() | ||
| { | ||
| if (m_running_flag.exchange(false, std::memory_order_acq_rel)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably need a symmetrical exception here. in start function you throw exception when it is called on already started object. The opposite should be done here.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented! |
||
| m_thread.join(); | ||
| } | ||
|
|
||
|
|
||
| template <typename Task, template<typename> class Queue> | ||
| inline void Rouser::threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters) | ||
| { | ||
| while (m_running_flag.load(std::memory_order_acquire)) | ||
| { | ||
| // Try to wake up a thread if there are no current busy waiters. | ||
| if (num_busy_waiters.load(std::memory_order_acquire) == 0) | ||
| { | ||
| auto result = idle_workers.tryEmptyAny(); | ||
| if (result.first) | ||
| workers[result.second]->wake(); | ||
| } | ||
|
|
||
| // Sleep. | ||
| std::this_thread::sleep_for(m_rouse_period); | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for changing the memory orders