1
0
mirror of https://github.com/pavel-odintsov/fastnetmon synced 2024-11-23 13:22:36 +01:00

Unified InfluxDB host traffic export logic with templates

This commit is contained in:
Pavel Odintsov 2023-07-10 19:10:27 +01:00
parent e8c97d3314
commit f5b7cd8e31
4 changed files with 91 additions and 61 deletions

@ -1898,7 +1898,7 @@ uint64_t get_current_unix_time_in_nanoseconds() {
} }
// Joins data to format a=b,d=f // Joins data to format a=b,d=f
std::string join_by_comma_and_equal(std::map<std::string, std::string>& data) { std::string join_by_comma_and_equal(const std::map<std::string, std::string>& data) {
std::stringstream buffer; std::stringstream buffer;
for (auto itr = data.begin(); itr != data.end(); ++itr) { for (auto itr = data.begin(); itr != data.end(); ++itr) {

@ -115,7 +115,7 @@ bool write_data_to_influxdb(std::string database,
std::string influx_password, std::string influx_password,
std::string query); std::string query);
std::string join_by_comma_and_equal(std::map<std::string, std::string>& data); std::string join_by_comma_and_equal(const std::map<std::string, std::string>& data);
bool parse_meminfo_into_map(std::map<std::string, uint64_t>& parsed_meminfo); bool parse_meminfo_into_map(std::map<std::string, uint64_t>& parsed_meminfo);
bool read_uint64_from_string(const std::string& line, uint64_t& value); bool read_uint64_from_string(const std::string& line, uint64_t& value);
bool read_integer_from_file(const std::string& file_path, int& value); bool read_integer_from_file(const std::string& file_path, int& value);

@ -136,17 +136,63 @@ bool push_total_traffic_counters_to_influxdb(std::string influx_database,
return true; return true;
} }
// Simple helper function to add additional metrics easily
void add_counter_to_influxdb(std::map<std::string, uint64_t>& plain_total_counters_map,
const traffic_counter_element_t& counter,
const std::string& counter_name) {
plain_total_counters_map[counter_name + "_packets_incoming"] = counter.in_packets;
plain_total_counters_map[counter_name + "_bits_incoming"] = counter.in_bytes * 8;
plain_total_counters_map[counter_name + "_packets_outgoing"] = counter.out_packets;
plain_total_counters_map[counter_name + "_bits_outgoing"] = counter.out_bytes * 8;
}
// Fills special structure which we use to export metrics into InfluxDB
void fill_per_protocol_countres_for_influxdb(const subnet_counter_t& current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map) {
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.dropped, "dropped");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.fragmented, "fragmented");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.tcp, "tcp");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.tcp_syn, "tcp_syn");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.udp, "udp");
add_counter_to_influxdb(plain_total_counters_map, current_speed_element.icmp, "icmp");
}
// Fills counters for standard fixed counters
void fill_fixed_counters_for_influxdb(const subnet_counter_t& counter,
std::map<std::string, uint64_t>& plain_total_counters_map,
bool populate_flow) {
fill_main_counters_for_influxdb(counter, plain_total_counters_map, populate_flow);
bool influxdb_per_protocol_counters = false;
if (influxdb_per_protocol_counters) {
fill_per_protocol_countres_for_influxdb(counter, plain_total_counters_map);
}
return;
}
// Push host traffic to InfluxDB // Push host traffic to InfluxDB
bool push_hosts_ipv6_traffic_counters_to_influxdb(std::string influx_database, template <typename T, typename C>
std::string influx_host, // Apply limitation on type of keys because we use special string conversion function inside and we must not instantiate it for other unknown types
std::string influx_port, requires(std::is_same_v<T, subnet_ipv6_cidr_mask_t> || std::is_same_v<T, uint32_t>) &&
bool enable_auth, (std::is_same_v<C, subnet_counter_t>)bool push_hosts_traffic_counters_to_influxdb(abstract_subnet_counters_t<T, C>& host_counters,
std::string influx_user, const std::string& influx_database,
std::string influx_password) { const std::string& influx_host,
std::vector<std::pair<subnet_ipv6_cidr_mask_t, subnet_counter_t>> speed_elements; const std::string& influx_port,
bool enable_auth,
const std::string& influx_user,
const std::string& influx_password,
const std::string& measurement,
const std::string& tag_name) {
std::vector<std::pair<T, C>> speed_elements;
// TODO: preallocate memory here for this array to avoid memory allocations under the lock // TODO: preallocate memory here for this array to avoid memory allocations under the lock
ipv6_host_counters.get_all_non_zero_average_speed_elements_as_pairs(speed_elements); host_counters.get_all_non_zero_average_speed_elements_as_pairs(speed_elements);
// Structure for InfluxDB // Structure for InfluxDB
std::vector<std::pair<std::string, std::map<std::string, uint64_t>>> hosts_vector; std::vector<std::pair<std::string, std::map<std::string, uint64_t>>> hosts_vector;
@ -154,9 +200,21 @@ bool push_hosts_ipv6_traffic_counters_to_influxdb(std::string influx_database,
for (const auto& speed_element : speed_elements) { for (const auto& speed_element : speed_elements) {
std::map<std::string, uint64_t> plain_total_counters_map; std::map<std::string, uint64_t> plain_total_counters_map;
std::string client_ip_as_string = print_ipv6_address(speed_element.first.subnet_address); std::string client_ip_as_string;
fill_main_counters_for_influxdb(&speed_element.second, plain_total_counters_map, true); if constexpr (std::is_same_v<T, subnet_ipv6_cidr_mask_t>) {
// We use pretty strange encoding here which encodes IPv6 address as subnet but
// then we just discard CIDR mask because it does not matter
client_ip_as_string = print_ipv6_address(speed_element.first.subnet_address);
} else if constexpr (std::is_same_v<T, uint32_t>) {
// We use this encoding when we use
client_ip_as_string = convert_ip_as_uint_to_string(speed_element.first);
} else {
logger << log4cpp::Priority::ERROR << "No match for push_hosts_traffic_counters_to_influxdb";
return false;
}
fill_fixed_counters_for_influxdb(speed_element.second, plain_total_counters_map, true);
hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map)); hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map));
} }
@ -165,12 +223,14 @@ bool push_hosts_ipv6_traffic_counters_to_influxdb(std::string influx_database,
if (hosts_vector.size() > 0) { if (hosts_vector.size() > 0) {
influxdb_writes_total++; influxdb_writes_total++;
std::string error_text;
bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user,
influx_password, "hosts_ipv6_traffic", "host", hosts_vector); influx_password, measurement, tag_name, hosts_vector);
if (!result) { if (!result) {
influxdb_writes_failed++; influxdb_writes_failed++;
logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic"; logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic with error ";
return false; return false;
} }
} }
@ -179,7 +239,6 @@ bool push_hosts_ipv6_traffic_counters_to_influxdb(std::string influx_database,
} }
// This thread pushes data to InfluxDB // This thread pushes data to InfluxDB
void influxdb_push_thread() { void influxdb_push_thread() {
// Sleep for a half second for shift against calculation thread // Sleep for a half second for shift against calculation thread
@ -231,8 +290,9 @@ void influxdb_push_thread() {
influxdb_auth, influxdb_user, influxdb_password); influxdb_auth, influxdb_user, influxdb_password);
// Push per host IPv6 counters to InfluxDB // Push per host IPv6 counters to InfluxDB
push_hosts_ipv6_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), push_hosts_traffic_counters_to_influxdb(ipv6_host_counters, influxdb_database,
influxdb_auth, influxdb_user, influxdb_password); current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth,
influxdb_user, influxdb_password, "hosts_ipv6_traffic", "host");
// Push total IPv6 counters // Push total IPv6 counters
push_total_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), push_total_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port),
@ -284,7 +344,7 @@ bool push_hosts_ipv4_traffic_counters_to_influxdb(std::string influx_database,
continue; continue;
} }
fill_main_counters_for_influxdb(current_speed_element, plain_total_counters_map, true); fill_main_counters_for_influxdb(*current_speed_element, plain_total_counters_map, true);
// Key: client_ip_as_string // Key: client_ip_as_string
hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map)); hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map));
@ -355,7 +415,7 @@ bool push_network_traffic_counters_to_influxdb(std::string influx_database,
const subnet_counter_t* speed = &itr.second; const subnet_counter_t* speed = &itr.second;
std::string subnet_as_string = convert_subnet_to_string(itr.first); std::string subnet_as_string = convert_subnet_to_string(itr.first);
fill_main_counters_for_influxdb(speed, plain_total_counters_map, false); fill_main_counters_for_influxdb(*speed, plain_total_counters_map, false);
influxdb_writes_total++; influxdb_writes_total++;
@ -397,50 +457,20 @@ bool write_line_of_data_to_influxdb(std::string influx_database,
} }
// Fills special structure which we use to export metrics into InfluxDB // Fills special structure which we use to export metrics into InfluxDB
void fill_per_protocol_countres_for_influxdb(const subnet_counter_t* current_speed_element, void fill_main_counters_for_influxdb(const subnet_counter_t& current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map) {
plain_total_counters_map["fragmented_packets_incoming"] = current_speed_element->fragmented.in_packets;
plain_total_counters_map["tcp_packets_incoming"] = current_speed_element->tcp.in_packets;
plain_total_counters_map["tcp_syn_packets_incoming"] = current_speed_element->tcp_syn.in_packets;
plain_total_counters_map["udp_packets_incoming"] = current_speed_element->udp.in_packets;
plain_total_counters_map["icmp_packets_incoming"] = current_speed_element->icmp.in_packets;
plain_total_counters_map["fragmented_bits_incoming"] = current_speed_element->fragmented.in_bytes * 8;
plain_total_counters_map["tcp_bits_incoming"] = current_speed_element->tcp.in_bytes * 8;
plain_total_counters_map["tcp_syn_bits_incoming"] = current_speed_element->tcp_syn.in_bytes * 8;
plain_total_counters_map["udp_bits_incoming"] = current_speed_element->udp.in_bytes * 8;
plain_total_counters_map["icmp_bits_incoming"] = current_speed_element->icmp.in_bytes * 8;
// Outgoing
plain_total_counters_map["fragmented_packets_outgoing"] = current_speed_element->fragmented.out_packets;
plain_total_counters_map["tcp_packets_outgoing"] = current_speed_element->tcp.out_packets;
plain_total_counters_map["tcp_syn_packets_outgoing"] = current_speed_element->tcp_syn.out_packets;
plain_total_counters_map["udp_packets_outgoing"] = current_speed_element->udp.out_packets;
plain_total_counters_map["icmp_packets_outgoing"] = current_speed_element->icmp.out_packets;
plain_total_counters_map["fragmented_bits_outgoing"] = current_speed_element->fragmented.out_bytes * 8;
plain_total_counters_map["tcp_bits_outgoing"] = current_speed_element->tcp.out_bytes * 8;
plain_total_counters_map["tcp_syn_bits_outgoing"] = current_speed_element->tcp_syn.out_bytes * 8;
plain_total_counters_map["udp_bits_outgoing"] = current_speed_element->udp.out_bytes * 8;
plain_total_counters_map["icmp_bits_outgoing"] = current_speed_element->icmp.out_bytes * 8;
}
// Fills special structure which we use to export metrics into InfluxDB
void fill_main_counters_for_influxdb(const subnet_counter_t* current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map, std::map<std::string, uint64_t>& plain_total_counters_map,
bool populate_flow) { bool populate_flow) {
// Prepare incoming traffic data // Prepare incoming traffic data
plain_total_counters_map["packets_incoming"] = current_speed_element->total.in_packets; plain_total_counters_map["packets_incoming"] = current_speed_element.total.in_packets;
plain_total_counters_map["bits_incoming"] = current_speed_element->total.in_bytes * 8; plain_total_counters_map["bits_incoming"] = current_speed_element.total.in_bytes * 8;
// Outdoing traffic // Outdoing traffic
plain_total_counters_map["packets_outgoing"] = current_speed_element->total.out_packets; plain_total_counters_map["packets_outgoing"] = current_speed_element.total.out_packets;
plain_total_counters_map["bits_outgoing"] = current_speed_element->total.out_bytes * 8; plain_total_counters_map["bits_outgoing"] = current_speed_element.total.out_bytes * 8;
if (populate_flow) { if (populate_flow) {
plain_total_counters_map["flows_incoming"] = current_speed_element->in_flows; plain_total_counters_map["flows_incoming"] = current_speed_element.in_flows;
plain_total_counters_map["flows_outgoing"] = current_speed_element->out_flows; plain_total_counters_map["flows_outgoing"] = current_speed_element.out_flows;
} }
} }
@ -448,8 +478,8 @@ void fill_main_counters_for_influxdb(const subnet_counter_t* current_speed_eleme
// Prepare string to insert data into InfluxDB // Prepare string to insert data into InfluxDB
std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds, std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds,
std::string measurement, std::string measurement,
std::map<std::string, std::string>& tags, const std::map<std::string, std::string>& tags,
std::map<std::string, uint64_t>& plain_total_counters_map) { const std::map<std::string, uint64_t>& plain_total_counters_map) {
std::stringstream buffer; std::stringstream buffer;
buffer << measurement << ","; buffer << measurement << ",";

@ -80,12 +80,12 @@ bool write_line_of_data_to_influxdb(std::string influx_database,
void fill_per_protocol_countres_for_influxdb(const subnet_counter_t* current_speed_element, void fill_per_protocol_countres_for_influxdb(const subnet_counter_t* current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map); std::map<std::string, uint64_t>& plain_total_counters_map);
void fill_main_counters_for_influxdb(const subnet_counter_t* current_speed_element, void fill_main_counters_for_influxdb(const subnet_counter_t& current_speed_element,
std::map<std::string, uint64_t>& plain_total_counters_map, std::map<std::string, uint64_t>& plain_total_counters_map,
bool populate_flow); bool populate_flow);
// Prepare string to insert data into InfluxDB // Prepare string to insert data into InfluxDB
std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds, std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds,
std::string measurement, std::string measurement,
std::map<std::string, std::string>& tags, const std::map<std::string, std::string>& tags,
std::map<std::string, uint64_t>& plain_total_counters_map); const std::map<std::string, uint64_t>& plain_total_counters_map);