diff --git a/src/fast_library.cpp b/src/fast_library.cpp index 2a3f4e1..fe50c73 100644 --- a/src/fast_library.cpp +++ b/src/fast_library.cpp @@ -1790,13 +1790,14 @@ bool execute_web_request(std::string address, } // Write data to influxdb -bool write_data_to_influxdb(std::string database, - std::string host, - std::string port, +bool write_data_to_influxdb(const std::string& database, + const std::string& host, + const std::string& port, bool enable_auth, - std::string influx_user, - std::string influx_password, - std::string query) { + const std::string& influx_user, + const std::string& influx_password, + const std::string& query, + std::string& error_text) { uint32_t response_code = 0; std::string address = host + ":" + port; @@ -1818,7 +1819,6 @@ bool write_data_to_influxdb(std::string database, std::string response_body; std::map headers; - std::string error_text; bool result = execute_web_request(influxdb_query_string, "post", query, response_code, response_body, headers, error_text); if (!result) { @@ -1826,6 +1826,7 @@ bool write_data_to_influxdb(std::string database, } if (response_code != 204) { + error_text = "Unexpected response code: " + std::to_string(response_code); return false; } diff --git a/src/fast_library.hpp b/src/fast_library.hpp index 2dd0fbc..edd0dc5 100644 --- a/src/fast_library.hpp +++ b/src/fast_library.hpp @@ -108,13 +108,14 @@ bool read_ipv6_host_from_string(std::string ipv6_host_as_string, in6_addr& resul bool validate_ipv6_or_ipv4_host(const std::string host); uint64_t get_current_unix_time_in_nanoseconds(); -bool write_data_to_influxdb(std::string database, - std::string host, - std::string port, +bool write_data_to_influxdb(const std::string& database, + const std::string& host, + const std::string& port, bool enable_auth, - std::string influx_user, - std::string influx_password, - std::string query); + const std::string& influx_user, + const std::string& influx_password, + const std::string& query, + std::string& error_text); std::string join_by_comma_and_equal(const std::map& data); bool parse_meminfo_into_map(std::map& parsed_meminfo); diff --git a/src/metrics/influxdb.cpp b/src/metrics/influxdb.cpp index 55c9a5d..f14273b 100644 --- a/src/metrics/influxdb.cpp +++ b/src/metrics/influxdb.cpp @@ -13,32 +13,73 @@ #include extern struct timeval graphite_thread_execution_time; -extern uint64_t incoming_total_flows_speed; -extern uint64_t outgoing_total_flows_speed; -extern abstract_subnet_counters_t ipv4_network_counters; extern uint64_t influxdb_writes_total; extern uint64_t influxdb_writes_failed; -extern abstract_subnet_counters_t ipv6_host_counters; -extern abstract_subnet_counters_t ipv6_subnet_counters; -extern std::vector hostgroup_list_total_calculation; -extern std::mutex hostgroup_list_total_calculation_mutex; -extern abstract_subnet_counters_t per_hostgroup_total_counters; extern log4cpp::Category& logger; -extern total_speed_counters_t total_counters_ipv4; -extern total_speed_counters_t total_counters_ipv6; extern fastnetmon_configuration_t fastnetmon_global_configuration; // I do this declaration here to avoid circular dependencies between fastnetmon_logic and this file bool get_statistics(std::vector& system_counters); -// Push system counters to InfluxDB -bool push_system_counters_to_influxdb(std::string influx_database, - std::string influx_host, - std::string influx_port, +bool write_batch_of_data_to_influxdb(const std::string& influx_database, + const std::string& influx_host, + const std::string& influx_port, + bool enable_auth, + const std::string& influx_user, + const std::string& influx_password, + const std::string& measurement, + const std::string& tag_name, + const std::vector>>& hosts_vector, + std::string& error_text); + +// Set block of data into InfluxDB +bool write_line_of_data_to_influxdb(const std::string& influx_database, + const std::string& influx_host, + const std::string& influx_port, + bool enable_auth, + const std::string& influx_user, + const std::string& influx_password, + const std::string& measurement, + const std::map& tags, + const std::map& plain_total_counters_map, + std::string& error_text); + +// Prepare string to insert data into InfluxDB +std::string craft_line_for_influxdb_line_protocol(uint64_t unix_timestamp_nanoseconds, + const std::string& measurement, + const std::map& tags, + const std::map& plain_total_counters_map); + +void fill_fixed_counters_for_influxdb(const subnet_counter_t& counter, + std::map& plain_total_counters_map, + bool populate_flow); + +bool push_system_counters_to_influxdb(const std::string& influx_database, + const std::string& influx_host, + const std::string& influx_port, bool enable_auth, - std::string influx_user, - std::string influx_password) { + const std::string& influx_user, + const std::string& influx_password); + +bool push_total_traffic_counters_to_influxdb(const std::string& influx_database, + const std::string& influx_host, + const std::string& influx_port, + bool enable_auth, + const std::string& influx_user, + const std::string& influx_password, + const std::string& measurement_name, + total_counter_element_t total_speed_average_counters_param[4], + bool ipv6); + + +// Push system counters to InfluxDB +bool push_system_counters_to_influxdb(const std::string& influx_database, + const std::string& influx_host, + const std::string& influx_port, + bool enable_auth, + const std::string& influx_user, + const std::string& influx_password) { std::vector system_counters; bool result = get_statistics(system_counters); @@ -58,12 +99,15 @@ bool push_system_counters_to_influxdb(std::string influx_database, std::map tags = { { "metric", "metric_value" } }; - bool influx_result = write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, - influx_password, "system_counters", tags, plain_total_counters_map); + std::string error_text; + + bool influx_result = + write_line_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, + influx_password, "system_counters", tags, plain_total_counters_map, error_text); if (!influx_result) { influxdb_writes_failed++; - logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for system counters"; + logger << log4cpp::Priority::DEBUG << "InfluxDB write operation failed for system counters with error " << error_text; return false; } @@ -72,17 +116,20 @@ bool push_system_counters_to_influxdb(std::string influx_database, // Push total traffic counters to InfluxDB -bool push_total_traffic_counters_to_influxdb(std::string influx_database, - std::string influx_host, - std::string influx_port, +bool push_total_traffic_counters_to_influxdb(const std::string& influx_database, + const std::string& influx_host, + const std::string& influx_port, bool enable_auth, - std::string influx_user, - std::string influx_password, - std::string measurement_name, + const std::string& influx_user, + const std::string& influx_password, + const std::string& measurement_name, total_counter_element_t total_speed_average_counters_param[4], bool ipv6) { std::vector directions = { INCOMING, OUTGOING, INTERNAL, OTHER }; + extern uint64_t incoming_total_flows_speed; + extern uint64_t outgoing_total_flows_speed; + for (auto packet_direction : directions) { std::map plain_total_counters_map; @@ -284,6 +331,11 @@ push_network_traffic_counters_to_influxdb(abstract_subnet_counters_t& netw // This thread pushes data to InfluxDB void influxdb_push_thread() { extern abstract_subnet_counters_t ipv4_host_counters; + extern abstract_subnet_counters_t ipv4_network_counters; + extern abstract_subnet_counters_t ipv6_host_counters; + extern abstract_subnet_counters_t ipv6_subnet_counters; + extern total_speed_counters_t total_counters_ipv4; + extern total_speed_counters_t total_counters_ipv6; // Sleep for a half second for shift against calculation thread boost::this_thread::sleep(boost::posix_time::milliseconds(500)); @@ -394,7 +446,8 @@ bool write_line_of_data_to_influxdb(std::string influx_database, std::string influx_password, std::string measurement, std::map& tags, - std::map& plain_total_counters_map) { + std::map& plain_total_counters_map, + std::string& error_text) { uint64_t unix_timestamp_nanoseconds = get_current_unix_time_in_nanoseconds(); auto influxdb_line = @@ -402,7 +455,7 @@ bool write_line_of_data_to_influxdb(std::string influx_database, // logger << log4cpp::Priority::INFO << "Raw data to InfluxDB: " << buffer.str(); - return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password, influxdb_line); + return write_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, influx_password, influxdb_line, error_text); } // Fills special structure which we use to export metrics into InfluxDB