Extracted Graphite logic into separate module

This commit is contained in:
Pavel Odintsov 2020-12-22 23:00:00 +00:00
parent 60d20e686c
commit d7f8ff61e8
6 changed files with 260 additions and 133 deletions

@ -200,6 +200,11 @@ add_library(simple_packet_capnp STATIC simple_packet_capnp/simple_packet.capnp.c
# Our LPM library # Our LPM library
add_library(patricia STATIC libpatricia/patricia.c) add_library(patricia STATIC libpatricia/patricia.c)
# Graphite metrics
add_library(graphite_metrics STATIC metrics/graphite.cpp)
target_link_libraries(fastnetmon graphite_metrics)
add_library(fastnetmon_pcap_format STATIC fastnetmon_pcap_format.cpp) add_library(fastnetmon_pcap_format STATIC fastnetmon_pcap_format.cpp)
# Our tools library # Our tools library

@ -114,6 +114,8 @@
#include "ban_list.hpp" #include "ban_list.hpp"
#include "metrics/graphite.hpp"
#ifdef FASTNETMON_API #ifdef FASTNETMON_API
using fastmitigation::BanListReply; using fastmitigation::BanListReply;
using fastmitigation::BanListRequest; using fastmitigation::BanListRequest;
@ -439,6 +441,10 @@ std::string exabgp_next_hop = "";
bool graphite_enabled = false; bool graphite_enabled = false;
std::string graphite_host = "127.0.0.1"; std::string graphite_host = "127.0.0.1";
unsigned short int graphite_port = 2003; unsigned short int graphite_port = 2003;
unsigned int graphite_push_period = 1;
// Time consumed by pushing data to Graphite
struct timeval graphite_thread_execution_time;
// Default graphite namespace // Default graphite namespace
std::string graphite_prefix = "fastnetmon"; std::string graphite_prefix = "fastnetmon";
@ -1640,6 +1646,11 @@ int main(int argc, char** argv) {
// Run screen draw thread for IPv6 // Run screen draw thread for IPv6
service_thread_group.add_thread(new boost::thread(screen_draw_ipv6_thread)); service_thread_group.add_thread(new boost::thread(screen_draw_ipv6_thread));
// Graphite export thread
if (graphite_enabled) {
service_thread_group.add_thread(new boost::thread(graphite_push_thread));
}
// start thread for recalculating speed in realtime // start thread for recalculating speed in realtime
service_thread_group.add_thread(new boost::thread(recalculate_speed_thread_handler)); service_thread_group.add_thread(new boost::thread(recalculate_speed_thread_handler));

@ -446,8 +446,6 @@ std::string print_subnet_ipv4_load() {
std::sort(vector_for_sort.begin(), vector_for_sort.end(), std::sort(vector_for_sort.begin(), vector_for_sort.end(),
TrafficComparatorClass<pair_of_map_for_subnet_counters_elements_t>(INCOMING, sorter)); TrafficComparatorClass<pair_of_map_for_subnet_counters_elements_t>(INCOMING, sorter));
graphite_data_t graphite_data;
for (std::vector<pair_of_map_for_subnet_counters_elements_t>::iterator itr = vector_for_sort.begin(); for (std::vector<pair_of_map_for_subnet_counters_elements_t>::iterator itr = vector_for_sort.begin();
itr != vector_for_sort.end(); ++itr) { itr != vector_for_sort.end(); ++itr) {
map_element_t* speed = &itr->second; map_element_t* speed = &itr->second;
@ -455,42 +453,12 @@ std::string print_subnet_ipv4_load() {
buffer << std::setw(18) << std::left << subnet_as_string; buffer << std::setw(18) << std::left << subnet_as_string;
if (graphite_enabled) {
std::string subnet_as_string_as_dash_delimiters = subnet_as_string;
// Replace dots by dashes
std::replace(subnet_as_string_as_dash_delimiters.begin(),
subnet_as_string_as_dash_delimiters.end(), '.', '_');
// Replace / by dashes too
std::replace(subnet_as_string_as_dash_delimiters.begin(),
subnet_as_string_as_dash_delimiters.end(), '/', '_');
graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".incoming.pps"] =
speed->in_packets;
graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".outgoing.pps"] =
speed->out_packets;
graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".incoming.bps"] =
speed->in_bytes * 8;
graphite_data[graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".outgoing.bps"] =
speed->out_bytes * 8;
}
buffer << " " buffer << " "
<< "pps in: " << std::setw(8) << speed->in_packets << " out: " << std::setw(8) << "pps in: " << std::setw(8) << speed->in_packets << " out: " << std::setw(8)
<< speed->out_packets << " mbps in: " << std::setw(5) << convert_speed_to_mbps(speed->in_bytes) << speed->out_packets << " mbps in: " << std::setw(5) << convert_speed_to_mbps(speed->in_bytes)
<< " out: " << std::setw(5) << convert_speed_to_mbps(speed->out_bytes) << "\n"; << " out: " << std::setw(5) << convert_speed_to_mbps(speed->out_bytes) << "\n";
} }
if (graphite_enabled) {
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
if (!graphite_put_result) {
logger << log4cpp::Priority::ERROR << "Can't store network load data to Graphite";
}
}
return buffer.str(); return buffer.str();
} }
@ -2093,33 +2061,6 @@ std::string print_channel_speed(std::string traffic_type, direction_t packet_dir
} else if (packet_direction == OUTGOING) { } else if (packet_direction == OUTGOING) {
stream << " " << std::setw(6) << outgoing_total_flows_speed << " flows"; stream << " " << std::setw(6) << outgoing_total_flows_speed << " flows";
} }
if (graphite_enabled) {
graphite_data_t graphite_data;
std::string direction_as_string;
if (packet_direction == INCOMING) {
direction_as_string = "incoming";
graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] =
incoming_total_flows_speed;
} else if (packet_direction == OUTGOING) {
direction_as_string = "outgoing";
graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] =
outgoing_total_flows_speed;
}
graphite_data[graphite_prefix + ".total." + direction_as_string + ".pps"] = speed_in_pps;
graphite_data[graphite_prefix + ".total." + direction_as_string + ".bps"] = speed_in_bps * 8;
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
if (!graphite_put_result) {
logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
}
}
} }
return stream.str(); return stream.str();
@ -3011,80 +2952,6 @@ std::string draw_table_ipv4(direction_t data_direction, bool do_redis_update, so
element_number++; element_number++;
} }
graphite_data_t graphite_data;
// TODO: add graphite operations time to the config file
if (graphite_enabled) {
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin();
ii != vector_for_sort.end(); ++ii) {
uint32_t client_ip = (*ii).first;
std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
uint64_t pps = 0;
uint64_t bps = 0;
uint64_t flows = 0;
// Here we could have average or instantaneous speed
map_element_t* current_speed_element = &ii->second;
// Create polymorphic pps, byte and flow counters
if (data_direction == INCOMING) {
pps = current_speed_element->in_packets;
bps = current_speed_element->in_bytes;
flows = current_speed_element->in_flows;
} else if (data_direction == OUTGOING) {
pps = current_speed_element->out_packets;
bps = current_speed_element->out_bytes;
flows = current_speed_element->out_flows;
}
std::string direction_as_string;
if (data_direction == INCOMING) {
direction_as_string = "incoming";
} else if (data_direction == OUTGOING) {
direction_as_string = "outgoing";
}
std::string ip_as_string_with_dash_delimiters = client_ip_as_string;
// Replace dots by dashes
std::replace(ip_as_string_with_dash_delimiters.begin(),
ip_as_string_with_dash_delimiters.end(), '.', '_');
std::string graphite_current_prefix =
graphite_prefix + ".hosts." + ip_as_string_with_dash_delimiters + "." + direction_as_string;
if (print_average_traffic_counts) {
graphite_current_prefix = graphite_current_prefix + ".average";
}
// We do not store zero data to Graphite
if (pps != 0) {
graphite_data[graphite_current_prefix + ".pps"] = pps;
}
if (bps != 0) {
graphite_data[graphite_current_prefix + ".bps"] = bps * 8;
}
if (flows != 0) {
graphite_data[graphite_current_prefix + ".flows"] = flows;
}
}
}
// TODO: we should switch to piclke format instead text
// TODO: we should check packet size for Graphite
// logger << log4cpp::Priority::INFO << "We will write " << graphite_data.size() << " records to Graphite";
if (graphite_enabled) {
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
if (!graphite_put_result) {
logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
}
}
return output_buffer.str(); return output_buffer.str();
} }

@ -19,6 +19,7 @@
#include "fastnetmon_networks.hpp" #include "fastnetmon_networks.hpp"
typedef std::vector<map_element_t> vector_of_counters_t;
typedef std::map<std::string, std::string> configuration_map_t; typedef std::map<std::string, std::string> configuration_map_t;
typedef std::map<std::string, uint64_t> graphite_data_t; typedef std::map<std::string, uint64_t> graphite_data_t;

236
src/metrics/graphite.cpp Normal file

@ -0,0 +1,236 @@
#include "graphite.hpp"
#include "../fastnetmon_types.h"
#include "../fast_library.h"
#include <vector>
#include "../all_logcpp_libraries.h"
extern log4cpp::Category& logger;
extern bool print_average_traffic_counts;
extern struct timeval graphite_thread_execution_time;
extern total_counter_element_t total_speed_average_counters[4];
extern map_of_vector_counters_t SubnetVectorMapSpeed;
extern map_of_vector_counters_t SubnetVectorMapSpeedAverage;
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern map_for_subnet_counters_t PerSubnetAverageSpeedMap;
extern bool graphite_enabled;
extern std::string graphite_host;
extern unsigned short int graphite_port;
extern std::string graphite_prefix;
extern unsigned int graphite_push_period;
// Push host traffic to Graphite
bool push_hosts_traffic_counters_to_graphite() {
std::vector<direction_t> processed_directions = { INCOMING, OUTGOING };
graphite_data_t graphite_data;
map_of_vector_counters_t* current_speed_map = nullptr;
if (print_average_traffic_counts) {
current_speed_map = &SubnetVectorMapSpeedAverage;
} else {
current_speed_map = &SubnetVectorMapSpeed;
}
// Iterate over all networks
for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) {
// Iterate over all hosts in network
for (vector_of_counters_t::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
int current_index = vector_itr - itr->second.begin();
// convert to host order for math operations
uint32_t subnet_ip = ntohl(itr->first.subnet_address);
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
// covnert to our standard network byte order
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
std::string ip_as_string_with_dash_delimiters = client_ip_as_string;
// Replace dots by dashes
std::replace(ip_as_string_with_dash_delimiters.begin(), ip_as_string_with_dash_delimiters.end(), '.', '_');
// Here we could have average or instantaneous speed
map_element_t* current_speed_element = &*vector_itr;
for (auto data_direction : processed_directions) {
std::string direction_as_string;
if (data_direction == INCOMING) {
direction_as_string = "incoming";
} else if (data_direction == OUTGOING) {
direction_as_string = "outgoing";
}
std::string graphite_current_prefix = graphite_prefix + ".hosts." +
ip_as_string_with_dash_delimiters + "." + direction_as_string;
if (print_average_traffic_counts) {
graphite_current_prefix = graphite_current_prefix + ".average";
}
if (data_direction == INCOMING) {
// Prepare incoming traffic data
// We do not store zero data to Graphite
if (current_speed_element->in_packets != 0) {
graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->in_packets;
}
if (current_speed_element->in_bytes != 0) {
graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->in_bytes * 8;
}
if (current_speed_element->in_flows != 0) {
graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->in_flows;
}
} else if (data_direction == OUTGOING) {
// Prepare outgoing traffic data
// We do not store zero data to Graphite
if (current_speed_element->out_packets != 0) {
graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->out_packets;
}
if (current_speed_element->out_bytes != 0) {
graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->out_bytes * 8;
}
if (current_speed_element->out_flows != 0) {
graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->out_flows;
}
}
}
}
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
if (!graphite_put_result) {
logger << log4cpp::Priority::ERROR << "Can't store host load data to Graphite server "
<< graphite_host << " port: " << graphite_port;
return false;
}
}
return true;
}
// Push total counters to graphite
bool push_total_traffic_counters_to_graphite() {
std::vector<direction_t> directions = { INCOMING, OUTGOING, INTERNAL, OTHER };
for (auto packet_direction : directions) {
uint64_t speed_in_pps = total_speed_average_counters[packet_direction].packets;
uint64_t speed_in_bps = total_speed_average_counters[packet_direction].bytes;
graphite_data_t graphite_data;
std::string direction_as_string = get_direction_name(packet_direction);
// We have flow information only for incoming and outgoing directions
if (packet_direction == INCOMING or packet_direction == OUTGOING) {
uint64_t flow_counter_for_this_direction = 0;
if (packet_direction == INCOMING) {
flow_counter_for_this_direction = incoming_total_flows_speed;
} else {
flow_counter_for_this_direction = outgoing_total_flows_speed;
}
graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] =
flow_counter_for_this_direction;
}
graphite_data[graphite_prefix + ".total." + direction_as_string + ".pps"] = speed_in_pps;
graphite_data[graphite_prefix + ".total." + direction_as_string + ".bps"] =
speed_in_bps * 8;
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
if (!graphite_put_result) {
logger << log4cpp::Priority::ERROR << "Can't store total load data to Graphite server "
<< graphite_host << " port: " << graphite_port;
;
return false;
}
}
return true;
}
// Push per subnet traffic counters to graphite
bool push_network_traffic_counters_to_graphite() {
graphite_data_t graphite_data;
for (map_for_subnet_counters_t::iterator itr = PerSubnetAverageSpeedMap.begin(); itr != PerSubnetAverageSpeedMap.end(); ++itr) {
map_element_t* speed = &itr->second;
std::string subnet_as_string_as_dash_delimiters = convert_subnet_to_string(itr->first);
;
// Replace dots by dashes
std::replace(subnet_as_string_as_dash_delimiters.begin(), subnet_as_string_as_dash_delimiters.end(), '.', '_');
// Replace / by dashes too
std::replace(subnet_as_string_as_dash_delimiters.begin(), subnet_as_string_as_dash_delimiters.end(), '/', '_');
std::string current_prefix = graphite_prefix + ".networks." + subnet_as_string_as_dash_delimiters + ".";
graphite_data[current_prefix + "incoming.pps"] = speed->in_packets;
graphite_data[current_prefix + "outgoing.pps"] = speed->out_packets;
graphite_data[current_prefix + "incoming.bps"] = speed->in_bytes * 8;
graphite_data[current_prefix + "outgoing.bps"] = speed->out_bytes * 8;
}
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
if (!graphite_put_result) {
logger << log4cpp::Priority::ERROR << "Can't store network load data to Graphite server "
<< graphite_host << " port: " << graphite_port;
return false;
}
return true;
}
// This thread pushes speed counters to graphite
void graphite_push_thread() {
// Sleep for a half second for shift against calculatiuon thread
boost::this_thread::sleep(boost::posix_time::milliseconds(500));
while (true) {
boost::this_thread::sleep(boost::posix_time::seconds(graphite_push_period));
struct timeval start_calc_time;
gettimeofday(&start_calc_time, NULL);
// First of all push total counters to Graphite
push_total_traffic_counters_to_graphite();
// Push per subnet counters to graphite
push_network_traffic_counters_to_graphite();
// Push per host counters to graphite
push_hosts_traffic_counters_to_graphite();
struct timeval end_calc_time;
gettimeofday(&end_calc_time, NULL);
timeval_subtract(&graphite_thread_execution_time, &end_calc_time, &start_calc_time);
logger << log4cpp::Priority::DEBUG << "Graphite data pushed in: " << graphite_thread_execution_time.tv_sec
<< " sec " << graphite_thread_execution_time.tv_usec << " microseconds\n";
}
}

7
src/metrics/graphite.hpp Normal file

@ -0,0 +1,7 @@
#pragma once
void graphite_push_thread();
bool push_total_traffic_counters_to_graphite();
bool push_network_traffic_counters_to_graphite();
bool push_hosts_traffic_counters_to_graphite();