Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions include/unifex/let_done.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,22 @@ class _op<Source, Done, Receiver>::type {

public:
template <typename Done2, typename Receiver2>
explicit type(Source&& source, Done2&& done, Receiver2&& dest)
explicit type(
Source&& source, // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
Done2&& done, Receiver2&& dest)
noexcept(std::is_nothrow_move_constructible_v<Receiver> &&
std::is_nothrow_move_constructible_v<Done> &&
is_nothrow_connectable_v<Source, source_receiver>)
: done_((Done2&&)done)
, receiver_((Receiver2&&)dest)
{
// Note: 'Source' is not a forwarding reference since it's not deduced
// in this constructor. It can either be a Sender&& or Sender& for
// some concrete type Sender. Here, we want the forwarding behavior when
// the operation is constructed based on the type of Source, even though
// it's not a idiomatic use for std::forward.
unifex::activate_union_member_with(sourceOp_, [&] {
return unifex::connect((Source&&)source, source_receiver{this});
return unifex::connect(std::forward<Source>(source), source_receiver{this});
});
startedOp_ = 0 + 1;
}
Expand Down
15 changes: 11 additions & 4 deletions include/unifex/let_error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,21 @@ class _op<Source, Func, Receiver>::type final {

public:
template <typename Func2, typename Receiver2>
explicit type(Source&& source, Func2&& func, Receiver2&& dest) noexcept(
explicit type(
Source&& source, // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
Func2&& func, Receiver2&& dest) noexcept(
std::is_nothrow_constructible_v<Receiver, Receiver2&&>&&
std::is_nothrow_constructible_v<Func, Func2&&>&&
is_nothrow_connectable_v<Source, source_receiver>)
: func_((Func2 &&) func)
, receiver_((Receiver2 &&) dest) {
// Note: 'Source' is not a forwarding reference since it's not deduced
// in this constructor. It can either be a Sender&& or Sender& for
// some concrete type Sender. Here, we want the forwarding behavior when
// the operation is constructed based on the type of Source, even though
// it's not a idiomatic use for std::forward.
unifex::activate_union_member_with(sourceOp_, [&] {
return unifex::connect((Source &&) source, source_receiver{this});
return unifex::connect(std::forward<Source>(source), source_receiver{this});
});
}

Expand Down Expand Up @@ -418,8 +425,8 @@ class _sndr<Source, Func>::type final {
is_nothrow_connectable_v<member_t<Sender, Source>, SourceReceiver> &&
std::is_nothrow_constructible_v<Func, member_t<Sender, Func>> &&
std::is_nothrow_constructible_v<remove_cvref_t<Receiver>, Receiver>)
-> operation_type<Source, Func, Receiver> {
return operation_type<Source, Func, Receiver>{
-> operation_type<member_t<Sender, Source>, Func, Receiver> {
return operation_type<member_t<Sender, Source> , Func, Receiver>{
static_cast<Sender&&>(s).source_,
static_cast<Sender&&>(s).func_,
static_cast<Receiver&&>(r)
Expand Down
11 changes: 6 additions & 5 deletions include/unifex/let_value.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <functional>
#include <tuple>
#include <type_traits>
#include <utility>

#include <unifex/detail/prologue.hpp>

Expand Down Expand Up @@ -85,7 +86,7 @@ struct _successor_receiver<Operation, Values...>::type {
void set_error(Error error) && noexcept {
auto& op = op_;
cleanup();
unifex::set_error(std::move(op.receiver_), (Error &&) error);
unifex::set_error(std::move(op.receiver_), std::move(error));
}

private:
Expand Down Expand Up @@ -178,7 +179,7 @@ struct _predecessor_receiver<Operation>::type {
void set_error(Error error) && noexcept {
auto& op = op_;
unifex::deactivate_union_member(op.predOp_);
unifex::set_error(std::move(op.receiver_), (Error &&) error);
unifex::set_error(std::move(op.receiver_), std::move(error));
}

template(typename CPO)
Expand Down Expand Up @@ -227,16 +228,16 @@ struct _op<Predecessor, SuccessorFactory, Receiver>::type {
template <typename Operation, typename... Values>
friend struct _successor_receiver;

template <typename SuccessorFactory2, typename Receiver2>
template <typename Predecessor2, typename SuccessorFactory2, typename Receiver2>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above in let_done and let_error. I think lines 378-379 are hand-rolling the member_t<Self, Predecessor> type alias with this:

decltype((static_cast<Sender&&>(sender).pred_)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lines 378-379 are hand-rolling the member_t<Self, Predecessor> type alias with this:

Yep, I noticed this too, and added a test for "l value connectable"-ness though nothing had to be changed for this test to pass.

explicit type(
Predecessor&& pred,
Predecessor2&& pred,
SuccessorFactory2&& func,
Receiver2&& receiver)
: func_((SuccessorFactory2 &&) func),
receiver_((Receiver2 &&) receiver) {
unifex::activate_union_member_with(predOp_, [&] {
return unifex::connect(
(Predecessor &&) pred, predecessor_receiver<operation>{*this});
std::forward<Predecessor2>(pred), predecessor_receiver<operation>{*this});
});
}

Expand Down
8 changes: 5 additions & 3 deletions include/unifex/let_value_with.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <unifex/execution_policy.hpp>
#include <unifex/type_list.hpp>

#include <utility>

#include <unifex/detail/prologue.hpp>

namespace unifex {
Expand Down Expand Up @@ -100,12 +102,12 @@ template<typename StateFactory, typename SuccessorFactory, typename Receiver>
struct _operation<StateFactory, SuccessorFactory, Receiver>::type {
template <typename StateFactory2, typename SuccessorFactory2, typename Receiver2>
type(StateFactory2&& stateFactory, SuccessorFactory2&& func, Receiver2&& r) :
stateFactory_(static_cast<StateFactory2&&>(stateFactory)),
stateFactory_(std::forward<StateFactory2>(stateFactory)),
func_(static_cast<SuccessorFactory2&&>(func)),
state_(static_cast<StateFactory&&>(stateFactory_)()),
state_(std::move(stateFactory_)()),
innerOp_(
unifex::connect(
static_cast<SuccessorFactory&&>(func_)(state_),
std::move(func_)(state_),
static_cast<Receiver2&&>(r))) {
}

Expand Down
4 changes: 3 additions & 1 deletion include/unifex/let_value_with_stop_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <unifex/execution_policy.hpp>
#include <unifex/type_list.hpp>

#include <utility>

#include <unifex/detail/prologue.hpp>

namespace unifex {
Expand Down Expand Up @@ -165,7 +167,7 @@ struct _stop_source_operation<SuccessorFactory, Receiver>::type {
nothrow_connectable) {
return unifex::connect(
static_cast<SuccessorFactory&&>(func)(stopSource_),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a move, too, which might mean it should be taken by rvalue reference.

Suggested change
static_cast<SuccessorFactory&&>(func)(stopSource_),
std::move(func)(stopSource_),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SuccessorFactory&& also seems like a "saved for later forwarding reference," although it's used differently than in the other algos. Do we need func_ to remain alive for the duration of the operation? In the operation constructor, we currently have (annotated with // comment)

template <typename SuccessorFactory2, typename Receiver2>
  type(SuccessorFactory2&& func, Receiver2&& r) noexcept(
      std::is_nothrow_constructible_v<SuccessorFactory, SuccessorFactory2>&&
          std::is_nothrow_constructible_v<Receiver, Receiver2>&& noexcept(
              connect_inner_op(func, (Receiver2 &&) r))) // borrow func, whether it's an lvalue ref or rvalue erf
    : func_{(SuccessorFactory2 &&) func} // forward func into func_
    , receiverToken_(get_stop_token(r))
    , innerOp_(connect_inner_op(func_, (Receiver2 &&) r)) {} // borrow func_, but don't move so that func_ remains alive

SuccessorFactory& in connect_inner_op seems correct so that it borrows but doesn't possibly move from.

receiver_t{*this, static_cast<Receiver&&>(r)});
receiver_t{*this, std::move(r)});
}

public:
Expand Down
2 changes: 2 additions & 0 deletions include/unifex/let_value_with_stop_token.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <unifex/sender_concepts.hpp>
#include <unifex/type_list.hpp>

#include <utility>

#include <unifex/detail/prologue.hpp>

namespace unifex {
Expand Down
9 changes: 9 additions & 0 deletions test/let_done_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unifex/just.hpp>
#include <unifex/just_done.hpp>
#include <unifex/on.hpp>
#include <unifex/repeat_effect_until.hpp>
#include <unifex/then.hpp>
#include <unifex/sequence.hpp>
#include <unifex/stop_when.hpp>
Expand Down Expand Up @@ -104,3 +105,11 @@ TEST(TransformDone, WithValue) {
EXPECT_TRUE(multiple.has_value());
EXPECT_EQ(*multiple, std::tuple(42, 1, 2));
}

TEST(TransformDone, LvalueConnectable) {
int n = 0;
sync_wait(repeat_effect_until(
let_done(just(), [] { return just(); }),
[&n]() mutable noexcept { return n++ == 5; }));
EXPECT_EQ(n, 6);
}
9 changes: 9 additions & 0 deletions test/let_error_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unifex/let_done.hpp>
#include <unifex/let_error.hpp>
#include <unifex/on.hpp>
#include <unifex/repeat_effect_until.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/sequence.hpp>
#include <unifex/stop_when.hpp>
Expand Down Expand Up @@ -173,6 +174,14 @@ TEST(TransformError, SequenceFwd) {
EXPECT_EQ(*one, 42);
}

TEST(TransformError, LvalueConnectable) {
int n = 0;
sync_wait(repeat_effect_until(
let_error(just(), [](auto&&) { return just(); }),
[&n]() mutable noexcept { return n++ == 5; }));
EXPECT_EQ(n, 6);
}

#if !UNIFEX_NO_COROUTINES
TEST(TransformError, WithTask) {
auto value = let_error(
Expand Down
13 changes: 12 additions & 1 deletion test/let_value_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <unifex/just.hpp>
#include <unifex/let_value.hpp>

#include <unifex/just.hpp>
#include <unifex/let_done.hpp>
#include <unifex/let_value_with.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/repeat_effect_until.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/timed_single_thread_context.hpp>
#include <unifex/then.hpp>
Expand Down Expand Up @@ -281,3 +284,11 @@ TEST(Let, LetValueWithTraitlessPredecessor) {
ASSERT_TRUE(ret);
EXPECT_EQ(*ret, 42);
}

TEST(Let, LvalueConnectable) {
int n = 0;
sync_wait(repeat_effect_until(
let_value(let_done(just(), [] { return just(); }), [] { return just(); }),
[&n]() mutable noexcept { return n++ == 5; }));
EXPECT_EQ(n, 6);
}
51 changes: 51 additions & 0 deletions test/let_value_with_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include <unifex/let_value_with.hpp>

#include <unifex/just.hpp>
#include <unifex/let_value_with.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/then.hpp>
#include <unifex/timed_single_thread_context.hpp>
#include <chrono>
#include <iostream>
#include <optional>
#include <variant>

#include <gtest/gtest.h>

using namespace unifex;

namespace {
constexpr auto async = [](auto& context, auto&& func) {
return then(
schedule_after(context.get_scheduler(), std::chrono::milliseconds(10)),
(decltype(func))func);
};
}

TEST(LetValueWith, StatefulFactory) {
// Verifies the let_value_with operation state holds onto the
// Factory object until the operation is complete.
timed_single_thread_context context;
std::optional<int> result = sync_wait(
let_value(just(), [&] {
return let_value_with([x = std::make_unique<int>(10)]() -> int* { return x.get(); }, [&](int*& v) {
return async(context, [&v]() { return *v; });
});
})
);
ASSERT_TRUE(!!result);
EXPECT_EQ(*result, 10);
}

TEST(LetValueWith, CallOperatorRvalueRefOverload) {
struct Factory {
int operator()() && {
return 10;
}
};
std::optional<int> result = sync_wait(let_value_with(Factory{}, [&](int& v) {
return just(v);
}));
ASSERT_TRUE(!!result);
EXPECT_EQ(*result, 10);
}
37 changes: 37 additions & 0 deletions test/static_thread_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
#include <unifex/static_thread_pool.hpp>

#include <unifex/just.hpp>
#include <unifex/let_done.hpp>
#include <unifex/let_error.hpp>
#include <unifex/on.hpp>
#include <unifex/repeat_effect_until.hpp>
#include <unifex/scheduler_concepts.hpp>
#include <unifex/stop_when.hpp>
#include <unifex/sync_wait.hpp>
#include <unifex/then.hpp>
#include <unifex/when_all.hpp>
Expand Down Expand Up @@ -62,3 +66,36 @@ TEST(StaticThreadPool, Smoke) {

EXPECT_EQ(x, 3);
}

TEST(StaticThreadPool, ScheduleCancelationThreadSafety) {
static_thread_pool tpContext;
auto sch = tpContext.get_scheduler();

unifex::sync_wait(unifex::repeat_effect_until(
unifex::let_done(
unifex::stop_when(
unifex::repeat_effect(unifex::schedule(sch)),
unifex::schedule(sch)),
[] { return unifex::just(); }),
[n=0]() mutable noexcept { return n++ == 1000; }));

unifex::sync_wait(unifex::repeat_effect_until(
unifex::let_done(
unifex::let_error(
unifex::stop_when(
unifex::repeat_effect(unifex::schedule(sch)),
unifex::schedule(sch)),
[](auto&&) { return unifex::just(); }),
[] { return unifex::just(); }),
[n=0]() mutable noexcept { return n++ == 1000; }));

unifex::sync_wait(unifex::repeat_effect_until(
unifex::let_error(
unifex::let_done(
unifex::stop_when(
unifex::repeat_effect(unifex::schedule(sch)),
unifex::schedule(sch)),
[] { return unifex::just(); }),
[](auto&&) { return unifex::just(); }),
[n=0]() mutable noexcept { return n++ == 1000; }));
}