From 4a052e9a8c349219250611aaea00d2b5c6969795 Mon Sep 17 00:00:00 2001 From: Pavel Odintsov Date: Wed, 12 Jul 2023 12:41:32 +0100 Subject: [PATCH] Reworked FastNetMon to use high efficient hash based counters to store per host traffic --- src/api.hpp | 89 ++--- src/fastnetmon.cpp | 65 +--- src/fastnetmon_logic.cpp | 692 +++---------------------------------- src/metrics/graphite.cpp | 103 +----- src/metrics/graphite.hpp | 1 - src/metrics/influxdb.cpp | 80 +---- src/notify_about_attack.sh | 7 - 7 files changed, 75 insertions(+), 962 deletions(-) diff --git a/src/api.hpp b/src/api.hpp index 21dd837..46f9612 100644 --- a/src/api.hpp +++ b/src/api.hpp @@ -4,35 +4,22 @@ Status FastnetmonApiServiceImpl::GetBanlist(::grpc::ServerContext* context, ::grpc::ServerWriter<::fastmitigation::BanListReply>* writer) { extern blackhole_ban_list_t ban_list_ipv6; extern blackhole_ban_list_t ban_list_ipv4; - extern bool hash_counters; logger << log4cpp::Priority::INFO << "API we asked for banlist"; // IPv4 - if (hash_counters) { - std::map ban_list_ipv4_copy; + std::map ban_list_ipv4_copy; - // Get whole ban list content atomically - ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy); + // Get whole ban list content atomically + ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy); - for (auto itr : ban_list_ipv4_copy) { - BanListReply reply; - - reply.set_ip_address(convert_ip_as_uint_to_string(itr.first) + "/32"); - - writer->Write(reply); - } - } else { - - for (auto itr = ban_list.begin(); itr != ban_list.end(); ++itr) { - std::string client_ip_as_string = convert_ip_as_uint_to_string(itr->first); - - BanListReply reply; - reply.set_ip_address(client_ip_as_string + "/32"); - writer->Write(reply); - } + for (auto itr : ban_list_ipv4_copy) { + BanListReply reply; + reply.set_ip_address(convert_ip_as_uint_to_string(itr.first) + "/32"); + + writer->Write(reply); } // IPv6 @@ -56,7 +43,6 @@ Status FastnetmonApiServiceImpl::ExecuteBan(ServerContext* context, fastmitigation::ExecuteBanReply* reply) { extern blackhole_ban_list_t ban_list_ipv6; extern blackhole_ban_list_t ban_list_ipv4; - extern bool hash_counters; logger << log4cpp::Priority::INFO << "API we asked for ban for IP: " << request->ip_address(); @@ -110,21 +96,7 @@ Status FastnetmonApiServiceImpl::ExecuteBan(ServerContext* context, return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "This IP does not belong to our subnets"); } - if (hash_counters) { - ban_list_ipv4.add_to_blackhole(client_ip, current_attack); - } else { - - { - std::lock_guard lock_guard(ban_list_mutex); - ban_list[client_ip] = current_attack; - } - - { - std::lock_guard lock_guard(ban_list_details_mutex); - ban_list_details[client_ip] = std::vector(); - } - - } + ban_list_ipv4.add_to_blackhole(client_ip, current_attack); } else { bool parsed_ipv6 = read_ipv6_host_from_string(request->ip_address(), ipv6_address.subnet_address); @@ -155,7 +127,6 @@ Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context, fastmitigation::ExecuteBanReply* reply) { extern blackhole_ban_list_t ban_list_ipv6; extern blackhole_ban_list_t ban_list_ipv4; - extern bool hash_counters; logger << log4cpp::Priority::INFO << "API: We asked for unban for IP: " << request->ip_address(); @@ -189,38 +160,20 @@ Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context, return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Can't parse IPv4 address"); } - if (hash_counters) { - bool is_blackholed_ipv4 = ban_list_ipv4.is_blackholed(client_ip); + bool is_blackholed_ipv4 = ban_list_ipv4.is_blackholed(client_ip); - if (!is_blackholed_ipv4) { - logger << log4cpp::Priority::ERROR << "API: Could not find IPv4 address in ban list"; - return Status::CANCELLED; - } - - bool get_details = ban_list_ipv4.get_blackhole_details(client_ip, current_attack); - - if (!get_details) { - return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Could not get IPv4 blackhole details"); - } - - ban_list_ipv4.remove_from_blackhole(client_ip); - } else { - if (ban_list.count(client_ip) == 0) { - logger << log4cpp::Priority::ERROR << "API: Could not find IP in ban list"; - return Status::CANCELLED; - } - - current_attack = ban_list[client_ip]; - - logger << log4cpp::Priority::INFO << "API: call unban handlers"; - - logger << log4cpp::Priority::INFO << "API: remove IP from ban list"; - - { - std::lock_guard lock_guard(ban_list_mutex); - ban_list.erase(client_ip); - } + if (!is_blackholed_ipv4) { + logger << log4cpp::Priority::ERROR << "API: Could not find IPv4 address in ban list"; + return Status::CANCELLED; } + + bool get_details = ban_list_ipv4.get_blackhole_details(client_ip, current_attack); + + if (!get_details) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Could not get IPv4 blackhole details"); + } + + ban_list_ipv4.remove_from_blackhole(client_ip); } else { bool parsed_ipv6 = read_ipv6_host_from_string(request->ip_address(), ipv6_address.subnet_address); diff --git a/src/fastnetmon.cpp b/src/fastnetmon.cpp index 4b80d94..ab14cff 100644 --- a/src/fastnetmon.cpp +++ b/src/fastnetmon.cpp @@ -139,8 +139,6 @@ cppkafka::Producer* kafka_traffic_export_producer = nullptr; // Traffic export to Kafka bool kafka_traffic_export = false; -bool hash_counters = false; - std::string kafka_traffic_export_topic = "fastnetmon"; kafka_traffic_export_format_t kafka_traffic_export_format = kafka_traffic_export_format_t::JSON; std::vector kafka_traffic_export_brokers; @@ -373,8 +371,6 @@ uint64_t our_ipv6_packets = 0; uint64_t incoming_total_flows_speed = 0; uint64_t outgoing_total_flows_speed = 0; -map_of_vector_counters_t SubnetVectorMap; - // Network counters for IPv6 abstract_subnet_counters_t ipv6_subnet_counters; @@ -396,20 +392,8 @@ int64_t netflow_ipfix_all_protocols_total_flows_speed = 0; std::string sflow_raw_packet_headers_total_speed_desc = "Number of sFlow headers per second"; int64_t sflow_raw_packet_headers_total_speed = 0; -/* End of our data structs */ -std::mutex ban_list_details_mutex; -std::mutex ban_list_mutex; std::mutex flow_counter_mutex; -// map for flows -std::map FlowCounter; - -// Struct for string speed per IP -map_of_vector_counters_t SubnetVectorMapSpeed; - -// Struct for storing average speed per IP for specified interval -map_of_vector_counters_t SubnetVectorMapSpeedAverage; - #ifdef GEOIP map_for_counters GeoIpCounter; #endif @@ -420,10 +404,6 @@ blackhole_ban_list_t ban_list_ipv6; // Banned IPv4 hosts blackhole_ban_list_t ban_list_ipv4; -// In ddos info we store attack power and direction -std::map ban_list; -std::map> ban_list_details; - host_group_map_t host_groups; // Here we store assignment from subnet to certain host group for fast lookup @@ -1132,43 +1112,18 @@ void subnet_vectors_allocator(prefix_t* prefix, void* data) { uint32_t result_ip_as_big_endian = fast_hton(ip_as_little_endian); // logger << log4cpp::Priority::INFO << "Allocate: " << convert_ip_as_uint_to_string(result_ip_as_big_endian); - - if (hash_counters) { - - // We use big endian values as keys - try { - ipv4_host_counters.average_speed_map[result_ip_as_big_endian] = zero_map_element; - ipv4_host_counters.counter_map[result_ip_as_big_endian] = zero_map_element; - } catch (std::bad_alloc& ba) { - logger << log4cpp::Priority::ERROR << "Can't allocate memory for hash counters"; - exit(1); - } + // We use big endian values as keys + try { + ipv4_host_counters.average_speed_map[result_ip_as_big_endian] = zero_map_element; + ipv4_host_counters.counter_map[result_ip_as_big_endian] = zero_map_element; + } catch (std::bad_alloc& ba) { + logger << log4cpp::Priority::ERROR << "Can't allocate memory for hash counters"; + exit(1); } } - if (hash_counters) { - logger << log4cpp::Priority::INFO << "Successfully allocated " << ipv4_host_counters.average_speed_map.size() << " counters"; - } - - // Initialize our counters with fill constructor - try { - SubnetVectorMap[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element); - SubnetVectorMapSpeed[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element); - SubnetVectorMapSpeedAverage[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element); - } catch (std::bad_alloc& ba) { - logger << log4cpp::Priority::ERROR << "Can't allocate memory for counters"; - exit(1); - } -} - -void zeroify_all_counters() { - subnet_counter_t zero_map_element{}; - - for (map_of_vector_counters_t::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) { - // logger<< log4cpp::Priority::INFO<<"Zeroify "<first; - std::fill(itr->second.begin(), itr->second.end(), zero_map_element); - } + logger << log4cpp::Priority::INFO << "Successfully allocated " << ipv4_host_counters.average_speed_map.size() << " counters"; } bool load_our_networks_list() { @@ -1337,10 +1292,6 @@ bool load_our_networks_list() { /* Preallocate data structures */ patricia_process(lookup_tree_ipv4, subnet_vectors_allocator); - logger << log4cpp::Priority::INFO << "We start total zerofication of counters"; - zeroify_all_counters(); - logger << log4cpp::Priority::INFO << "We finished zerofication"; - logger << log4cpp::Priority::INFO << "We loaded " << networks_list_ipv4_as_string.size() << " IPv4 subnets to our in-memory list of networks"; diff --git a/src/fastnetmon_logic.cpp b/src/fastnetmon_logic.cpp index c2b45e6..eee8f39 100644 --- a/src/fastnetmon_logic.cpp +++ b/src/fastnetmon_logic.cpp @@ -79,11 +79,9 @@ extern bool DEBUG_DUMP_ALL_PACKETS; extern bool DEBUG_DUMP_OTHER_PACKETS; extern uint64_t total_ipv4_packets; extern uint64_t total_ipv6_packets; -extern map_of_vector_counters_t SubnetVectorMapSpeed; extern double average_calculation_amount; extern bool print_configuration_params_on_the_screen; extern uint64_t our_ipv6_packets; -extern map_of_vector_counters_t SubnetVectorMap; extern uint64_t unknown_ip_version_packets; extern uint64_t total_simple_packets_processed; extern unsigned int maximum_time_since_bucket_start_to_remove; @@ -116,8 +114,6 @@ extern bool exabgp_announce_whole_subnet; extern subnet_to_host_group_map_t subnet_to_host_groups; extern bool collect_attack_pcap_dumps; -extern std::mutex ban_list_details_mutex; -extern std::mutex ban_list_mutex; extern std::mutex flow_counter_mutex; #ifdef REDIS @@ -143,11 +139,9 @@ extern std::string mongodb_database_name; extern unsigned int number_of_packets_for_pcap_attack_dump; extern patricia_tree_t *lookup_tree_ipv4, *whitelist_tree_ipv4; extern patricia_tree_t *lookup_tree_ipv6, *whitelist_tree_ipv6; -extern std::map> ban_list_details; extern ban_settings_t global_ban_settings; extern bool exabgp_enabled; extern bool gobgp_enabled; -extern map_of_vector_counters_t SubnetVectorMapSpeedAverage; extern int global_ban_time; extern bool notify_script_enabled; extern std::map ban_list; @@ -847,8 +841,6 @@ void execute_unban_operation_ipv6() { /* Thread for cleaning up ban list */ void cleanup_ban_list() { - extern bool hash_counters; - // If we use very small ban time we should call ban_cleanup thread more often if (unban_iteration_sleep_time > global_ban_time) { unban_iteration_sleep_time = int(global_ban_time / 2); @@ -865,103 +857,7 @@ void cleanup_ban_list() { time_t current_time; time(¤t_time); - - - if (hash_counters) { - - execute_unban_operation_ipv4(); - - } else { - - std::vector ban_list_items_for_erase; - - for (std::map::iterator itr = ban_list.begin(); itr != ban_list.end(); ++itr) { - uint32_t client_ip = itr->first; - - // This IP should be banned permanentely and we skip any processing - if (!itr->second.unban_enabled) { - continue; - } - - double time_difference = difftime(current_time, itr->second.ban_timestamp); - int ban_time = itr->second.ban_time; - - // Yes, we reached end of ban time for this customer - bool we_could_unban_this_ip = time_difference > ban_time; - - // We haven't reached time for unban yet - if (!we_could_unban_this_ip) { - continue; - } - - // Check about ongoing attack - if (unban_only_if_attack_finished) { - std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); - - uint32_t subnet_in_host_byte_order = ntohl(itr->second.customer_network.subnet_address); - int64_t shift_in_vector = (int64_t)ntohl(client_ip) - (int64_t)subnet_in_host_byte_order; - - // Try to find average speed element - map_of_vector_counters_t::iterator itr_average_speed = - SubnetVectorMapSpeedAverage.find(itr->second.customer_network); - - if (itr_average_speed == SubnetVectorMapSpeedAverage.end()) { - logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map for unban function"; - continue; - } - - if (shift_in_vector < 0 or shift_in_vector >= itr_average_speed->second.size()) { - logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector - << " which located outside allocated vector with size " << itr_average_speed->second.size(); - - continue; - } - - subnet_counter_t& average_speed_element = itr_average_speed->second[shift_in_vector]; - - // We get ban settings from host subnet - std::string host_group_name; - ban_settings_t current_ban_settings = - get_ban_settings_for_this_subnet(itr->second.customer_network, host_group_name); - - attack_detection_threshold_type_t attack_detection_source; - attack_detection_direction_type_t attack_detection_direction; - - if (we_should_ban_this_entity(average_speed_element, current_ban_settings, attack_detection_source, - attack_detection_direction)) { - logger << log4cpp::Priority::ERROR << "Attack to IP " << client_ip_as_string - << " still going! We should not unblock this host"; - - // Well, we still saw attack, skip to next iteration - continue; - } - } - - // Add this IP to remove list - // We will remove keys really after this loop - ban_list_items_for_erase.push_back(itr->first); - - // Call all hooks for unban - subnet_ipv6_cidr_mask_t zero_ipv6_address; - - // It's empty for unban - std::string flow_attack_details; - - // These are empty too - boost::circular_buffer simple_packets_buffer; - boost::circular_buffer raw_packets_buffer; - - call_blackhole_actions_per_host(attack_action_t::unban, itr->first, zero_ipv6_address, false, - itr->second, attack_detection_source_t::Automatic, flow_attack_details, simple_packets_buffer, raw_packets_buffer); - } - - // Remove all unbanned hosts from the ban list - for (std::vector::iterator itr = ban_list_items_for_erase.begin(); itr != ban_list_items_for_erase.end(); ++itr) { - std::lock_guard lock_guard(ban_list_mutex); - ban_list.erase(*itr); - } - - } + execute_unban_operation_ipv4(); // Unban IPv6 bans execute_unban_operation_ipv6(); @@ -971,39 +867,20 @@ void cleanup_ban_list() { // This code is a source of race conditions of worst kind, we had to rework it ASAP std::string print_ddos_attack_details() { extern blackhole_ban_list_t ban_list_ipv4; - extern bool hash_counters; std::stringstream output_buffer; - if (hash_counters) { - std::map ban_list_ipv4_copy; + std::map ban_list_ipv4_copy; - // Get whole ban list content atomically - ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy); + // Get whole ban list content atomically + ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy); - for (auto itr : ban_list_ipv4_copy) { - uint32_t client_ip = itr.first; + for (auto itr : ban_list_ipv4_copy) { + uint32_t client_ip = itr.first; - std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); - - output_buffer << client_ip_as_string << " at " << print_time_t_in_fastnetmon_format(itr.second.ban_timestamp) << std::endl; - } - } else { - - for (std::map::iterator ii = ban_list.begin(); ii != ban_list.end(); ++ii) { - uint32_t client_ip = (*ii).first; - - std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); - std::string max_pps_as_string = convert_int_to_string(((*ii).second).max_attack_power); - std::string attack_direction = get_direction_name(((*ii).second).attack_direction); - - output_buffer << client_ip_as_string << "/" << max_pps_as_string << " pps " << attack_direction << " at " - << print_time_t_in_fastnetmon_format(ii->second.ban_timestamp) << std::endl; - - // This logic is evil side effect of this function - send_attack_details(client_ip, (*ii).second); - } + std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); + output_buffer << client_ip_as_string << " at " << print_time_t_in_fastnetmon_format(itr.second.ban_timestamp) << std::endl; } return output_buffer.str(); @@ -1149,49 +1026,6 @@ std::string get_attack_description_in_json_for_web_hooks(uint32_t client_ip, return json_as_text; } -std::string generate_simple_packets_dump(std::vector& ban_list_details) { - std::stringstream attack_details; - - std::map protocol_counter; - for (std::vector::iterator iii = ban_list_details.begin(); iii != ban_list_details.end(); ++iii) { - attack_details << print_simple_packet(*iii); - - protocol_counter[iii->protocol]++; - } - - return attack_details.str(); -} - -void send_attack_details(uint32_t client_ip, attack_details_t current_attack_details) { - std::string pps_as_string = convert_int_to_string(current_attack_details.attack_power); - std::string attack_direction = get_direction_name(current_attack_details.attack_direction); - std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); - - // In this case we do not collect any traffic samples - if (ban_details_records_count == 0) { - return; - } - - // Very strange code but it work in 95% cases - if (ban_list_details.count(client_ip) > 0 && ban_list_details[client_ip].size() >= ban_details_records_count) { - std::stringstream attack_details; - - attack_details << get_attack_description(client_ip, current_attack_details) << "\n\n"; - attack_details << generate_simple_packets_dump(ban_list_details[client_ip]); - - logger << log4cpp::Priority::INFO << "Attack with direction: " << attack_direction - << " IP: " << client_ip_as_string << " Power: " << pps_as_string << " traffic samples collected"; - - call_attack_details_handlers(client_ip, current_attack_details, attack_details.str()); - - // TODO: here we have definitely RACE CONDITION!!! FIX IT - - // Remove key and prevent collection new data about this attack - std::lock_guard lock_guard(ban_list_details_mutex); - ban_list_details.erase(client_ip); - } -} - void call_attack_details_handlers(uint32_t client_ip, attack_details_t& current_attack, std::string attack_fingerprint) { std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); std::string attack_direction = get_direction_name(current_attack.attack_direction); @@ -1400,131 +1234,6 @@ redisContext* redis_init_connection() { #endif - -void execute_ip_ban(uint32_t client_ip, subnet_counter_t average_speed_element, std::string flow_attack_details, subnet_cidr_mask_t customer_subnet) { - attack_details_t current_attack; - uint64_t pps = 0; - - uint64_t in_pps = average_speed_element.total.in_packets; - uint64_t out_pps = average_speed_element.total.out_packets; - uint64_t in_bps = average_speed_element.total.in_bytes; - uint64_t out_bps = average_speed_element.total.out_bytes; - uint64_t in_flows = average_speed_element.in_flows; - uint64_t out_flows = average_speed_element.out_flows; - - direction_t data_direction; - - if (!global_ban_settings.enable_ban) { - logger << log4cpp::Priority::INFO << "We do not ban: " << convert_ip_as_uint_to_string(client_ip) - << " because ban disabled completely"; - return; - } - - // Detect attack direction with simple heuristic - if (abs(int((int)in_pps - (int)out_pps)) < 1000) { - // If difference between pps speed is so small we should do additional investigation using - // bandwidth speed - if (in_bps > out_bps) { - data_direction = INCOMING; - pps = in_pps; - } else { - data_direction = OUTGOING; - pps = out_pps; - } - } else { - if (in_pps > out_pps) { - data_direction = INCOMING; - pps = in_pps; - } else { - data_direction = OUTGOING; - pps = out_pps; - } - } - - current_attack.attack_protocol = detect_attack_protocol(average_speed_element, data_direction); - - { - std::lock_guard lock_guard(ban_list_mutex); - - // Host is blocked already - if (ban_list.count(client_ip) > 0) { - // update attack power - if (pps > ban_list[client_ip].max_attack_power) { - ban_list[client_ip].max_attack_power = pps; - } - - return; - } - } - - prefix_t prefix_for_check_adreess; - prefix_for_check_adreess.add.sin.s_addr = client_ip; - prefix_for_check_adreess.family = AF_INET; - prefix_for_check_adreess.bitlen = 32; - - bool in_white_list = (patricia_search_best2(whitelist_tree_ipv4, &prefix_for_check_adreess, 1) != NULL); - - if (in_white_list) { - return; - } - - std::string data_direction_as_string = get_direction_name(data_direction); - - logger << log4cpp::Priority::INFO << "We run execute_ip_ban code with following params " - << " in_pps: " << in_pps << " out_pps: " << out_pps << " in_bps: " << in_bps << " out_bps: " << out_bps - << " and we decide it's " << data_direction_as_string << " attack"; - - std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); - std::string pps_as_string = convert_int_to_string(pps); - - // Store information about subnet - current_attack.customer_network = customer_subnet; - - // Store ban time - time(¤t_attack.ban_timestamp); - // set ban time in seconds - current_attack.ban_time = global_ban_time; - current_attack.unban_enabled = unban_enabled; - - // Pass main information about attack - current_attack.attack_direction = data_direction; - current_attack.attack_power = pps; - current_attack.max_attack_power = pps; - - // Copy traffic metrics - current_attack.traffic_counters = average_speed_element; - - if (collect_attack_pcap_dumps) { - bool buffer_allocation_result = current_attack.pcap_attack_dump.allocate_buffer(number_of_packets_for_pcap_attack_dump); - - if (!buffer_allocation_result) { - logger << log4cpp::Priority::ERROR << "Can't allocate buffer for attack, switch off this option completely "; - collect_attack_pcap_dumps = false; - } - } - - { - std::lock_guard lock_guard(ban_list_mutex); - ban_list[client_ip] = current_attack; - } - - { - std::lock_guard lock_guard(ban_list_details_mutex); - ban_list_details[client_ip] = std::vector(); - } - - logger << log4cpp::Priority::INFO << "Attack with direction: " << data_direction_as_string - << " IP: " << client_ip_as_string << " Power: " << pps_as_string; - - subnet_ipv6_cidr_mask_t zero_ipv6_address; - - boost::circular_buffer empty_simple_packets_buffer; - boost::circular_buffer empty_raw_packets_buffer; - - call_blackhole_actions_per_host(attack_action_t::ban, client_ip, zero_ipv6_address, false, ban_list[client_ip], - attack_detection_source_t::Automatic, flow_attack_details, empty_simple_packets_buffer, empty_raw_packets_buffer); -} - void call_blackhole_actions_per_host(attack_action_t attack_action, uint32_t client_ip, const subnet_ipv6_cidr_mask_t& client_ipv6, @@ -1820,8 +1529,6 @@ void traffic_draw_ipv6_program() { } void traffic_draw_ipv4_program() { - extern bool hash_counters; - std::stringstream output_buffer; // logger< ipv4_host_counters; extern map_of_vector_counters_for_flow_t SubnetVectorMapFlow; - extern bool hash_counters; - std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now(); // Calculate duration of our sleep duration as it may be altered by OS behaviour (i.e. process scheduler) @@ -2287,129 +1974,30 @@ void recalculate_speed() { ipv4_network_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, nullptr); - if (hash_counters) { - uint64_t flow_exists_for_ip = 0; - uint64_t flow_does_not_exist_for_ip = 0; + uint64_t flow_exists_for_ip = 0; + uint64_t flow_does_not_exist_for_ip = 0; - ipv4_host_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, speed_calculation_callback_local_ipv4, [&outgoing_total_flows, &incoming_total_flows, &flow_exists_for_ip, - &flow_does_not_exist_for_ip](const uint32_t& ip, subnet_counter_t& new_speed_element, double speed_calc_period) { - if (enable_connection_tracking) { - bool res = increment_flow_counters(new_speed_element, fast_ntoh(ip), speed_calc_period); + ipv4_host_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, speed_calculation_callback_local_ipv4, [&outgoing_total_flows, &incoming_total_flows, &flow_exists_for_ip, + &flow_does_not_exist_for_ip](const uint32_t& ip, subnet_counter_t& new_speed_element, double speed_calc_period) { + if (enable_connection_tracking) { + bool res = increment_flow_counters(new_speed_element, fast_ntoh(ip), speed_calc_period); - if (res) { - // Increment global counter - outgoing_total_flows += new_speed_element.out_flows; - incoming_total_flows += new_speed_element.in_flows; + if (res) { + // Increment global counter + outgoing_total_flows += new_speed_element.out_flows; + incoming_total_flows += new_speed_element.in_flows; - flow_exists_for_ip++; + flow_exists_for_ip++; - // logger << log4cpp::Priority::DEBUG << convert_ipv4_subnet_to_string(subnet) - // << "in flows: " << new_speed_element.in_flows << " out flows: " << - // new_speed_element.out_flows; - } else { - // We did not find record - flow_does_not_exist_for_ip++; - } - } - }); - - } else { - - for (map_of_vector_counters_t::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) { - for (vector_of_counters::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) { - int current_index = vector_itr - itr->second.begin(); - - // New element - subnet_counter_t new_speed_element; - - // convert to host order for math operations - uint32_t subnet_ip = ntohl(itr->first.subnet_address); - uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index; - - // covnert to our standard network byte order - uint32_t client_ip = htonl(client_ip_in_host_bytes_order); - - // Calculate speed for IP or whole subnet - build_speed_counters_from_packet_counters(new_speed_element, *vector_itr, speed_calc_period); - - // It uses host byte order for key - conntrack_main_struct_t& flow_counter = SubnetVectorMapFlow[client_ip_in_host_bytes_order]; - - if (enable_connection_tracking) { - // todo: optimize this operations! - // it's really bad and SLOW CODE - uint64_t total_out_flows = - (uint64_t)flow_counter.out_tcp.size() + (uint64_t)flow_counter.out_udp.size() + - (uint64_t)flow_counter.out_icmp.size() + (uint64_t)flow_counter.out_other.size(); - - uint64_t total_in_flows = - (uint64_t)flow_counter.in_tcp.size() + (uint64_t)flow_counter.in_udp.size() + - (uint64_t)flow_counter.in_icmp.size() + (uint64_t)flow_counter.in_other.size(); - - new_speed_element.out_flows = uint64_t((double)total_out_flows / speed_calc_period); - new_speed_element.in_flows = uint64_t((double)total_in_flows / speed_calc_period); - - // Increment global counter - outgoing_total_flows += new_speed_element.out_flows; - incoming_total_flows += new_speed_element.in_flows; - } else { - new_speed_element.out_flows = 0; - new_speed_element.in_flows = 0; - } - - /* Moving average recalculation */ - // http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance - // double speed_calc_period = 1; - double exp_power = -speed_calc_period / average_calculation_amount; - double exp_value = exp(exp_power); - - subnet_counter_t& current_average_speed_element = SubnetVectorMapSpeedAverage[itr->first][current_index]; - - // Calculate average speed from per-second speed - build_average_speed_counters_from_speed_counters(current_average_speed_element, new_speed_element, exp_value); - - if (enable_connection_tracking) { - current_average_speed_element.out_flows = - uint64_t(new_speed_element.out_flows + exp_value * ((double)current_average_speed_element.out_flows - - (double)new_speed_element.out_flows)); - - current_average_speed_element.in_flows = - uint64_t(new_speed_element.in_flows + exp_value * ((double)current_average_speed_element.in_flows - - (double)new_speed_element.in_flows)); - } - - /* Moving average recalculation end */ - - - std::string host_group_name; - ban_settings_t current_ban_settings = get_ban_settings_for_this_subnet(itr->first, host_group_name); - - attack_detection_threshold_type_t attack_detection_source; - attack_detection_direction_type_t attack_detection_direction; - - - if (we_should_ban_this_entity(current_average_speed_element, current_ban_settings, attack_detection_source, - attack_detection_direction)) { - logger << log4cpp::Priority::DEBUG << "We have found host group for this host as: " << host_group_name; - - std::string flow_attack_details = ""; - - if (enable_connection_tracking) { - flow_attack_details = print_flow_tracking_for_ip(flow_counter, convert_ip_as_uint_to_string(client_ip)); - } - - // TODO: we should pass type of ddos ban source (pps, flowd, bandwidth)! - execute_ip_ban(client_ip, current_average_speed_element, flow_attack_details, itr->first); - } - - - SubnetVectorMapSpeed[itr->first][current_index] = new_speed_element; - - *vector_itr = zero_map_element; + // logger << log4cpp::Priority::DEBUG << convert_ipv4_subnet_to_string(subnet) + // << "in flows: " << new_speed_element.in_flows << " out flows: " << + // new_speed_element.out_flows; + } else { + // We did not find record + flow_does_not_exist_for_ip++; } } - - } + }); // Calculate IPv6 per network traffic ipv6_subnet_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, nullptr); @@ -2663,108 +2251,6 @@ std::string draw_table_ipv6(attack_detection_direction_type_t sort_direction, at return output_buffer.str(); } -std::string draw_table_ipv4(const attack_detection_direction_type_t& data_direction, - const attack_detection_threshold_type_t& sorter_type) { - std::vector vector_for_sort; - - std::stringstream output_buffer; - - // Preallocate memory for sort vector - // We use total networks size for this vector - vector_for_sort.reserve(total_number_of_hosts_in_our_networks); - - map_of_vector_counters_t* current_speed_map = &SubnetVectorMapSpeedAverage; - - subnet_counter_t zero_map_element{}; - - unsigned int count_of_zero_speed_packets = 0; - for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) { - for (vector_of_counters::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) { - int current_index = vector_itr - itr->second.begin(); - - // convert to host order for math operations - uint32_t subnet_ip = ntohl(itr->first.subnet_address); - uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index; - - // covnert to our standard network byte order - uint32_t client_ip = htonl(client_ip_in_host_bytes_order); - - // Do not add zero speed packets to sort list - if (memcmp((void*)&zero_map_element, &*vector_itr, sizeof(subnet_counter_t)) != 0) { - vector_for_sort.push_back(std::make_pair(client_ip, *vector_itr)); - } else { - count_of_zero_speed_packets++; - } - } - } - - // Sort only first X elements in this vector - unsigned int shift_for_sort = max_ips_in_list; - - if (data_direction == attack_detection_direction_type_t::incoming or data_direction == attack_detection_direction_type_t::outgoing) { - // Because in another case we will got segmentation fault - unsigned int vector_size = vector_for_sort.size(); - - if (vector_size < shift_for_sort) { - shift_for_sort = vector_size; - } - - std::partial_sort(vector_for_sort.begin(), vector_for_sort.begin() + shift_for_sort, vector_for_sort.end(), - TrafficComparatorClass(data_direction, sorter_type)); - } else { - logger << log4cpp::Priority::ERROR << "Unexpected bahaviour on sort function"; - return "Internal error"; - } - - unsigned int element_number = 0; - - // In this loop we print only top X talkers in our subnet to screen buffer - for (std::vector::iterator ii = vector_for_sort.begin(); ii != vector_for_sort.end(); ++ii) { - // Print first max_ips_in_list elements in list, we will show top X "huge" channel loaders - if (element_number >= max_ips_in_list) { - break; - } - - uint32_t client_ip = (*ii).first; - std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first); - - uint64_t pps = 0; - uint64_t bps = 0; - uint64_t flows = 0; - - // Here we could have average or instantaneous speed - subnet_counter_t* current_speed_element = &ii->second; - - // Create polymorphic pps, byte and flow counters - if (data_direction == attack_detection_direction_type_t::incoming) { - pps = current_speed_element->total.in_packets; - bps = current_speed_element->total.in_bytes; - flows = current_speed_element->in_flows; - } else if (data_direction == attack_detection_direction_type_t::outgoing) { - pps = current_speed_element->total.out_packets; - bps = current_speed_element->total.out_bytes; - flows = current_speed_element->out_flows; - } - - uint64_t mbps = convert_speed_to_mbps(bps); - - std::string is_banned = ban_list.count(client_ip) > 0 ? " *banned* " : ""; - - // We use setw for alignment - output_buffer << client_ip_as_string << "\t\t"; - - output_buffer << std::setw(6) << pps << " pps "; - output_buffer << std::setw(6) << mbps << " mbps "; - output_buffer << std::setw(6) << flows << " flows "; - - output_buffer << is_banned << std::endl; - - element_number++; - } - - return output_buffer.str(); -} - void print_screen_contents_into_file(std::string screen_data_stats_param, std::string file_path) { std::ofstream screen_data_file; screen_data_file.open(file_path.c_str(), std::ios::trunc); @@ -2959,7 +2445,6 @@ void collect_traffic_to_buckets_ipv4(const simple_packet_t& current_packet, pack void process_packet(simple_packet_t& current_packet) { extern bool kafka_traffic_export; extern abstract_subnet_counters_t ipv4_host_counters; - extern bool hash_counters; extern packet_buckets_storage_t packet_buckets_ipv4_storage; extern map_of_vector_counters_for_flow_t SubnetVectorMapFlow; @@ -3067,120 +2552,31 @@ void process_packet(simple_packet_t& current_packet) { __sync_fetch_and_add(&total_counters_ipv4.total_counters[current_packet.packet_direction].bytes, sampled_number_of_bytes); #endif - // Try to find map key for this subnet - map_of_vector_counters_t::iterator itr; + // Add traffic to buckets when we have them + collect_traffic_to_buckets_ipv4(current_packet, packet_buckets_ipv4_storage); - if (current_packet.packet_direction == OUTGOING or current_packet.packet_direction == INCOMING) { - // Find element in map of vectors - itr = SubnetVectorMap.find(current_subnet); - - if (itr == SubnetVectorMap.end()) { - logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map"; - return; - } - } - - if (hash_counters) { - // Add traffic to buckets when we have them - collect_traffic_to_buckets_ipv4(current_packet, packet_buckets_ipv4_storage); - - // Increment counters for all local hosts using new counters - if (current_packet.packet_direction == OUTGOING) { - ipv4_host_counters.increment_outgoing_counters_for_key(current_packet.src_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes); - } else if (current_packet.packet_direction == INCOMING) { - ipv4_host_counters.increment_incoming_counters_for_key(current_packet.dst_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes); - } else { - // No reasons to keep locks for other or internal - } + // Increment counters for all local hosts using new counters + if (current_packet.packet_direction == OUTGOING) { + ipv4_host_counters.increment_outgoing_counters_for_key(current_packet.src_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes); + } else if (current_packet.packet_direction == INCOMING) { + ipv4_host_counters.increment_incoming_counters_for_key(current_packet.dst_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes); + } else { + // No reasons to keep locks for other or internal } // Increment main and per protocol packet counters if (current_packet.packet_direction == OUTGOING) { - int64_t shift_in_vector = (int64_t)ntohl(current_packet.src_ip) - (int64_t)subnet_in_host_byte_order; - - if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) { - logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector - << " which located outside allocated vector with size " << itr->second.size(); - - logger << log4cpp::Priority::ERROR - << "We expect issues with this packet in OUTGOING direction: " << print_simple_packet(current_packet); - - return; - } - - subnet_counter_t& current_element = itr->second[shift_in_vector]; - - increment_outgoing_counters(current_element, current_packet, sampled_number_of_packets, sampled_number_of_bytes); - if (enable_connection_tracking) { increment_outgoing_flow_counters(fast_ntoh(current_packet.src_ip), current_packet, sampled_number_of_packets, sampled_number_of_bytes); } } else if (current_packet.packet_direction == INCOMING) { - int64_t shift_in_vector = (int64_t)ntohl(current_packet.dst_ip) - (int64_t)subnet_in_host_byte_order; - - if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) { - logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector - << " which located outside allocated vector with size " << itr->second.size(); - - logger << log4cpp::Priority::ERROR - << "We expect issues with this packet in INCOMING direction: " << print_simple_packet(current_packet); - - return; - } - - subnet_counter_t& current_element = itr->second[shift_in_vector]; - - increment_incoming_counters(current_element, current_packet, sampled_number_of_packets, sampled_number_of_bytes); - if (enable_connection_tracking) { increment_incoming_flow_counters(fast_ntoh(current_packet.dst_ip), current_packet, sampled_number_of_packets, sampled_number_of_bytes); } } else if (current_packet.packet_direction == INTERNAL) { } - - // Execute ban related processing - if (current_packet.packet_direction == OUTGOING) { - // Collect data when ban client - if (ban_details_records_count != 0 && !ban_list_details.empty() && ban_list_details.count(current_packet.src_ip) > 0 && - ban_list_details[current_packet.src_ip].size() < ban_details_records_count) { - - std::lock_guard lock_guard(ban_list_details_mutex); - - if (collect_attack_pcap_dumps) { - // this code SHOULD NOT be called without mutex! - if (current_packet.captured_payload_length > 0 && current_packet.payload_pointer != NULL) { - ban_list[current_packet.src_ip].pcap_attack_dump.write_packet(current_packet.payload_pointer, - current_packet.captured_payload_length, - current_packet.payload_full_length); - } - } - - ban_list_details[current_packet.src_ip].push_back(current_packet); - } - } - - - if (current_packet.packet_direction == INCOMING) { - // Collect attack details - if (ban_details_records_count != 0 && !ban_list_details.empty() && ban_list_details.count(current_packet.dst_ip) > 0 && - ban_list_details[current_packet.dst_ip].size() < ban_details_records_count) { - - std::lock_guard lock_guard(ban_list_details_mutex); - - if (collect_attack_pcap_dumps) { - // this code SHOULD NOT be called without mutex! - if (current_packet.captured_payload_length > 0 && current_packet.payload_pointer != NULL) { - ban_list[current_packet.dst_ip].pcap_attack_dump.write_packet(current_packet.payload_pointer, - current_packet.captured_payload_length, - current_packet.payload_full_length); - } - } - - ban_list_details[current_packet.dst_ip].push_back(current_packet); - } - } } void system_counters_speed_thread_handler() { @@ -3544,21 +2940,15 @@ void process_filled_buckets_ipv6() { // This functions will check for packet buckets availible for processing void check_traffic_buckets() { - extern bool hash_counters; extern packet_buckets_storage_t packet_buckets_ipv4_storage; while (true) { - if (hash_counters) { - remove_orphaned_buckets(packet_buckets_ipv4_storage, "ipv4"); - } + remove_orphaned_buckets(packet_buckets_ipv4_storage, "ipv4"); // Process buckets which haven't filled by packets remove_orphaned_buckets(packet_buckets_ipv6_storage, "ipv6"); - - if (hash_counters) { - process_filled_buckets_ipv4(); - } + process_filled_buckets_ipv4(); process_filled_buckets_ipv6(); diff --git a/src/metrics/graphite.cpp b/src/metrics/graphite.cpp index 9c8bee3..b35e2ab 100644 --- a/src/metrics/graphite.cpp +++ b/src/metrics/graphite.cpp @@ -11,8 +11,6 @@ #include "../abstract_subnet_counters.hpp" extern log4cpp::Category& logger; -extern map_of_vector_counters_t SubnetVectorMapSpeed; -extern map_of_vector_counters_t SubnetVectorMapSpeedAverage; extern uint64_t incoming_total_flows_speed; extern uint64_t outgoing_total_flows_speed; extern abstract_subnet_counters_t ipv4_network_counters; @@ -27,98 +25,6 @@ extern unsigned int graphite_push_period; // Push host traffic to Graphite bool push_hosts_traffic_counters_to_graphite() { - std::vector processed_directions = { INCOMING, OUTGOING }; - - graphite_data_t graphite_data; - - map_of_vector_counters_t* current_speed_map = &SubnetVectorMapSpeedAverage; - - // Iterate over all networks - for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) { - - // Iterate over all hosts in network - for (vector_of_counters_t::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) { - int current_index = vector_itr - itr->second.begin(); - - // convert to host order for math operations - uint32_t subnet_ip = ntohl(itr->first.subnet_address); - uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index; - - // convert to our standard network byte order - uint32_t client_ip = htonl(client_ip_in_host_bytes_order); - - std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); - - std::string ip_as_string_with_dash_delimiters = client_ip_as_string; - // Replace dots by dashes - std::replace(ip_as_string_with_dash_delimiters.begin(), ip_as_string_with_dash_delimiters.end(), '.', '_'); - - // Here we could have average or instantaneous speed - subnet_counter_t* current_speed_element = &*vector_itr; - - for (auto data_direction : processed_directions) { - std::string direction_as_string; - - if (data_direction == INCOMING) { - direction_as_string = "incoming"; - } else if (data_direction == OUTGOING) { - direction_as_string = "outgoing"; - } - - std::string graphite_current_prefix = - graphite_prefix + ".hosts." + ip_as_string_with_dash_delimiters + "." + direction_as_string; - - graphite_current_prefix = graphite_current_prefix + ".average"; - - if (data_direction == INCOMING) { - // Prepare incoming traffic data - - // We do not store zero data to Graphite - if (current_speed_element->total.in_packets != 0) { - graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->total.in_packets; - } - - if (current_speed_element->total.in_bytes != 0) { - graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->total.in_bytes * 8; - } - - if (current_speed_element->in_flows != 0) { - graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->in_flows; - } - - } else if (data_direction == OUTGOING) { - // Prepare outgoing traffic data - - // We do not store zero data to Graphite - if (current_speed_element->total.out_packets != 0) { - graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->total.out_packets; - } - - if (current_speed_element->total.out_bytes != 0) { - graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->total.out_bytes * 8; - } - - if (current_speed_element->out_flows != 0) { - graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->out_flows; - } - } - } - } - - bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data); - - if (!graphite_put_result) { - logger << log4cpp::Priority::ERROR << "Can't store host load data to Graphite server " << graphite_host - << " port: " << graphite_port; - return false; - } - } - - return true; -} - -// Push host traffic to Graphite -bool push_hosts_traffic_counters_to_graphite_hash() { extern abstract_subnet_counters_t ipv4_host_counters; std::vector processed_directions = { INCOMING, OUTGOING }; @@ -283,8 +189,7 @@ bool push_network_traffic_counters_to_graphite() { // This thread pushes speed counters to graphite void graphite_push_thread() { - extern bool hash_counters; - + // Sleep for a half second for shift against calculatiuon thread boost::this_thread::sleep(boost::posix_time::milliseconds(500)); @@ -300,11 +205,7 @@ void graphite_push_thread() { push_network_traffic_counters_to_graphite(); // Push per host counters to graphite - if (hash_counters) { - push_hosts_traffic_counters_to_graphite_hash(); - } else { - push_hosts_traffic_counters_to_graphite(); - } + push_hosts_traffic_counters_to_graphite(); std::chrono::duration diff = std::chrono::steady_clock::now() - start_time; diff --git a/src/metrics/graphite.hpp b/src/metrics/graphite.hpp index c44af82..dbaf826 100644 --- a/src/metrics/graphite.hpp +++ b/src/metrics/graphite.hpp @@ -4,4 +4,3 @@ void graphite_push_thread(); bool push_total_traffic_counters_to_graphite(); bool push_network_traffic_counters_to_graphite(); bool push_hosts_traffic_counters_to_graphite(); -bool push_hosts_traffic_counters_to_graphite_hash(); diff --git a/src/metrics/influxdb.cpp b/src/metrics/influxdb.cpp index 4490aa9..9601c05 100644 --- a/src/metrics/influxdb.cpp +++ b/src/metrics/influxdb.cpp @@ -11,8 +11,6 @@ #include extern struct timeval graphite_thread_execution_time; -extern map_of_vector_counters_t SubnetVectorMapSpeed; -extern map_of_vector_counters_t SubnetVectorMapSpeedAverage; extern uint64_t incoming_total_flows_speed; extern uint64_t outgoing_total_flows_speed; extern abstract_subnet_counters_t ipv4_network_counters; @@ -288,7 +286,6 @@ push_network_traffic_counters_to_influxdb(abstract_subnet_counters_t& netw // This thread pushes data to InfluxDB void influxdb_push_thread() { - extern bool hash_counters; extern abstract_subnet_counters_t ipv4_host_counters; // Sleep for a half second for shift against calculation thread @@ -333,14 +330,9 @@ void influxdb_push_thread() { influxdb_auth, influxdb_user, influxdb_password, "networks_traffic", "network"); // Push per host counters to InfluxDB - if (hash_counters) { - push_hosts_traffic_counters_to_influxdb(ipv4_host_counters, influxdb_database, - current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth, - influxdb_user, influxdb_password, "hosts_traffic", "host"); - } else { - push_hosts_ipv4_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), - influxdb_auth, influxdb_user, influxdb_password); - } + push_hosts_traffic_counters_to_influxdb(ipv4_host_counters, influxdb_database, + current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth, + influxdb_user, influxdb_password, "hosts_traffic", "host"); push_system_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth, influxdb_user, influxdb_password); @@ -357,72 +349,6 @@ void influxdb_push_thread() { } } -// Push host traffic to InfluxDB -bool push_hosts_ipv4_traffic_counters_to_influxdb(std::string influx_database, - std::string influx_host, - std::string influx_port, - bool enable_auth, - std::string influx_user, - std::string influx_password) { - /* https://docs.influxdata.com/influxdb/v1.7/concepts/glossary/: - A collection of points in line protocol format, separated by newlines (0x0A). A batch of points may be submitted to - the database using a single HTTP request to the write endpoint. This makes writes via the HTTP API much more - performant by drastically reducing the HTTP overhead. InfluxData recommends batch sizes of 5,000-10,000 points, - although different use cases may be better served by significantly smaller or larger batches. - */ - - map_of_vector_counters_t* current_speed_map = &SubnetVectorMapSpeedAverage; - - // Iterate over all networks - for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) { - std::vector>> hosts_vector; - - // Iterate over all hosts in network - for (vector_of_counters_t::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) { - std::map plain_total_counters_map; - - int current_index = vector_itr - itr->second.begin(); - - // Convert to host order for math operations - uint32_t subnet_ip = ntohl(itr->first.subnet_address); - uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index; - - // Convert to our standard network byte order - uint32_t client_ip = htonl(client_ip_in_host_bytes_order); - - std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip); - - // Here we could have average or instantaneous speed - subnet_counter_t* current_speed_element = &*vector_itr; - - // Skip elements with zero speed - if (current_speed_element->is_zero()) { - continue; - } - - fill_main_counters_for_influxdb(*current_speed_element, plain_total_counters_map, true); - - // Key: client_ip_as_string - hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map)); - } - - if (hosts_vector.size() > 0) { - std::string error_text; - - bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user, - influx_password, "hosts_traffic", "host", hosts_vector, error_text); - - if (!result) { - influxdb_writes_failed++; - logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic"; - return false; - } - } - } - - return true; -} - // Write batch of data for particular InfluxDB database bool write_batch_of_data_to_influxdb(std::string influx_database, std::string influx_host, diff --git a/src/notify_about_attack.sh b/src/notify_about_attack.sh index 44d2c46..57acc5b 100755 --- a/src/notify_about_attack.sh +++ b/src/notify_about_attack.sh @@ -35,10 +35,3 @@ if [ "$4" = "ban" ]; then # You can add ban code here exit 0 fi - -# Advanced edition does not use this action and passes all details in ban action -if [ "$4" = "attack_details" ]; then - cat | mail -s "FastNetMon Guard: IP $1 blocked because $2 attack with power $3 pps" $email_notify; - - exit 0 -fi