Skip to content

Commit 28beaa2

Browse files
authored
Adjust unfold so that it doesn't need a multiplexer (#95)
1 parent e79caf8 commit 28beaa2

File tree

3 files changed

+14
-69
lines changed

3 files changed

+14
-69
lines changed

phlex/core/declared_unfold.hpp

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include "phlex/core/fwd.hpp"
77
#include "phlex/core/input_arguments.hpp"
88
#include "phlex/core/message.hpp"
9-
#include "phlex/core/multiplexer.hpp"
109
#include "phlex/core/products_consumer.hpp"
1110
#include "phlex/core/store_counters.hpp"
1211
#include "phlex/model/algorithm_name.hpp"
@@ -61,11 +60,10 @@ namespace phlex::experimental {
6160
specified_labels input_products);
6261
virtual ~declared_unfold();
6362

63+
virtual tbb::flow::sender<message>& sender() = 0;
6464
virtual tbb::flow::sender<message>& to_output() = 0;
6565
virtual qualified_names const& output() const = 0;
66-
virtual void finalize(multiplexer::head_ports_t head_ports) = 0;
6766
virtual std::size_t product_count() const = 0;
68-
virtual multiplexer::head_ports_t const& downstream_ports() const = 0;
6967

7068
protected:
7169
using stores_t = tbb::concurrent_hash_map<level_id::hash_type, product_store_ptr>;
@@ -99,34 +97,33 @@ namespace phlex::experimental {
9997
declared_unfold{std::move(name), std::move(predicates), std::move(product_labels)},
10098
output_{to_qualified_names(full_name(), std::move(output_products))},
10199
new_level_name_{std::move(new_level_name)},
102-
multiplexer_{g},
103100
join_{make_join_or_none(g, std::make_index_sequence<N>{})},
104101
unfold_{
105102
g,
106103
concurrency,
107-
[this, p = std::move(predicate), ufold = std::move(unfold)](
108-
messages_t<N> const& messages) -> tbb::flow::continue_msg {
104+
[this, p = std::move(predicate), ufold = std::move(unfold)](messages_t<N> const& messages,
105+
auto& output) {
109106
auto const& msg = most_derived(messages);
110107
auto const& store = msg.store;
111108
if (store->is_flush()) {
112109
flag_for(store->id()->hash()).flush_received(msg.id);
110+
std::get<0>(output).try_put(msg);
113111
} else if (accessor a; stores_.insert(a, store->id()->hash())) {
114112
std::size_t const original_message_id{msg_counter_};
115113
generator g{msg.store, this->full_name(), new_level_name_};
116114
call(p, ufold, msg.store->id(), g, msg.eom, messages, std::make_index_sequence<N>{});
117-
multiplexer_.try_put({g.flush_store(), msg.eom, ++msg_counter_, original_message_id});
115+
116+
message const flush_msg{g.flush_store(), msg.eom, ++msg_counter_, original_message_id};
117+
std::get<0>(output).try_put(flush_msg);
118118
flag_for(store->id()->hash()).mark_as_processed();
119119
}
120120

121121
if (done_with(store)) {
122122
stores_.erase(store->id()->hash());
123123
}
124-
return {};
125-
}},
126-
to_output_{g}
124+
}}
127125
{
128126
make_edge(join_, unfold_);
129-
make_edge(to_output_, multiplexer_);
130127
}
131128

132129
~unfold_node() { report_cached_stores(stores_); }
@@ -138,19 +135,10 @@ namespace phlex::experimental {
138135
}
139136
std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports<N>(join_); }
140137

141-
tbb::flow::sender<message>& to_output() override { return to_output_; }
138+
tbb::flow::sender<message>& sender() override { return output_port<0>(unfold_); }
139+
tbb::flow::sender<message>& to_output() override { return sender(); }
142140
qualified_names const& output() const override { return output_; }
143141

144-
void finalize(multiplexer::head_ports_t head_ports) override
145-
{
146-
multiplexer_.finalize(std::move(head_ports));
147-
}
148-
149-
multiplexer::head_ports_t const& downstream_ports() const override
150-
{
151-
return multiplexer_.downstream_ports();
152-
}
153-
154142
template <std::size_t... Is>
155143
void call(Predicate const& predicate,
156144
Unfold const& unfold,
@@ -178,7 +166,8 @@ namespace phlex::experimental {
178166
}
179167
++product_count_;
180168
auto child = g.make_child_for(counter++, std::move(new_products));
181-
to_output_.try_put({child, eom->make_child(child->id()), ++msg_counter_});
169+
message const child_msg{child, eom->make_child(child->id()), ++msg_counter_};
170+
output_port<0>(unfold_).try_put(child_msg);
182171
}
183172
}
184173

@@ -188,10 +177,8 @@ namespace phlex::experimental {
188177
input_retriever_types<InputArgs> input_{input_arguments<InputArgs>()};
189178
qualified_names output_;
190179
std::string new_level_name_;
191-
multiplexer multiplexer_;
192180
join_or_none_t<N> join_;
193-
tbb::flow::function_node<messages_t<N>> unfold_;
194-
tbb::flow::broadcast_node<message> to_output_;
181+
tbb::flow::multifunction_node<messages_t<N>, messages_t<1u>> unfold_;
195182
tbb::concurrent_hash_map<level_id::hash_type, product_store_ptr> stores_;
196183
std::atomic<std::size_t> msg_counter_{}; // Is this sufficient? Probably not.
197184
std::atomic<std::size_t> calls_{};

phlex/core/edge_maker.hpp

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#define PHLEX_CORE_EDGE_MAKER_HPP
33

44
#include "phlex/core/declared_output.hpp"
5-
#include "phlex/core/declared_unfold.hpp"
65
#include "phlex/core/dot/attributes.hpp"
76
#include "phlex/core/dot/data_graph.hpp"
87
#include "phlex/core/dot/function_graph.hpp"
@@ -166,8 +165,6 @@ namespace phlex::experimental {
166165
make_edge(source, multi);
167166

168167
// Create edges to outputs
169-
auto& unfolds = std::get<consumers<declared_unfolds>&>(std::tie(cons...));
170-
171168
for (auto const& [output_name, output_node] : outputs) {
172169
make_edge(source, output_node->port());
173170
if (function_graph_) {
@@ -180,54 +177,15 @@ namespace phlex::experimental {
180177
function_graph_->edge(named_port.node.full(), output_name, {.color = "gray"});
181178
}
182179
}
183-
for (auto const& [unfold_name, unfold] : unfolds.data) {
184-
make_edge(unfold->to_output(), output_node->port());
185-
if (function_graph_) {
186-
function_graph_->edge(unfold_name, output_name, {.color = "gray"});
187-
}
188-
}
189180
}
190181

191182
// Create normal edges
192183
multiplexer::head_ports_t head_ports;
193184
(head_ports.merge(edges(filters, cons)), ...);
194185

195-
// Create head nodes for unfolds
196-
std::set<std::string> remove_ports_for_products;
197-
for (auto const& [name, unfold] : unfolds.data) {
198-
multiplexer::head_ports_t heads;
199-
for (auto const& product_name : unfold->output()) {
200-
// There can be multiple head nodes that require the same product.
201-
remove_ports_for_products.insert(product_name.full());
202-
for (auto const& [node_name, ports] : head_ports) {
203-
for (auto const& port : ports) {
204-
if (to_name(port.product_label) != product_name.name()) {
205-
continue;
206-
}
207-
heads[node_name].push_back(port);
208-
}
209-
}
210-
}
211-
unfold->finalize(std::move(heads));
212-
}
213-
214-
// Remove head nodes claimed by unfolds
215-
for (auto const& key : remove_ports_for_products) {
216-
for (auto& ports : head_ports | std::views::values) {
217-
std::erase_if(ports,
218-
[&key](auto const& port) { return to_name(port.product_label) == key; });
219-
}
220-
}
221-
222186
multi.finalize(std::move(head_ports));
223187

224188
if (function_graph_) {
225-
for (auto const& [name, unfold] : unfolds.data) {
226-
for (auto const& [node_name, ports] : unfold->downstream_ports()) {
227-
function_graph_->edges_for(name, node_name, ports);
228-
}
229-
}
230-
231189
for (auto const& [node_name, ports] : multi.downstream_ports()) {
232190
function_graph_->edges_for("Source", node_name, ports);
233191
}

phlex/core/framework_graph.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ namespace phlex::experimental {
142142
filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.unfolds));
143143
filters_.merge(internal_edges_for_predicates(graph_, nodes_.predicates, nodes_.transforms));
144144

145-
edge_maker make_edges{dot_file_prefix, nodes_.transforms, nodes_.folds};
145+
edge_maker make_edges{dot_file_prefix, nodes_.transforms, nodes_.folds, nodes_.unfolds};
146146
make_edges(src_,
147147
multiplexer_,
148148
filters_,

0 commit comments

Comments
 (0)