@@ -38,7 +38,7 @@ namespace phlex::experimental {
3838 declared_fold (algorithm_name name,
3939 std::vector<std::string> predicates,
4040 specified_labels input_products);
41- virtual ~declared_fold ();
41+ ~declared_fold () override ;
4242
4343 virtual tbb::flow::sender<message>& sender () = 0;
4444 virtual tbb::flow::sender<message>& to_output () = 0;
@@ -55,10 +55,10 @@ namespace phlex::experimental {
5555 class fold_node : public declared_fold , private count_stores {
5656 using all_parameter_types = typename AlgorithmBits::input_parameter_types;
5757 using input_parameter_types = skip_first_type<all_parameter_types>; // Skip fold object
58- static constexpr auto N = std::tuple_size_v<input_parameter_types>;
59- using R = std::decay_t <std::tuple_element_t <0 , all_parameter_types>>;
58+ static constexpr auto number_inputs = std::tuple_size_v<input_parameter_types>;
59+ using result_type = std::decay_t <std::tuple_element_t <0 , all_parameter_types>>;
6060
61- static constexpr std::size_t M = 1 ; // hard-coded for now
61+ static constexpr std::size_t number_output_products = 1 ; // hard-coded for now
6262 using function_t = typename AlgorithmBits::bound_type;
6363
6464 public:
@@ -75,10 +75,11 @@ namespace phlex::experimental {
7575 initializer_{std::move (initializer)},
7676 output_{to_qualified_names (full_name (), std::move (output))},
7777 partition_{std::move (partition)},
78- join_{make_join_or_none (g, std::make_index_sequence<N >{})},
78+ join_{make_join_or_none (g, std::make_index_sequence<number_inputs >{})},
7979 fold_{g,
8080 concurrency,
81- [this , ft = alg.release_algorithm ()](messages_t <N> const & messages, auto & outputs) {
81+ [this , ft = alg.release_algorithm ()](messages_t <number_inputs> const & messages,
82+ auto & outputs) {
8283 // N.B. The assumption is that a fold will *never* need to cache
8384 // the product store it creates. Any flush messages *do not* need
8485 // to be propagated to downstream nodes.
@@ -104,13 +105,13 @@ namespace phlex::experimental {
104105 if (store->is_flush ()) {
105106 counter_for (id_hash_for_counter).set_flush_value (store, original_message_id);
106107 } else {
107- call (ft, messages, std::make_index_sequence<N >{});
108+ call (ft, messages, std::make_index_sequence<number_inputs >{});
108109 counter_for (id_hash_for_counter).increment (store->id ()->level_hash ());
109110 }
110111
111112 if (auto counter = done_with (id_hash_for_counter)) {
112113 auto parent = fold_store->make_continuation (this ->full_name ());
113- commit_ (*parent);
114+ commit (*parent);
114115 ++product_count_;
115116 // FIXME: This msg.eom value may be wrong!
116117 get<0 >(outputs).try_put ({parent, msg.eom , counter->original_message_id ()});
@@ -123,17 +124,22 @@ namespace phlex::experimental {
123124 private:
124125 tbb::flow::receiver<message>& port_for (specified_label const & product_label) override
125126 {
126- return receiver_for<N >(join_, input (), product_label);
127+ return receiver_for<number_inputs >(join_, input (), product_label);
127128 }
128129
129- std::vector<tbb::flow::receiver<message>*> ports () override { return input_ports<N>(join_); }
130+ std::vector<tbb::flow::receiver<message>*> ports () override
131+ {
132+ return input_ports<number_inputs>(join_);
133+ }
130134
131135 tbb::flow::sender<message>& sender () override { return output_port<0ull >(fold_); }
132136 tbb::flow::sender<message>& to_output () override { return sender (); }
133137 qualified_names const & output () const override { return output_; }
134138
135139 template <std::size_t ... Is>
136- void call (function_t const & ft, messages_t <N> const & messages, std::index_sequence<Is...>)
140+ void call (function_t const & ft,
141+ messages_t <number_inputs> const & messages,
142+ std::index_sequence<Is...>)
137143 {
138144 auto const & parent_id = *most_derived (messages).store ->id ()->parent (partition_);
139145 // FIXME: Not the safest approach!
@@ -142,8 +148,7 @@ namespace phlex::experimental {
142148 it =
143149 results_
144150 .insert ({parent_id,
145- initialized_object (std::move (initializer_),
146- std::make_index_sequence<std::tuple_size_v<InitTuple>>{})})
151+ initialized_object (std::make_index_sequence<std::tuple_size_v<InitTuple>>{})})
147152 .first ;
148153 }
149154 ++calls_;
@@ -154,13 +159,12 @@ namespace phlex::experimental {
154159 std::size_t product_count () const final { return product_count_.load (); }
155160
156161 template <size_t ... Is>
157- auto initialized_object (InitTuple&& tuple, std::index_sequence<Is...>) const
162+ auto initialized_object (std::index_sequence<Is...>) const
158163 {
159- return std::unique_ptr<R>{
160- new R{std::forward<std::tuple_element_t <Is, InitTuple>>(std::get<Is>(tuple))...}};
164+ return std::unique_ptr<result_type>{new result_type{std::get<Is>(initializer_)...}};
161165 }
162166
163- void commit_ (product_store& store)
167+ void commit (product_store& store)
164168 {
165169 auto & result = results_.at (*store.id ());
166170 if constexpr (requires { send (*result); }) {
@@ -177,9 +181,9 @@ namespace phlex::experimental {
177181 input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
178182 qualified_names output_;
179183 std::string partition_;
180- join_or_none_t <N > join_;
181- tbb::flow::multifunction_node<messages_t <N >, messages_t <1 >> fold_;
182- tbb::concurrent_unordered_map<level_id, std::unique_ptr<R >> results_;
184+ join_or_none_t <number_inputs > join_;
185+ tbb::flow::multifunction_node<messages_t <number_inputs >, messages_t <1 >> fold_;
186+ tbb::concurrent_unordered_map<level_id, std::unique_ptr<result_type >> results_;
183187 std::atomic<std::size_t > calls_;
184188 std::atomic<std::size_t > product_count_;
185189 };
0 commit comments