From d7f8ff61e8e4e5a2f56769927904648fa51a6ca4 Mon Sep 17 00:00:00 2001 From: Pavel Odintsov Date: Tue, 22 Dec 2020 23:00:00 +0000 Subject: [PATCH] Extracted Graphite logic into separate module --- src/CMakeLists.txt | 5 + src/fastnetmon.cpp | 11 ++ src/fastnetmon_logic.cpp | 133 ---------------------- src/fastnetmon_types.h | 1 + src/metrics/graphite.cpp | 236 +++++++++++++++++++++++++++++++++++++++ src/metrics/graphite.hpp | 7 ++ 6 files changed, 260 insertions(+), 133 deletions(-) create mode 100644 src/metrics/graphite.cpp create mode 100644 src/metrics/graphite.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5f8c9bb..5b48dbc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -200,6 +200,11 @@ add_library(simple_packet_capnp STATIC simple_packet_capnp/simple_packet.capnp.c # Our LPM library add_library(patricia STATIC libpatricia/patricia.c) +# Graphite metrics +add_library(graphite_metrics STATIC metrics/graphite.cpp) +target_link_libraries(fastnetmon graphite_metrics) + + add_library(fastnetmon_pcap_format STATIC fastnetmon_pcap_format.cpp) # Our tools library diff --git a/src/fastnetmon.cpp b/src/fastnetmon.cpp index 02c7e4f..f6fc861 100644 --- a/src/fastnetmon.cpp +++ b/src/fastnetmon.cpp @@ -114,6 +114,8 @@ #include "ban_list.hpp" +#include "metrics/graphite.hpp" + #ifdef FASTNETMON_API using fastmitigation::BanListReply; using fastmitigation::BanListRequest; @@ -439,6 +441,10 @@ std::string exabgp_next_hop = ""; bool graphite_enabled = false; std::string graphite_host = "127.0.0.1"; unsigned short int graphite_port = 2003; +unsigned int graphite_push_period = 1; + +// Time consumed by pushing data to Graphite +struct timeval graphite_thread_execution_time; // Default graphite namespace std::string graphite_prefix = "fastnetmon"; @@ -1640,6 +1646,11 @@ int main(int argc, char** argv) { // Run screen draw thread for IPv6 service_thread_group.add_thread(new boost::thread(screen_draw_ipv6_thread)); + // Graphite export thread + if (graphite_enabled) { + service_thread_group.add_thread(new boost::thread(graphite_push_thread)); + } + // start thread for recalculating speed in realtime service_thread_group.add_thread(new boost::thread(recalculate_speed_thread_handler)); diff --git a/src/fastnetmon_logic.cpp b/src/fastnetmon_logic.cpp index c4c64e8..a68c11d 100644 --- a/src/fastnetmon_logic.cpp +++ b/src/fastnetmon_logic.cpp @@ -446,8 +446,6 @@ std::string print_subnet_ipv4_load() { std::sort(vector_for_sort.begin(), vector_for_sort.end(), TrafficComparatorClass(INCOMING, sorter)); - graphite_data_t graphite_data; - for (std::vector::iterator itr = vector_for_sort.begin(); itr != vector_for_sort.end(); ++itr) { map_element_t* speed = &itr->second; @@ -455,42 +453,12 @@ std::string print_subnet_ipv4_load() { buffer << std::setw(18) << std::left << subnet_as_string; - if (graphite_enabled) { - std::string subnet_as_string_as_dash_delimiters = subnet_as_string; - - // Replace dots by dashes - std::replace(subnet_as_string_as_dash_delimiters.begin(), - subnet_as_string_as_dash_delimiters.end(), '.', '_'); - - // Replace / by dashes too - std::replace(subnet_as_string_as_dash_delimiters.begin(), - subnet_as_string_as_dash_delimiters.end(), '/', '_'); - - graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".incoming.pps"] = - speed->in_packets; - graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".outgoing.pps"] = - speed->out_packets; - - graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".incoming.bps"] = - speed->in_bytes * 8; - graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".outgoing.bps"] = - speed->out_bytes * 8; - } - buffer << " " << "pps in: " << std::setw(8) << speed->in_packets << " out: " << std::setw(8) << speed->out_packets << " mbps in: " << std::setw(5) << convert_speed_to_mbps(speed->in_bytes) << " out: " << std::setw(5) << convert_speed_to_mbps(speed->out_bytes) << "\n"; } - if (graphite_enabled) { - bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data); - - if (!graphite_put_result) { - logger << log4cpp::Priority::ERROR << "Can't store network load data to Graphite"; - } - } - return buffer.str(); } @@ -2093,33 +2061,6 @@ std::string print_channel_speed(std::string traffic_type, direction_t packet_dir } else if (packet_direction == OUTGOING) { stream << " " << std::setw(6) << outgoing_total_flows_speed << " flows"; } - - if (graphite_enabled) { - graphite_data_t graphite_data; - - std::string direction_as_string; - - if (packet_direction == INCOMING) { - direction_as_string = "incoming"; - - graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] = - incoming_total_flows_speed; - } else if (packet_direction == OUTGOING) { - direction_as_string = "outgoing"; - - graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] = - outgoing_total_flows_speed; - } - - graphite_data[graphite_prefix + ".total." + direction_as_string + ".pps"] = speed_in_pps; - graphite_data[graphite_prefix + ".total." + direction_as_string + ".bps"] = speed_in_bps * 8; - - bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data); - - if (!graphite_put_result) { - logger << log4cpp::Priority::ERROR << "Can't store data to Graphite"; - } - } } return stream.str(); @@ -3011,80 +2952,6 @@ std::string draw_table_ipv4(direction_t data_direction, bool do_redis_update, so element_number++; } - graphite_data_t graphite_data; - - // TODO: add graphite operations time to the config file - if (graphite_enabled) { - for (std::vector::iterator ii = vector_for_sort.begin(); - ii != vector_for_sort.end(); ++ii) { - uint32_t client_ip = (*ii).first; - std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first); - - uint64_t pps = 0; - uint64_t bps = 0; - uint64_t flows = 0; - - // Here we could have average or instantaneous speed - map_element_t* current_speed_element = &ii->second; - - // Create polymorphic pps, byte and flow counters - if (data_direction == INCOMING) { - pps = current_speed_element->in_packets; - bps = current_speed_element->in_bytes; - flows = current_speed_element->in_flows; - } else if (data_direction == OUTGOING) { - pps = current_speed_element->out_packets; - bps = current_speed_element->out_bytes; - flows = current_speed_element->out_flows; - } - - std::string direction_as_string; - - if (data_direction == INCOMING) { - direction_as_string = "incoming"; - } else if (data_direction == OUTGOING) { - direction_as_string = "outgoing"; - } - - std::string ip_as_string_with_dash_delimiters = client_ip_as_string; - // Replace dots by dashes - std::replace(ip_as_string_with_dash_delimiters.begin(), - ip_as_string_with_dash_delimiters.end(), '.', '_'); - - std::string graphite_current_prefix = - graphite_prefix + ".hosts." + ip_as_string_with_dash_delimiters + "." + direction_as_string; - - if (print_average_traffic_counts) { - graphite_current_prefix = graphite_current_prefix + ".average"; - } - - // We do not store zero data to Graphite - if (pps != 0) { - graphite_data[graphite_current_prefix + ".pps"] = pps; - } - - if (bps != 0) { - graphite_data[graphite_current_prefix + ".bps"] = bps * 8; - } - - if (flows != 0) { - graphite_data[graphite_current_prefix + ".flows"] = flows; - } - } - } - - // TODO: we should switch to piclke format instead text - // TODO: we should check packet size for Graphite - // logger << log4cpp::Priority::INFO << "We will write " << graphite_data.size() << " records to Graphite"; - - if (graphite_enabled) { - bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data); - - if (!graphite_put_result) { - logger << log4cpp::Priority::ERROR << "Can't store data to Graphite"; - } - } - return output_buffer.str(); } diff --git a/src/fastnetmon_types.h b/src/fastnetmon_types.h index 88108e6..14cca3a 100644 --- a/src/fastnetmon_types.h +++ b/src/fastnetmon_types.h @@ -19,6 +19,7 @@ #include "fastnetmon_networks.hpp" +typedef std::vector vector_of_counters_t; typedef std::map configuration_map_t; typedef std::map graphite_data_t; diff --git a/src/metrics/graphite.cpp b/src/metrics/graphite.cpp new file mode 100644 index 0000000..dee1d26 --- /dev/null +++ b/src/metrics/graphite.cpp @@ -0,0 +1,236 @@ +#include "graphite.hpp" + + +#include "../fastnetmon_types.h" +#include "../fast_library.h" + +#include + +#include "../all_logcpp_libraries.h" + +extern log4cpp::Category& logger; +extern bool print_average_traffic_counts; +extern struct timeval graphite_thread_execution_time; +extern total_counter_element_t total_speed_average_counters[4]; +extern map_of_vector_counters_t SubnetVectorMapSpeed; +extern map_of_vector_counters_t SubnetVectorMapSpeedAverage; +extern uint64_t incoming_total_flows_speed; +extern uint64_t outgoing_total_flows_speed; +extern map_for_subnet_counters_t PerSubnetAverageSpeedMap; + +extern bool graphite_enabled; +extern std::string graphite_host; +extern unsigned short int graphite_port; +extern std::string graphite_prefix; +extern unsigned int graphite_push_period; + +// Push host traffic to Graphite +bool push_hosts_traffic_counters_to_graphite() { + std::vector processed_directions = { INCOMING, OUTGOING }; + + graphite_data_t graphite_data; + + map_of_vector_counters_t* current_speed_map = nullptr; + + if (print_average_traffic_counts) { + current_speed_map = &SubnetVectorMapSpeedAverage; + } else { + current_speed_map = &SubnetVectorMapSpeed; + } + + // Iterate over all networks + for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) { + + // Iterate over all hosts in network + for (vector_of_counters_t::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) { + int current_index = vector_itr - itr->second.begin(); + + // convert to host order for math operations + uint32_t subnet_ip = ntohl(itr->first.subnet_address); + uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index; + + // covnert to our standard network byte order + uint32_t client_ip = htonl(client_ip_in_host_bytes_order); + + std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); + + std::string ip_as_string_with_dash_delimiters = client_ip_as_string; + // Replace dots by dashes + std::replace(ip_as_string_with_dash_delimiters.begin(), ip_as_string_with_dash_delimiters.end(), '.', '_'); + + // Here we could have average or instantaneous speed + map_element_t* current_speed_element = &*vector_itr; + + for (auto data_direction : processed_directions) { + std::string direction_as_string; + + if (data_direction == INCOMING) { + direction_as_string = "incoming"; + } else if (data_direction == OUTGOING) { + direction_as_string = "outgoing"; + } + + std::string graphite_current_prefix = graphite_prefix + ".hosts." + + ip_as_string_with_dash_delimiters + "." + direction_as_string; + + if (print_average_traffic_counts) { + graphite_current_prefix = graphite_current_prefix + ".average"; + } + + if (data_direction == INCOMING) { + // Prepare incoming traffic data + + // We do not store zero data to Graphite + if (current_speed_element->in_packets != 0) { + graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->in_packets; + } + + if (current_speed_element->in_bytes != 0) { + graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->in_bytes * 8; + } + + if (current_speed_element->in_flows != 0) { + graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->in_flows; + } + + } else if (data_direction == OUTGOING) { + // Prepare outgoing traffic data + + // We do not store zero data to Graphite + if (current_speed_element->out_packets != 0) { + graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->out_packets; + } + + if (current_speed_element->out_bytes != 0) { + graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->out_bytes * 8; + } + + if (current_speed_element->out_flows != 0) { + graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->out_flows; + } + } + } + } + + bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data); + + if (!graphite_put_result) { + logger << log4cpp::Priority::ERROR << "Can't store host load data to Graphite server " + << graphite_host << " port: " << graphite_port; + return false; + } + } + + return true; +} + +// Push total counters to graphite +bool push_total_traffic_counters_to_graphite() { + std::vector directions = { INCOMING, OUTGOING, INTERNAL, OTHER }; + + for (auto packet_direction : directions) { + uint64_t speed_in_pps = total_speed_average_counters[packet_direction].packets; + uint64_t speed_in_bps = total_speed_average_counters[packet_direction].bytes; + + graphite_data_t graphite_data; + + std::string direction_as_string = get_direction_name(packet_direction); + + // We have flow information only for incoming and outgoing directions + if (packet_direction == INCOMING or packet_direction == OUTGOING) { + uint64_t flow_counter_for_this_direction = 0; + + if (packet_direction == INCOMING) { + flow_counter_for_this_direction = incoming_total_flows_speed; + } else { + flow_counter_for_this_direction = outgoing_total_flows_speed; + } + + graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] = + flow_counter_for_this_direction; + } + + graphite_data[graphite_prefix + ".total." + direction_as_string + ".pps"] = speed_in_pps; + graphite_data[graphite_prefix + ".total." + direction_as_string + ".bps"] = + speed_in_bps * 8; + + bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data); + + if (!graphite_put_result) { + logger << log4cpp::Priority::ERROR << "Can't store total load data to Graphite server " + << graphite_host << " port: " << graphite_port; + ; + return false; + } + } + + return true; +} + +// Push per subnet traffic counters to graphite +bool push_network_traffic_counters_to_graphite() { + graphite_data_t graphite_data; + + for (map_for_subnet_counters_t::iterator itr = PerSubnetAverageSpeedMap.begin(); itr != PerSubnetAverageSpeedMap.end(); ++itr) { + map_element_t* speed = &itr->second; + std::string subnet_as_string_as_dash_delimiters = convert_subnet_to_string(itr->first); + ; + + // Replace dots by dashes + std::replace(subnet_as_string_as_dash_delimiters.begin(), subnet_as_string_as_dash_delimiters.end(), '.', '_'); + + // Replace / by dashes too + std::replace(subnet_as_string_as_dash_delimiters.begin(), subnet_as_string_as_dash_delimiters.end(), '/', '_'); + + std::string current_prefix = graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + "."; + + graphite_data[current_prefix + "incoming.pps"] = speed->in_packets; + graphite_data[current_prefix + "outgoing.pps"] = speed->out_packets; + graphite_data[current_prefix + "incoming.bps"] = speed->in_bytes * 8; + graphite_data[current_prefix + "outgoing.bps"] = speed->out_bytes * 8; + } + + + bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data); + + if (!graphite_put_result) { + logger << log4cpp::Priority::ERROR << "Can't store network load data to Graphite server " + << graphite_host << " port: " << graphite_port; + return false; + } + + return true; +} + + + +// This thread pushes speed counters to graphite +void graphite_push_thread() { + // Sleep for a half second for shift against calculatiuon thread + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + + while (true) { + boost::this_thread::sleep(boost::posix_time::seconds(graphite_push_period)); + + struct timeval start_calc_time; + gettimeofday(&start_calc_time, NULL); + + // First of all push total counters to Graphite + push_total_traffic_counters_to_graphite(); + + // Push per subnet counters to graphite + push_network_traffic_counters_to_graphite(); + + // Push per host counters to graphite + push_hosts_traffic_counters_to_graphite(); + + struct timeval end_calc_time; + gettimeofday(&end_calc_time, NULL); + + timeval_subtract(&graphite_thread_execution_time, &end_calc_time, &start_calc_time); + + logger << log4cpp::Priority::DEBUG << "Graphite data pushed in: " << graphite_thread_execution_time.tv_sec + << " sec " << graphite_thread_execution_time.tv_usec << " microseconds\n"; + } +} + diff --git a/src/metrics/graphite.hpp b/src/metrics/graphite.hpp new file mode 100644 index 0000000..f8e9ce4 --- /dev/null +++ b/src/metrics/graphite.hpp @@ -0,0 +1,7 @@ +#pragma once + +void graphite_push_thread(); +bool push_total_traffic_counters_to_graphite(); +bool push_network_traffic_counters_to_graphite(); +bool push_hosts_traffic_counters_to_graphite(); +