From e2931827a6ee579084c9467c123bf3e8f3b38e92 Mon Sep 17 00:00:00 2001 From: Pavel Odintsov Date: Sat, 13 Jul 2024 15:31:39 +0300 Subject: [PATCH] Migrated to per protocol total traffic counters --- src/fastnetmon.cpp | 14 ++++++- src/fastnetmon_logic.cpp | 80 ++++++++++++++++++++-------------------- src/fastnetmon_types.hpp | 48 ++++++++++++++++++++++-- 3 files changed, 96 insertions(+), 46 deletions(-) diff --git a/src/fastnetmon.cpp b/src/fastnetmon.cpp index 481da9c..abe6c40 100644 --- a/src/fastnetmon.cpp +++ b/src/fastnetmon.cpp @@ -257,6 +257,9 @@ ban_settings_t global_ban_settings; // We use these flow spec rules as custom whitelist std::vector static_flowspec_based_whitelist; +std::string graphite_thread_execution_time_desc = "Time consumed by pushing data to Graphite"; +struct timeval graphite_thread_execution_time; + void init_global_ban_settings() { // ban Configuration params global_ban_settings.enable_ban_for_pps = false; @@ -1986,15 +1989,22 @@ int main(int argc, char** argv) { service_thread_group.add_thread(new boost::thread(screen_draw_ipv6_thread)); // Graphite export thread - if (graphite_enabled) { + if (fastnetmon_global_configuration.graphite) { service_thread_group.add_thread(new boost::thread(graphite_push_thread)); } // InfluxDB export thread - if (influxdb_enabled) { + if (fastnetmon_global_configuration.influxdb) { service_thread_group.add_thread(new boost::thread(influxdb_push_thread)); } +#ifdef ENABLE_CLICKHOUSE_SUPPORT + // Clickhouse metrics export therad + if (fastnetmon_global_configuration.clickhouse_metrics) { + service_thread_group.add_thread(new boost::thread(clickhouse_push_thread)); + } +#endif + // start thread for recalculating speed in realtime service_thread_group.add_thread(new boost::thread(recalculate_speed_thread_handler)); diff --git a/src/fastnetmon_logic.cpp b/src/fastnetmon_logic.cpp index d4025a9..fa2c5b0 100644 --- a/src/fastnetmon_logic.cpp +++ b/src/fastnetmon_logic.cpp @@ -1392,8 +1392,8 @@ void store_data_in_mongo(std::string key_name, std::string attack_details_json) // pretty print channel speed in pps and MBit std::string print_channel_speed(std::string traffic_type, direction_t packet_direction) { - uint64_t speed_in_pps = total_counters_ipv4.total_speed_average_counters[packet_direction].packets; - uint64_t speed_in_bps = total_counters_ipv4.total_speed_average_counters[packet_direction].bytes; + uint64_t speed_in_pps = total_counters_ipv4.total_speed_average_counters[packet_direction].total.packets; + uint64_t speed_in_bps = total_counters_ipv4.total_speed_average_counters[packet_direction].total.bytes; unsigned int number_of_tabs = 1; // We need this for correct alignment of blocks @@ -1957,49 +1957,49 @@ void recalculate_speed() { // Calculate IPv4 total traffic speed for (unsigned int index = 0; index < 4; index++) { - total_counters_ipv4.total_speed_counters[index].bytes = - uint64_t((double)total_counters_ipv4.total_counters[index].bytes / (double)speed_calc_period); + total_counters_ipv4.total_speed_counters[index].total.bytes = + uint64_t((double)total_counters_ipv4.total_counters[index].total.bytes / (double)speed_calc_period); - total_counters_ipv4.total_speed_counters[index].packets = - uint64_t((double)total_counters_ipv4.total_counters[index].packets / (double)speed_calc_period); + total_counters_ipv4.total_speed_counters[index].total.packets = + uint64_t((double)total_counters_ipv4.total_counters[index].total.packets / (double)speed_calc_period); double exp_power = -speed_calc_period / average_calculation_amount; double exp_value = exp(exp_power); - total_counters_ipv4.total_speed_average_counters[index].bytes = - uint64_t(total_counters_ipv4.total_speed_counters[index].bytes + - exp_value * ((double)total_counters_ipv4.total_speed_average_counters[index].bytes - - (double)total_counters_ipv4.total_speed_counters[index].bytes)); + total_counters_ipv4.total_speed_average_counters[index].total.bytes = + uint64_t(total_counters_ipv4.total_speed_counters[index].total.bytes + + exp_value * ((double)total_counters_ipv4.total_speed_average_counters[index].total.bytes - + (double)total_counters_ipv4.total_speed_counters[index].total.bytes)); - total_counters_ipv4.total_speed_average_counters[index].packets = - uint64_t(total_counters_ipv4.total_speed_counters[index].packets + - exp_value * ((double)total_counters_ipv4.total_speed_average_counters[index].packets - - (double)total_counters_ipv4.total_speed_counters[index].packets)); + total_counters_ipv4.total_speed_average_counters[index].total.packets = + uint64_t(total_counters_ipv4.total_speed_counters[index].total.packets + + exp_value * ((double)total_counters_ipv4.total_speed_average_counters[index].total.packets - + (double)total_counters_ipv4.total_speed_counters[index].total.packets)); // nullify data counters after speed calculation - total_counters_ipv4.total_counters[index].bytes = 0; - total_counters_ipv4.total_counters[index].packets = 0; + total_counters_ipv4.total_counters[index].total.bytes = 0; + total_counters_ipv4.total_counters[index].total.packets = 0; } // Do same for IPv6 for (unsigned int index = 0; index < 4; index++) { - total_counters_ipv6.total_speed_counters[index].bytes = - uint64_t((double)total_counters_ipv6.total_counters[index].bytes / (double)speed_calc_period); - total_counters_ipv6.total_speed_counters[index].packets = - uint64_t((double)total_counters_ipv6.total_counters[index].packets / (double)speed_calc_period); + total_counters_ipv6.total_speed_counters[index].total.bytes = + uint64_t((double)total_counters_ipv6.total_counters[index].total.bytes / (double)speed_calc_period); + total_counters_ipv6.total_speed_counters[index].total.packets = + uint64_t((double)total_counters_ipv6.total_counters[index].total.packets / (double)speed_calc_period); double exp_power = -speed_calc_period / average_calculation_amount; double exp_value = exp(exp_power); - total_counters_ipv6.total_speed_average_counters[index].bytes = - uint64_t(total_counters_ipv6.total_speed_counters[index].bytes + - exp_value * ((double)total_counters_ipv6.total_speed_average_counters[index].bytes - - (double)total_counters_ipv6.total_speed_counters[index].bytes)); + total_counters_ipv6.total_speed_average_counters[index].total.bytes = + uint64_t(total_counters_ipv6.total_speed_counters[index].total.bytes + + exp_value * ((double)total_counters_ipv6.total_speed_average_counters[index].total.bytes - + (double)total_counters_ipv6.total_speed_counters[index].total.bytes)); - total_counters_ipv6.total_speed_average_counters[index].packets = - uint64_t(total_counters_ipv6.total_speed_counters[index].packets + - exp_value * ((double)total_counters_ipv6.total_speed_average_counters[index].packets - - (double)total_counters_ipv6.total_speed_counters[index].packets)); + total_counters_ipv6.total_speed_average_counters[index].total.packets = + uint64_t(total_counters_ipv6.total_speed_counters[index].total.packets + + exp_value * ((double)total_counters_ipv6.total_speed_average_counters[index].total.packets - + (double)total_counters_ipv6.total_speed_counters[index].total.packets)); // nullify data counters after speed calculation total_counters_ipv6.total_counters[index].zeroify(); @@ -2328,8 +2328,8 @@ void process_ipv6_packet(simple_packet_t& current_packet) { __atomic_add_fetch(&total_ipv6_packets, 1, __ATOMIC_RELAXED); #else - __sync_fetch_and_add(&total_counters_ipv6.total_counters[current_packet.packet_direction].packets, sampled_number_of_packets); - __sync_fetch_and_add(&total_counters_ipv6.total_counters[current_packet.packet_direction].bytes, sampled_number_of_bytes); + __sync_fetch_and_add(&total_counters_ipv6.total_counters[current_packet.packet_direction].total.packets, sampled_number_of_packets); + __sync_fetch_and_add(&total_counters_ipv6.total_counters[current_packet.packet_direction].total.bytes, sampled_number_of_bytes); __sync_fetch_and_add(&total_ipv6_packets, 1); #endif @@ -2506,8 +2506,8 @@ void process_packet(simple_packet_t& current_packet) { __atomic_add_fetch(&total_counters_ipv4.total_counters[current_packet.packet_direction].bytes, sampled_number_of_bytes, __ATOMIC_RELAXED); #else - __sync_fetch_and_add(&total_counters_ipv4.total_counters[current_packet.packet_direction].packets, sampled_number_of_packets); - __sync_fetch_and_add(&total_counters_ipv4.total_counters[current_packet.packet_direction].bytes, sampled_number_of_bytes); + __sync_fetch_and_add(&total_counters_ipv4.total_counters[current_packet.packet_direction].total.packets, sampled_number_of_packets); + __sync_fetch_and_add(&total_counters_ipv4.total_counters[current_packet.packet_direction].total.bytes, sampled_number_of_bytes); #endif // Add traffic to buckets when we have them @@ -2663,8 +2663,8 @@ void increment_outgoing_flow_counters(uint32_t client_ip, // pretty print channel speed in pps and MBit std::string print_channel_speed_ipv6(std::string traffic_type, direction_t packet_direction) { - uint64_t speed_in_pps = total_counters_ipv6.total_speed_average_counters[packet_direction].packets; - uint64_t speed_in_bps = total_counters_ipv6.total_speed_average_counters[packet_direction].bytes; + uint64_t speed_in_pps = total_counters_ipv6.total_speed_average_counters[packet_direction].total.packets; + uint64_t speed_in_bps = total_counters_ipv6.total_speed_average_counters[packet_direction].total.bytes; unsigned int number_of_tabs = 3; @@ -3066,11 +3066,11 @@ void send_usage_data_to_reporting_server() { try { nlohmann::json stats; - uint64_t incoming_ipv4 = total_counters_ipv4.total_speed_average_counters[INCOMING].bytes; - uint64_t outgoing_ipv4 = total_counters_ipv4.total_speed_average_counters[OUTGOING].bytes; + uint64_t incoming_ipv4 = total_counters_ipv4.total_speed_average_counters[INCOMING].total.bytes; + uint64_t outgoing_ipv4 = total_counters_ipv4.total_speed_average_counters[OUTGOING].total.bytes; - uint64_t incoming_ipv6 = total_counters_ipv6.total_speed_average_counters[INCOMING].bytes; - uint64_t outgoing_ipv6 = total_counters_ipv6.total_speed_average_counters[OUTGOING].bytes; + uint64_t incoming_ipv6 = total_counters_ipv6.total_speed_average_counters[INCOMING].total.bytes; + uint64_t outgoing_ipv6 = total_counters_ipv6.total_speed_average_counters[OUTGOING].total.bytes; stats["incoming_traffic_speed"] = incoming_ipv4 + incoming_ipv6; stats["outgoing_traffic_speed"] = outgoing_ipv4 + outgoing_ipv6; @@ -3234,7 +3234,7 @@ void add_total_traffic_to_prometheus(const total_speed_counters_t& total_counter output << "# HELP Total traffic in packets\n"; output << "# TYPE " << packet_metric_name << " gauge\n"; output << packet_metric_name << "{traffic_direction=\"" << direction_as_string << "\",protocol_version=\"" - << protocol_version << "\"} " << total_counters.total_speed_average_counters[packet_direction].packets << "\n"; + << protocol_version << "\"} " << total_counters.total_speed_average_counters[packet_direction].total.packets << "\n"; // Bytes std::string bits_metric_name = "fastnetmon_total_traffic_bits"; @@ -3242,7 +3242,7 @@ void add_total_traffic_to_prometheus(const total_speed_counters_t& total_counter output << "# HELP Total traffic in bits\n"; output << "# TYPE " << bits_metric_name << " gauge\n"; output << bits_metric_name << "{traffic_direction=\"" << direction_as_string << "\",protocol_version=\"" << protocol_version - << "\"} " << total_counters.total_speed_average_counters[packet_direction].bytes * 8 << "\n"; + << "\"} " << total_counters.total_speed_average_counters[packet_direction].total.bytes * 8 << "\n"; // Flows if (protocol_version == "ipv4" && enable_connection_tracking && diff --git a/src/fastnetmon_types.hpp b/src/fastnetmon_types.hpp index 65c0177..dd5c5db 100644 --- a/src/fastnetmon_types.hpp +++ b/src/fastnetmon_types.hpp @@ -264,19 +264,59 @@ enum amplification_attack_type_t { AMPLIFICATION_ATTACK_CHARGEN = 6, }; -class total_counter_element_t { +class single_counter_element_t { public: - uint64_t bytes; - uint64_t packets; - uint64_t flows; + uint64_t bytes = 0; + uint64_t packets = 0; + uint64_t flows = 0; void zeroify() { bytes = 0; packets = 0; flows = 0; } + + template void serialize(Archive& ar, [[maybe_unused]] const unsigned int version) { + ar& BOOST_SERIALIZATION_NVP(bytes); + ar& BOOST_SERIALIZATION_NVP(packets); + ar& BOOST_SERIALIZATION_NVP(flows); + } }; + +class total_counter_element_t { + public: + single_counter_element_t total{}; + single_counter_element_t tcp; + single_counter_element_t udp; + single_counter_element_t icmp; + single_counter_element_t fragmented; + single_counter_element_t tcp_syn; + single_counter_element_t dropped; + + + void zeroify() { + total.zeroify(); + tcp.zeroify(); + udp.zeroify(); + icmp.zeroify(); + fragmented.zeroify(); + tcp_syn.zeroify(); + dropped.zeroify(); + } + + template void serialize(Archive& ar, [[maybe_unused]] const unsigned int version) { + ar& BOOST_SERIALIZATION_NVP(total); + ar& BOOST_SERIALIZATION_NVP(tcp); + ar& BOOST_SERIALIZATION_NVP(udp); + ar& BOOST_SERIALIZATION_NVP(icmp); + ar& BOOST_SERIALIZATION_NVP(fragmented); + ar& BOOST_SERIALIZATION_NVP(tcp_syn); + ar& BOOST_SERIALIZATION_NVP(dropped); + } +}; + + // Set of structures for calculating total traffic counters class total_speed_counters_t { public: