Introduce new data structure for accumulators:

1) Thread safe
2) Without memory allocation
3) EXTREMELY FAST

Old std::map:
26547 root       20   0  127M  7216  2832 S 68.0  0.0  8h11:37 │  └─ /opt/fastnetmon/fastnetmon
26551 root       20   0  127M  7216  2832 R 59.0  0.0  6h44:51 │     ├─ /opt/fastnetmon/fastnetmon
26550 root       20   0  127M  7216  2832 S  0.0  0.0  0:00.00 │     ├─ /opt/fastnetmon/fastnetmon
26549 root       20   0  127M  7216  2832 S  7.0  0.0  1h15:12 │     ├─ /opt/fastnetmon/fastnetmon
26548 root       20   0  127M  7216  2832 S  0.0  0.0 11:33.57 │     └─ /opt/fastnetmon/fastnetmon

New std::map:
29427 root       20   0  153M  5604  3436 S 22.0  0.0  0:09.60 │        └─ ./fastnetmon
29431 root       20   0  153M  5604  3436 R 21.0  0.0  0:09.06 │           ├─ ./fastnetmon
29430 root       20   0  153M  5604  3436 S  0.0  0.0  0:00.00 │           ├─ ./fastnetmon
29429 root       20   0  153M  5604  3436 S  0.0  0.0  0:00.22 │           ├─ ./fastnetmon
29428 root       20   0  153M  5604  3436 S  1.0  0.0  0:00.31 │           └─ ./fastnetmon
This commit is contained in:
Pavel Odintsov 2014-10-23 18:08:58 +04:00
parent 81e6719dc1
commit 1c8928df00
2 changed files with 174 additions and 44 deletions

View File

@ -76,7 +76,8 @@ LIBS += -lboost_regex
# STATIC = -static
# incompatible with static linkage
# LIBS += -ltbb
#DEFINES += -DENABLE_TBB
#LIBS += -ltbb
# removed -std=c++11 for fastnetmon.o
COMPILER = g++

View File

@ -231,13 +231,11 @@ typedef struct {
uint32_t dst_ip;
} conntrack_key;
#ifdef ENABLE_TBB
//apt-get install -y libtbb-dev
#include "tbb/concurrent_unordered_map.h"
typedef tbb::concurrent_unordered_map <uint32_t, map_element> map_for_counters;
#else
typedef std::map <uint32_t, map_element> map_for_counters;
#endif
typedef vector<map_element> vector_of_counters;
typedef std::map <unsigned long int, vector_of_counters> map_of_vector_counters;
map_of_vector_counters SubnetVectorMap;
// data structure for storing data in Vector
typedef pair<uint32_t, map_element> pair_of_map_elements;
@ -281,7 +279,7 @@ pfring* pf_ring_descr = NULL;
#endif
// main map for storing traffic data
map_for_counters DataCounter;
// map_for_counters DataCounter;
// map for flows
map<uint64_t, int> FlowCounter;
@ -341,7 +339,7 @@ void main_packet_process_task();
unsigned int get_cidr_mask_from_network_as_string(string network_cidr_format);
string send_ddos_attack_details();
void execute_ip_ban(uint32_t client_ip, unsigned int in_pps, unsigned int out_pps, unsigned int in_bps, unsigned int out_bps);
direction get_packet_direction(uint32_t src_ip, uint32_t dst_ip);
direction get_packet_direction(uint32_t src_ip, uint32_t dst_ip, unsigned long& subnet);
void recalculate_speed();
std::string print_channel_speed(string traffic_type, direction packet_direction);
void process_packet(simple_packet& current_packet);
@ -698,6 +696,34 @@ void enable_core_dumps() {
}
}
void subnet_vectors_allocator(prefix_t* prefix, void* data) {
// Network byte order
uint32_t subnet_as_integer = prefix->add.sin.s_addr;
u_short bitlen = prefix->bitlen;
int network_size_in_ips = pow(2, 32-bitlen);
//logger<< log4cpp::Priority::INFO<<"Subnet: "<<prefix->add.sin.s_addr<<" network size: "<<network_size_in_ips;
logger<< log4cpp::Priority::INFO<<"I will allocate "<<network_size_in_ips<<" records for subnet "<<subnet_as_integer;
// Initialize map element
SubnetVectorMap[subnet_as_integer] = vector_of_counters(network_size_in_ips);
// Zeroify all vector elements
map_element zero_map_element;
memset(&zero_map_element, 0, sizeof(zero_map_element));
std::fill(SubnetVectorMap[subnet_as_integer].begin(), SubnetVectorMap[subnet_as_integer].end(), zero_map_element);
}
void zeroify_all_counters() {
map_element zero_map_element;
memset(&zero_map_element, 0, sizeof(zero_map_element));
for (map_of_vector_counters::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); itr++) {
logger<< log4cpp::Priority::INFO<<"Zeroify "<<itr->first;
std::fill(itr->second.begin(), itr->second.end(), zero_map_element);
}
}
bool load_our_networks_list() {
if (file_exists("/etc/networks_whitelist")) {
vector<string> network_list_from_config = read_file_to_vector("/etc/networks_whitelist");
@ -756,12 +782,20 @@ bool load_our_networks_list() {
total_number_of_hosts_in_our_networks += pow(2, 32-cidr_mask);
if (ii->length() > 0 && is_cidr_subnet(*ii)) {
make_and_lookup(lookup_tree, const_cast<char*>(ii->c_str()));
make_and_lookup(lookup_tree, const_cast<char*>(ii->c_str()));
} else {
logger<<log4cpp::Priority::INFO<<"Can't parse line from subnet list: "<<*ii;
}
}
/* Preallocate data structures */
patricia_process (lookup_tree, (void_fn_t)subnet_vectors_allocator);
logger<<log4cpp::Priority::INFO<<"We start total zerofication of counters";
zeroify_all_counters();
logger<<log4cpp::Priority::INFO<<"We finished it";
logger<<log4cpp::Priority::INFO<<"We loaded "<<networks_list_as_string.size()<<" subnets to our in-memory list of networks";
logger<<log4cpp::Priority::INFO<<"Total number of monitored hosts (total size of all networks): "
<<total_number_of_hosts_in_our_networks;
@ -961,7 +995,27 @@ void process_packet(simple_packet& current_packet) {
logger<< log4cpp::Priority::INFO<<"Dump: "<<print_simple_packet(current_packet);
}
direction packet_direction = get_packet_direction(current_packet.src_ip, current_packet.dst_ip);
// Subnet for found IPs
unsigned long subnet = 0;
direction packet_direction = get_packet_direction(current_packet.src_ip, current_packet.dst_ip, subnet);
uint32_t subnet_in_host_byte_order = 0;
// We operate in host bytes order and need to convert subnet
if (subnet != 0) {
subnet_in_host_byte_order = ntohl(subnet);
}
// Try to find map key for this subnet
map_of_vector_counters::iterator itr;
if (packet_direction == OUTGOING or packet_direction == INCOMING) {
itr = SubnetVectorMap.find(subnet);
if (itr == SubnetVectorMap.end()) {
logger<< log4cpp::Priority::INFO<<"Can't find vector address in subnet map";
return;
}
}
total_counters_mutex.lock();
total_counters[packet_direction].packets++;
@ -971,6 +1025,9 @@ void process_packet(simple_packet& current_packet) {
if (packet_direction == INTERNAL) {
} else if (packet_direction == OUTGOING) {
uint32_t shift_in_vector = ntohl(current_packet.src_ip) - subnet_in_host_byte_order;
#define current_element itr->second[shift_in_vector]
// собираем данные для деталей при бане клиента
if (ban_list_details.count(current_packet.src_ip) > 0 &&
ban_list_details[current_packet.src_ip].size() < ban_details_records_count) {
@ -978,28 +1035,37 @@ void process_packet(simple_packet& current_packet) {
ban_list_details[current_packet.src_ip].push_back(current_packet);
}
#ifndef ENABLE_TBB
data_counters_mutex.lock();
#endif
//data_counters_mutex.lock();
if (current_packet.protocol == IPPROTO_TCP) {
DataCounter[ current_packet.src_ip ].tcp_out_packets++;
DataCounter[ current_packet.src_ip ].tcp_out_bytes += current_packet.length;
current_element.tcp_out_packets++;
current_element.tcp_out_bytes += current_packet.length;
//DataCounter[ current_packet.src_ip ].tcp_out_packets++;
//DataCounter[ current_packet.src_ip ].tcp_out_bytes += current_packet.length;
} else if (current_packet.protocol == IPPROTO_UDP) {
DataCounter[ current_packet.src_ip ].udp_out_packets++;
DataCounter[ current_packet.src_ip ].udp_out_bytes += current_packet.length;
current_element.udp_out_packets++;
current_element.udp_out_bytes += current_packet.length;
//DataCounter[ current_packet.src_ip ].udp_out_packets++;
//DataCounter[ current_packet.src_ip ].udp_out_bytes += current_packet.length;
} else {
// TBD
}
DataCounter[ current_packet.src_ip ].out_packets++;
DataCounter[ current_packet.src_ip ].out_bytes += current_packet.length;
current_element.out_packets++;
current_element.out_bytes += current_packet.length;
#ifndef ENABLE_TBB
data_counters_mutex.unlock();
#endif
//DataCounter[ current_packet.src_ip ].out_packets++;
//DataCounter[ current_packet.src_ip ].out_bytes += current_packet.length;
//data_counters_mutex.unlock();
} else if (packet_direction == INCOMING) {
uint32_t shift_in_vector = ntohl(current_packet.dst_ip) - subnet_in_host_byte_order;
#define current_element itr->second[shift_in_vector]
// logger<< log4cpp::Priority::INFO<<"Shift is: "<<shift_in_vector;
// собираемы данные для деталей при бане клиента
if (ban_list_details.count(current_packet.dst_ip) > 0 &&
ban_list_details[current_packet.dst_ip].size() < ban_details_records_count) {
@ -1007,25 +1073,30 @@ void process_packet(simple_packet& current_packet) {
ban_list_details[current_packet.dst_ip].push_back(current_packet);
}
#ifndef ENABLE_TBB
data_counters_mutex.lock();
#endif
DataCounter[ current_packet.dst_ip ].in_packets ++;
//data_counters_mutex.lock();
if (current_packet.protocol == IPPROTO_TCP) {
DataCounter[ current_packet.dst_ip ].tcp_in_packets++;
DataCounter[ current_packet.dst_ip ].tcp_in_bytes += current_packet.length;
current_element.tcp_in_packets++;
current_element.tcp_in_bytes += current_packet.length;
//DataCounter[ current_packet.dst_ip ].tcp_in_packets++;
//DataCounter[ current_packet.dst_ip ].tcp_in_bytes += current_packet.length;
} else if (current_packet.protocol == IPPROTO_UDP) {
DataCounter[ current_packet.dst_ip ].udp_in_packets++;
DataCounter[ current_packet.dst_ip ].udp_in_bytes += current_packet.length;
current_element.udp_in_packets++;
current_element.udp_in_bytes += current_packet.length;
//DataCounter[ current_packet.dst_ip ].udp_in_packets++;
//DataCounter[ current_packet.dst_ip ].udp_in_bytes += current_packet.length;
} else {
// TBD
}
DataCounter[ current_packet.dst_ip ].in_bytes += current_packet.length;
#ifndef ENABLE_TBB
data_counters_mutex.unlock();
#endif
current_element.in_packets ++;
current_element.in_bytes += current_packet.length;
//DataCounter[ current_packet.dst_ip ].in_packets ++;
//DataCounter[ current_packet.dst_ip ].in_bytes += current_packet.length;
//data_counters_mutex.unlock();
} else {
// Other traffic
}
@ -1121,6 +1192,52 @@ void recalculate_speed() {
//logger<< log4cpp::Priority::INFO<<"Difference: "<<time_difference;
map_element zero_map_element;
memset(&zero_map_element, 0, sizeof(zero_map_element));
for (map_of_vector_counters::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) {
for (vector_of_counters::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
if (vector_itr->in_packets == 0 and vector_itr->out_packets == 0) {
continue;
}
int current_index = vector_itr - itr->second.begin();
// convert to host order for math operations
uint32_t subnet_ip = ntohl(itr->first);
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
// covnert to our standard network byte order
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
//logger<< log4cpp::Priority::INFO<<convert_ip_as_uint_to_string(client_ip);
unsigned int in_pps = int((double)vector_itr->in_packets / (double)speed_calc_period);
unsigned int out_pps = int((double)vector_itr->out_packets / (double)speed_calc_period);
unsigned int in_bps = int((double)vector_itr->in_bytes / (double)speed_calc_period);
unsigned int out_bps = int((double)vector_itr->out_bytes / (double)speed_calc_period);
// we detect overspeed
if (in_pps > ban_threshold or out_pps > ban_threshold) {
execute_ip_ban(client_ip, in_pps, out_pps, in_bps, out_bps);
}
speed_counters_mutex.lock();
// add speed values to speed struct
SpeedCounter[client_ip].in_bytes = in_bps;
SpeedCounter[client_ip].out_bytes = out_bps;
SpeedCounter[client_ip].in_packets = in_pps;
SpeedCounter[client_ip].out_packets = out_pps;
speed_counters_mutex.unlock();
data_counters_mutex.lock();
*vector_itr = zero_map_element;
data_counters_mutex.unlock();
}
}
/*
// calculate speed for all our IPs
for( map_for_counters::iterator ii = DataCounter.begin(); ii != DataCounter.end(); ++ii) {
uint32_t client_ip = (*ii).first;
@ -1145,9 +1262,8 @@ void recalculate_speed() {
SpeedCounter[client_ip].out_packets = out_pps;
speed_counters_mutex.unlock();
#ifndef ENABLE_TBB
data_counters_mutex.lock();
#endif
DataCounter[client_ip].in_bytes = 0;
DataCounter[client_ip].out_bytes = 0;
DataCounter[client_ip].in_packets = 0;
@ -1162,10 +1278,14 @@ void recalculate_speed() {
DataCounter[client_ip].udp_out_packets = 0;
DataCounter[client_ip].udp_in_bytes = 0;
DataCounter[client_ip].udp_out_bytes = 0;
#ifndef ENABLE_TBB
data_counters_mutex.unlock();
#endif
}
*/
// We use per elemen zerofication now
//data_counters_mutex.lock();
//zeroify_all_counters();
//data_counters_mutex.unlock();
// Clean Flow Counter
flow_counter.lock();
@ -1210,7 +1330,7 @@ void calculation_programm() {
output_buffer<<"FastNetMon v1.0 "<<"IPs ordered by: "<<sort_parameter<<" (use keys 'b'/'p' for change) and use 'q' for quit"<<"\n"
<<"Threshold is: "<<ban_threshold
<<" number of active hosts: "<<DataCounter.size()
//<<" number of active hosts: "<<DataCounter.size()
<<" traffic recaculation time is: "<< calculation_thread_execution_time.tv_sec<<" sec "<<calculation_thread_execution_time.tv_usec<<" microseconds"
<<" number of flows: "<<FlowCounter.size()
<<" from total hosts: "<<total_number_of_hosts_in_our_networks<<endl<<endl;
@ -1744,7 +1864,7 @@ bool cached_patricia_lookup(patricia_tree_t *patricia_tree, prefix_t* prefix, lp
}
/* Get traffic type: check it belongs to our IPs */
direction get_packet_direction(uint32_t src_ip, uint32_t dst_ip) {
direction get_packet_direction(uint32_t src_ip, uint32_t dst_ip, unsigned long& subnet) {
direction packet_direction;
bool our_ip_is_destination = false;
@ -1755,23 +1875,32 @@ direction get_packet_direction(uint32_t src_ip, uint32_t dst_ip) {
prefix_for_check_adreess.family = AF_INET;
prefix_for_check_adreess.bitlen = 32;
patricia_node_t* found_patrica_node = NULL;
unsigned long destination_subnet = 0;
//if (cached_patricia_lookup(lookup_tree, &prefix_for_check_adreess, lpm_cache)) {
if (fast_patricia_lookup(lookup_tree, &prefix_for_check_adreess)) {
if (found_patrica_node = patricia_search_best(lookup_tree, &prefix_for_check_adreess)) {
our_ip_is_destination = true;
destination_subnet = found_patrica_node->prefix->add.sin.s_addr;
}
prefix_for_check_adreess.add.sin.s_addr = src_ip;
unsigned long source_subnet = 0;
//if (cached_patricia_lookup(lookup_tree, &prefix_for_check_adreess, lpm_cache)) {
if (fast_patricia_lookup(lookup_tree, &prefix_for_check_adreess)) {
if (found_patrica_node = patricia_search_best(lookup_tree, &prefix_for_check_adreess)) {
our_ip_is_source = true;
source_subnet = found_patrica_node->prefix->add.sin.s_addr;
}
subnet = 0;
if (our_ip_is_source && our_ip_is_destination) {
packet_direction = INTERNAL;
} else if (our_ip_is_source) {
subnet = source_subnet;
packet_direction = OUTGOING;
} else if (our_ip_is_destination) {
subnet = destination_subnet;
packet_direction = INCOMING;
} else {
packet_direction = OTHER;