Skip to content

Commit e4be437

Browse files
committed
[FOLD] working but sloppy type erasure
1 parent abf3261 commit e4be437

File tree

4 files changed

+201
-52
lines changed

4 files changed

+201
-52
lines changed

example/server/main.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,19 +85,29 @@ int server_main( int argc, char* argv[] )
8585
http_proto::install_serializer_service(srv.services(), cfg);
8686
}
8787

88-
ports<executor_type> vp(srv.get_executor());
89-
9088
// Create the routes
9189
// VFALCO this is a hacky WIP
9290
router_type rr0;
9391
rr0.get<https_redirect_responder>("/*splat");
9492
router_type rr;
9593
rr.get<file_responder>("/*splat", doc_root);
9694

95+
auto& vp = emplace_part<ports<executor_type>>(srv,
96+
srv.get_executor(),
97+
1,
98+
num_workers,
99+
of_type<worker_ssl<executor_type>>,
100+
srv,
101+
srv.get_executor(),
102+
ssl_ctx,
103+
rr);
104+
vp.emplace(asio::ip::tcp::endpoint(addr, 443), reuse_addr);
105+
vp.emplace(asio::ip::tcp::endpoint(addr, 5050), reuse_addr);
106+
97107
//
98108
// Add the listening ports and workers
99109
//
100-
110+
#if 0
101111
{
102112
// plain (no https) port that does https redirect
103113
//
@@ -128,6 +138,7 @@ int server_main( int argc, char* argv[] )
128138
ssl_ctx,
129139
rr);
130140
}
141+
#endif
131142

132143
srv.run();
133144
}

example/server/worker_ssl.hpp

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -60,24 +60,19 @@ class worker_ssl : public http_responder<
6060
using stream_type = ssl_stream<socket_type>;
6161

6262
template<
63-
class Executor1,
6463
class Executor0,
6564
class = typename std::enable_if<std::is_constructible<Executor, Executor0>::value>::type
6665
>
6766
worker_ssl(
67+
std::size_t index,
68+
any_lambda<void(std::size_t)> do_idle,
6869
asio_server& srv,
69-
asio::basic_socket_acceptor<
70-
Protocol, Executor1>& acc,
7170
Executor0 const& ex,
7271
asio::ssl::context& ssl_ctx,
7372
router_type& rr)
7473
: http_responder<worker_ssl>(srv, rr)
75-
, do_accept_(
76-
[&acc](worker_ssl& self)
77-
{
78-
acc.async_accept( self.stream_.next_layer(), self.ep_,
79-
call_mf(&worker_ssl::on_accept, &self));
80-
})
74+
, index_(index)
75+
, do_idle_(std::move(do_idle))
8176
, ssl_ctx_(ssl_ctx)
8277
, stream_(ex, ssl_ctx)
8378
{
@@ -89,28 +84,21 @@ class worker_ssl : public http_responder<
8984
return stream_.stream();
9085
}
9186

92-
/** Run the worker
93-
94-
The worker will continue to run until
95-
@ref stop is called or the worker is destroyed.
96-
*/
97-
void run()
98-
{
99-
do_accept();
100-
}
101-
102-
/** Stop the worker
103-
104-
Any outstanding I/O is canceled.
87+
/** Cancel all outstanding I/O
10588
*/
106-
void stop()
89+
void cancel()
10790
{
10891
system::error_code ec;
10992
stream_.next_layer().cancel(ec);
11093
}
11194

112-
void do_accept()
95+
template<class Executor1>
96+
void do_accept(
97+
asio::basic_socket_acceptor<Protocol, Executor1>& acc,
98+
any_lambda<void()> do_accepted)
11399
{
100+
do_accepted_ = do_accepted;
101+
114102
// Clean up any previous connection.
115103
{
116104
system::error_code ec;
@@ -124,7 +112,8 @@ class worker_ssl : public http_responder<
124112
stream_ = stream_type(std::move(stream_.next_layer()), ssl_ctx_);
125113
}
126114

127-
do_accept_(*this);
115+
acc.async_accept(stream_.next_layer(), ep_,
116+
call_mf(&worker_ssl::on_accept, this));
128117
}
129118

130119
/** Called when the connected session is ended
@@ -146,12 +135,14 @@ class worker_ssl : public http_responder<
146135
}
147136

148137
LOG_DBG(this->sect_, this->id(), " ", s, ": ", ec.message());
149-
return do_accept();
138+
return do_idle();
150139
}
151140

152141
private:
153142
void on_accept(system::error_code const& ec)
154143
{
144+
do_accepted_();
145+
155146
if(ec.failed())
156147
return do_failed("worker_ssl::on_accept", ec);
157148

@@ -186,12 +177,19 @@ class worker_ssl : public http_responder<
186177
stream_.next_layer().shutdown(asio::socket_base::shutdown_both, ec);
187178
// error ignored
188179

189-
return do_accept();
180+
return do_idle();
181+
}
182+
183+
void do_idle()
184+
{
185+
do_idle_(index_);
190186
}
191187

192188
private:
193189
// order of destruction matters here
194-
any_lambda<void(worker_ssl&)> do_accept_;
190+
std::size_t index_;
191+
any_lambda<void(std::size_t)> do_idle_;
192+
any_lambda<void()> do_accepted_;
195193
asio::ssl::context& ssl_ctx_;
196194
stream_type stream_;
197195
typename Protocol::endpoint ep_;

include/boost/http_io/server/ports.hpp

Lines changed: 160 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@
1010
#ifndef BOOST_HTTP_IO_SERVER_PORTS_HPP
1111
#define BOOST_HTTP_IO_SERVER_PORTS_HPP
1212

13+
#include "logger.hpp"
1314
#include <boost/http_io/detail/config.hpp>
15+
#include <boost/http_io/server/call_mf.hpp>
16+
#include <boost/http_io/server/fixed_array.hpp>
17+
#include <boost/http_io/server/server.hpp>
1418
#include <boost/asio/basic_socket_acceptor.hpp>
1519
#include <boost/asio/basic_stream_socket.hpp>
1620
#include <boost/asio/dispatch.hpp>
@@ -27,6 +31,30 @@ class tcp;
2731
namespace boost {
2832
namespace http_io {
2933

34+
template<class T>
35+
struct of_type_t
36+
{
37+
using type = T;
38+
};
39+
40+
template<class T>
41+
constexpr of_type_t<T> of_type;
42+
43+
class ports_base
44+
{
45+
public:
46+
struct entry
47+
{
48+
std::size_t need; // number of accepts we need
49+
};
50+
51+
struct api
52+
{
53+
std::size_t index;
54+
};
55+
56+
};
57+
3058
/** A TCP/IP listening port
3159
3260
This implements a listening port as a server service. An array of workers
@@ -40,48 +68,160 @@ namespace http_io {
4068
*/
4169
template< class Executor, class Protocol = asio::ip::tcp >
4270
class ports
71+
: public ports_base
72+
, public server::part
4373
{
4474
public:
45-
static constexpr std::size_t max_accept = 128;
46-
4775
using executor_type = Executor;
48-
using endpoint_type = typename Protocol::endpoint;
49-
using acceptor_type =
50-
asio::basic_socket_acceptor<Protocol, Executor>;
76+
using protocol_type = Protocol;
77+
using endpoint = typename Protocol::endpoint;
78+
using acceptor = asio::basic_socket_acceptor<Protocol, Executor>;
5179

52-
struct entry
80+
struct entry_impl : entry
5381
{
5482
template<class... Args>
55-
explicit entry(Args&&... args)
83+
explicit entry_impl(Args&&... args)
5684
: sock(std::forward<Args>(args)...)
5785
{
5886
}
5987

60-
acceptor_type sock;
61-
std::size_t avail; // free workers available
62-
std::size_t accept; // number currently accepting
63-
88+
acceptor sock;
6489
};
6590

66-
template<class Executor1
67-
,typename = std::enable_if<std::is_constructible<Executor, Executor1>::value>
91+
/** Constructor
92+
93+
@param concurrency The number of threads calling io_context::run
94+
*/
95+
template<
96+
class Executor1,
97+
class Worker,
98+
class... Args,
99+
class = typename std::enable_if<std::is_constructible<Executor, Executor1>::value>::type
68100
>
69-
explicit
70-
ports(Executor1 const& ex)
101+
ports(
102+
server&,
103+
Executor1 const& ex,
104+
unsigned concurrency,
105+
std::size_t num_workers,
106+
of_type_t<Worker>,
107+
Args&&... args)
71108
: ex_(ex)
109+
, concurrency_(concurrency)
110+
{
111+
fixed_array<Worker> vw(num_workers);
112+
while(! vw.is_full())
113+
vw.emplace_back(
114+
vw.size() + 1,
115+
[this](std::size_t i)
116+
{
117+
asio::dispatch(ex_,
118+
[this, i]
119+
{
120+
do_idle(i);
121+
});
122+
},
123+
std::forward<Args>(args)...);
124+
vw_ = std::move(vw);
125+
126+
// tracks linked list of idle workers
127+
vi_.resize(num_workers);
128+
for(std::size_t i = 0; i < vi_.size() - 1; ++i)
129+
vi_[i] = i + 2;
130+
vi_.back() = 0;
131+
132+
do_accept_ = &ports::do_worker_accept<Worker>;
133+
do_stop_ = &ports::do_worker_stop<Worker>;
134+
}
135+
136+
template<class Worker>
137+
void do_worker_accept(std::size_t i, entry_impl& e)
72138
{
139+
vw_.to_span<Worker>()[i - 1].do_accept(e.sock,
140+
[this, &e]
141+
{
142+
asio::dispatch(ex_,
143+
[this, &e]
144+
{
145+
++e.need;
146+
LOG_TRC(sect_, "need=", e.need);
147+
});
148+
});
73149
}
74150

75-
template<class... Args>
76-
entry& emplace(Args&&... args)
151+
template<class Worker>
152+
void do_worker_stop()
77153
{
78-
v_.emplace_back(std::forward<Args>(args)...);
79-
return v_.back();
154+
for(auto& w : vw_.to_span<Worker>())
155+
w.cancel();
156+
for(auto& e : ve_)
157+
{
158+
system::error_code ec;
159+
e.sock.cancel(ec);
160+
}
161+
}
162+
163+
void do_idle(std::size_t index)
164+
{
165+
vi_[index - 1] = idle_;
166+
idle_ = index;
167+
do_accept();
168+
}
169+
170+
/** Add another port
171+
*/
172+
void emplace(
173+
endpoint const& ep,
174+
bool reuse_addr = true)
175+
{
176+
ve_.emplace_back(ex_, ep, reuse_addr);
177+
ve_.back().need = concurrency_;
80178
}
81179

82180
private:
181+
void run() override
182+
{
183+
asio::dispatch(ex_, call_mf(&ports::do_accept, this));
184+
}
185+
186+
void stop() override
187+
{
188+
asio::dispatch(ex_, call_mf(&ports::do_stop, this));
189+
}
190+
191+
void do_accept()
192+
{
193+
if(idle_ == 0) // no idle workers
194+
return;
195+
for(auto& e : ve_)
196+
{
197+
while(e.need > 0)
198+
{
199+
--e.need;
200+
LOG_TRC(sect_, "need=", e.need);
201+
auto const i = idle_;
202+
idle_ = vi_[idle_ - 1];
203+
(this->*do_accept_)(i, e);
204+
if(idle_ == 0) // no idle workers
205+
return;
206+
}
207+
}
208+
}
209+
210+
void do_stop()
211+
{
212+
(this->*do_stop_)();
213+
}
214+
215+
section sect_;
83216
Executor ex_;
84-
std::vector<entry> v_;
217+
unsigned concurrency_;
218+
std::vector<entry_impl> ve_;
219+
any_fixed_array vw_;
220+
std::vector<std::size_t> vi_;
221+
std::size_t idle_ = 1;
222+
223+
void(ports::*do_accept_)(std::size_t, entry_impl&);
224+
void(ports::*do_stop_)();
85225
};
86226

87227
} // http_io

include/boost/http_io/server/workers.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
#define BOOST_HTTP_IO_SERVER_WORKERS_HPP
1212

1313
#include <boost/http_io/detail/config.hpp>
14+
#include <boost/http_io/server/fixed_array.hpp>
1415
#include <boost/http_io/server/server.hpp>
15-
#include "fixed_array.hpp"
1616

1717
namespace boost {
1818
namespace http_io {

0 commit comments

Comments
 (0)