Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions RFCs/2021-04-16-76-collection-policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ visor:
handlers:
# default configuration for the stream handlers
config:
periods: 5
max_deep_sample: 50
periods: 2 #default is 5
max_deep_sample: 50 #default is 100
topn_count: 5 #default is 10
modules:
# the keys at this level are unique identifiers
default_net:
Expand Down Expand Up @@ -74,8 +75,8 @@ visor:
qname_suffix: .mydomain.com
metric_groups:
disable:
- top_qtypes
- top_udp_ports
- top_qname
- dns_transaction
chaning_handlers:
kind: collection
description: "base chaning NET to DNS policy"
Expand Down
8 changes: 8 additions & 0 deletions src/AbstractMetricsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class AbstractMetricsBucket

virtual void to_json(json &j) const = 0;
virtual void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const = 0;
virtual void update_topn_metrics(size_t topn_count) = 0;
};

template <typename MetricsBucketClass>
Expand All @@ -238,6 +239,7 @@ class AbstractMetricsManager
*/
jsf32 _rng;
uint32_t _deep_sample_rate{100};
size_t _topn_count{10};

protected:
std::atomic_bool _deep_sampling_now; // atomic so we can reference without mutex
Expand Down Expand Up @@ -274,6 +276,7 @@ class AbstractMetricsManager
_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>());
_metric_buckets[0]->configure_groups(_groups);
_metric_buckets[0]->set_start_tstamp(stamp);
_metric_buckets[0]->update_topn_metrics(_topn_count);
if (_recorded_stream) {
_metric_buckets[0]->set_recorded_stream();
}
Expand Down Expand Up @@ -364,7 +367,12 @@ class AbstractMetricsManager
_next_shift_tstamp = _last_shift_tstamp;
_next_shift_tstamp.tv_sec += AbstractMetricsManager::PERIOD_SEC;

if (window_config->config_exists("topn_count")) {
_topn_count = window_config->config_get<uint64_t>("topn_count");
}

_metric_buckets.emplace_front(std::make_unique<MetricsBucketClass>());
_metric_buckets[0]->update_topn_metrics(_topn_count);
}

virtual ~AbstractMetricsManager() = default;
Expand Down
10 changes: 10 additions & 0 deletions src/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ class TopN final : public Metric
_fi.merge(other._fi);
}

void set_topn_count(const size_t top_count)
{
_top_count = top_count;
}

size_t topn_count() const
{
return _top_count;
}

/**
* to_json which takes a formater to format the "name"
* @param j json object
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/dhcp/DhcpStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class DhcpMetricsBucket final : public visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
void update_topn_metrics(size_t) override
{
}

void process_filtered();
void process_dhcp_layer(bool deep, pcpp::DhcpLayer *payload, pcpp::ProtocolType l3, pcpp::ProtocolType l4, uint16_t src_port, uint16_t dst_port);
Expand Down
13 changes: 13 additions & 0 deletions src/handlers/dns/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
void update_topn_metrics(size_t topn_count) override
{
_dns_topQname2.set_topn_count(topn_count);
_dns_topQname3.set_topn_count(topn_count);
_dns_topNX.set_topn_count(topn_count);
_dns_topREFUSED.set_topn_count(topn_count);
_dns_topSRVFAIL.set_topn_count(topn_count);
_dns_topUDPPort.set_topn_count(topn_count);
_dns_topQType.set_topn_count(topn_count);
_dns_topRCode.set_topn_count(topn_count);
_dns_slowXactIn.set_topn_count(topn_count);
_dns_slowXactOut.set_topn_count(topn_count);
}

void process_filtered();
void process_dns_layer(bool deep, DnsLayer &payload, pcpp::ProtocolType l3, Protocol l4, uint16_t port, size_t suffix_size = 0);
Expand Down
35 changes: 35 additions & 0 deletions src/handlers/dns/tests/test_dns_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,40 @@ TEST_CASE("DNS Filters: only_rcode refused", "[pcap][dns]")
REQUIRE(j["wire_packets"]["filtered"] == 23);
}

TEST_CASE("DNS TopN custom size", "[pcap][dns]")
{

PcapInputStream stream{"pcap-test"};
stream.config_set("pcap_file", "tests/fixtures/dns_udp_tcp_random.pcap");
stream.config_set("bpf", "");
stream.config_set("host_spec", "192.168.0.0/24");
stream.parse_host_spec();

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
c.config_set<uint64_t>("topn_count", 3);
DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c};

dns_handler.start();
stream.start();
stream.stop();
dns_handler.stop();

nlohmann::json j;
dns_handler.metrics()->bucket(0)->to_json(j);

CHECK(j["cardinality"]["qname"] == 2036); // flame was run with 1000 randoms x2 (udp+tcp)

CHECK(j["top_qtype"][0]["name"] == "AAAA");
CHECK(j["top_qtype"][0]["estimate"] == 1476);
CHECK(j["top_qtype"][1]["name"] == "CNAME");
CHECK(j["top_qtype"][1]["estimate"] == 825);
CHECK(j["top_qtype"][2]["name"] == "SOA");
CHECK(j["top_qtype"][2]["estimate"] == 794);
CHECK(j["top_qtype"][3] == nullptr);
}

TEST_CASE("DNS Filters: only_qname_suffix", "[pcap][dns]")
{

Expand Down Expand Up @@ -484,3 +518,4 @@ TEST_CASE("DNS groups", "[pcap][dns]")
REQUIRE_THROWS_WITH(dns_handler.start(), "dns_top_wired is an invalid/unsupported metric group. The valid groups are cardinality, counters, dns_transaction, top_qnames");
}
}

20 changes: 19 additions & 1 deletion src/handlers/flow/FlowStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket
, topOutIfIndex("flow", "index", {"top_out_if_index_" + metric}, "Top output interface indexes by " + metric)
{
}

void set_topn_count(size_t topn_count)
{
topSrcIP.set_topn_count(topn_count);
topDstIP.set_topn_count(topn_count);
topSrcPort.set_topn_count(topn_count);
topDstPort.set_topn_count(topn_count);
topSrcIPandPort.set_topn_count(topn_count);
topDstIPandPort.set_topn_count(topn_count);
topInIfIndex.set_topn_count(topn_count);
topOutIfIndex.set_topn_count(topn_count);
}
};

topns _topByBytes;
Expand Down Expand Up @@ -152,6 +164,13 @@ class FlowMetricsBucket final : public visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
void update_topn_metrics(size_t topn_count) override
{
_topByBytes.set_topn_count(topn_count);
_topByPackets.set_topn_count(topn_count);
_topGeoLoc.set_topn_count(topn_count);
_topASN.set_topn_count(topn_count);
}

// must be thread safe as it is called from time window maintenance thread
void on_set_read_only() override
Expand Down Expand Up @@ -230,5 +249,4 @@ class FlowStreamHandler final : public visor::StreamMetricsHandler<FlowMetricsMa
void start() override;
void stop() override;
};

}
3 changes: 3 additions & 0 deletions src/handlers/input_resources/InputResourcesStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class InputResourcesMetricsBucket final : public visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
void update_topn_metrics(size_t) override
{
}

void process_resources(double cpu_usage, uint64_t memory_usage);
void process_policies(int16_t policy_count, int16_t handler_count);
Expand Down
2 changes: 2 additions & 0 deletions src/handlers/mock/MockHandlerModulePlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ std::unique_ptr<StreamHandler> MockHandlerModulePlugin::instantiate(const std::s
{
// TODO using config as both window config and module config
auto handler_module = std::make_unique<MockStreamHandler>(name, proxy, config, stream_handler);
handler_module->config_merge(*config);
handler_module->config_merge(*filter);
return handler_module;
}

Expand Down
3 changes: 3 additions & 0 deletions src/handlers/mock/MockStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class MockMetricsBucket final : public visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
void update_topn_metrics(size_t) override
{
}

void process_random_int(uint64_t i);
};
Expand Down
7 changes: 7 additions & 0 deletions src/handlers/net/NetStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
void update_topn_metrics(size_t topn_count) override
{
_topGeoLoc.set_topn_count(topn_count);
_topASN.set_topn_count(topn_count);
_topIPv4.set_topn_count(topn_count);
_topIPv6.set_topn_count(topn_count);
}

// must be thread safe as it is called from time window maintenance thread
void on_set_read_only() override
Expand Down
3 changes: 3 additions & 0 deletions src/handlers/pcap/PcapStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class PcapMetricsBucket final : public visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
void update_topn_metrics(size_t) override
{
}

void process_pcap_tcp_reassembly_error(bool deep, pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3);
void process_pcap_stats(const pcpp::IPcapDevice::PcapStats &stats);
Expand Down
23 changes: 23 additions & 0 deletions src/tests/test_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class TestMetricsBucket : public AbstractMetricsBucket
{
out << "test_performed" << std::endl;
}
void update_topn_metrics([[maybe_unused]] size_t topn_count)
{
}
};

class TestMetricsManager : public AbstractMetricsManager<TestMetricsBucket>
Expand Down Expand Up @@ -234,6 +237,26 @@ TEST_CASE("TopN metrics", "[metrics][topn]")
std::getline(output, line);
CHECK(line == R"(root_test_metric{instance="test instance",integer="10",policy="default"} 1)");
}

SECTION("TopN get count size")
{
CHECK(top_sting.topn_count() == 10);
CHECK(top_int.topn_count() == 10);
}

SECTION("TopN update count size")
{
top_sting.update("top1");
top_sting.update("top2");
top_sting.update("top1");
CHECK(top_sting.topn_count() == 10);
top_sting.set_topn_count(1);
CHECK(top_sting.topn_count() == 1);
top_sting.to_json(j["top"]);
CHECK(j["top"]["test"]["metric"][0]["estimate"] == 2);
CHECK(j["top"]["test"]["metric"][0]["name"] == "top1");
CHECK(j["top"]["test"]["metric"][1] == nullptr);
}
}

TEST_CASE("Cardinality metrics", "[metrics][cardinality]")
Expand Down