1
0
mirror of https://github.com/pavel-odintsov/fastnetmon synced 2024-11-23 09:12:14 +01:00

Reworked FastNetMon to use high efficient hash based counters to store per host traffic

This commit is contained in:
Pavel Odintsov 2023-07-12 12:41:32 +01:00
parent 64a78d1d83
commit 4a052e9a8c
7 changed files with 75 additions and 962 deletions

@ -4,35 +4,22 @@ Status FastnetmonApiServiceImpl::GetBanlist(::grpc::ServerContext* context,
::grpc::ServerWriter<::fastmitigation::BanListReply>* writer) {
extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6;
extern blackhole_ban_list_t<uint32_t> ban_list_ipv4;
extern bool hash_counters;
logger << log4cpp::Priority::INFO << "API we asked for banlist";
// IPv4
if (hash_counters) {
std::map<uint32_t, banlist_item_t> ban_list_ipv4_copy;
std::map<uint32_t, banlist_item_t> ban_list_ipv4_copy;
// Get whole ban list content atomically
ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy);
// Get whole ban list content atomically
ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy);
for (auto itr : ban_list_ipv4_copy) {
BanListReply reply;
reply.set_ip_address(convert_ip_as_uint_to_string(itr.first) + "/32");
writer->Write(reply);
}
} else {
for (auto itr = ban_list.begin(); itr != ban_list.end(); ++itr) {
std::string client_ip_as_string = convert_ip_as_uint_to_string(itr->first);
BanListReply reply;
reply.set_ip_address(client_ip_as_string + "/32");
writer->Write(reply);
}
for (auto itr : ban_list_ipv4_copy) {
BanListReply reply;
reply.set_ip_address(convert_ip_as_uint_to_string(itr.first) + "/32");
writer->Write(reply);
}
// IPv6
@ -56,7 +43,6 @@ Status FastnetmonApiServiceImpl::ExecuteBan(ServerContext* context,
fastmitigation::ExecuteBanReply* reply) {
extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6;
extern blackhole_ban_list_t<uint32_t> ban_list_ipv4;
extern bool hash_counters;
logger << log4cpp::Priority::INFO << "API we asked for ban for IP: " << request->ip_address();
@ -110,21 +96,7 @@ Status FastnetmonApiServiceImpl::ExecuteBan(ServerContext* context,
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "This IP does not belong to our subnets");
}
if (hash_counters) {
ban_list_ipv4.add_to_blackhole(client_ip, current_attack);
} else {
{
std::lock_guard<std::mutex> lock_guard(ban_list_mutex);
ban_list[client_ip] = current_attack;
}
{
std::lock_guard<std::mutex> lock_guard(ban_list_details_mutex);
ban_list_details[client_ip] = std::vector<simple_packet_t>();
}
}
ban_list_ipv4.add_to_blackhole(client_ip, current_attack);
} else {
bool parsed_ipv6 = read_ipv6_host_from_string(request->ip_address(), ipv6_address.subnet_address);
@ -155,7 +127,6 @@ Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context,
fastmitigation::ExecuteBanReply* reply) {
extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6;
extern blackhole_ban_list_t<uint32_t> ban_list_ipv4;
extern bool hash_counters;
logger << log4cpp::Priority::INFO << "API: We asked for unban for IP: " << request->ip_address();
@ -189,38 +160,20 @@ Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context,
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Can't parse IPv4 address");
}
if (hash_counters) {
bool is_blackholed_ipv4 = ban_list_ipv4.is_blackholed(client_ip);
bool is_blackholed_ipv4 = ban_list_ipv4.is_blackholed(client_ip);
if (!is_blackholed_ipv4) {
logger << log4cpp::Priority::ERROR << "API: Could not find IPv4 address in ban list";
return Status::CANCELLED;
}
bool get_details = ban_list_ipv4.get_blackhole_details(client_ip, current_attack);
if (!get_details) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Could not get IPv4 blackhole details");
}
ban_list_ipv4.remove_from_blackhole(client_ip);
} else {
if (ban_list.count(client_ip) == 0) {
logger << log4cpp::Priority::ERROR << "API: Could not find IP in ban list";
return Status::CANCELLED;
}
current_attack = ban_list[client_ip];
logger << log4cpp::Priority::INFO << "API: call unban handlers";
logger << log4cpp::Priority::INFO << "API: remove IP from ban list";
{
std::lock_guard<std::mutex> lock_guard(ban_list_mutex);
ban_list.erase(client_ip);
}
if (!is_blackholed_ipv4) {
logger << log4cpp::Priority::ERROR << "API: Could not find IPv4 address in ban list";
return Status::CANCELLED;
}
bool get_details = ban_list_ipv4.get_blackhole_details(client_ip, current_attack);
if (!get_details) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Could not get IPv4 blackhole details");
}
ban_list_ipv4.remove_from_blackhole(client_ip);
} else {
bool parsed_ipv6 = read_ipv6_host_from_string(request->ip_address(), ipv6_address.subnet_address);

@ -139,8 +139,6 @@ cppkafka::Producer* kafka_traffic_export_producer = nullptr;
// Traffic export to Kafka
bool kafka_traffic_export = false;
bool hash_counters = false;
std::string kafka_traffic_export_topic = "fastnetmon";
kafka_traffic_export_format_t kafka_traffic_export_format = kafka_traffic_export_format_t::JSON;
std::vector<std::string> kafka_traffic_export_brokers;
@ -373,8 +371,6 @@ uint64_t our_ipv6_packets = 0;
uint64_t incoming_total_flows_speed = 0;
uint64_t outgoing_total_flows_speed = 0;
map_of_vector_counters_t SubnetVectorMap;
// Network counters for IPv6
abstract_subnet_counters_t<subnet_ipv6_cidr_mask_t, subnet_counter_t> ipv6_subnet_counters;
@ -396,20 +392,8 @@ int64_t netflow_ipfix_all_protocols_total_flows_speed = 0;
std::string sflow_raw_packet_headers_total_speed_desc = "Number of sFlow headers per second";
int64_t sflow_raw_packet_headers_total_speed = 0;
/* End of our data structs */
std::mutex ban_list_details_mutex;
std::mutex ban_list_mutex;
std::mutex flow_counter_mutex;
// map for flows
std::map<uint64_t, int> FlowCounter;
// Struct for string speed per IP
map_of_vector_counters_t SubnetVectorMapSpeed;
// Struct for storing average speed per IP for specified interval
map_of_vector_counters_t SubnetVectorMapSpeedAverage;
#ifdef GEOIP
map_for_counters GeoIpCounter;
#endif
@ -420,10 +404,6 @@ blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6;
// Banned IPv4 hosts
blackhole_ban_list_t<uint32_t> ban_list_ipv4;
// In ddos info we store attack power and direction
std::map<uint32_t, banlist_item_t> ban_list;
std::map<uint32_t, std::vector<simple_packet_t>> ban_list_details;
host_group_map_t host_groups;
// Here we store assignment from subnet to certain host group for fast lookup
@ -1132,43 +1112,18 @@ void subnet_vectors_allocator(prefix_t* prefix, void* data) {
uint32_t result_ip_as_big_endian = fast_hton(ip_as_little_endian);
// logger << log4cpp::Priority::INFO << "Allocate: " << convert_ip_as_uint_to_string(result_ip_as_big_endian);
if (hash_counters) {
// We use big endian values as keys
try {
ipv4_host_counters.average_speed_map[result_ip_as_big_endian] = zero_map_element;
ipv4_host_counters.counter_map[result_ip_as_big_endian] = zero_map_element;
} catch (std::bad_alloc& ba) {
logger << log4cpp::Priority::ERROR << "Can't allocate memory for hash counters";
exit(1);
}
// We use big endian values as keys
try {
ipv4_host_counters.average_speed_map[result_ip_as_big_endian] = zero_map_element;
ipv4_host_counters.counter_map[result_ip_as_big_endian] = zero_map_element;
} catch (std::bad_alloc& ba) {
logger << log4cpp::Priority::ERROR << "Can't allocate memory for hash counters";
exit(1);
}
}
if (hash_counters) {
logger << log4cpp::Priority::INFO << "Successfully allocated " << ipv4_host_counters.average_speed_map.size() << " counters";
}
// Initialize our counters with fill constructor
try {
SubnetVectorMap[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element);
SubnetVectorMapSpeed[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element);
SubnetVectorMapSpeedAverage[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element);
} catch (std::bad_alloc& ba) {
logger << log4cpp::Priority::ERROR << "Can't allocate memory for counters";
exit(1);
}
}
void zeroify_all_counters() {
subnet_counter_t zero_map_element{};
for (map_of_vector_counters_t::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) {
// logger<< log4cpp::Priority::INFO<<"Zeroify "<<itr->first;
std::fill(itr->second.begin(), itr->second.end(), zero_map_element);
}
logger << log4cpp::Priority::INFO << "Successfully allocated " << ipv4_host_counters.average_speed_map.size() << " counters";
}
bool load_our_networks_list() {
@ -1337,10 +1292,6 @@ bool load_our_networks_list() {
/* Preallocate data structures */
patricia_process(lookup_tree_ipv4, subnet_vectors_allocator);
logger << log4cpp::Priority::INFO << "We start total zerofication of counters";
zeroify_all_counters();
logger << log4cpp::Priority::INFO << "We finished zerofication";
logger << log4cpp::Priority::INFO << "We loaded " << networks_list_ipv4_as_string.size()
<< " IPv4 subnets to our in-memory list of networks";

@ -79,11 +79,9 @@ extern bool DEBUG_DUMP_ALL_PACKETS;
extern bool DEBUG_DUMP_OTHER_PACKETS;
extern uint64_t total_ipv4_packets;
extern uint64_t total_ipv6_packets;
extern map_of_vector_counters_t SubnetVectorMapSpeed;
extern double average_calculation_amount;
extern bool print_configuration_params_on_the_screen;
extern uint64_t our_ipv6_packets;
extern map_of_vector_counters_t SubnetVectorMap;
extern uint64_t unknown_ip_version_packets;
extern uint64_t total_simple_packets_processed;
extern unsigned int maximum_time_since_bucket_start_to_remove;
@ -116,8 +114,6 @@ extern bool exabgp_announce_whole_subnet;
extern subnet_to_host_group_map_t subnet_to_host_groups;
extern bool collect_attack_pcap_dumps;
extern std::mutex ban_list_details_mutex;
extern std::mutex ban_list_mutex;
extern std::mutex flow_counter_mutex;
#ifdef REDIS
@ -143,11 +139,9 @@ extern std::string mongodb_database_name;
extern unsigned int number_of_packets_for_pcap_attack_dump;
extern patricia_tree_t *lookup_tree_ipv4, *whitelist_tree_ipv4;
extern patricia_tree_t *lookup_tree_ipv6, *whitelist_tree_ipv6;
extern std::map<uint32_t, std::vector<simple_packet_t>> ban_list_details;
extern ban_settings_t global_ban_settings;
extern bool exabgp_enabled;
extern bool gobgp_enabled;
extern map_of_vector_counters_t SubnetVectorMapSpeedAverage;
extern int global_ban_time;
extern bool notify_script_enabled;
extern std::map<uint32_t, banlist_item_t> ban_list;
@ -847,8 +841,6 @@ void execute_unban_operation_ipv6() {
/* Thread for cleaning up ban list */
void cleanup_ban_list() {
extern bool hash_counters;
// If we use very small ban time we should call ban_cleanup thread more often
if (unban_iteration_sleep_time > global_ban_time) {
unban_iteration_sleep_time = int(global_ban_time / 2);
@ -865,103 +857,7 @@ void cleanup_ban_list() {
time_t current_time;
time(&current_time);
if (hash_counters) {
execute_unban_operation_ipv4();
} else {
std::vector<uint32_t> ban_list_items_for_erase;
for (std::map<uint32_t, banlist_item_t>::iterator itr = ban_list.begin(); itr != ban_list.end(); ++itr) {
uint32_t client_ip = itr->first;
// This IP should be banned permanentely and we skip any processing
if (!itr->second.unban_enabled) {
continue;
}
double time_difference = difftime(current_time, itr->second.ban_timestamp);
int ban_time = itr->second.ban_time;
// Yes, we reached end of ban time for this customer
bool we_could_unban_this_ip = time_difference > ban_time;
// We haven't reached time for unban yet
if (!we_could_unban_this_ip) {
continue;
}
// Check about ongoing attack
if (unban_only_if_attack_finished) {
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
uint32_t subnet_in_host_byte_order = ntohl(itr->second.customer_network.subnet_address);
int64_t shift_in_vector = (int64_t)ntohl(client_ip) - (int64_t)subnet_in_host_byte_order;
// Try to find average speed element
map_of_vector_counters_t::iterator itr_average_speed =
SubnetVectorMapSpeedAverage.find(itr->second.customer_network);
if (itr_average_speed == SubnetVectorMapSpeedAverage.end()) {
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map for unban function";
continue;
}
if (shift_in_vector < 0 or shift_in_vector >= itr_average_speed->second.size()) {
logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
<< " which located outside allocated vector with size " << itr_average_speed->second.size();
continue;
}
subnet_counter_t& average_speed_element = itr_average_speed->second[shift_in_vector];
// We get ban settings from host subnet
std::string host_group_name;
ban_settings_t current_ban_settings =
get_ban_settings_for_this_subnet(itr->second.customer_network, host_group_name);
attack_detection_threshold_type_t attack_detection_source;
attack_detection_direction_type_t attack_detection_direction;
if (we_should_ban_this_entity(average_speed_element, current_ban_settings, attack_detection_source,
attack_detection_direction)) {
logger << log4cpp::Priority::ERROR << "Attack to IP " << client_ip_as_string
<< " still going! We should not unblock this host";
// Well, we still saw attack, skip to next iteration
continue;
}
}
// Add this IP to remove list
// We will remove keys really after this loop
ban_list_items_for_erase.push_back(itr->first);
// Call all hooks for unban
subnet_ipv6_cidr_mask_t zero_ipv6_address;
// It's empty for unban
std::string flow_attack_details;
// These are empty too
boost::circular_buffer<simple_packet_t> simple_packets_buffer;
boost::circular_buffer<fixed_size_packet_storage_t> raw_packets_buffer;
call_blackhole_actions_per_host(attack_action_t::unban, itr->first, zero_ipv6_address, false,
itr->second, attack_detection_source_t::Automatic, flow_attack_details, simple_packets_buffer, raw_packets_buffer);
}
// Remove all unbanned hosts from the ban list
for (std::vector<uint32_t>::iterator itr = ban_list_items_for_erase.begin(); itr != ban_list_items_for_erase.end(); ++itr) {
std::lock_guard<std::mutex> lock_guard(ban_list_mutex);
ban_list.erase(*itr);
}
}
execute_unban_operation_ipv4();
// Unban IPv6 bans
execute_unban_operation_ipv6();
@ -971,39 +867,20 @@ void cleanup_ban_list() {
// This code is a source of race conditions of worst kind, we had to rework it ASAP
std::string print_ddos_attack_details() {
extern blackhole_ban_list_t<uint32_t> ban_list_ipv4;
extern bool hash_counters;
std::stringstream output_buffer;
if (hash_counters) {
std::map<uint32_t, banlist_item_t> ban_list_ipv4_copy;
std::map<uint32_t, banlist_item_t> ban_list_ipv4_copy;
// Get whole ban list content atomically
ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy);
// Get whole ban list content atomically
ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy);
for (auto itr : ban_list_ipv4_copy) {
uint32_t client_ip = itr.first;
for (auto itr : ban_list_ipv4_copy) {
uint32_t client_ip = itr.first;
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
output_buffer << client_ip_as_string << " at " << print_time_t_in_fastnetmon_format(itr.second.ban_timestamp) << std::endl;
}
} else {
for (std::map<uint32_t, banlist_item_t>::iterator ii = ban_list.begin(); ii != ban_list.end(); ++ii) {
uint32_t client_ip = (*ii).first;
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
std::string max_pps_as_string = convert_int_to_string(((*ii).second).max_attack_power);
std::string attack_direction = get_direction_name(((*ii).second).attack_direction);
output_buffer << client_ip_as_string << "/" << max_pps_as_string << " pps " << attack_direction << " at "
<< print_time_t_in_fastnetmon_format(ii->second.ban_timestamp) << std::endl;
// This logic is evil side effect of this function
send_attack_details(client_ip, (*ii).second);
}
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
output_buffer << client_ip_as_string << " at " << print_time_t_in_fastnetmon_format(itr.second.ban_timestamp) << std::endl;
}
return output_buffer.str();
@ -1149,49 +1026,6 @@ std::string get_attack_description_in_json_for_web_hooks(uint32_t client_ip,
return json_as_text;
}
std::string generate_simple_packets_dump(std::vector<simple_packet_t>& ban_list_details) {
std::stringstream attack_details;
std::map<unsigned int, unsigned int> protocol_counter;
for (std::vector<simple_packet_t>::iterator iii = ban_list_details.begin(); iii != ban_list_details.end(); ++iii) {
attack_details << print_simple_packet(*iii);
protocol_counter[iii->protocol]++;
}
return attack_details.str();
}
void send_attack_details(uint32_t client_ip, attack_details_t current_attack_details) {
std::string pps_as_string = convert_int_to_string(current_attack_details.attack_power);
std::string attack_direction = get_direction_name(current_attack_details.attack_direction);
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
// In this case we do not collect any traffic samples
if (ban_details_records_count == 0) {
return;
}
// Very strange code but it work in 95% cases
if (ban_list_details.count(client_ip) > 0 && ban_list_details[client_ip].size() >= ban_details_records_count) {
std::stringstream attack_details;
attack_details << get_attack_description(client_ip, current_attack_details) << "\n\n";
attack_details << generate_simple_packets_dump(ban_list_details[client_ip]);
logger << log4cpp::Priority::INFO << "Attack with direction: " << attack_direction
<< " IP: " << client_ip_as_string << " Power: " << pps_as_string << " traffic samples collected";
call_attack_details_handlers(client_ip, current_attack_details, attack_details.str());
// TODO: here we have definitely RACE CONDITION!!! FIX IT
// Remove key and prevent collection new data about this attack
std::lock_guard<std::mutex> lock_guard(ban_list_details_mutex);
ban_list_details.erase(client_ip);
}
}
void call_attack_details_handlers(uint32_t client_ip, attack_details_t& current_attack, std::string attack_fingerprint) {
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
std::string attack_direction = get_direction_name(current_attack.attack_direction);
@ -1400,131 +1234,6 @@ redisContext* redis_init_connection() {
#endif
void execute_ip_ban(uint32_t client_ip, subnet_counter_t average_speed_element, std::string flow_attack_details, subnet_cidr_mask_t customer_subnet) {
attack_details_t current_attack;
uint64_t pps = 0;
uint64_t in_pps = average_speed_element.total.in_packets;
uint64_t out_pps = average_speed_element.total.out_packets;
uint64_t in_bps = average_speed_element.total.in_bytes;
uint64_t out_bps = average_speed_element.total.out_bytes;
uint64_t in_flows = average_speed_element.in_flows;
uint64_t out_flows = average_speed_element.out_flows;
direction_t data_direction;
if (!global_ban_settings.enable_ban) {
logger << log4cpp::Priority::INFO << "We do not ban: " << convert_ip_as_uint_to_string(client_ip)
<< " because ban disabled completely";
return;
}
// Detect attack direction with simple heuristic
if (abs(int((int)in_pps - (int)out_pps)) < 1000) {
// If difference between pps speed is so small we should do additional investigation using
// bandwidth speed
if (in_bps > out_bps) {
data_direction = INCOMING;
pps = in_pps;
} else {
data_direction = OUTGOING;
pps = out_pps;
}
} else {
if (in_pps > out_pps) {
data_direction = INCOMING;
pps = in_pps;
} else {
data_direction = OUTGOING;
pps = out_pps;
}
}
current_attack.attack_protocol = detect_attack_protocol(average_speed_element, data_direction);
{
std::lock_guard<std::mutex> lock_guard(ban_list_mutex);
// Host is blocked already
if (ban_list.count(client_ip) > 0) {
// update attack power
if (pps > ban_list[client_ip].max_attack_power) {
ban_list[client_ip].max_attack_power = pps;
}
return;
}
}
prefix_t prefix_for_check_adreess;
prefix_for_check_adreess.add.sin.s_addr = client_ip;
prefix_for_check_adreess.family = AF_INET;
prefix_for_check_adreess.bitlen = 32;
bool in_white_list = (patricia_search_best2(whitelist_tree_ipv4, &prefix_for_check_adreess, 1) != NULL);
if (in_white_list) {
return;
}
std::string data_direction_as_string = get_direction_name(data_direction);
logger << log4cpp::Priority::INFO << "We run execute_ip_ban code with following params "
<< " in_pps: " << in_pps << " out_pps: " << out_pps << " in_bps: " << in_bps << " out_bps: " << out_bps
<< " and we decide it's " << data_direction_as_string << " attack";
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
std::string pps_as_string = convert_int_to_string(pps);
// Store information about subnet
current_attack.customer_network = customer_subnet;
// Store ban time
time(&current_attack.ban_timestamp);
// set ban time in seconds
current_attack.ban_time = global_ban_time;
current_attack.unban_enabled = unban_enabled;
// Pass main information about attack
current_attack.attack_direction = data_direction;
current_attack.attack_power = pps;
current_attack.max_attack_power = pps;
// Copy traffic metrics
current_attack.traffic_counters = average_speed_element;
if (collect_attack_pcap_dumps) {
bool buffer_allocation_result = current_attack.pcap_attack_dump.allocate_buffer(number_of_packets_for_pcap_attack_dump);
if (!buffer_allocation_result) {
logger << log4cpp::Priority::ERROR << "Can't allocate buffer for attack, switch off this option completely ";
collect_attack_pcap_dumps = false;
}
}
{
std::lock_guard<std::mutex> lock_guard(ban_list_mutex);
ban_list[client_ip] = current_attack;
}
{
std::lock_guard<std::mutex> lock_guard(ban_list_details_mutex);
ban_list_details[client_ip] = std::vector<simple_packet_t>();
}
logger << log4cpp::Priority::INFO << "Attack with direction: " << data_direction_as_string
<< " IP: " << client_ip_as_string << " Power: " << pps_as_string;
subnet_ipv6_cidr_mask_t zero_ipv6_address;
boost::circular_buffer<simple_packet_t> empty_simple_packets_buffer;
boost::circular_buffer<fixed_size_packet_storage_t> empty_raw_packets_buffer;
call_blackhole_actions_per_host(attack_action_t::ban, client_ip, zero_ipv6_address, false, ban_list[client_ip],
attack_detection_source_t::Automatic, flow_attack_details, empty_simple_packets_buffer, empty_raw_packets_buffer);
}
void call_blackhole_actions_per_host(attack_action_t attack_action,
uint32_t client_ip,
const subnet_ipv6_cidr_mask_t& client_ipv6,
@ -1820,8 +1529,6 @@ void traffic_draw_ipv6_program() {
}
void traffic_draw_ipv4_program() {
extern bool hash_counters;
std::stringstream output_buffer;
// logger<<log4cpp::Priority::INFO<<"Draw table call";
@ -1848,11 +1555,7 @@ void traffic_draw_ipv4_program() {
output_buffer << print_channel_speed("Incoming traffic", INCOMING) << std::endl;
if (process_incoming_traffic) {
if (hash_counters) {
output_buffer << draw_table_ipv4_hash(attack_detection_direction_type_t::incoming, sorter_type);
} else {
output_buffer << draw_table_ipv4(attack_detection_direction_type_t::incoming, sorter_type);
}
output_buffer << draw_table_ipv4_hash(attack_detection_direction_type_t::incoming, sorter_type);
output_buffer << std::endl;
}
@ -1860,11 +1563,7 @@ void traffic_draw_ipv4_program() {
output_buffer << print_channel_speed("Outgoing traffic", OUTGOING) << std::endl;
if (process_outgoing_traffic) {
if (hash_counters) {
output_buffer << draw_table_ipv4_hash(attack_detection_direction_type_t::outgoing, sorter_type);
} else {
output_buffer << draw_table_ipv4(attack_detection_direction_type_t::outgoing, sorter_type);
}
output_buffer << draw_table_ipv4_hash(attack_detection_direction_type_t::outgoing, sorter_type);
output_buffer << std::endl;
}
@ -1885,17 +1584,8 @@ void traffic_draw_ipv4_program() {
output_buffer << "Not processed packets: " << total_unparsed_packets_speed << " pps\n";
if (hash_counters) {
output_buffer << std::endl << "Ban list:" << std::endl;
output_buffer << print_ddos_attack_details();
} else {
if (!ban_list.empty()) {
output_buffer << std::endl << "Ban list:" << std::endl;
output_buffer << print_ddos_attack_details();
}
}
output_buffer << std::endl << "Ban list:" << std::endl;
output_buffer << print_ddos_attack_details();
// Print screen contents into file
print_screen_contents_into_file(output_buffer.str(), cli_stats_file_path);
@ -2233,13 +1923,10 @@ bool increment_flow_counters(subnet_counter_t& new_speed_element, uint32_t clien
void recalculate_speed() {
// logger<< log4cpp::Priority::INFO<<"We run recalculate_speed";
double speed_calc_period = recalculate_speed_timeout;
extern bool hash_counters;
extern abstract_subnet_counters_t<uint32_t, subnet_counter_t> ipv4_host_counters;
extern map_of_vector_counters_for_flow_t SubnetVectorMapFlow;
extern bool hash_counters;
std::chrono::steady_clock::time_point start_time = std::chrono::steady_clock::now();
// Calculate duration of our sleep duration as it may be altered by OS behaviour (i.e. process scheduler)
@ -2287,129 +1974,30 @@ void recalculate_speed() {
ipv4_network_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, nullptr);
if (hash_counters) {
uint64_t flow_exists_for_ip = 0;
uint64_t flow_does_not_exist_for_ip = 0;
uint64_t flow_exists_for_ip = 0;
uint64_t flow_does_not_exist_for_ip = 0;
ipv4_host_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, speed_calculation_callback_local_ipv4, [&outgoing_total_flows, &incoming_total_flows, &flow_exists_for_ip,
&flow_does_not_exist_for_ip](const uint32_t& ip, subnet_counter_t& new_speed_element, double speed_calc_period) {
if (enable_connection_tracking) {
bool res = increment_flow_counters(new_speed_element, fast_ntoh(ip), speed_calc_period);
ipv4_host_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, speed_calculation_callback_local_ipv4, [&outgoing_total_flows, &incoming_total_flows, &flow_exists_for_ip,
&flow_does_not_exist_for_ip](const uint32_t& ip, subnet_counter_t& new_speed_element, double speed_calc_period) {
if (enable_connection_tracking) {
bool res = increment_flow_counters(new_speed_element, fast_ntoh(ip), speed_calc_period);
if (res) {
// Increment global counter
outgoing_total_flows += new_speed_element.out_flows;
incoming_total_flows += new_speed_element.in_flows;
if (res) {
// Increment global counter
outgoing_total_flows += new_speed_element.out_flows;
incoming_total_flows += new_speed_element.in_flows;
flow_exists_for_ip++;
flow_exists_for_ip++;
// logger << log4cpp::Priority::DEBUG << convert_ipv4_subnet_to_string(subnet)
// << "in flows: " << new_speed_element.in_flows << " out flows: " <<
// new_speed_element.out_flows;
} else {
// We did not find record
flow_does_not_exist_for_ip++;
}
}
});
} else {
for (map_of_vector_counters_t::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) {
for (vector_of_counters::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
int current_index = vector_itr - itr->second.begin();
// New element
subnet_counter_t new_speed_element;
// convert to host order for math operations
uint32_t subnet_ip = ntohl(itr->first.subnet_address);
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
// covnert to our standard network byte order
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
// Calculate speed for IP or whole subnet
build_speed_counters_from_packet_counters(new_speed_element, *vector_itr, speed_calc_period);
// It uses host byte order for key
conntrack_main_struct_t& flow_counter = SubnetVectorMapFlow[client_ip_in_host_bytes_order];
if (enable_connection_tracking) {
// todo: optimize this operations!
// it's really bad and SLOW CODE
uint64_t total_out_flows =
(uint64_t)flow_counter.out_tcp.size() + (uint64_t)flow_counter.out_udp.size() +
(uint64_t)flow_counter.out_icmp.size() + (uint64_t)flow_counter.out_other.size();
uint64_t total_in_flows =
(uint64_t)flow_counter.in_tcp.size() + (uint64_t)flow_counter.in_udp.size() +
(uint64_t)flow_counter.in_icmp.size() + (uint64_t)flow_counter.in_other.size();
new_speed_element.out_flows = uint64_t((double)total_out_flows / speed_calc_period);
new_speed_element.in_flows = uint64_t((double)total_in_flows / speed_calc_period);
// Increment global counter
outgoing_total_flows += new_speed_element.out_flows;
incoming_total_flows += new_speed_element.in_flows;
} else {
new_speed_element.out_flows = 0;
new_speed_element.in_flows = 0;
}
/* Moving average recalculation */
// http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance
// double speed_calc_period = 1;
double exp_power = -speed_calc_period / average_calculation_amount;
double exp_value = exp(exp_power);
subnet_counter_t& current_average_speed_element = SubnetVectorMapSpeedAverage[itr->first][current_index];
// Calculate average speed from per-second speed
build_average_speed_counters_from_speed_counters(current_average_speed_element, new_speed_element, exp_value);
if (enable_connection_tracking) {
current_average_speed_element.out_flows =
uint64_t(new_speed_element.out_flows + exp_value * ((double)current_average_speed_element.out_flows -
(double)new_speed_element.out_flows));
current_average_speed_element.in_flows =
uint64_t(new_speed_element.in_flows + exp_value * ((double)current_average_speed_element.in_flows -
(double)new_speed_element.in_flows));
}
/* Moving average recalculation end */
std::string host_group_name;
ban_settings_t current_ban_settings = get_ban_settings_for_this_subnet(itr->first, host_group_name);
attack_detection_threshold_type_t attack_detection_source;
attack_detection_direction_type_t attack_detection_direction;
if (we_should_ban_this_entity(current_average_speed_element, current_ban_settings, attack_detection_source,
attack_detection_direction)) {
logger << log4cpp::Priority::DEBUG << "We have found host group for this host as: " << host_group_name;
std::string flow_attack_details = "";
if (enable_connection_tracking) {
flow_attack_details = print_flow_tracking_for_ip(flow_counter, convert_ip_as_uint_to_string(client_ip));
}
// TODO: we should pass type of ddos ban source (pps, flowd, bandwidth)!
execute_ip_ban(client_ip, current_average_speed_element, flow_attack_details, itr->first);
}
SubnetVectorMapSpeed[itr->first][current_index] = new_speed_element;
*vector_itr = zero_map_element;
// logger << log4cpp::Priority::DEBUG << convert_ipv4_subnet_to_string(subnet)
// << "in flows: " << new_speed_element.in_flows << " out flows: " <<
// new_speed_element.out_flows;
} else {
// We did not find record
flow_does_not_exist_for_ip++;
}
}
}
});
// Calculate IPv6 per network traffic
ipv6_subnet_counters.recalculate_speed(speed_calc_period, (double)average_calculation_amount, nullptr);
@ -2663,108 +2251,6 @@ std::string draw_table_ipv6(attack_detection_direction_type_t sort_direction, at
return output_buffer.str();
}
std::string draw_table_ipv4(const attack_detection_direction_type_t& data_direction,
const attack_detection_threshold_type_t& sorter_type) {
std::vector<pair_of_map_elements> vector_for_sort;
std::stringstream output_buffer;
// Preallocate memory for sort vector
// We use total networks size for this vector
vector_for_sort.reserve(total_number_of_hosts_in_our_networks);
map_of_vector_counters_t* current_speed_map = &SubnetVectorMapSpeedAverage;
subnet_counter_t zero_map_element{};
unsigned int count_of_zero_speed_packets = 0;
for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) {
for (vector_of_counters::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
int current_index = vector_itr - itr->second.begin();
// convert to host order for math operations
uint32_t subnet_ip = ntohl(itr->first.subnet_address);
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
// covnert to our standard network byte order
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
// Do not add zero speed packets to sort list
if (memcmp((void*)&zero_map_element, &*vector_itr, sizeof(subnet_counter_t)) != 0) {
vector_for_sort.push_back(std::make_pair(client_ip, *vector_itr));
} else {
count_of_zero_speed_packets++;
}
}
}
// Sort only first X elements in this vector
unsigned int shift_for_sort = max_ips_in_list;
if (data_direction == attack_detection_direction_type_t::incoming or data_direction == attack_detection_direction_type_t::outgoing) {
// Because in another case we will got segmentation fault
unsigned int vector_size = vector_for_sort.size();
if (vector_size < shift_for_sort) {
shift_for_sort = vector_size;
}
std::partial_sort(vector_for_sort.begin(), vector_for_sort.begin() + shift_for_sort, vector_for_sort.end(),
TrafficComparatorClass<pair_of_map_elements>(data_direction, sorter_type));
} else {
logger << log4cpp::Priority::ERROR << "Unexpected bahaviour on sort function";
return "Internal error";
}
unsigned int element_number = 0;
// In this loop we print only top X talkers in our subnet to screen buffer
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin(); ii != vector_for_sort.end(); ++ii) {
// Print first max_ips_in_list elements in list, we will show top X "huge" channel loaders
if (element_number >= max_ips_in_list) {
break;
}
uint32_t client_ip = (*ii).first;
std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
uint64_t pps = 0;
uint64_t bps = 0;
uint64_t flows = 0;
// Here we could have average or instantaneous speed
subnet_counter_t* current_speed_element = &ii->second;
// Create polymorphic pps, byte and flow counters
if (data_direction == attack_detection_direction_type_t::incoming) {
pps = current_speed_element->total.in_packets;
bps = current_speed_element->total.in_bytes;
flows = current_speed_element->in_flows;
} else if (data_direction == attack_detection_direction_type_t::outgoing) {
pps = current_speed_element->total.out_packets;
bps = current_speed_element->total.out_bytes;
flows = current_speed_element->out_flows;
}
uint64_t mbps = convert_speed_to_mbps(bps);
std::string is_banned = ban_list.count(client_ip) > 0 ? " *banned* " : "";
// We use setw for alignment
output_buffer << client_ip_as_string << "\t\t";
output_buffer << std::setw(6) << pps << " pps ";
output_buffer << std::setw(6) << mbps << " mbps ";
output_buffer << std::setw(6) << flows << " flows ";
output_buffer << is_banned << std::endl;
element_number++;
}
return output_buffer.str();
}
void print_screen_contents_into_file(std::string screen_data_stats_param, std::string file_path) {
std::ofstream screen_data_file;
screen_data_file.open(file_path.c_str(), std::ios::trunc);
@ -2959,7 +2445,6 @@ void collect_traffic_to_buckets_ipv4(const simple_packet_t& current_packet, pack
void process_packet(simple_packet_t& current_packet) {
extern bool kafka_traffic_export;
extern abstract_subnet_counters_t<uint32_t, subnet_counter_t> ipv4_host_counters;
extern bool hash_counters;
extern packet_buckets_storage_t<uint32_t> packet_buckets_ipv4_storage;
extern map_of_vector_counters_for_flow_t SubnetVectorMapFlow;
@ -3067,120 +2552,31 @@ void process_packet(simple_packet_t& current_packet) {
__sync_fetch_and_add(&total_counters_ipv4.total_counters[current_packet.packet_direction].bytes, sampled_number_of_bytes);
#endif
// Try to find map key for this subnet
map_of_vector_counters_t::iterator itr;
// Add traffic to buckets when we have them
collect_traffic_to_buckets_ipv4(current_packet, packet_buckets_ipv4_storage);
if (current_packet.packet_direction == OUTGOING or current_packet.packet_direction == INCOMING) {
// Find element in map of vectors
itr = SubnetVectorMap.find(current_subnet);
if (itr == SubnetVectorMap.end()) {
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map";
return;
}
}
if (hash_counters) {
// Add traffic to buckets when we have them
collect_traffic_to_buckets_ipv4(current_packet, packet_buckets_ipv4_storage);
// Increment counters for all local hosts using new counters
if (current_packet.packet_direction == OUTGOING) {
ipv4_host_counters.increment_outgoing_counters_for_key(current_packet.src_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
} else if (current_packet.packet_direction == INCOMING) {
ipv4_host_counters.increment_incoming_counters_for_key(current_packet.dst_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
} else {
// No reasons to keep locks for other or internal
}
// Increment counters for all local hosts using new counters
if (current_packet.packet_direction == OUTGOING) {
ipv4_host_counters.increment_outgoing_counters_for_key(current_packet.src_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
} else if (current_packet.packet_direction == INCOMING) {
ipv4_host_counters.increment_incoming_counters_for_key(current_packet.dst_ip, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
} else {
// No reasons to keep locks for other or internal
}
// Increment main and per protocol packet counters
if (current_packet.packet_direction == OUTGOING) {
int64_t shift_in_vector = (int64_t)ntohl(current_packet.src_ip) - (int64_t)subnet_in_host_byte_order;
if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
<< " which located outside allocated vector with size " << itr->second.size();
logger << log4cpp::Priority::ERROR
<< "We expect issues with this packet in OUTGOING direction: " << print_simple_packet(current_packet);
return;
}
subnet_counter_t& current_element = itr->second[shift_in_vector];
increment_outgoing_counters(current_element, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
if (enable_connection_tracking) {
increment_outgoing_flow_counters(fast_ntoh(current_packet.src_ip), current_packet,
sampled_number_of_packets, sampled_number_of_bytes);
}
} else if (current_packet.packet_direction == INCOMING) {
int64_t shift_in_vector = (int64_t)ntohl(current_packet.dst_ip) - (int64_t)subnet_in_host_byte_order;
if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
<< " which located outside allocated vector with size " << itr->second.size();
logger << log4cpp::Priority::ERROR
<< "We expect issues with this packet in INCOMING direction: " << print_simple_packet(current_packet);
return;
}
subnet_counter_t& current_element = itr->second[shift_in_vector];
increment_incoming_counters(current_element, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
if (enable_connection_tracking) {
increment_incoming_flow_counters(fast_ntoh(current_packet.dst_ip), current_packet,
sampled_number_of_packets, sampled_number_of_bytes);
}
} else if (current_packet.packet_direction == INTERNAL) {
}
// Execute ban related processing
if (current_packet.packet_direction == OUTGOING) {
// Collect data when ban client
if (ban_details_records_count != 0 && !ban_list_details.empty() && ban_list_details.count(current_packet.src_ip) > 0 &&
ban_list_details[current_packet.src_ip].size() < ban_details_records_count) {
std::lock_guard<std::mutex> lock_guard(ban_list_details_mutex);
if (collect_attack_pcap_dumps) {
// this code SHOULD NOT be called without mutex!
if (current_packet.captured_payload_length > 0 && current_packet.payload_pointer != NULL) {
ban_list[current_packet.src_ip].pcap_attack_dump.write_packet(current_packet.payload_pointer,
current_packet.captured_payload_length,
current_packet.payload_full_length);
}
}
ban_list_details[current_packet.src_ip].push_back(current_packet);
}
}
if (current_packet.packet_direction == INCOMING) {
// Collect attack details
if (ban_details_records_count != 0 && !ban_list_details.empty() && ban_list_details.count(current_packet.dst_ip) > 0 &&
ban_list_details[current_packet.dst_ip].size() < ban_details_records_count) {
std::lock_guard<std::mutex> lock_guard(ban_list_details_mutex);
if (collect_attack_pcap_dumps) {
// this code SHOULD NOT be called without mutex!
if (current_packet.captured_payload_length > 0 && current_packet.payload_pointer != NULL) {
ban_list[current_packet.dst_ip].pcap_attack_dump.write_packet(current_packet.payload_pointer,
current_packet.captured_payload_length,
current_packet.payload_full_length);
}
}
ban_list_details[current_packet.dst_ip].push_back(current_packet);
}
}
}
void system_counters_speed_thread_handler() {
@ -3544,21 +2940,15 @@ void process_filled_buckets_ipv6() {
// This functions will check for packet buckets availible for processing
void check_traffic_buckets() {
extern bool hash_counters;
extern packet_buckets_storage_t<uint32_t> packet_buckets_ipv4_storage;
while (true) {
if (hash_counters) {
remove_orphaned_buckets(packet_buckets_ipv4_storage, "ipv4");
}
remove_orphaned_buckets(packet_buckets_ipv4_storage, "ipv4");
// Process buckets which haven't filled by packets
remove_orphaned_buckets(packet_buckets_ipv6_storage, "ipv6");
if (hash_counters) {
process_filled_buckets_ipv4();
}
process_filled_buckets_ipv4();
process_filled_buckets_ipv6();

@ -11,8 +11,6 @@
#include "../abstract_subnet_counters.hpp"
extern log4cpp::Category& logger;
extern map_of_vector_counters_t SubnetVectorMapSpeed;
extern map_of_vector_counters_t SubnetVectorMapSpeedAverage;
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern abstract_subnet_counters_t<subnet_cidr_mask_t, subnet_counter_t> ipv4_network_counters;
@ -27,98 +25,6 @@ extern unsigned int graphite_push_period;
// Push host traffic to Graphite
bool push_hosts_traffic_counters_to_graphite() {
std::vector<direction_t> processed_directions = { INCOMING, OUTGOING };
graphite_data_t graphite_data;
map_of_vector_counters_t* current_speed_map = &SubnetVectorMapSpeedAverage;
// Iterate over all networks
for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) {
// Iterate over all hosts in network
for (vector_of_counters_t::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
int current_index = vector_itr - itr->second.begin();
// convert to host order for math operations
uint32_t subnet_ip = ntohl(itr->first.subnet_address);
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
// convert to our standard network byte order
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
std::string ip_as_string_with_dash_delimiters = client_ip_as_string;
// Replace dots by dashes
std::replace(ip_as_string_with_dash_delimiters.begin(), ip_as_string_with_dash_delimiters.end(), '.', '_');
// Here we could have average or instantaneous speed
subnet_counter_t* current_speed_element = &*vector_itr;
for (auto data_direction : processed_directions) {
std::string direction_as_string;
if (data_direction == INCOMING) {
direction_as_string = "incoming";
} else if (data_direction == OUTGOING) {
direction_as_string = "outgoing";
}
std::string graphite_current_prefix =
graphite_prefix + ".hosts." + ip_as_string_with_dash_delimiters + "." + direction_as_string;
graphite_current_prefix = graphite_current_prefix + ".average";
if (data_direction == INCOMING) {
// Prepare incoming traffic data
// We do not store zero data to Graphite
if (current_speed_element->total.in_packets != 0) {
graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->total.in_packets;
}
if (current_speed_element->total.in_bytes != 0) {
graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->total.in_bytes * 8;
}
if (current_speed_element->in_flows != 0) {
graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->in_flows;
}
} else if (data_direction == OUTGOING) {
// Prepare outgoing traffic data
// We do not store zero data to Graphite
if (current_speed_element->total.out_packets != 0) {
graphite_data[graphite_current_prefix + ".pps"] = current_speed_element->total.out_packets;
}
if (current_speed_element->total.out_bytes != 0) {
graphite_data[graphite_current_prefix + ".bps"] = current_speed_element->total.out_bytes * 8;
}
if (current_speed_element->out_flows != 0) {
graphite_data[graphite_current_prefix + ".flows"] = current_speed_element->out_flows;
}
}
}
}
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
if (!graphite_put_result) {
logger << log4cpp::Priority::ERROR << "Can't store host load data to Graphite server " << graphite_host
<< " port: " << graphite_port;
return false;
}
}
return true;
}
// Push host traffic to Graphite
bool push_hosts_traffic_counters_to_graphite_hash() {
extern abstract_subnet_counters_t<uint32_t, subnet_counter_t> ipv4_host_counters;
std::vector<direction_t> processed_directions = { INCOMING, OUTGOING };
@ -283,8 +189,7 @@ bool push_network_traffic_counters_to_graphite() {
// This thread pushes speed counters to graphite
void graphite_push_thread() {
extern bool hash_counters;
// Sleep for a half second for shift against calculatiuon thread
boost::this_thread::sleep(boost::posix_time::milliseconds(500));
@ -300,11 +205,7 @@ void graphite_push_thread() {
push_network_traffic_counters_to_graphite();
// Push per host counters to graphite
if (hash_counters) {
push_hosts_traffic_counters_to_graphite_hash();
} else {
push_hosts_traffic_counters_to_graphite();
}
push_hosts_traffic_counters_to_graphite();
std::chrono::duration<double> diff = std::chrono::steady_clock::now() - start_time;

@ -4,4 +4,3 @@ void graphite_push_thread();
bool push_total_traffic_counters_to_graphite();
bool push_network_traffic_counters_to_graphite();
bool push_hosts_traffic_counters_to_graphite();
bool push_hosts_traffic_counters_to_graphite_hash();

@ -11,8 +11,6 @@
#include <vector>
extern struct timeval graphite_thread_execution_time;
extern map_of_vector_counters_t SubnetVectorMapSpeed;
extern map_of_vector_counters_t SubnetVectorMapSpeedAverage;
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern abstract_subnet_counters_t<subnet_cidr_mask_t, subnet_counter_t> ipv4_network_counters;
@ -288,7 +286,6 @@ push_network_traffic_counters_to_influxdb(abstract_subnet_counters_t<T, C>& netw
// This thread pushes data to InfluxDB
void influxdb_push_thread() {
extern bool hash_counters;
extern abstract_subnet_counters_t<uint32_t, subnet_counter_t> ipv4_host_counters;
// Sleep for a half second for shift against calculation thread
@ -333,14 +330,9 @@ void influxdb_push_thread() {
influxdb_auth, influxdb_user, influxdb_password, "networks_traffic", "network");
// Push per host counters to InfluxDB
if (hash_counters) {
push_hosts_traffic_counters_to_influxdb(ipv4_host_counters, influxdb_database,
current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth,
influxdb_user, influxdb_password, "hosts_traffic", "host");
} else {
push_hosts_ipv4_traffic_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port),
influxdb_auth, influxdb_user, influxdb_password);
}
push_hosts_traffic_counters_to_influxdb(ipv4_host_counters, influxdb_database,
current_influxdb_ip_address, std::to_string(influxdb_port), influxdb_auth,
influxdb_user, influxdb_password, "hosts_traffic", "host");
push_system_counters_to_influxdb(influxdb_database, current_influxdb_ip_address, std::to_string(influxdb_port),
influxdb_auth, influxdb_user, influxdb_password);
@ -357,72 +349,6 @@ void influxdb_push_thread() {
}
}
// Push host traffic to InfluxDB
bool push_hosts_ipv4_traffic_counters_to_influxdb(std::string influx_database,
std::string influx_host,
std::string influx_port,
bool enable_auth,
std::string influx_user,
std::string influx_password) {
/* https://docs.influxdata.com/influxdb/v1.7/concepts/glossary/:
A collection of points in line protocol format, separated by newlines (0x0A). A batch of points may be submitted to
the database using a single HTTP request to the write endpoint. This makes writes via the HTTP API much more
performant by drastically reducing the HTTP overhead. InfluxData recommends batch sizes of 5,000-10,000 points,
although different use cases may be better served by significantly smaller or larger batches.
*/
map_of_vector_counters_t* current_speed_map = &SubnetVectorMapSpeedAverage;
// Iterate over all networks
for (map_of_vector_counters_t::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) {
std::vector<std::pair<std::string, std::map<std::string, uint64_t>>> hosts_vector;
// Iterate over all hosts in network
for (vector_of_counters_t::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
std::map<std::string, uint64_t> plain_total_counters_map;
int current_index = vector_itr - itr->second.begin();
// Convert to host order for math operations
uint32_t subnet_ip = ntohl(itr->first.subnet_address);
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
// Convert to our standard network byte order
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
// Here we could have average or instantaneous speed
subnet_counter_t* current_speed_element = &*vector_itr;
// Skip elements with zero speed
if (current_speed_element->is_zero()) {
continue;
}
fill_main_counters_for_influxdb(*current_speed_element, plain_total_counters_map, true);
// Key: client_ip_as_string
hosts_vector.push_back(std::make_pair(client_ip_as_string, plain_total_counters_map));
}
if (hosts_vector.size() > 0) {
std::string error_text;
bool result = write_batch_of_data_to_influxdb(influx_database, influx_host, influx_port, enable_auth, influx_user,
influx_password, "hosts_traffic", "host", hosts_vector, error_text);
if (!result) {
influxdb_writes_failed++;
logger << log4cpp::Priority::DEBUG << "InfluxDB batch operation failed for hosts_traffic";
return false;
}
}
}
return true;
}
// Write batch of data for particular InfluxDB database
bool write_batch_of_data_to_influxdb(std::string influx_database,
std::string influx_host,

@ -35,10 +35,3 @@ if [ "$4" = "ban" ]; then
# You can add ban code here
exit 0
fi
# Advanced edition does not use this action and passes all details in ban action
if [ "$4" = "attack_details" ]; then
cat | mail -s "FastNetMon Guard: IP $1 blocked because $2 attack with power $3 pps" $email_notify;
exit 0
fi