Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/thread_pool/fixed_function.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ template <typename SIGNATURE, size_t STORAGE_SIZE = 128>
class FixedFunction;

template <typename R, typename... ARGS, size_t STORAGE_SIZE>
class FixedFunction<R(ARGS...), STORAGE_SIZE>
class FixedFunction<R(ARGS...), STORAGE_SIZE> final
{

typedef R (*func_ptr_type)(ARGS...);
Expand Down
60 changes: 34 additions & 26 deletions include/thread_pool/mpmc_bounded_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ namespace tp
* http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
*/
template <typename T>
class MPMCBoundedQueue
class MPMCBoundedQueue final
{
static_assert(
std::is_move_constructible<T>::value, "Should be of movable type");
Expand All @@ -65,6 +65,16 @@ class MPMCBoundedQueue
*/
explicit MPMCBoundedQueue(size_t size);

/**
* @brief Copy ctor implementation.
*/
MPMCBoundedQueue(MPMCBoundedQueue const&) = delete;

/**
* @brief Copy assignment implementation.
*/
MPMCBoundedQueue& operator=(MPMCBoundedQueue const& rhs) = delete;

/**
* @brief Move ctor implementation.
*/
Expand All @@ -76,18 +86,23 @@ class MPMCBoundedQueue
MPMCBoundedQueue& operator=(MPMCBoundedQueue&& rhs) noexcept;

/**
* @brief push Push data to queue.
* @param data Data to be pushed.
* @return true on success.
*/
* @brief MPMCBoundedQueue destructor.
*/
~MPMCBoundedQueue() = default;

/**
* @brief push Push data to queue.
* @param data Data to be pushed.
* @return true on success.
*/
template <typename U>
bool push(U&& data);

/**
* @brief pop Pop data from queue.
* @param data Place to store popped data.
* @return true on sucess.
*/
* @brief pop Pop data from queue.
* @param data Place to store popped data.
* @return true on sucess.
*/
bool pop(T& data);

private:
Expand Down Expand Up @@ -115,7 +130,6 @@ class MPMCBoundedQueue
}
};

private:
typedef char Cacheline[64];

Cacheline pad0;
Expand Down Expand Up @@ -151,7 +165,7 @@ inline MPMCBoundedQueue<T>::MPMCBoundedQueue(size_t size)
template <typename T>
inline MPMCBoundedQueue<T>::MPMCBoundedQueue(MPMCBoundedQueue&& rhs) noexcept
{
*this = rhs;
*this = std::move(rhs);
}

template <typename T>
Expand All @@ -160,10 +174,11 @@ inline MPMCBoundedQueue<T>& MPMCBoundedQueue<T>::operator=(MPMCBoundedQueue&& rh
if (this != &rhs)
{
m_buffer = std::move(rhs.m_buffer);
m_buffer_mask = std::move(rhs.m_buffer_mask);
m_buffer_mask = rhs.m_buffer_mask;
m_enqueue_pos = rhs.m_enqueue_pos.load();
m_dequeue_pos = rhs.m_dequeue_pos.load();
}

return *this;
}

Expand All @@ -172,27 +187,24 @@ template <typename U>
inline bool MPMCBoundedQueue<T>::push(U&& data)
{
Cell* cell;
size_t pos = m_enqueue_pos.load(std::memory_order_relaxed);
size_t pos = m_enqueue_pos.load(std::memory_order_acquire);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changing the memory orders

for(;;)
{
cell = &m_buffer[pos & m_buffer_mask];
size_t seq = cell->sequence.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
if(dif == 0)
{
if(m_enqueue_pos.compare_exchange_weak(
pos, pos + 1, std::memory_order_relaxed))
{
if(m_enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel))
break;
}
}
else if(dif < 0)
{
return false;
}
else
{
pos = m_enqueue_pos.load(std::memory_order_relaxed);
pos = m_enqueue_pos.load(std::memory_order_acquire);
}
}

Expand All @@ -207,34 +219,30 @@ template <typename T>
inline bool MPMCBoundedQueue<T>::pop(T& data)
{
Cell* cell;
size_t pos = m_dequeue_pos.load(std::memory_order_relaxed);
size_t pos = m_dequeue_pos.load(std::memory_order_acquire);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here, thanks for changing

for(;;)
{
cell = &m_buffer[pos & m_buffer_mask];
size_t seq = cell->sequence.load(std::memory_order_acquire);
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
if(dif == 0)
{
if(m_dequeue_pos.compare_exchange_weak(
pos, pos + 1, std::memory_order_relaxed))
{
if(m_dequeue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_acq_rel))
break;
}
}
else if(dif < 0)
{
return false;
}
else
{
pos = m_dequeue_pos.load(std::memory_order_relaxed);
pos = m_dequeue_pos.load(std::memory_order_acquire);
}
}

data = std::move(cell->data);

cell->sequence.store(
pos + m_buffer_mask + 1, std::memory_order_release);
cell->sequence.store(pos + m_buffer_mask + 1, std::memory_order_release);

return true;
}
Expand Down
160 changes: 160 additions & 0 deletions include/thread_pool/rouser.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#pragma once

#include <thread_pool/slotted_bag.hpp>
#include <thread_pool/thread_pool_options.hpp>
#include <thread_pool/worker.hpp>

#include <atomic>
#include <thread>
#include <limits>
#include <mutex>
#include <chrono>
#include <condition_variable>

namespace tp
{

/**
* @brief Rouser is a specialized worker that periodically wakes other workers.
* @detail This serves two purposes:
* The first is that it emplaces an upper bound on the processing time of tasks in the thread pool, since
* it is otherwise possible for the thread pool to enter a state where all threads are asleep, and tasks exist
* in worker queues. The second is that it increases the likelihood of at least one worker busy-waiting at any
* point in time, which speeds up task processing response time.
*/
class Rouser final
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add a destructor to this class and stop the thread it inside if it's not stopped yet.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented!

{
public:
/**
* @brief Worker Constructor.
*/
Rouser(std::chrono::microseconds rouse_period);

/**
* @brief Copy ctor implementation.
*/
Rouser(Rouser const&) = delete;

/**
* @brief Copy assignment implementation.
*/
Rouser& operator=(Rouser const& rhs) = delete;

/**
* @brief Move ctor implementation.
* @note Be very careful when invoking this while the thread pool is
* active, or in an otherwise undefined state.
*/
Rouser(Rouser&& rhs) noexcept;

/**
* @brief Move assignment implementaion.
* @note Be very careful when invoking this while the thread pool is
* active, or in an otherwise undefined state.
*/
Rouser& operator=(Rouser&& rhs) noexcept;

/**
* @brief Destructor implementation.
*/
~Rouser();

/**
* @brief start Create the executing thread and start tasks execution.
* @param workers A pointer to the vector containing sibling workers for performing round robin work stealing.
* @param idle_workers A pointer to the slotted bag containing all idle workers.
* @param num_busy_waiters A pointer to the atomic busy waiter counter.
* @note The parameters passed into this function generally relate to the global thread pool state.
*/
template <typename Task, template<typename> class Queue>
void start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);

/**
* @brief stop Stop all worker's thread and stealing activity.
* Waits until the executing thread becomes finished.
*/
void stop();

private:

/**
* @brief threadFunc Executing thread function.
* @param workers A pointer to the vector containing sibling workers for performing round robin work stealing.
* @param idle_workers A pointer to the slotted bag containing all idle workers.
* @param num_busy_waiters A pointer to the atomic busy waiter counter.
*/
template <typename Task, template<typename> class Queue>
void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);

std::atomic<bool> m_running_flag;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a general recommendation with designing this type of logic controlled with flags. You could alternatively have a single atomic variable which represents the states of the object. When you have one variable representing the state it's easier to control transitions between the states with exchange and compare exchange operations and easier to reason about the state.

For this class you can model the following states:
not started
start requested
running
stop requested
stopped

This is not a necessary change now, but if you have time later you can experiment with it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning for keeping the states separate was that the m_running_flag check in the threadFunc loop would be faster with boolean comparison versus an enum.

Not sure if the performance gain is worth the hit to readability?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur with Hayk. Modeling this with an enum rather than a bunch of boolean flags would make your life much easier. The difference in performance should be so small as to be negligible, AFAIK.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will change.

std::atomic<bool> m_started_flag;
std::thread m_thread;
std::chrono::microseconds m_rouse_period;
};

inline Rouser::Rouser(std::chrono::microseconds rouse_period)
: m_running_flag(false)
, m_started_flag(false)
, m_rouse_period(std::move(rouse_period))
{
}

inline Rouser::Rouser(Rouser&& rhs) noexcept
{
*this = std::move(rhs);
}

inline Rouser& Rouser::operator=(Rouser&& rhs) noexcept
{
if (this != &rhs)
{
m_running_flag = rhs.m_running_flag.load();
m_started_flag = rhs.m_started_flag.load();
m_thread = std::move(rhs.m_thread);
m_rouse_period = std::move(rhs.m_rouse_period);
}

return *this;
}

inline Rouser::~Rouser()
{
stop();
}

template <typename Task, template<typename> class Queue>
inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
{
if (m_started_flag.exchange(true, std::memory_order_acq_rel))
throw std::runtime_error("The Rouser has already been started.");

m_running_flag.store(true, std::memory_order_release);
m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters));
}

inline void Rouser::stop()
{
if (m_running_flag.exchange(false, std::memory_order_acq_rel))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably need a symmetrical exception here. in start function you throw exception when it is called on already started object. The opposite should be done here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented!

m_thread.join();
}


template <typename Task, template<typename> class Queue>
inline void Rouser::threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
{
while (m_running_flag.load(std::memory_order_acquire))
{
// Try to wake up a thread if there are no current busy waiters.
if (num_busy_waiters.load(std::memory_order_acquire) == 0)
{
auto result = idle_workers.tryEmptyAny();
if (result.first)
workers[result.second]->wake();
}

// Sleep.
std::this_thread::sleep_for(m_rouse_period);
}
}

}
Loading