diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5b48dbc0..ce8d14b4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -204,6 +204,9 @@ add_library(patricia STATIC libpatricia/patricia.c) add_library(graphite_metrics STATIC metrics/graphite.cpp) target_link_libraries(fastnetmon graphite_metrics) +# InfluxDB metrics +add_library(influxdb_metrics STATIC metrics/influxdb.cpp) +target_link_libraries(fastnetmon influxdb_metrics) add_library(fastnetmon_pcap_format STATIC fastnetmon_pcap_format.cpp) diff --git a/src/fast_library.cpp b/src/fast_library.cpp index dfcdca04..cc8a173d 100644 --- a/src/fast_library.cpp +++ b/src/fast_library.cpp @@ -1883,3 +1883,21 @@ uint64_t get_current_unix_time_in_nanoseconds() { return unix_timestamp_nanoseconds; } +// Joins data to format a=b,d=f +std::string join_by_comma_and_equal(std::map& data) { + std::stringstream buffer; + + for (auto itr = data.begin(); itr != data.end(); ++itr) { + buffer << itr->first << "=" << itr->second; + + // it's last element + if (std::distance(itr, data.end()) == 1) { + // Do not print comma + } else { + buffer << ","; + } + } + + return buffer.str(); +} + diff --git a/src/fast_library.h b/src/fast_library.h index c9aab1d5..b947a8d2 100644 --- a/src/fast_library.h +++ b/src/fast_library.h @@ -123,3 +123,14 @@ bool set_boost_process_name(boost::thread* thread, std::string process_name); bool convert_string_to_positive_integer_safe(std::string line, int& value); bool read_ipv6_host_from_string(std::string ipv6_host_as_string, in6_addr& result); bool validate_ipv6_or_ipv4_host(const std::string host); +uint64_t get_current_unix_time_in_nanoseconds(); + +bool write_data_to_influxdb(std::string database, + std::string host, + std::string port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string query); + +std::string join_by_comma_and_equal(std::map& data); diff --git a/src/fastnetmon.conf b/src/fastnetmon.conf index 93a1c4ee..ebfa0080 100644 --- a/src/fastnetmon.conf +++ b/src/fastnetmon.conf @@ -261,6 +261,20 @@ graphite_port = 2003 # Default namespace for Graphite data graphite_prefix = fastnetmon +# Before using InfluxDB you need to create database using influx tool: +# create database fastnetmon + +# InfluxDB +influxdb = off +influxdb_host = 127.0.0.1 +influxdb_port = 8086 +influxdb_database = fastnetmon + +# InfluxDB auth +influxdb_auth = off +influxdb_user = fastnetmon +influxdb_password = secure + # Add local IP addresses and aliases to monitoring list # Works only for Linux monitor_local_ip_addresses = on diff --git a/src/fastnetmon.cpp b/src/fastnetmon.cpp index f6fc861e..7fe7ffd4 100644 --- a/src/fastnetmon.cpp +++ b/src/fastnetmon.cpp @@ -115,6 +115,7 @@ #include "ban_list.hpp" #include "metrics/graphite.hpp" +#include "metrics/influxdb.hpp" #ifdef FASTNETMON_API using fastmitigation::BanListReply; @@ -449,6 +450,23 @@ struct timeval graphite_thread_execution_time; // Default graphite namespace std::string graphite_prefix = "fastnetmon"; + +// Total number of InfluxDB writes +uint64_t influxdb_writes_total = 0; + +// Total number of failed InfluxDB writes +uint64_t influxdb_writes_failed = 0; + +// InfluxDB +bool influxdb_enabled = false; +std::string influxdb_database = "fastnetmon"; +std::string influxdb_host = "127.0.0.1"; +unsigned short int influxdb_port = 8086; +bool influxdb_auth = false; +std::string influxdb_user = ""; +std::string influxdb_password = ""; +unsigned int influxdb_push_period = 1; + bool process_incoming_traffic = true; bool process_outgoing_traffic = true; @@ -825,8 +843,33 @@ bool load_configuration_file() { graphite_port = convert_string_to_integer(configuration_map["graphite_port"]); } - if (configuration_map.count("graphite_number_of_ips") != 0) { - logger << log4cpp::Priority::ERROR << "Sorry, you have used deprecated function graphite_number_of_ips"; + // InfluxDB + if (configuration_map.count("influxdb") != 0) { + influxdb_enabled = configuration_map["influxdb"] == "on" ? true : false; + } + + if (configuration_map.count("influxdb_port") != 0) { + influxdb_port = convert_string_to_integer(configuration_map["influxdb_port"]); + } + + if (configuration_map.count("influxdb_host") != 0) { + influxdb_host = configuration_map["influxdb_host"]; + } + + if (configuration_map.count("influxdb_database") != 0) { + influxdb_database = configuration_map["influxdb_database"]; + } + + if (configuration_map.count("influxdb_auth") != 0) { + influxdb_auth = configuration_map["influxdb_auth"] == "on" ? true : false; + } + + if (configuration_map.count("influxdb_user") != 0) { + influxdb_user = configuration_map["influxdb_user"]; + } + + if (configuration_map.count("influxdb_password") != 0) { + influxdb_password = configuration_map["influxdb_password"]; } if (configuration_map.count("process_incoming_traffic") != 0) { @@ -1651,6 +1694,11 @@ int main(int argc, char** argv) { service_thread_group.add_thread(new boost::thread(graphite_push_thread)); } + // InfluxDB export thread + if (influxdb_enabled) { + service_thread_group.add_thread(new boost::thread(influxdb_push_thread)); + } + // start thread for recalculating speed in realtime service_thread_group.add_thread(new boost::thread(recalculate_speed_thread_handler)); diff --git a/src/fastnetmon_logic.cpp b/src/fastnetmon_logic.cpp index a68c11d0..8594f2d9 100644 --- a/src/fastnetmon_logic.cpp +++ b/src/fastnetmon_logic.cpp @@ -50,6 +50,9 @@ #include "ban_list.hpp" + +extern uint64_t influxdb_writes_total; +extern uint64_t influxdb_writes_failed; extern packet_buckets_storage_t packet_buckets_ipv6_storage; extern bool print_average_traffic_counts; extern std::string cli_stats_file_path; @@ -3631,3 +3634,23 @@ bool should_remove_orphaned_bucket(const std::pair& system_counters) { + system_counters.push_back(system_counter_t("total_simple_packets_processed", total_simple_packets_processed)); + system_counters.push_back(system_counter_t("total_ipv4_packets", total_ipv4_packets)); + system_counters.push_back(system_counter_t("total_ipv6_packets", total_ipv6_packets)); + + system_counters.push_back(system_counter_t("non_ip_packets", non_ip_packets)); + + system_counters.push_back(system_counter_t("total_unparsed_packets_speed", total_unparsed_packets_speed)); + system_counters.push_back(system_counter_t("total_unparsed_packets", total_unparsed_packets)); + + system_counters.push_back(system_counter_t("speed_recalculation_time_seconds", speed_calculation_time.tv_sec)); + system_counters.push_back(system_counter_t("speed_recalculation_time_microseconds", speed_calculation_time.tv_usec)); + + system_counters.push_back(system_counter_t("total_number_of_hosts", total_number_of_hosts_in_our_networks)); + + system_counters.push_back(system_counter_t("influxdb_writes_total", influxdb_writes_total)); + system_counters.push_back(system_counter_t("influxdb_writes_failed", influxdb_writes_failed)); + + return true; +} diff --git a/src/fastnetmon_types.h b/src/fastnetmon_types.h index 14cca3aa..a457dbcc 100644 --- a/src/fastnetmon_types.h +++ b/src/fastnetmon_types.h @@ -59,6 +59,18 @@ enum class attack_detection_threshold_type_t { }; +// Here we store different counters +class system_counter_t { + public: + system_counter_t(std::string counter_name, uint64_t counter_value) { + this->counter_name = counter_name; + this->counter_value = counter_value; + } + std::string counter_name; + uint64_t counter_value = 0; +}; + + /* Class for custom comparison fields by different fields */ template class TrafficComparatorClass { private: diff --git a/src/metrics/influxdb.cpp b/src/metrics/influxdb.cpp new file mode 100644 index 00000000..72008a7c --- /dev/null +++ b/src/metrics/influxdb.cpp @@ -0,0 +1,478 @@ +#include "influxdb.hpp" + +#include "../fastnetmon_types.h" +#include "../fast_library.h" +#include "../abstract_subnet_counters.hpp" + +#include "../all_logcpp_libraries.h" + +#include + +extern bool print_average_traffic_counts; +extern struct timeval graphite_thread_execution_time; +extern total_counter_element_t total_speed_average_counters[4]; +extern map_of_vector_counters_t SubnetVectorMapSpeed; +extern map_of_vector_counters_t SubnetVectorMapSpeedAverage; +extern uint64_t incoming_total_flows_speed; +extern uint64_t outgoing_total_flows_speed; +extern map_for_subnet_counters_t PerSubnetAverageSpeedMap; +extern uint64_t influxdb_writes_total; +extern uint64_t influxdb_writes_failed; +extern total_counter_element_t total_speed_average_counters_ipv6[4]; +extern abstract_subnet_counters_t ipv6_host_counters; +extern abstract_subnet_counters_t ipv4_host_counters; +extern abstract_subnet_counters_t ipv4_remote_host_counters; +extern std::vector hostgroup_list_total_calculation; +extern std::mutex hostgroup_list_total_calculation_mutex; +extern abstract_subnet_counters_t per_hostgroup_total_counters; +extern log4cpp::Category& logger; + +extern std::string influxdb_database; +extern std::string influxdb_host; +extern unsigned short int influxdb_port; +extern bool influxdb_auth; +extern std::string influxdb_user; +extern std::string influxdb_password; +extern unsigned int influxdb_push_period; + +// I do this delcaration here to avoid circuclar dependencies between fastnetmon_logic and this file +bool get_statistics(std::vector& system_counters); + +// Push system counters to InfluxDB +bool push_system_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password) { + std::vector system_counters; + + bool result = get_statistics(system_counters); + + if (!result) { + logger << log4cpp::Priority::ERROR << "Can't collect system counters"; + return false; + } + + std::map plain_total_counters_map; + + for (auto counter : system_counters) { + plain_total_counters_map[counter.counter_name] = counter.counter_value; + } + + influxdb_writes_total++; + + std::map tags = { { "metric", "metric_value" } }; + + bool influx_result = write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, + influx_password, "system_counters", tags, plain_total_counters_map); + + if (!influx_result) { + influxdb_writes_failed++; + logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for system counters"; + return false; + } + + return true; +} + + + +// Push total traffic counters to InfluxDB +bool push_total_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string measurement_name, + total_counter_element_t total_speed_average_counters_param[4], + bool ipv6) { + std::vector directions = { INCOMING, OUTGOING, INTERNAL, OTHER }; + + for (auto packet_direction : directions) { + std::map plain_total_counters_map; + + uint64_t speed_in_pps = total_speed_average_counters_param[packet_direction].packets; + uint64_t speed_in_bits_per_second = total_speed_average_counters_param[packet_direction].bytes * 8; + + // We do not have this counter for IPv6 + if (!ipv6) { + // We have flow information only for incoming and outgoing directions + if (packet_direction == INCOMING or packet_direction == OUTGOING) { + uint64_t flow_counter_for_this_direction = 0; + + if (packet_direction == INCOMING) { + flow_counter_for_this_direction = incoming_total_flows_speed; + } else { + flow_counter_for_this_direction = outgoing_total_flows_speed; + } + + plain_total_counters_map["flows"] = flow_counter_for_this_direction; + } + } + + plain_total_counters_map["packets"] = speed_in_pps; + plain_total_counters_map["bits"] = speed_in_bits_per_second; + + std::string direction_as_string = get_direction_name(packet_direction); + + influxdb_writes_total++; + + std::map tags = { { "direction", direction_as_string } }; + + bool result = write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, + influx_password, measurement_name, tags, plain_total_counters_map); + + if (!result) { + influxdb_writes_failed++; + logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for total_traffic"; + return false; + } + } + + + return true; +} + +// This thread pushes data to InfluxDB +void influxdb_push_thread() { + // Sleep for a half second for shift against calculation thread + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + + bool do_dns_resolution = false; + + // If address does not look like IPv4 or IPv6 then we will use DNS resolution for it + if (!validate_ipv6_or_ipv4_host(influxdb_host)) { + logger << log4cpp::Priority::INFO << "You set InfluxDB server address as hostname and we will use DNS to resolve it"; + do_dns_resolution = true; + } + + while (true) { + boost::this_thread::sleep(boost::posix_time::seconds(influxdb_push_period)); + + std::string current_influxdb_ip_address = ""; + + if (do_dns_resolution) { + std::string ip_address = dns_lookup(influxdb_host); + + if (ip_address.empty()) { + logger << log4cpp::Priority::ERROR << "Cannot resolve " << influxdb_host << " to address"; + continue; + } + + logger << log4cpp::Priority::DEBUG << "Resolved " << influxdb_host << " to " << ip_address; + + current_influxdb_ip_address = ip_address; + } else { + // We do not need DNS resolution here, use address as is + current_influxdb_ip_address = influxdb_host; + } + + // First of all push total counters to InfluxDB + push_total_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth, influxdb_user, + influxdb_password, "total_traffic", total_speed_average_counters, false); + + // Push per subnet counters to InfluxDB + push_network_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), + influxdb_auth, influxdb_user, influxdb_password); + + // Push per host counters to InfluxDB + push_hosts_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth, + influxdb_user, influxdb_password); + + push_system_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth, + influxdb_user, influxdb_password); + + // Push per host IPv6 counters to InfluxDB + push_hosts_ipv6_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), + influxdb_auth, influxdb_user, influxdb_password); + + // Push total IPv6 counters + push_total_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), + influxdb_auth, influxdb_user, influxdb_password, "total_traffic_ipv6", + total_speed_average_counters_ipv6, true); + } +} + + +// Push host traffic to InfluxDB +bool push_hosts_ipv6_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password) { + std::vector> speed_elements; + + // TODO: preallocate memory here for this array to avoid memory allocations under the lock + ipv6_host_counters.get_all_non_zero_average_speed_elements_as_pairs(speed_elements); + + // Structure for InfluxDB + std::vector>> hosts_vector; + + for (const auto& speed_element : speed_elements) { + std::map plain_total_counters_map; + + std::string client_ip_as_string = print_ipv6_address(speed_element.first.subnet_address); + + fill_main_counters_for_influxdb(&speed_element.second, plain_total_counters_map, true); + + hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map)); + } + + // TODO: For big networks it will cause HUGE batches, it will make sense to split them in 5-10k batches + if (hosts_vector.size() > 0) { + influxdb_writes_total++; + + bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, + influx_password, "hosts_ipv6_traffic", "host", hosts_vector); + + if (!result) { + influxdb_writes_failed++; + logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic"; + return false; + } + } + + return true; +} + + +// Push host traffic to InfluxDB +bool push_hosts_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password) { + /* https://docs.influxdata.com/influxdb/v1.7/concepts/glossary/: + A collection of points in line protocol format, separated by newlines (0x0A). A batch of points may be submitted to + the database using a single HTTP request to the write endpoint. This makes writes via the HTTP API much more + performant by drastically reducing the HTTP overhead. InfluxData recommends batch sizes of 5,000-10,000 points, + although different use cases may be better served by significantly smaller or larger batches. + */ + + map_of_vector_counters_t* current_speed_map = nullptr; + + if (print_average_traffic_counts) { + current_speed_map = &SubnetVectorMapSpeedAverage; + } else { + current_speed_map = &SubnetVectorMapSpeed; + } + + // Iterate over all networks + for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) { + std::vector>> hosts_vector; + + // Iterate over all hosts in network + for (vector_of_counters_t::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) { + std::map plain_total_counters_map; + + int current_index = vector_itr - itr->second.begin(); + + // Convert to host order for math operations + uint32_t subnet_ip = ntohl(itr->first.subnet_address); + uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index; + + // Convert to our standard network byte order + uint32_t client_ip = htonl(client_ip_in_host_bytes_order); + + std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); + + // Here we could have average or instantaneous speed + map_element_t* current_speed_element = &*vector_itr; + + // Skip elements with zero speed + if (current_speed_element->is_zero()) { + continue; + } + + fill_main_counters_for_influxdb(current_speed_element, plain_total_counters_map, true); + + // Key: client_ip_as_string + hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map)); + } + + if (hosts_vector.size() > 0) { + bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, + influx_password, "hosts_traffic", "host", hosts_vector); + + if (!result) { + influxdb_writes_failed++; + logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic"; + return false; + } + } + } + + return true; +} + +// Write batch of data for particular InfluxDB database +bool write_batch_of_data_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string measurement, + std::string tag_name, + std::vector>>& hosts_vector) { + // Nothing to write + if (hosts_vector.size() == 0) { + return true; + } + + std::stringstream buffer; + uint64_t unix_timestamp_nanoseconds = get_current_unix_time_in_nanoseconds(); + + // Prepare batch for insert + for (auto& host_traffic : hosts_vector) { + std::map tags = { { tag_name, host_traffic.first } }; + + std::string line_protocol_format = + craft_line_for_influxdb_line_protocol(unix_timestamp_nanoseconds, measurement, tags, host_traffic.second); + + buffer << line_protocol_format << "\n"; + } + + // logger << log4cpp::Priority::INFO << "Raw data to InfluxDB: " << buffer.str(); + return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password, + buffer.str()); +} + +// Push per subnet traffic counters to influxDB +bool push_network_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password) { + for (map_for_subnet_counters_t::iterator itr = PerSubnetAverageSpeedMap.begin(); itr != PerSubnetAverageSpeedMap.end(); ++itr) { + std::map plain_total_counters_map; + + map_element_t* speed = &itr->second; + std::string subnet_as_string = convert_subnet_to_string(itr->first); + + fill_main_counters_for_influxdb(speed, plain_total_counters_map, false); + + influxdb_writes_total++; + + std::map tags = { { "network", subnet_as_string } }; + + bool result = write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, + influx_password, "networks_traffic", tags, plain_total_counters_map); + + if (!result) { + influxdb_writes_failed++; + logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for networks_traffic"; + return false; + } + } + + + return true; +} + + +// Set block of data into InfluxDB +bool write_line_of_data_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string measurement, + std::map& tags, + std::map& plain_total_counters_map) { + uint64_t unix_timestamp_nanoseconds = get_current_unix_time_in_nanoseconds(); + + auto influxdb_line = + craft_line_for_influxdb_line_protocol(unix_timestamp_nanoseconds, measurement, tags, plain_total_counters_map); + + // logger << log4cpp::Priority::INFO << "Raw data to InfluxDB: " << buffer.str(); + + return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password, influxdb_line); +} + +// Fills special structure which we use to export metrics into InfluxDB +void fill_per_protocol_countres_for_influxdb(const map_element_t* current_speed_element, + std::map& plain_total_counters_map) { + plain_total_counters_map["fragmented_packets_incoming"] = current_speed_element->fragmented_in_packets; + plain_total_counters_map["tcp_packets_incoming"] = current_speed_element->tcp_in_packets; + plain_total_counters_map["tcp_syn_packets_incoming"] = current_speed_element->tcp_syn_in_packets; + plain_total_counters_map["udp_packets_incoming"] = current_speed_element->udp_in_packets; + plain_total_counters_map["icmp_packets_incoming"] = current_speed_element->icmp_in_packets; + + plain_total_counters_map["fragmented_bits_incoming"] = current_speed_element->fragmented_in_bytes * 8; + plain_total_counters_map["tcp_bits_incoming"] = current_speed_element->tcp_in_bytes * 8; + plain_total_counters_map["tcp_syn_bits_incoming"] = current_speed_element->tcp_syn_in_bytes * 8; + plain_total_counters_map["udp_bits_incoming"] = current_speed_element->udp_in_bytes * 8; + plain_total_counters_map["icmp_bits_incoming"] = current_speed_element->icmp_in_bytes * 8; + + + // Outgoing + plain_total_counters_map["fragmented_packets_outgoing"] = current_speed_element->fragmented_out_packets; + plain_total_counters_map["tcp_packets_outgoing"] = current_speed_element->tcp_out_packets; + plain_total_counters_map["tcp_syn_packets_outgoing"] = current_speed_element->tcp_syn_out_packets; + plain_total_counters_map["udp_packets_outgoing"] = current_speed_element->udp_out_packets; + plain_total_counters_map["icmp_packets_outgoing"] = current_speed_element->icmp_out_packets; + + plain_total_counters_map["fragmented_bits_outgoing"] = current_speed_element->fragmented_out_bytes * 8; + plain_total_counters_map["tcp_bits_outgoing"] = current_speed_element->tcp_out_bytes * 8; + plain_total_counters_map["tcp_syn_bits_outgoing"] = current_speed_element->tcp_syn_out_bytes * 8; + plain_total_counters_map["udp_bits_outgoing"] = current_speed_element->udp_out_bytes * 8; + plain_total_counters_map["icmp_bits_outgoing"] = current_speed_element->icmp_out_bytes * 8; +} + +// Fills special structure which we use to export metrics into InfluxDB +void fill_main_counters_for_influxdb(const map_element_t* current_speed_element, + std::map& plain_total_counters_map, + bool populate_flow) { + // Prepare incoming traffic data + plain_total_counters_map["packets_incoming"] = current_speed_element->in_packets; + plain_total_counters_map["bits_incoming"] = current_speed_element->in_bytes * 8; + + // Outdoing traffic + plain_total_counters_map["packets_outgoing"] = current_speed_element->out_packets; + plain_total_counters_map["bits_outgoing"] = current_speed_element->out_bytes * 8; + + if (populate_flow) { + plain_total_counters_map["flows_incoming"] = current_speed_element->in_flows; + plain_total_counters_map["flows_outgoing"] = current_speed_element->out_flows; + } +} + + +// Prepare string to insert data into InfluxDB +std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds, + std::string measurement, + std::map& tags, + std::map& plain_total_counters_map) { + std::stringstream buffer; + buffer << measurement << ","; + + // tag set section + buffer << join_by_comma_and_equal(tags); + + buffer << " "; + + // field set section + for (auto itr = plain_total_counters_map.begin(); itr != plain_total_counters_map.end(); ++itr) { + buffer << itr->first << "=" << std::to_string(itr->second); + + // it's last element + if (std::distance(itr, plain_total_counters_map.end()) == 1) { + // Do not print comma + } else { + buffer << ","; + } + } + + buffer << " " << std::to_string(unix_timestamp_nanoseconds); + + return buffer.str(); +} + + diff --git a/src/metrics/influxdb.hpp b/src/metrics/influxdb.hpp new file mode 100644 index 00000000..39fa69f6 --- /dev/null +++ b/src/metrics/influxdb.hpp @@ -0,0 +1,92 @@ +#pragma once + +#include +#include + +#include "../fastnetmon_types.h" + +bool push_system_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password); + + bool push_total_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string measurement_name, + total_counter_element_t total_speed_average_counters_param[4], + bool ipv6); + + void send_grafana_alert(std::string title, std::string text, std::vector& tags) ; + + void influxdb_push_thread(); + + bool push_hosts_ipv6_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password); + + bool push_hosts_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password) ; + + bool push_hostgroup_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password); + + bool write_batch_of_data_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string measurement, + std::string tag_name, + std::vector>>& hosts_vector); + + + bool push_network_traffic_counters_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password); + + +// Set block of data into InfluxDB +bool write_line_of_data_to_influxdb(std::string influx_database, + std::string influx_host, + std::string influx_port, + bool enable_auth, + std::string influx_user, + std::string influx_password, + std::string measurement, + std::map& tags, + std::map& plain_total_counters_map); + +void fill_per_protocol_countres_for_influxdb(const map_element_t* current_speed_element, std::map& plain_total_counters_map); + +void fill_main_counters_for_influxdb(const map_element_t* current_speed_element, + std::map& plain_total_counters_map, + bool populate_flow); + +// Prepare string to insert data into InfluxDB +std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds, + std::string measurement, + std::map& tags, + std::map& plain_total_counters_map); + +