Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions src/handlers/flow/FlowStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@ static std::string ip_summarization(const std::string &val, SummaryData *summary
return val;
}
if (summary->type == IpSummary::ByASN && HandlerModulePlugin::asn->enabled()) {
std::string asn;
if (ipv4.isValid()) {
sockaddr_in sa4{};
if (lib::utils::ipv4_to_sockaddr(ipv4, &sa4)) {
return HandlerModulePlugin::asn->getASNString(&sa4);
asn = HandlerModulePlugin::asn->getASNString(&sa4);
}
} else if (ipv6.isValid()) {
sockaddr_in6 sa6{};
if (lib::utils::ipv6_to_sockaddr(ipv6, &sa6)) {
return HandlerModulePlugin::asn->getASNString(&sa6);
asn = HandlerModulePlugin::asn->getASNString(&sa6);
}
}
if (!summary->asn_exclude_summary.empty() && std::any_of(summary->asn_exclude_summary.begin(), summary->asn_exclude_summary.end(), [&asn](const auto &prefix) {
return asn.size() >= prefix.size() && 0 == asn.compare(0, prefix.size(), prefix);
})) {
return val;
}
return asn;
} else if (summary->type == IpSummary::BySubnet) {
if (ipv4.isValid()) {
if (auto [match, subnet] = match_subnet(summary->ipv4_summary, ipv4.toInt()); match) {
Expand Down Expand Up @@ -135,6 +142,11 @@ void FlowStreamHandler::start()
if (config_exists("exclude_ips_from_summarization")) {
parse_host_specs(config_get<StringList>("exclude_ips_from_summarization"), summary_data.ipv4_exclude_summary, summary_data.ipv6_exclude_summary);
}
if (config_exists("exclude_asns_from_summarization")) {
for (const auto &asn : config_get<StringList>("exclude_ips_from_summarization")) {
summary_data.asn_exclude_summary.push_back(asn + "/");
}
}
_metrics->set_summary_data(std::move(summary_data));
} else if (config_exists("subnets_for_summarization")) {
summary_data.type = IpSummary::BySubnet;
Expand Down Expand Up @@ -929,16 +941,20 @@ void FlowMetricsBucket::to_json(json &j) const
}
if (group_enabled(group::FlowMetrics::TopIPs)) {
if (summary) {
top_dir.second.topSrcIP.to_json(j["devices"][deviceId]["interfaces"][interfaceId], [summary](const std::string &val) {
return ip_summarization(val, summary);
});
top_dir.second.topSrcIP.to_json(
j["devices"][deviceId]["interfaces"][interfaceId], [summary](const std::string &val) {
return ip_summarization(val, summary);
},
Metric::Aggregate::SUMMARY);
} else {
top_dir.second.topSrcIP.to_json(j["devices"][deviceId]["interfaces"][interfaceId]);
}
if (summary) {
top_dir.second.topDstIP.to_json(j["devices"][deviceId]["interfaces"][interfaceId], [summary](const std::string &val) {
return ip_summarization(val, summary);
});
top_dir.second.topDstIP.to_json(
j["devices"][deviceId]["interfaces"][interfaceId], [summary](const std::string &val) {
return ip_summarization(val, summary);
},
Metric::Aggregate::SUMMARY);
} else {
top_dir.second.topDstIP.to_json(j["devices"][deviceId]["interfaces"][interfaceId]);
}
Expand Down
5 changes: 5 additions & 0 deletions src/handlers/flow/FlowStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct SummaryData {
IpSummary type{IpSummary::None};
lib::utils::IPv4subnetList ipv4_exclude_summary;
lib::utils::IPv6subnetList ipv6_exclude_summary;
std::vector<std::string> asn_exclude_summary;
lib::utils::IPv4subnetList ipv4_summary;
lib::utils::IPv6subnetList ipv6_summary;
};
Expand Down Expand Up @@ -325,6 +326,9 @@ class FlowMetricsManager final : public visor::AbstractMetricsManager<FlowMetric
if (!_enrich_data.empty()) {
live_bucket()->set_enrich_data(&_enrich_data);
}
if (_summary_data.type != IpSummary::None) {
live_bucket()->set_summary_data(&_summary_data);
}
}
};

Expand Down Expand Up @@ -370,6 +374,7 @@ class FlowStreamHandler final : public visor::StreamMetricsHandler<FlowMetricsMa
"asn_notfound",
"summarize_ips_by_asn",
"subnets_for_summarization",
"exclude_asns_from_summarization",
"exclude_ips_from_summarization",
"sample_rate_scaling",
"recorded_stream"};
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/flow/tests/test_flows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ TEST_CASE("Parse sflow stream without sampling", "[sflow][flow]")
stream.config_set("flow_type", "sflow");
stream.config_set("pcap_file", "tests/fixtures/ecmp.pcap");
visor::network::IpPort::set_csv_iana_ports("tests/fixtures/pktvisor-port-service-names.csv");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
Expand Down Expand Up @@ -392,5 +392,5 @@ TEST_CASE("Flow invalid config", "[flow][filter][config]")
c.config_set<uint64_t>("num_periods", 1);
FlowStreamHandler flow_handler{"flow-test", stream_proxy, &c};
flow_handler.config_set<bool>("invalid_config", true);
REQUIRE_THROWS_WITH(flow_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: device_map, enrichment, only_device_interfaces, only_ips, only_ports, only_directions, geoloc_notfound, asn_notfound, summarize_ips_by_asn, subnets_for_summarization, exclude_ips_from_summarization, sample_rate_scaling, recorded_stream, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
REQUIRE_THROWS_WITH(flow_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: device_map, enrichment, only_device_interfaces, only_ips, only_ports, only_directions, geoloc_notfound, asn_notfound, summarize_ips_by_asn, subnets_for_summarization, exclude_asns_from_summarization, exclude_ips_from_summarization, sample_rate_scaling, recorded_stream, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
}