From 26e2f719c84ad2e07efe690c701199b4a84c19b6 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Mon, 18 Oct 2021 16:20:28 -0400 Subject: [PATCH 01/12] add mock handler --- src/handlers/mock/CMakeLists.txt | 20 +++++ src/handlers/mock/MockHandler.conf | 4 + src/handlers/mock/MockHandlerModulePlugin.cpp | 32 +++++++ src/handlers/mock/MockHandlerModulePlugin.h | 26 ++++++ src/handlers/mock/MockStreamHandler.cpp | 67 +++++++++++++++ src/handlers/mock/MockStreamHandler.h | 83 +++++++++++++++++++ src/handlers/mock/README.md | 3 + 7 files changed, 235 insertions(+) create mode 100644 src/handlers/mock/CMakeLists.txt create mode 100644 src/handlers/mock/MockHandler.conf create mode 100644 src/handlers/mock/MockHandlerModulePlugin.cpp create mode 100644 src/handlers/mock/MockHandlerModulePlugin.h create mode 100644 src/handlers/mock/MockStreamHandler.cpp create mode 100644 src/handlers/mock/MockStreamHandler.h create mode 100644 src/handlers/mock/README.md diff --git a/src/handlers/mock/CMakeLists.txt b/src/handlers/mock/CMakeLists.txt new file mode 100644 index 000000000..eb332e0b1 --- /dev/null +++ b/src/handlers/mock/CMakeLists.txt @@ -0,0 +1,20 @@ +message(STATUS "Handler Module: Mock (dynamic)") + +set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON) + +corrade_add_plugin(VisorHandlerMock + ${CMAKE_CURRENT_BINARY_DIR} + MockHandler.conf + MockHandlerModulePlugin.cpp + MockStreamHandler.cpp) +add_library(Visor::Handler::Mock ALIAS VisorHandlerMock) + +target_include_directories(VisorHandlerMock + INTERFACE + $ + ) + +target_link_libraries(VisorHandlerMock + PUBLIC + Visor::Input::Mock + ) diff --git a/src/handlers/mock/MockHandler.conf b/src/handlers/mock/MockHandler.conf new file mode 100644 index 000000000..bee902fd0 --- /dev/null +++ b/src/handlers/mock/MockHandler.conf @@ -0,0 +1,4 @@ +# Aliases +provides=mock +[data] +desc=A mock stream handler diff --git a/src/handlers/mock/MockHandlerModulePlugin.cpp b/src/handlers/mock/MockHandlerModulePlugin.cpp new file mode 100644 index 000000000..e0988ae82 --- /dev/null +++ b/src/handlers/mock/MockHandlerModulePlugin.cpp @@ -0,0 +1,32 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#include "MockHandlerModulePlugin.h" +#include "CoreRegistry.h" +#include "HandlerManager.h" +#include "InputStreamManager.h" +#include "MockStreamHandler.h" +#include "MockInputStream.h" +#include +#include + +CORRADE_PLUGIN_REGISTER(VisorHandlerMock, visor::handler::mock::MockHandlerModulePlugin, + "visor.module.handler/1.0") + +namespace visor::handler::mock { + +using namespace visor::input::mock; +using json = nlohmann::json; + +void MockHandlerModulePlugin::setup_routes(HttpServer *svr) +{ +} +std::unique_ptr MockHandlerModulePlugin::instantiate(const std::string &name, InputStream *input_stream, const Configurable *config) +{ + // TODO using config as both window config and module config + auto handler_module = std::make_unique(name, input_stream, config); + return handler_module; +} + +} \ No newline at end of file diff --git a/src/handlers/mock/MockHandlerModulePlugin.h b/src/handlers/mock/MockHandlerModulePlugin.h new file mode 100644 index 000000000..a6cf1aca7 --- /dev/null +++ b/src/handlers/mock/MockHandlerModulePlugin.h @@ -0,0 +1,26 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + + +#include "HandlerModulePlugin.h" + +namespace visor::handler::mock { + +class MockHandlerModulePlugin : public HandlerModulePlugin +{ + +protected: + void setup_routes(HttpServer *svr) override; + +public: + explicit MockHandlerModulePlugin(Corrade::PluginManager::AbstractManager &manager, const std::string &plugin) + : visor::HandlerModulePlugin{manager, plugin} + { + } + std::unique_ptr instantiate(const std::string &name, InputStream *input_stream, const Configurable *config) override; +}; +} + diff --git a/src/handlers/mock/MockStreamHandler.cpp b/src/handlers/mock/MockStreamHandler.cpp new file mode 100644 index 000000000..643598010 --- /dev/null +++ b/src/handlers/mock/MockStreamHandler.cpp @@ -0,0 +1,67 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#include "MockStreamHandler.h" + +namespace visor::handler::mock { + +MockStreamHandler::MockStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config) + : visor::StreamMetricsHandler(name, window_config) +{ + assert(stream); + // figure out which input stream we have + _mock_stream = dynamic_cast(stream); + if (!_mock_stream) { + throw StreamHandlerException(fmt::format("MockStreamHandler: unsupported input stream {}", stream->name())); + } +} + +void MockStreamHandler::start() +{ + if (_running) { + return; + } + + if (config_exists("recorded_stream")) { + _metrics->set_recorded_stream(); + } + + _running = true; +} + +void MockStreamHandler::stop() +{ + if (!_running) { + return; + } + + _running = false; +} + +void MockMetricsBucket::specialized_merge(const AbstractMetricsBucket &o) +{ + // static because caller guarantees only our own bucket type + const auto &other = static_cast(o); + + std::shared_lock r_lock(other._mutex); + std::unique_lock w_lock(_mutex); + + _counters.mock_counter += other._counters.mock_counter; +} + +void MockMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap add_labels) const +{ + std::shared_lock r_lock(_mutex); + + _counters.mock_counter.to_prometheus(out, add_labels); +} + +void MockMetricsBucket::to_json(json &j) const +{ + std::shared_lock r_lock(_mutex); + + _counters.mock_counter.to_json(j); +} + +} \ No newline at end of file diff --git a/src/handlers/mock/MockStreamHandler.h b/src/handlers/mock/MockStreamHandler.h new file mode 100644 index 000000000..54083a713 --- /dev/null +++ b/src/handlers/mock/MockStreamHandler.h @@ -0,0 +1,83 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +#pragma once + +#include "AbstractMetricsManager.h" +#include "MockInputStream.h" +#include "StreamHandler.h" +#include +#include +#include + +namespace visor::handler::mock { + +using namespace visor::input::mock; + +class MockMetricsBucket final : public visor::AbstractMetricsBucket +{ + +protected: + mutable std::shared_mutex _mutex; + + // total numPackets is tracked in base class num_events + struct counters { + + Counter mock_counter; + + counters() + : mock_counter("mock", {"counter"}, "Count of random ints from mock input source") + { + } + }; + counters _counters; + +public: + MockMetricsBucket() + { + } + + // get a copy of the counters + counters counters() const + { + std::shared_lock lock(_mutex); + return _counters; + } + + // visor::AbstractMetricsBucket + void specialized_merge(const AbstractMetricsBucket &other) override; + void to_json(json &j) const override; + void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; + +}; + +class MockMetricsManager final : public visor::AbstractMetricsManager +{ +public: + MockMetricsManager(const Configurable *window_config) + : visor::AbstractMetricsManager(window_config) + { + } + +}; + +class MockStreamHandler final : public visor::StreamMetricsHandler +{ + + MockInputStream *_mock_stream; + +public: + MockStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config); + ~MockStreamHandler() = default; + + // visor::AbstractModule + std::string schema_key() const override + { + return "mock"; + } + void start() override; + void stop() override; +}; + +} diff --git a/src/handlers/mock/README.md b/src/handlers/mock/README.md new file mode 100644 index 000000000..11d615a22 --- /dev/null +++ b/src/handlers/mock/README.md @@ -0,0 +1,3 @@ +# Mock Metrics Stream Handler + +This directory contains a mock stream handler From 3a7656b961372c657b709e847b3f93d52a48335f Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Mon, 18 Oct 2021 16:20:36 -0400 Subject: [PATCH 02/12] add mock handler --- src/handlers/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/handlers/CMakeLists.txt b/src/handlers/CMakeLists.txt index 3a5e3cf35..c569f794a 100644 --- a/src/handlers/CMakeLists.txt +++ b/src/handlers/CMakeLists.txt @@ -2,5 +2,6 @@ add_subdirectory(net) add_subdirectory(dns) add_subdirectory(pcap) +add_subdirectory(mock) set(VISOR_STATIC_PLUGINS ${VISOR_STATIC_PLUGINS} PARENT_SCOPE) From 059bab3023831acae1a93ddefed89d459360b948 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Mon, 18 Oct 2021 16:35:42 -0400 Subject: [PATCH 03/12] mock metric --- src/handlers/mock/MockStreamHandler.cpp | 18 +++++++++++++++--- src/handlers/mock/MockStreamHandler.h | 6 ++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/handlers/mock/MockStreamHandler.cpp b/src/handlers/mock/MockStreamHandler.cpp index 643598010..735afbd57 100644 --- a/src/handlers/mock/MockStreamHandler.cpp +++ b/src/handlers/mock/MockStreamHandler.cpp @@ -17,15 +17,18 @@ MockStreamHandler::MockStreamHandler(const std::string &name, InputStream *strea } } +void MockStreamHandler::process_random_int(uint64_t i) +{ + _metrics->process_random_int(i); +} + void MockStreamHandler::start() { if (_running) { return; } - if (config_exists("recorded_stream")) { - _metrics->set_recorded_stream(); - } + _random_int_connection = _mock_stream->random_int_signal.connect(&MockStreamHandler::process_random_int, this); _running = true; } @@ -63,5 +66,14 @@ void MockMetricsBucket::to_json(json &j) const _counters.mock_counter.to_json(j); } +void MockMetricsBucket::process_random_int(uint64_t i) +{ + std::unique_lock w_lock(_mutex); + _counters.mock_counter += i; +} +void MockMetricsManager::process_random_int(uint64_t i) +{ + live_bucket()->process_random_int(i); +} } \ No newline at end of file diff --git a/src/handlers/mock/MockStreamHandler.h b/src/handlers/mock/MockStreamHandler.h index 54083a713..18687788c 100644 --- a/src/handlers/mock/MockStreamHandler.h +++ b/src/handlers/mock/MockStreamHandler.h @@ -50,6 +50,7 @@ class MockMetricsBucket final : public visor::AbstractMetricsBucket void to_json(json &j) const override; void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override; + void process_random_int(uint64_t i); }; class MockMetricsManager final : public visor::AbstractMetricsManager @@ -60,6 +61,7 @@ class MockMetricsManager final : public visor::AbstractMetricsManager @@ -67,6 +69,10 @@ class MockStreamHandler final : public visor::StreamMetricsHandler Date: Tue, 19 Oct 2021 16:43:18 -0400 Subject: [PATCH 04/12] pull CoreRegistry out of CoreServer --- cmd/pktvisord/main.cpp | 16 ++++++++++++++- src/CoreRegistry.cpp | 32 ++++++++++++++++------------- src/CoreRegistry.h | 8 ++++++-- src/CoreServer.cpp | 46 +++++++++++++++++++++--------------------- src/CoreServer.h | 8 ++++---- 5 files changed, 66 insertions(+), 44 deletions(-) diff --git a/cmd/pktvisord/main.cpp b/cmd/pktvisord/main.cpp index a4da27b50..bd234c92f 100644 --- a/cmd/pktvisord/main.cpp +++ b/cmd/pktvisord/main.cpp @@ -67,6 +67,8 @@ static const char USAGE[] = Configuration: --config FILE Use specified YAML configuration to configure options, Taps, and Collection Policies Please see https://pktvisor.dev for more information + Modules: + --modules-list List available modules (builtin and dynamic) Logging Options: --log-file FILE Log to the given output file name --syslog Log to syslog @@ -226,6 +228,18 @@ int main(int argc, char *argv[]) logger->set_level(spdlog::level::debug); } + // modules + CoreRegistry registry; + if (args["--modules-list"].asBool()) { + for (auto &p : registry.input_plugin_registry()->pluginList()) { + logger->info("input: {}", p); + } + for (auto &p : registry.handler_plugin_registry()->pluginList()) { + logger->info("handler: {}", p); + } + exit(EXIT_SUCCESS); + } + logger->info("{} starting up", VISOR_VERSION); // if we are demonized, change to root directory now that (potentially) logs are open @@ -254,7 +268,7 @@ int main(int argc, char *argv[]) std::unique_ptr svr; try { - svr = std::make_unique(logger, http_config, prom_config); + svr = std::make_unique(®istry, logger, http_config, prom_config); } catch (const std::exception &e) { logger->error(e.what()); logger->info("exit with failure"); diff --git a/src/CoreRegistry.cpp b/src/CoreRegistry.cpp index 91d484de4..ab963318a 100644 --- a/src/CoreRegistry.cpp +++ b/src/CoreRegistry.cpp @@ -12,8 +12,7 @@ namespace visor { -CoreRegistry::CoreRegistry(HttpServer *svr) - : _svr(svr) +CoreRegistry::CoreRegistry() { _logger = spdlog::get("visor"); @@ -21,13 +20,25 @@ CoreRegistry::CoreRegistry(HttpServer *svr) _logger = spdlog::stderr_color_mt("visor"); } + // inputs + _input_manager = std::make_unique(); + + // handlers + _handler_manager = std::make_unique(); + + // taps + _tap_manager = std::make_unique(this); + + // policies policies + _policy_manager = std::make_unique(this); +} + +void CoreRegistry::start(HttpServer *svr) +{ if (!svr) { _logger->warn("initializing modules with no HttpServer"); } - // inputs - _input_manager = std::make_unique(); - // initialize input plugins { auto alias_list = _input_registry.aliasList(); @@ -38,14 +49,11 @@ CoreRegistry::CoreRegistry(HttpServer *svr) for (auto &s : by_alias) { InputPluginPtr mod = _input_registry.instantiate(s); _logger->info("Load input stream plugin: {} {}", s, mod->pluginInterface()); - mod->init_plugin(this, _svr); + mod->init_plugin(this, svr); _input_plugins.insert({s, std::move(mod)}); } } - // handlers - _handler_manager = std::make_unique(); - // initialize handler plugins { auto alias_list = _handler_registry.aliasList(); @@ -56,15 +64,11 @@ CoreRegistry::CoreRegistry(HttpServer *svr) for (auto &s : by_alias) { HandlerPluginPtr mod = _handler_registry.instantiate(s); _logger->info("Load stream handler plugin: {} {}", s, mod->pluginInterface()); - mod->init_plugin(this, _svr); + mod->init_plugin(this, svr); _handler_plugins.insert({s, std::move(mod)}); } } - // taps - _tap_manager = std::make_unique(this); - // policies policies - _policy_manager = std::make_unique(this); } void CoreRegistry::stop() diff --git a/src/CoreRegistry.h b/src/CoreRegistry.h index 03f230f16..9dc026714 100644 --- a/src/CoreRegistry.h +++ b/src/CoreRegistry.h @@ -47,12 +47,12 @@ class CoreRegistry std::unique_ptr _policy_manager; std::shared_ptr _logger; - HttpServer *_svr; public: - CoreRegistry(HttpServer *svr); + CoreRegistry(); ~CoreRegistry(); + void start(HttpServer *svr); void stop(); // yaml based configuration @@ -84,6 +84,10 @@ class CoreRegistry { return _policy_manager.get(); } + [[nodiscard]] const HandlerPluginRegistry *handler_plugin_registry() const + { + return &_handler_registry; + } [[nodiscard]] const InputPluginRegistry *input_plugin_registry() const { return &_input_registry; diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index 88343406b..e74b23c4b 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -16,9 +16,9 @@ namespace visor { -visor::CoreServer::CoreServer(std::shared_ptr logger, const HttpConfig &http_config, const PrometheusConfig &prom_config) +visor::CoreServer::CoreServer(CoreRegistry *registry, std::shared_ptr logger, const HttpConfig &http_config, const PrometheusConfig &prom_config) : _svr(http_config) - , _registry(&_svr) + , _registry(registry) , _logger(logger) , _start_time(std::chrono::system_clock::now()) { @@ -49,7 +49,7 @@ void CoreServer::start(const std::string &host, int port) void CoreServer::stop() { _svr.stop(); - _registry.stop(); + _registry->stop(); } CoreServer::~CoreServer() @@ -100,14 +100,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(R"(/api/v1/metrics/bucket/(\d+))", [&](const httplib::Request &req, httplib::Response &res) { json j; bool bc_period{false}; - if (!_registry.policy_manager()->module_exists("default")) { + if (!_registry->policy_manager()->module_exists("default")) { res.status = 404; j["error"] = "no \"default\" policy exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked("default"); + auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); uint64_t period(std::stol(req.matches[1])); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); @@ -132,14 +132,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) // 3.0.x compatible: reference "default" policy _svr.Get(R"(/api/v1/metrics/window/(\d+))", [&](const httplib::Request &req, httplib::Response &res) { json j; - if (!_registry.policy_manager()->module_exists("default")) { + if (!_registry->policy_manager()->module_exists("default")) { res.status = 404; j["error"] = "no \"default\" policy exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked("default"); + auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); uint64_t period(std::stol(req.matches[1])); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); @@ -161,13 +161,13 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) if (!prom_config.default_path.empty()) { _logger->info("enabling prometheus metrics for \"default\" policy on: {}", prom_config.default_path); _svr.Get(prom_config.default_path.c_str(), [&]([[maybe_unused]] const httplib::Request &req, httplib::Response &res) { - if (!_registry.policy_manager()->module_exists("default")) { + if (!_registry->policy_manager()->module_exists("default")) { res.status = 404; return; } try { std::stringstream output; - auto [policy, lock] = _registry.policy_manager()->module_get_locked("default"); + auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); if (hmod) { @@ -187,7 +187,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(R"(/api/v1/taps)", [&]([[maybe_unused]] const httplib::Request &req, httplib::Response &res) { json j; try { - auto [tap_modules, hm_lock] = _registry.tap_manager()->module_get_all_locked(); + auto [tap_modules, hm_lock] = _registry->tap_manager()->module_get_all_locked(); for (auto &[name, mod] : tap_modules) { auto tmod = dynamic_cast(mod.get()); if (tmod) { @@ -205,7 +205,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(R"(/api/v1/policies)", [&]([[maybe_unused]] const httplib::Request &req, httplib::Response &res) { json j; try { - auto [policy_modules, hm_lock] = _registry.policy_manager()->module_get_all_locked(); + auto [policy_modules, hm_lock] = _registry->policy_manager()->module_get_all_locked(); for (auto &[name, mod] : policy_modules) { auto tmod = dynamic_cast(mod.get()); if (tmod) { @@ -235,7 +235,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) return; } try { - auto policies = _registry.policy_manager()->load_from_str(req.body); + auto policies = _registry->policy_manager()->load_from_str(req.body); for (auto &mod : policies) { mod->info_json(j[mod->name()]); } @@ -249,14 +249,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(fmt::format("/api/v1/policies/({})", AbstractModule::MODULE_ID_REGEX).c_str(), [&](const httplib::Request &req, httplib::Response &res) { json j; auto name = req.matches[1]; - if (!_registry.policy_manager()->module_exists(name)) { + if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; j["error"] = "policy does not exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked(name); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(name); policy->info_json(j); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { @@ -268,18 +268,18 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Delete(fmt::format("/api/v1/policies/({})", AbstractModule::MODULE_ID_REGEX).c_str(), [&](const httplib::Request &req, httplib::Response &res) { json j; auto name = req.matches[1]; - if (!_registry.policy_manager()->module_exists(name)) { + if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; j["error"] = "policy does not exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked(name); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(name); policy->stop(); lock.unlock(); // TODO chance of race here - _registry.policy_manager()->module_remove(name); + _registry->policy_manager()->module_remove(name); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { res.status = 500; @@ -290,14 +290,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(fmt::format("/api/v1/policies/({})/metrics/bucket/(\\d+)", AbstractModule::MODULE_ID_REGEX).c_str(), [&](const httplib::Request &req, httplib::Response &res) { json j; auto name = req.matches[1]; - if (!_registry.policy_manager()->module_exists(name)) { + if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; j["error"] = "policy does not exist"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked(name); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(name); uint64_t period(std::stol(req.matches[2])); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); @@ -317,14 +317,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(fmt::format("/api/v1/policies/({})/metrics/window/(\\d+)", AbstractModule::MODULE_ID_REGEX).c_str(), [&](const httplib::Request &req, httplib::Response &res) { json j; auto name = req.matches[1]; - if (!_registry.policy_manager()->module_exists(name)) { + if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; j["error"] = "policy does not exist"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked(name); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(name); uint64_t period(std::stol(req.matches[2])); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); @@ -343,14 +343,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) }); _svr.Get(fmt::format("/api/v1/policies/({})/metrics/prometheus", AbstractModule::MODULE_ID_REGEX).c_str(), [&](const httplib::Request &req, httplib::Response &res) { auto name = req.matches[1]; - if (!_registry.policy_manager()->module_exists(name)) { + if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; res.set_content("policy does not exists", "text/plain"); return; } try { std::stringstream output; - auto [policy, lock] = _registry.policy_manager()->module_get_locked(name); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(name); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); if (hmod) { diff --git a/src/CoreServer.h b/src/CoreServer.h index e11e3d7d8..3f925a882 100644 --- a/src/CoreServer.h +++ b/src/CoreServer.h @@ -20,7 +20,7 @@ class CoreServer { HttpServer _svr; - CoreRegistry _registry; + CoreRegistry *_registry; std::shared_ptr _logger; std::chrono::system_clock::time_point _start_time; @@ -28,7 +28,7 @@ class CoreServer void _setup_routes(const PrometheusConfig &prom_config); public: - CoreServer(std::shared_ptr logger, const HttpConfig &http_config, const PrometheusConfig &prom_config); + CoreServer(CoreRegistry *registry, std::shared_ptr logger, const HttpConfig &http_config, const PrometheusConfig &prom_config); ~CoreServer(); void start(const std::string &host, int port); @@ -36,12 +36,12 @@ class CoreServer const CoreRegistry *registry() const { - return &_registry; + return _registry; } CoreRegistry *registry() { - return &_registry; + return _registry; } void set_http_logger(httplib::Logger logger) From b2537292ff96768def8cfbb185febc873c4bc58b Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Wed, 20 Oct 2021 11:11:00 -0400 Subject: [PATCH 05/12] rework help, add initial load option --- cmd/pktvisord/main.cpp | 70 +++++++++++++++++++++++------------------- src/CoreRegistry.h | 10 +++++- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/cmd/pktvisord/main.cpp b/cmd/pktvisord/main.cpp index bd234c92f..c2ad7b3ec 100644 --- a/cmd/pktvisord/main.cpp +++ b/cmd/pktvisord/main.cpp @@ -32,9 +32,9 @@ static const char USAGE[] = pktvisord summarizes data streams and exposes a REST API control plane for configuration and metrics. - pktvisord operation is configured via Taps and Collection Policies. The former set up the available - input streams while the latter instantiate Taps and Stream Handlers to analyze and summarize - the stream data. + pktvisord operation is configured via Taps and Collection Policies. Taps abstract the process of "tapping into" + input streams with templated configuration while Policies use Taps to instantiate and configure Input and Stream + Handlers to analyze and summarize stream data, which is then made available for collection via REST API. Taps and Collection Policies may be created by passing the appropriate YAML configuration file to --config, and/or by enabling the admin REST API with --admin-api and using the appropriate endpoints. @@ -47,42 +47,44 @@ static const char USAGE[] = For more documentation, see https://pktvisor.dev Base Options: - -d Daemonize; fork and continue running in the background [default: false] - -h --help Show this screen - -v Verbose log output - --no-track Don't send lightweight, anonymous usage metrics - --version Show version + -d Daemonize; fork and continue running in the background [default: false] + -h --help Show this screen + -v Verbose log output + --no-track Don't send lightweight, anonymous usage metrics + --version Show version Web Server Options: - -l HOST Run web server on the given host or IP [default: localhost] - -p PORT Run web server on the given port [default: 10853] - --tls Enable TLS on the web server - --tls-cert FILE Use given TLS cert. Required if --tls is enabled. - --tls-key FILE Use given TLS private key. Required if --tls is enabled. - --admin-api Enable admin REST API giving complete control plane functionality [default: false] - When not specified, the exposed API is read-only access to module status and metrics. - When specified, write access is enabled for all modules. + -l HOST Run web server on the given host or IP [default: localhost] + -p PORT Run web server on the given port [default: 10853] + --tls Enable TLS on the web server + --tls-cert FILE Use given TLS cert. Required if --tls is enabled. + --tls-key FILE Use given TLS private key. Required if --tls is enabled. + --admin-api Enable admin REST API giving complete control plane functionality [default: false] + When not specified, the exposed API is read-only access to module status and metrics. + When specified, write access is enabled for all modules. Geo Options: - --geo-city FILE GeoLite2 City database to use for IP to Geo mapping - --geo-asn FILE GeoLite2 ASN database to use for IP to ASN mapping + --geo-city FILE GeoLite2 City database to use for IP to Geo mapping + --geo-asn FILE GeoLite2 ASN database to use for IP to ASN mapping Configuration: - --config FILE Use specified YAML configuration to configure options, Taps, and Collection Policies - Please see https://pktvisor.dev for more information + --config FILE Use specified YAML configuration to configure options, Taps, and Collection Policies + Please see https://pktvisor.dev for more information Modules: - --modules-list List available modules (builtin and dynamic) + --module-list List all modules which have been loaded (builtin and dynamic) + --module-load FILE Load the specified dynamic module Logging Options: - --log-file FILE Log to the given output file name - --syslog Log to syslog + --log-file FILE Log to the given output file name + --syslog Log to syslog Prometheus Options: - --prometheus Ignored, Prometheus output always enabled (left for backwards compatibility) - --prom-instance ID Optionally set the 'instance' label to given ID + --prometheus Ignored, Prometheus output always enabled (left for backwards compatibility) + --prom-instance ID Optionally set the 'instance' label to given ID Handler Module Defaults: - --max-deep-sample N Never deep sample more than N% of streams (an int between 0 and 100) [default: 100] - --periods P Hold this many 60 second time periods of history in memory [default: 5] + --max-deep-sample N Never deep sample more than N% of streams (an int between 0 and 100) [default: 100] + --periods P Hold this many 60 second time periods of history in memory [default: 5] pcap Input Module Options: (applicable to default policy when IFACE is specified only) - -b BPF Filter packets using the given tcpdump compatible filter expression. Example: "port 53" - -H HOSTSPEC Specify subnets (comma separated) to consider HOST, in CIDR form. In live capture this /may/ be detected automatically - from capture device but /must/ be specified for pcaps. Example: "10.0.1.0/24,10.0.2.1/32,2001:db8::/64" - Specifying this for live capture will append to any automatic detection. + -b BPF Filter packets using the given tcpdump compatible filter expression. Example: "port 53" + -H HOSTSPEC Specify subnets (comma separated) to consider HOST, in CIDR form. In live capture this + /may/ be detected automatically from capture device but /must/ be specified for pcaps. + Example: "10.0.1.0/24,10.0.2.1/32,2001:db8::/64" + Specifying this for live capture will append to any automatic detection. )"; namespace { @@ -230,7 +232,7 @@ int main(int argc, char *argv[]) // modules CoreRegistry registry; - if (args["--modules-list"].asBool()) { + if (args["--module-list"].asBool()) { for (auto &p : registry.input_plugin_registry()->pluginList()) { logger->info("input: {}", p); } @@ -239,6 +241,10 @@ int main(int argc, char *argv[]) } exit(EXIT_SUCCESS); } + if (args["--module-load"]) { + auto result = registry.input_plugin_registry()->load(args["--module-load"].asString()); + exit(EXIT_SUCCESS); + } logger->info("{} starting up", VISOR_VERSION); diff --git a/src/CoreRegistry.h b/src/CoreRegistry.h index 9dc026714..97f929b24 100644 --- a/src/CoreRegistry.h +++ b/src/CoreRegistry.h @@ -38,7 +38,7 @@ class CoreRegistry HandlerPluginMap _handler_plugins; // these hold instances of active visor::AbstractModule derived modules (the main event processors) which are created from the plugins above - // any number can exist per plugin type can exist at a time, each with their own life cycle + // any number can exist per plugin type at a time, each with their own life cycle std::unique_ptr _input_manager; std::unique_ptr _handler_manager; @@ -92,6 +92,14 @@ class CoreRegistry { return &_input_registry; } + [[nodiscard]] HandlerPluginRegistry *handler_plugin_registry() + { + return &_handler_registry; + } + [[nodiscard]] InputPluginRegistry *input_plugin_registry() + { + return &_input_registry; + } [[nodiscard]] InputPluginMap &input_plugins() { From a266c6a00221330c4b6139debe4cebb67ea62641 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 3 Dec 2021 11:56:25 -0800 Subject: [PATCH 06/12] continue module load refactor --- cmd/pktvisor-pcap/main.cpp | 18 +++++++-------- cmd/pktvisord/main.cpp | 11 +++++---- src/CoreServer.cpp | 46 ++++++++++++++++++------------------- src/tests/test_policies.cpp | 20 ++++++++-------- src/tests/test_taps.cpp | 6 ++--- 5 files changed, 52 insertions(+), 49 deletions(-) diff --git a/cmd/pktvisor-pcap/main.cpp b/cmd/pktvisor-pcap/main.cpp index 4fb476445..e6a95307e 100644 --- a/cmd/pktvisor-pcap/main.cpp +++ b/cmd/pktvisor-pcap/main.cpp @@ -82,7 +82,7 @@ int main(int argc, char *argv[]) logger->set_level(spdlog::level::debug); } - CoreRegistry mgrs(nullptr); + CoreRegistry registry; std::signal(SIGINT, signal_handler); std::signal(SIGTERM, signal_handler); @@ -125,8 +125,8 @@ int main(int argc, char *argv[]) input_stream->info_json(j["info"]); logger->info("{}", j.dump(4)); - mgrs.input_manager()->module_add(std::move(input_stream)); - auto [input_stream_, stream_mgr_lock] = mgrs.input_manager()->module_get_locked("pcap"); + registry.input_manager()->module_add(std::move(input_stream)); + auto [input_stream_, stream_mgr_lock] = registry.input_manager()->module_get_locked("pcap"); stream_mgr_lock.unlock(); auto pcap_stream = dynamic_cast(input_stream_); @@ -140,8 +140,8 @@ int main(int argc, char *argv[]) auto handler_module = std::make_unique("net", pcap_stream, &window_config); handler_module->config_set("recorded_stream", true); handler_module->start(); - mgrs.handler_manager()->module_add(std::move(handler_module)); - auto [handler, handler_mgr_lock] = mgrs.handler_manager()->module_get_locked("net"); + registry.handler_manager()->module_add(std::move(handler_module)); + auto [handler, handler_mgr_lock] = registry.handler_manager()->module_get_locked("net"); handler_mgr_lock.unlock(); net_handler = dynamic_cast(handler); } @@ -150,8 +150,8 @@ int main(int argc, char *argv[]) auto handler_module = std::make_unique("dns", pcap_stream, &window_config); handler_module->config_set("recorded_stream", true); handler_module->start(); - mgrs.handler_manager()->module_add(std::move(handler_module)); - auto [handler, handler_mgr_lock] = mgrs.handler_manager()->module_get_locked("dns"); + registry.handler_manager()->module_add(std::move(handler_module)); + auto [handler, handler_mgr_lock] = registry.handler_manager()->module_get_locked("dns"); handler_mgr_lock.unlock(); dns_handler = dynamic_cast(handler); } @@ -160,8 +160,8 @@ int main(int argc, char *argv[]) auto handler_module = std::make_unique("dhcp", pcap_stream, &window_config); handler_module->config_set("recorded_stream", true); handler_module->start(); - mgrs.handler_manager()->module_add(std::move(handler_module)); - auto [handler, handler_mgr_lock] = mgrs.handler_manager()->module_get_locked("dhcp"); + registry.handler_manager()->module_add(std::move(handler_module)); + auto [handler, handler_mgr_lock] = registry.handler_manager()->module_get_locked("dhcp"); handler_mgr_lock.unlock(); dhcp_handler = dynamic_cast(handler); } diff --git a/cmd/pktvisord/main.cpp b/cmd/pktvisord/main.cpp index b47cf55bb..ac59a34fb 100644 --- a/cmd/pktvisord/main.cpp +++ b/cmd/pktvisord/main.cpp @@ -234,6 +234,13 @@ int main(int argc, char *argv[]) // modules CoreRegistry registry; + if (args["--module-load"]) { + auto result = registry.input_plugin_registry()->load(args["--module-load"].asString()); + if (result != Corrade::PluginManager::LoadState::Loaded) { + logger->error("failed to load plugin: {}", result); + exit(EXIT_FAILURE); + } + } if (args["--module-list"].asBool()) { for (auto &p : registry.input_plugin_registry()->pluginList()) { logger->info("input: {}", p); @@ -243,10 +250,6 @@ int main(int argc, char *argv[]) } exit(EXIT_SUCCESS); } - if (args["--module-load"]) { - auto result = registry.input_plugin_registry()->load(args["--module-load"].asString()); - exit(EXIT_SUCCESS); - } logger->info("{} starting up", VISOR_VERSION); diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index 975494986..b74787bae 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -16,9 +16,9 @@ namespace visor { -visor::CoreServer::CoreServer(std::shared_ptr logger, const HttpConfig &http_config, const PrometheusConfig &prom_config) +visor::CoreServer::CoreServer(CoreRegistry *r, std::shared_ptr logger, const HttpConfig &http_config, const PrometheusConfig &prom_config) : _svr(http_config) - , _registry(&_svr) + , _registry(r) , _logger(logger) , _start_time(std::chrono::system_clock::now()) { @@ -49,7 +49,7 @@ void CoreServer::start(const std::string &host, int port) void CoreServer::stop() { _svr.stop(); - _registry.stop(); + _registry->stop(); } CoreServer::~CoreServer() @@ -100,14 +100,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(R"(/api/v1/metrics/bucket/(\d+))", [&](const httplib::Request &req, httplib::Response &res) { json j; bool bc_period{false}; - if (!_registry.policy_manager()->module_exists("default")) { + if (!_registry->policy_manager()->module_exists("default")) { res.status = 404; j["error"] = "no \"default\" policy exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked("default"); + auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); uint64_t period(std::stol(req.matches[1])); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); @@ -132,14 +132,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) // 3.0.x compatible: reference "default" policy _svr.Get(R"(/api/v1/metrics/window/(\d+))", [&](const httplib::Request &req, httplib::Response &res) { json j; - if (!_registry.policy_manager()->module_exists("default")) { + if (!_registry->policy_manager()->module_exists("default")) { res.status = 404; j["error"] = "no \"default\" policy exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked("default"); + auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); uint64_t period(std::stol(req.matches[1])); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); @@ -161,13 +161,13 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) if (!prom_config.default_path.empty()) { _logger->info("enabling prometheus metrics for \"default\" policy on: {}", prom_config.default_path); _svr.Get(prom_config.default_path.c_str(), [&]([[maybe_unused]] const httplib::Request &req, httplib::Response &res) { - if (!_registry.policy_manager()->module_exists("default")) { + if (!_registry->policy_manager()->module_exists("default")) { res.status = 404; return; } try { std::stringstream output; - auto [policy, lock] = _registry.policy_manager()->module_get_locked("default"); + auto [policy, lock] = _registry->policy_manager()->module_get_locked("default"); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); if (hmod) { @@ -187,7 +187,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(R"(/api/v1/taps)", [&]([[maybe_unused]] const httplib::Request &req, httplib::Response &res) { json j; try { - auto [tap_modules, hm_lock] = _registry.tap_manager()->module_get_all_locked(); + auto [tap_modules, hm_lock] = _registry->tap_manager()->module_get_all_locked(); for (auto &[name, mod] : tap_modules) { auto tmod = dynamic_cast(mod.get()); if (tmod) { @@ -205,7 +205,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(R"(/api/v1/policies)", [&]([[maybe_unused]] const httplib::Request &req, httplib::Response &res) { json j = json::object(); try { - auto [policy_modules, hm_lock] = _registry.policy_manager()->module_get_all_locked(); + auto [policy_modules, hm_lock] = _registry->policy_manager()->module_get_all_locked(); for (auto &[name, mod] : policy_modules) { auto tmod = dynamic_cast(mod.get()); if (tmod) { @@ -235,7 +235,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) return; } try { - auto policies = _registry.policy_manager()->load_from_str(req.body); + auto policies = _registry->policy_manager()->load_from_str(req.body); for (auto &mod : policies) { mod->info_json(j[mod->name()]); } @@ -249,14 +249,14 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Get(fmt::format("/api/v1/policies/({})", AbstractModule::MODULE_ID_REGEX).c_str(), [&](const httplib::Request &req, httplib::Response &res) { json j = json::object(); auto name = req.matches[1]; - if (!_registry.policy_manager()->module_exists(name)) { + if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; j["error"] = "policy does not exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked(name); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(name); policy->info_json(j[name]); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { @@ -268,18 +268,18 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) _svr.Delete(fmt::format("/api/v1/policies/({})", AbstractModule::MODULE_ID_REGEX).c_str(), [&](const httplib::Request &req, httplib::Response &res) { json j = json::object(); auto name = req.matches[1]; - if (!_registry.policy_manager()->module_exists(name)) { + if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; j["error"] = "policy does not exists"; res.set_content(j.dump(), "text/json"); return; } try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked(name); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(name); policy->stop(); lock.unlock(); // TODO chance of race here - _registry.policy_manager()->module_remove(name); + _registry->policy_manager()->module_remove(name); res.set_content(j.dump(), "text/json"); } catch (const std::exception &e) { res.status = 500; @@ -293,8 +293,8 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) std::vector plist; if (name == "__all") { // special route to get all policy metrics in one call, for scraping performance reasons - plist = _registry.policy_manager()->module_get_keys(); - } else if (!_registry.policy_manager()->module_exists(name)) { + plist = _registry->policy_manager()->module_get_keys(); + } else if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; j["error"] = "policy does not exist"; res.set_content(j.dump(), "text/json"); @@ -305,7 +305,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) try { for (const auto &p_mname : plist) { spdlog::stopwatch psw; - auto [policy, lock] = _registry.policy_manager()->module_get_locked(p_mname); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(p_mname); uint64_t period(std::stol(req.matches[3])); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); @@ -344,8 +344,8 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) auto name = req.matches[1]; if (name == "__all") { // special route to get all policy metrics in one call, for scraping performance reasons - plist = _registry.policy_manager()->module_get_keys(); - } else if (!_registry.policy_manager()->module_exists(name)) { + plist = _registry->policy_manager()->module_get_keys(); + } else if (!_registry->policy_manager()->module_exists(name)) { res.status = 404; res.set_content("policy does not exists", "text/plain"); return; @@ -356,7 +356,7 @@ void CoreServer::_setup_routes(const PrometheusConfig &prom_config) std::stringstream output; for (const auto &p_mname : plist) { try { - auto [policy, lock] = _registry.policy_manager()->module_get_locked(p_mname); + auto [policy, lock] = _registry->policy_manager()->module_get_locked(p_mname); for (auto &mod : policy->modules()) { auto hmod = dynamic_cast(mod); if (hmod) { diff --git a/src/tests/test_policies.cpp b/src/tests/test_policies.cpp index 9c656194a..b465b1c78 100644 --- a/src/tests/test_policies.cpp +++ b/src/tests/test_policies.cpp @@ -163,7 +163,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Good Config happy path") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config); CHECK(config_file["visor"]["policies"]); @@ -193,7 +193,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Duplicate") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -207,7 +207,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config_bad1); REQUIRE_THROWS_WITH(registry.policy_manager()->load(config_file["visor"]["policies"]), "expecting policy configuration map"); @@ -215,7 +215,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: invalid tap") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config_bad2); REQUIRE_THROWS_WITH(registry.policy_manager()->load(config_file["visor"]["policies"]), "tap 'nonexist' does not exist"); @@ -223,7 +223,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: invalid tap config") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config_bad3); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -232,7 +232,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: exception on input start") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config_bad4); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -241,7 +241,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: mis-matched input_type on tap") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config_bad5); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -250,7 +250,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: bad policy kind") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config_bad6); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -259,7 +259,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Roll Back") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config); CHECK(config_file["visor"]["policies"]); @@ -280,7 +280,7 @@ TEST_CASE("Policies", "[policies]") } SECTION("Good Config, test stop()") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(policies_config); CHECK(config_file["visor"]["policies"]); diff --git a/src/tests/test_taps.cpp b/src/tests/test_taps.cpp index 66b825bf1..5f6ab314f 100644 --- a/src/tests/test_taps.cpp +++ b/src/tests/test_taps.cpp @@ -44,7 +44,7 @@ TEST_CASE("Taps", "[taps]") SECTION("Good Config") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(tap_config); CHECK(config_file["visor"]["taps"]); @@ -60,7 +60,7 @@ TEST_CASE("Taps", "[taps]") SECTION("Duplicate") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(tap_config); CHECK_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -69,7 +69,7 @@ TEST_CASE("Taps", "[taps]") SECTION("Bad Config") { - CoreRegistry registry(nullptr); + CoreRegistry registry; YAML::Node config_file = YAML::Load(tap_config_bad); CHECK(config_file["visor"]["taps"]); From f73f2a2a8957dd33c81ed449a1b0e659cd1b30e1 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 3 Dec 2021 12:08:36 -0800 Subject: [PATCH 07/12] module load and path --- cmd/pktvisord/main.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/pktvisord/main.cpp b/cmd/pktvisord/main.cpp index ac59a34fb..9fe141852 100644 --- a/cmd/pktvisord/main.cpp +++ b/cmd/pktvisord/main.cpp @@ -70,6 +70,7 @@ static const char USAGE[] = Modules: --module-list List all modules which have been loaded (builtin and dynamic) --module-load FILE Load the specified dynamic module + --module-dir DIR Set module search path Logging Options: --log-file FILE Log to the given output file name --syslog Log to syslog @@ -234,6 +235,10 @@ int main(int argc, char *argv[]) // modules CoreRegistry registry; + if (args["--module-dir"]) { + registry.input_plugin_registry()->setPluginDirectory(args["--module-dir"].asString()); + registry.handler_plugin_registry()->setPluginDirectory(args["--module-dir"].asString()); + } if (args["--module-load"]) { auto result = registry.input_plugin_registry()->load(args["--module-load"].asString()); if (result != Corrade::PluginManager::LoadState::Loaded) { From 9e2bc01cb3c5a57f46cb4c51ae92d9eb1ea48288 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 3 Dec 2021 13:15:14 -0800 Subject: [PATCH 08/12] working loader. add plugin type info to conf file. --- cmd/pktvisord/main.cpp | 25 ++++++++++++++++--- src/handlers/mock/CMakeLists.txt | 3 ++- ...MockHandler.conf => VisorHandlerMock.conf} | 1 + src/inputs/mock/CMakeLists.txt | 2 +- .../mock/{Mock.conf => VisorInputMock.conf} | 0 5 files changed, 26 insertions(+), 5 deletions(-) rename src/handlers/mock/{MockHandler.conf => VisorHandlerMock.conf} (81%) rename src/inputs/mock/{Mock.conf => VisorInputMock.conf} (100%) diff --git a/cmd/pktvisord/main.cpp b/cmd/pktvisord/main.cpp index 9fe141852..f21d92fcb 100644 --- a/cmd/pktvisord/main.cpp +++ b/cmd/pktvisord/main.cpp @@ -12,6 +12,7 @@ #include "handlers/static_plugins.h" #include "inputs/static_plugins.h" #include "visor_config.h" +#include #include #include #include @@ -240,11 +241,29 @@ int main(int argc, char *argv[]) registry.handler_plugin_registry()->setPluginDirectory(args["--module-dir"].asString()); } if (args["--module-load"]) { - auto result = registry.input_plugin_registry()->load(args["--module-load"].asString()); - if (result != Corrade::PluginManager::LoadState::Loaded) { - logger->error("failed to load plugin: {}", result); + auto meta = registry.input_plugin_registry()->metadata(args["--module-load"].asString()); + if (!meta) { + logger->error("failed to load plugin: {}", args["--module-load"].asString()); exit(EXIT_FAILURE); } + if (!meta->data().hasValue("type") || (meta->data().value("type") != "handler" && meta->data().value("type") != "input")) { + logger->error("plugin configuration metadata did not specify a valid plugin type", args["--module-load"].asString()); + exit(EXIT_FAILURE); + } + if (meta->data().value("type") == "input") { + auto result = registry.input_plugin_registry()->load(args["--module-load"].asString()); + if (result != Corrade::PluginManager::LoadState::Loaded) { + logger->error("failed to load input plugin: {}", result); + exit(EXIT_FAILURE); + } + } + else if (meta->data().value("type") == "handler") { + auto result = registry.handler_plugin_registry()->load(args["--module-load"].asString()); + if (result != Corrade::PluginManager::LoadState::Loaded) { + logger->error("failed to load input handler plugin: {}", result); + exit(EXIT_FAILURE); + } + } } if (args["--module-list"].asBool()) { for (auto &p : registry.input_plugin_registry()->pluginList()) { diff --git a/src/handlers/mock/CMakeLists.txt b/src/handlers/mock/CMakeLists.txt index eb332e0b1..1fe8b9946 100644 --- a/src/handlers/mock/CMakeLists.txt +++ b/src/handlers/mock/CMakeLists.txt @@ -4,7 +4,8 @@ set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON) corrade_add_plugin(VisorHandlerMock ${CMAKE_CURRENT_BINARY_DIR} - MockHandler.conf + "" + VisorHandlerMock.conf MockHandlerModulePlugin.cpp MockStreamHandler.cpp) add_library(Visor::Handler::Mock ALIAS VisorHandlerMock) diff --git a/src/handlers/mock/MockHandler.conf b/src/handlers/mock/VisorHandlerMock.conf similarity index 81% rename from src/handlers/mock/MockHandler.conf rename to src/handlers/mock/VisorHandlerMock.conf index bee902fd0..708c43da4 100644 --- a/src/handlers/mock/MockHandler.conf +++ b/src/handlers/mock/VisorHandlerMock.conf @@ -2,3 +2,4 @@ provides=mock [data] desc=A mock stream handler +type=handler diff --git a/src/inputs/mock/CMakeLists.txt b/src/inputs/mock/CMakeLists.txt index 37fae14fc..85110cc28 100644 --- a/src/inputs/mock/CMakeLists.txt +++ b/src/inputs/mock/CMakeLists.txt @@ -3,7 +3,7 @@ message(STATUS "Input Module: Mock") set_directory_properties(PROPERTIES CORRADE_USE_PEDANTIC_FLAGS ON) corrade_add_static_plugin(VisorInputMock ${CMAKE_CURRENT_BINARY_DIR} - Mock.conf + VisorInputMock.conf MockInputModulePlugin.cpp MockInputStream.cpp ) diff --git a/src/inputs/mock/Mock.conf b/src/inputs/mock/VisorInputMock.conf similarity index 100% rename from src/inputs/mock/Mock.conf rename to src/inputs/mock/VisorInputMock.conf From 78e6b61631a54224e9079ff24b1f84a3cd475db5 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 3 Dec 2021 13:49:55 -0800 Subject: [PATCH 09/12] run registry start, fix tests --- src/CoreServer.cpp | 2 ++ src/tests/test_policies.cpp | 8 ++++++++ src/tests/test_taps.cpp | 3 +++ 3 files changed, 13 insertions(+) diff --git a/src/CoreServer.cpp b/src/CoreServer.cpp index b74787bae..b4acf4cba 100644 --- a/src/CoreServer.cpp +++ b/src/CoreServer.cpp @@ -28,6 +28,8 @@ visor::CoreServer::CoreServer(CoreRegistry *r, std::shared_ptr l _logger = spdlog::stderr_color_mt("visor"); } + _registry->start(&_svr); + _setup_routes(prom_config); if (!prom_config.instance_label.empty()) { diff --git a/src/tests/test_policies.cpp b/src/tests/test_policies.cpp index b465b1c78..a559fdb1d 100644 --- a/src/tests/test_policies.cpp +++ b/src/tests/test_policies.cpp @@ -164,6 +164,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Good Config happy path") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config); CHECK(config_file["visor"]["policies"]); @@ -194,6 +195,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Duplicate") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -224,6 +226,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: invalid tap config") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config_bad3); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -233,6 +236,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: exception on input start") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config_bad4); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -242,6 +246,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: mis-matched input_type on tap") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config_bad5); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -251,6 +256,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Bad Config: bad policy kind") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config_bad6); REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -260,6 +266,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Roll Back") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config); CHECK(config_file["visor"]["policies"]); @@ -281,6 +288,7 @@ TEST_CASE("Policies", "[policies]") SECTION("Good Config, test stop()") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(policies_config); CHECK(config_file["visor"]["policies"]); diff --git a/src/tests/test_taps.cpp b/src/tests/test_taps.cpp index 5f6ab314f..977e579aa 100644 --- a/src/tests/test_taps.cpp +++ b/src/tests/test_taps.cpp @@ -45,6 +45,7 @@ TEST_CASE("Taps", "[taps]") SECTION("Good Config") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(tap_config); CHECK(config_file["visor"]["taps"]); @@ -61,6 +62,7 @@ TEST_CASE("Taps", "[taps]") SECTION("Duplicate") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(tap_config); CHECK_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true)); @@ -70,6 +72,7 @@ TEST_CASE("Taps", "[taps]") SECTION("Bad Config") { CoreRegistry registry; + registry.start(nullptr); YAML::Node config_file = YAML::Load(tap_config_bad); CHECK(config_file["visor"]["taps"]); From f946bdbaf999a6229f712c797d6c204aa95b6980 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Fri, 3 Dec 2021 14:09:54 -0800 Subject: [PATCH 10/12] clarity and debug on mock loads,run --- src/Taps.cpp | 2 +- src/handlers/mock/MockStreamHandler.cpp | 12 ++++++++++++ src/handlers/mock/MockStreamHandler.h | 4 +++- src/inputs/mock/MockInputStream.cpp | 5 +++-- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/Taps.cpp b/src/Taps.cpp index ee8b5d8e3..9e7b84498 100644 --- a/src/Taps.cpp +++ b/src/Taps.cpp @@ -54,7 +54,7 @@ void TapManager::load(const YAML::Node &tap_yaml, bool strict) // will throw if it already exists. nothing else to clean up module_add(std::move(tap_module)); - spdlog::get("visor")->info("tap [{}]: loaded", tap_name); + spdlog::get("visor")->info("tap [{}]: loaded, type {}", tap_name, input_type); } } diff --git a/src/handlers/mock/MockStreamHandler.cpp b/src/handlers/mock/MockStreamHandler.cpp index 735afbd57..d0d860c53 100644 --- a/src/handlers/mock/MockStreamHandler.cpp +++ b/src/handlers/mock/MockStreamHandler.cpp @@ -10,15 +10,23 @@ MockStreamHandler::MockStreamHandler(const std::string &name, InputStream *strea : visor::StreamMetricsHandler(name, window_config) { assert(stream); + _logger = spdlog::get("visor"); + assert(_logger); // figure out which input stream we have _mock_stream = dynamic_cast(stream); if (!_mock_stream) { throw StreamHandlerException(fmt::format("MockStreamHandler: unsupported input stream {}", stream->name())); } + _logger->info("mock handler created"); +} +MockStreamHandler::~MockStreamHandler() +{ + _logger->info("mock handler destroyed"); } void MockStreamHandler::process_random_int(uint64_t i) { + _logger->info("mock handler received random int signal: {}", i); _metrics->process_random_int(i); } @@ -28,6 +36,8 @@ void MockStreamHandler::start() return; } + _logger->info("mock handler start()"); + _random_int_connection = _mock_stream->random_int_signal.connect(&MockStreamHandler::process_random_int, this); _running = true; @@ -39,6 +49,8 @@ void MockStreamHandler::stop() return; } + _logger->info("mock handler stop()"); + _running = false; } diff --git a/src/handlers/mock/MockStreamHandler.h b/src/handlers/mock/MockStreamHandler.h index 18687788c..f45987f72 100644 --- a/src/handlers/mock/MockStreamHandler.h +++ b/src/handlers/mock/MockStreamHandler.h @@ -7,6 +7,7 @@ #include "AbstractMetricsManager.h" #include "MockInputStream.h" #include "StreamHandler.h" +#include #include #include #include @@ -68,6 +69,7 @@ class MockStreamHandler final : public visor::StreamMetricsHandler _logger; sigslot::connection _random_int_connection; @@ -75,7 +77,7 @@ class MockStreamHandler final : public visor::StreamMetricsHandlerinfo("mock input sends random int signal"); - random_int_signal(std::rand()); + auto i = std::rand(); + _logger->info("mock input sends random int signal: {}", i); + random_int_signal(i); }); _running = true; From f786e6c488ce49a46b6dbbd2c6c0a64b65195415 Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Thu, 9 Dec 2021 15:27:14 -0500 Subject: [PATCH 11/12] use a separate logger --- src/handlers/mock/MockStreamHandler.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/handlers/mock/MockStreamHandler.cpp b/src/handlers/mock/MockStreamHandler.cpp index d0d860c53..4cd1064c2 100644 --- a/src/handlers/mock/MockStreamHandler.cpp +++ b/src/handlers/mock/MockStreamHandler.cpp @@ -3,6 +3,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ #include "MockStreamHandler.h" +#include namespace visor::handler::mock { @@ -10,7 +11,10 @@ MockStreamHandler::MockStreamHandler(const std::string &name, InputStream *strea : visor::StreamMetricsHandler(name, window_config) { assert(stream); - _logger = spdlog::get("visor"); + _logger = spdlog::get("dyn-mock-handler"); + if (!_logger) { + _logger = spdlog::stderr_color_mt("dyn-mock-handler"); + } assert(_logger); // figure out which input stream we have _mock_stream = dynamic_cast(stream); From a82d848b1d7826c924670b9838fed04a6d25025a Mon Sep 17 00:00:00 2001 From: Shannon Weyrick Date: Thu, 9 Dec 2021 15:33:19 -0500 Subject: [PATCH 12/12] add quick integration test for dynamic module load --- integration_tests/CMakeLists.txt | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/integration_tests/CMakeLists.txt b/integration_tests/CMakeLists.txt index 1430559e9..9e3b5c977 100644 --- a/integration_tests/CMakeLists.txt +++ b/integration_tests/CMakeLists.txt @@ -2,20 +2,30 @@ set(FIXTURE_DIR ${CMAKE_SOURCE_DIR}/src/tests/fixtures) set(HOST_VAR "-H 127.0.0.1/32") set(PCAP_BINARY ${CMAKE_BINARY_DIR}/bin/pktvisor-pcap) +set(PKTD_BINARY ${CMAKE_BINARY_DIR}/bin/pktvisord) +set(VISOR_DYNMODULE_DIR ${CMAKE_BINARY_DIR}/lib/) set(INT_SH ${CMAKE_SOURCE_DIR}/integration_tests/integration.sh) set(WORKING_DIR ${CMAKE_SOURCE_DIR}/integration_tests) -macro(visor_int_test name) +macro(visor_pcap_int_test name) add_test(NAME ${name} WORKING_DIRECTORY ${WORKING_DIR} COMMAND ${INT_SH} ${PCAP_BINARY} -- ${HOST_VAR} --geo-city ${FIXTURE_DIR}/GeoIP2-City-Test.mmdb --geo-asn ${FIXTURE_DIR}/GeoIP2-ISP-Test.mmdb ${FIXTURE_DIR}/${name}.pcap) endmacro() -visor_int_test(dns_ipv4_udp) -visor_int_test(dns_ipv4_tcp) -visor_int_test(dns_ipv6_udp) -visor_int_test(dns_ipv6_tcp) -visor_int_test(dhcp-flow) +macro(visor_dyn_mod_int_test name) + add_test(NAME ${name} + WORKING_DIRECTORY ${WORKING_DIR} + COMMAND ${PKTD_BINARY} --module-dir ${VISOR_DYNMODULE_DIR} --module-load ${name} --module-list) +endmacro() + +visor_pcap_int_test(dns_ipv4_udp) +visor_pcap_int_test(dns_ipv4_tcp) +visor_pcap_int_test(dns_ipv6_udp) +visor_pcap_int_test(dns_ipv6_tcp) +visor_pcap_int_test(dhcp-flow) + +visor_dyn_mod_int_test(VisorHandlerMock) # this allows local, non-public integration tests (for example, on private pcap data) #add_test(NAME external-tests