Another stage of big refactoring (#871)
This commit is contained in:
parent
d51ad89ac7
commit
d021179c73
@ -538,6 +538,7 @@ if (MONGOC_INCLUDES_FOLDER AND MONGOC_LIBRARY_PATH AND BSON_INCLUDES_FOLDER AND
|
||||
include_directories("${BSON_INCLUDES_FOLDER}/libbson-1.0")
|
||||
|
||||
target_link_libraries(fastnetmon ${MONGOC_LIBRARY_PATH} ${BSON_LIBRARY_PATH})
|
||||
target_link_libraries(fastnetmon_logic ${MONGOC_LIBRARY_PATH} ${BSON_LIBRARY_PATH})
|
||||
else()
|
||||
message(FATAL_ERROR "We can't find Mongo C library")
|
||||
endif()
|
||||
|
1452
src/fastnetmon.cpp
1452
src/fastnetmon.cpp
@ -112,11 +112,6 @@
|
||||
#include <hiredis/hiredis.h>
|
||||
#endif
|
||||
|
||||
#ifdef MONGO
|
||||
#include <bson.h>
|
||||
#include <mongoc.h>
|
||||
#endif
|
||||
|
||||
// #define IPV6_HASH_COUNTERS
|
||||
|
||||
#ifdef IPV6_HASH_COUNTERS
|
||||
@ -461,9 +456,7 @@ std::string get_attack_description(uint32_t client_ip, attack_details& current_a
|
||||
void send_attack_details(uint32_t client_ip, attack_details current_attack_details);
|
||||
void free_up_all_resources();
|
||||
std::string print_ddos_attack_details();
|
||||
void recalculate_speed();
|
||||
std::string print_channel_speed(std::string traffic_type, direction_t packet_direction);
|
||||
void process_packet(simple_packet_t& current_packet);
|
||||
void traffic_draw_program();
|
||||
void interruption_signal_handler(int signal_number);
|
||||
|
||||
@ -604,252 +597,6 @@ bool geoip_init() {
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef MONGO
|
||||
void store_data_in_mongo(std::string key_name, std::string attack_details_json) {
|
||||
mongoc_client_t* client;
|
||||
mongoc_collection_t* collection;
|
||||
mongoc_cursor_t* cursor;
|
||||
bson_error_t error;
|
||||
bson_oid_t oid;
|
||||
bson_t* doc;
|
||||
|
||||
mongoc_init();
|
||||
|
||||
std::string collection_name = "attacks";
|
||||
std::string connection_string =
|
||||
"mongodb://" + mongodb_host + ":" + convert_int_to_string(mongodb_port) + "/";
|
||||
|
||||
client = mongoc_client_new(connection_string.c_str());
|
||||
|
||||
if (!client) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't connect to MongoDB database";
|
||||
return;
|
||||
}
|
||||
|
||||
bson_error_t bson_from_json_error;
|
||||
bson_t* bson_data = bson_new_from_json((const uint8_t*)attack_details_json.c_str(),
|
||||
attack_details_json.size(), &bson_from_json_error);
|
||||
if (!bson_data) {
|
||||
logger << log4cpp::Priority::ERROR << "Could not convert JSON to BSON";
|
||||
return;
|
||||
}
|
||||
|
||||
// logger << log4cpp::Priority::INFO << bson_as_json(bson_data, NULL);
|
||||
|
||||
collection =
|
||||
mongoc_client_get_collection(client, mongodb_database_name.c_str(), collection_name.c_str());
|
||||
|
||||
doc = bson_new();
|
||||
bson_oid_init(&oid, NULL);
|
||||
BSON_APPEND_OID(doc, "_id", &oid);
|
||||
bson_append_document(doc, key_name.c_str(), key_name.size(), bson_data);
|
||||
|
||||
// logger << log4cpp::Priority::INFO << bson_as_json(doc, NULL);
|
||||
|
||||
if (!mongoc_collection_insert(collection, MONGOC_INSERT_NONE, doc, NULL, &error)) {
|
||||
logger << log4cpp::Priority::ERROR << "Could not store data to MongoDB: " << error.message;
|
||||
}
|
||||
|
||||
// TODO: destroy bson_data too!
|
||||
|
||||
bson_destroy(doc);
|
||||
mongoc_collection_destroy(collection);
|
||||
mongoc_client_destroy(client);
|
||||
}
|
||||
#endif
|
||||
|
||||
std::string draw_table(direction_t data_direction, bool do_redis_update, sort_type sort_item) {
|
||||
std::vector<pair_of_map_elements> vector_for_sort;
|
||||
|
||||
std::stringstream output_buffer;
|
||||
|
||||
// Preallocate memory for sort vector
|
||||
// We use total networks size for this vector
|
||||
vector_for_sort.reserve(total_number_of_hosts_in_our_networks);
|
||||
|
||||
// Switch to Average speed there!!!
|
||||
map_of_vector_counters* current_speed_map = NULL;
|
||||
|
||||
if (print_average_traffic_counts) {
|
||||
current_speed_map = &SubnetVectorMapSpeedAverage;
|
||||
} else {
|
||||
current_speed_map = &SubnetVectorMapSpeed;
|
||||
}
|
||||
|
||||
map_element zero_map_element;
|
||||
memset(&zero_map_element, 0, sizeof(zero_map_element));
|
||||
|
||||
unsigned int count_of_zero_speed_packets = 0;
|
||||
for (map_of_vector_counters::iterator itr = current_speed_map->begin();
|
||||
itr != current_speed_map->end(); ++itr) {
|
||||
for (vector_of_counters::iterator vector_itr = itr->second.begin();
|
||||
vector_itr != itr->second.end(); ++vector_itr) {
|
||||
int current_index = vector_itr - itr->second.begin();
|
||||
|
||||
// convert to host order for math operations
|
||||
uint32_t subnet_ip = ntohl(itr->first.first);
|
||||
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
|
||||
|
||||
// covnert to our standard network byte order
|
||||
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
|
||||
|
||||
// Do not add zero speed packets to sort list
|
||||
if (memcmp((void*)&zero_map_element, &*vector_itr, sizeof(map_element)) != 0) {
|
||||
vector_for_sort.push_back(std::make_pair(client_ip, *vector_itr));
|
||||
} else {
|
||||
count_of_zero_speed_packets++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort only first X elements in this vector
|
||||
unsigned int shift_for_sort = max_ips_in_list;
|
||||
|
||||
if (data_direction == INCOMING or data_direction == OUTGOING) {
|
||||
// Because in another case we will got segmentation fault
|
||||
unsigned int vector_size = vector_for_sort.size();
|
||||
|
||||
if (vector_size < shift_for_sort) {
|
||||
shift_for_sort = vector_size;
|
||||
}
|
||||
|
||||
std::partial_sort(vector_for_sort.begin(), vector_for_sort.begin() + shift_for_sort,
|
||||
vector_for_sort.end(),
|
||||
TrafficComparatorClass<pair_of_map_elements>(data_direction, sort_item));
|
||||
} else {
|
||||
logger << log4cpp::Priority::ERROR << "Unexpected bahaviour on sort function";
|
||||
return "Internal error";
|
||||
}
|
||||
|
||||
unsigned int element_number = 0;
|
||||
|
||||
// In this loop we print only top X talkers in our subnet to screen buffer
|
||||
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin();
|
||||
ii != vector_for_sort.end(); ++ii) {
|
||||
// Print first max_ips_in_list elements in list, we will show top X "huge" channel loaders
|
||||
if (element_number >= max_ips_in_list) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint32_t client_ip = (*ii).first;
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
|
||||
|
||||
uint64_t pps = 0;
|
||||
uint64_t bps = 0;
|
||||
uint64_t flows = 0;
|
||||
|
||||
uint64_t pps_average = 0;
|
||||
uint64_t bps_average = 0;
|
||||
uint64_t flows_average = 0;
|
||||
|
||||
// Here we could have average or instantaneous speed
|
||||
map_element* current_speed_element = &ii->second;
|
||||
|
||||
// Create polymorphic pps, byte and flow counters
|
||||
if (data_direction == INCOMING) {
|
||||
pps = current_speed_element->in_packets;
|
||||
bps = current_speed_element->in_bytes;
|
||||
flows = current_speed_element->in_flows;
|
||||
} else if (data_direction == OUTGOING) {
|
||||
pps = current_speed_element->out_packets;
|
||||
bps = current_speed_element->out_bytes;
|
||||
flows = current_speed_element->out_flows;
|
||||
}
|
||||
|
||||
uint64_t mbps = convert_speed_to_mbps(bps);
|
||||
uint64_t mbps_average = convert_speed_to_mbps(bps_average);
|
||||
|
||||
std::string is_banned = ban_list.count(client_ip) > 0 ? " *banned* " : "";
|
||||
|
||||
// We use setw for alignment
|
||||
output_buffer << client_ip_as_string << "\t\t";
|
||||
|
||||
output_buffer << std::setw(6) << pps << " pps ";
|
||||
output_buffer << std::setw(6) << mbps << " mbps ";
|
||||
output_buffer << std::setw(6) << flows << " flows ";
|
||||
|
||||
output_buffer << is_banned << std::endl;
|
||||
|
||||
element_number++;
|
||||
}
|
||||
|
||||
graphite_data_t graphite_data;
|
||||
|
||||
// TODO: add graphite operations time to the config file
|
||||
if (graphite_enabled) {
|
||||
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin();
|
||||
ii != vector_for_sort.end(); ++ii) {
|
||||
uint32_t client_ip = (*ii).first;
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
|
||||
|
||||
uint64_t pps = 0;
|
||||
uint64_t bps = 0;
|
||||
uint64_t flows = 0;
|
||||
|
||||
// Here we could have average or instantaneous speed
|
||||
map_element* current_speed_element = &ii->second;
|
||||
|
||||
// Create polymorphic pps, byte and flow counters
|
||||
if (data_direction == INCOMING) {
|
||||
pps = current_speed_element->in_packets;
|
||||
bps = current_speed_element->in_bytes;
|
||||
flows = current_speed_element->in_flows;
|
||||
} else if (data_direction == OUTGOING) {
|
||||
pps = current_speed_element->out_packets;
|
||||
bps = current_speed_element->out_bytes;
|
||||
flows = current_speed_element->out_flows;
|
||||
}
|
||||
|
||||
std::string direction_as_string;
|
||||
|
||||
if (data_direction == INCOMING) {
|
||||
direction_as_string = "incoming";
|
||||
} else if (data_direction == OUTGOING) {
|
||||
direction_as_string = "outgoing";
|
||||
}
|
||||
|
||||
std::string ip_as_string_with_dash_delimiters = client_ip_as_string;
|
||||
// Replace dots by dashes
|
||||
std::replace(ip_as_string_with_dash_delimiters.begin(),
|
||||
ip_as_string_with_dash_delimiters.end(), '.', '_');
|
||||
|
||||
std::string graphite_current_prefix =
|
||||
graphite_prefix + ".hosts." + ip_as_string_with_dash_delimiters + "." + direction_as_string;
|
||||
|
||||
if (print_average_traffic_counts) {
|
||||
graphite_current_prefix = graphite_current_prefix + ".average";
|
||||
}
|
||||
|
||||
// We do not store zero data to Graphite
|
||||
if (pps != 0) {
|
||||
graphite_data[graphite_current_prefix + ".pps"] = pps;
|
||||
}
|
||||
|
||||
if (bps != 0) {
|
||||
graphite_data[graphite_current_prefix + ".bps"] = bps * 8;
|
||||
}
|
||||
|
||||
if (flows != 0) {
|
||||
graphite_data[graphite_current_prefix + ".flows"] = flows;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: we should switch to piclke format instead text
|
||||
// TODO: we should check packet size for Graphite
|
||||
// logger << log4cpp::Priority::INFO << "We will write " << graphite_data.size() << " records to Graphite";
|
||||
|
||||
if (graphite_enabled) {
|
||||
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
|
||||
|
||||
if (!graphite_put_result) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
|
||||
}
|
||||
}
|
||||
|
||||
return output_buffer.str();
|
||||
}
|
||||
|
||||
// TODO: move to lirbary
|
||||
// read whole file to vector
|
||||
std::vector<std::string> read_file_to_vector(std::string file_name) {
|
||||
@ -1389,30 +1136,6 @@ void zeroify_all_counters() {
|
||||
}
|
||||
}
|
||||
|
||||
void zeroify_all_flow_counters() {
|
||||
// On creating it initilizes by zeros
|
||||
conntrack_main_struct zero_conntrack_main_struct;
|
||||
|
||||
// Iterate over map
|
||||
for (map_of_vector_counters_for_flow::iterator itr = SubnetVectorMapFlow.begin();
|
||||
itr != SubnetVectorMapFlow.end(); ++itr) {
|
||||
// Iterate over vector
|
||||
for (vector_of_flow_counters::iterator vector_iterator = itr->second.begin();
|
||||
vector_iterator != itr->second.end(); ++vector_iterator) {
|
||||
// TODO: rewrite this monkey code
|
||||
vector_iterator->in_tcp.clear();
|
||||
vector_iterator->in_udp.clear();
|
||||
vector_iterator->in_icmp.clear();
|
||||
vector_iterator->in_other.clear();
|
||||
|
||||
vector_iterator->out_tcp.clear();
|
||||
vector_iterator->out_udp.clear();
|
||||
vector_iterator->out_icmp.clear();
|
||||
vector_iterator->out_other.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool load_our_networks_list() {
|
||||
if (file_exists(fastnetmon_platform_configuration.white_list_path)) {
|
||||
unsigned int network_entries = 0;
|
||||
@ -1638,470 +1361,6 @@ void ipv6_traffic_processor() {
|
||||
|
||||
#endif
|
||||
|
||||
/* Process simple unified packet */
|
||||
void process_packet(simple_packet_t& current_packet) {
|
||||
// Packets dump is very useful for bug hunting
|
||||
if (DEBUG_DUMP_ALL_PACKETS) {
|
||||
logger << log4cpp::Priority::INFO << "Dump: " << print_simple_packet(current_packet);
|
||||
}
|
||||
|
||||
if (current_packet.ip_protocol_version == 6) {
|
||||
#ifdef IPV6_HASH_COUNTERS
|
||||
current_packet.packet_direction =
|
||||
get_packet_direction_ipv6(lookup_tree_ipv6, current_packet.src_ipv6, current_packet.dst_ipv6);
|
||||
|
||||
// TODO: move to bulk operations here!
|
||||
multi_process_queue_for_ipv6_counters.enqueue(current_packet);
|
||||
#else
|
||||
|
||||
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&total_ipv6_packets, 1, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&total_ipv6_packets, 1);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// We do not process IPv6 at all on this mement
|
||||
if (current_packet.ip_protocol_version != 4) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Subnet for found IPs
|
||||
unsigned long subnet = 0;
|
||||
unsigned int subnet_cidr_mask = 0;
|
||||
|
||||
// We use these variables to track subnets for internal traffic because we have two of them
|
||||
unsigned long destination_subnet_host = 0;
|
||||
unsigned int destination_subnet_cidr_mask = 0;
|
||||
unsigned long source_subnet_host = 0;
|
||||
unsigned int source_subnet_cidr_mask = 0;
|
||||
|
||||
|
||||
direction_t packet_direction = get_packet_direction(lookup_tree_ipv4, current_packet.src_ip,
|
||||
current_packet.dst_ip, subnet, subnet_cidr_mask,
|
||||
destination_subnet_host, destination_subnet_cidr_mask,
|
||||
source_subnet_host, source_subnet_cidr_mask);
|
||||
|
||||
// It's useful in case when we can't find what packets do not processed correctly
|
||||
if (DEBUG_DUMP_OTHER_PACKETS && packet_direction == OTHER) {
|
||||
logger << log4cpp::Priority::INFO << "Dump other: " << print_simple_packet(current_packet);
|
||||
}
|
||||
|
||||
// Skip processing of specific traffic direction
|
||||
if ((packet_direction == INCOMING && !process_incoming_traffic) or
|
||||
(packet_direction == OUTGOING && !process_outgoing_traffic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
subnet_t current_subnet = std::make_pair(subnet, subnet_cidr_mask);
|
||||
|
||||
// We will use them for INTERNAL traffic type
|
||||
subnet_t source_subnet = std::make_pair(source_subnet_host, source_subnet_cidr_mask);
|
||||
subnet_t destination_subnet = std::make_pair(destination_subnet_host, destination_subnet_cidr_mask);
|
||||
|
||||
// Iterator for subnet counter
|
||||
subnet_counter_t* subnet_counter = NULL;
|
||||
|
||||
if (packet_direction == OUTGOING or packet_direction == INCOMING) {
|
||||
if (enable_subnet_counters) {
|
||||
map_for_subnet_counters::iterator subnet_iterator;
|
||||
|
||||
// Find element in map of subnet counters
|
||||
subnet_iterator = PerSubnetCountersMap.find(current_subnet);
|
||||
|
||||
if (subnet_iterator == PerSubnetCountersMap.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find counter structure for subnet";
|
||||
return;
|
||||
}
|
||||
|
||||
subnet_counter = &subnet_iterator->second;
|
||||
}
|
||||
}
|
||||
|
||||
map_of_vector_counters_for_flow::iterator itr_flow;
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
if (packet_direction == OUTGOING or packet_direction == INCOMING) {
|
||||
itr_flow = SubnetVectorMapFlow.find(current_subnet);
|
||||
|
||||
if (itr_flow == SubnetVectorMapFlow.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet flow map";
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Because we support mirroring, sflow and netflow we should support different cases:
|
||||
- One packet passed for processing (mirror)
|
||||
- Multiple packets ("flows") passed for processing (netflow)
|
||||
- One sampled packed passed for processing (netflow)
|
||||
- Another combinations of this three options
|
||||
*/
|
||||
|
||||
uint64_t sampled_number_of_packets = current_packet.number_of_packets * current_packet.sample_ratio;
|
||||
uint64_t sampled_number_of_bytes = current_packet.length * current_packet.sample_ratio;
|
||||
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&total_counters[packet_direction].packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(&total_counters[packet_direction].bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&total_counters[packet_direction].packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(&total_counters[packet_direction].bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
// Increment main and per protocol packet counters
|
||||
// Below we will implement different logic according to packet direction
|
||||
// We cannot use if / else if / else in this case because same conditions may trigger twice
|
||||
// For internal traffic type we trigger incoming and outgoing processing paths in same time
|
||||
if (packet_direction == OUTGOING or (process_internal_traffic_as_external && packet_direction == INTERNAL)) {
|
||||
uint32_t subnet_in_host_byte_order = 0;
|
||||
|
||||
// Try to find map key for this subnet
|
||||
map_of_vector_counters::iterator itr;
|
||||
|
||||
if (packet_direction == OUTGOING) {
|
||||
// We operate in host bytes order and need to convert subnet
|
||||
if (subnet != 0) {
|
||||
subnet_in_host_byte_order = ntohl(current_subnet.first);
|
||||
}
|
||||
|
||||
// Find element in map of vectors
|
||||
itr = SubnetVectorMap.find(current_subnet);
|
||||
}
|
||||
|
||||
// In this case we need to use another subnet
|
||||
if (packet_direction == INTERNAL) {
|
||||
subnet_in_host_byte_order = ntohl(source_subnet.first);
|
||||
|
||||
// Lookup another subnet in this case
|
||||
itr = SubnetVectorMap.find(source_subnet);
|
||||
}
|
||||
|
||||
if (itr == SubnetVectorMap.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map";
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t shift_in_vector = (int64_t)ntohl(current_packet.src_ip) - (int64_t)subnet_in_host_byte_order;
|
||||
|
||||
if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
|
||||
logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
|
||||
<< " which located outside allocated vector with size " << itr->second.size();
|
||||
|
||||
logger << log4cpp::Priority::ERROR << "We expect issues with this packet in OUTGOING direction: "
|
||||
<< print_simple_packet(current_packet);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
map_element* current_element = &itr->second[shift_in_vector];
|
||||
|
||||
// Main packet/bytes counter
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
// Fragmented IP packets
|
||||
if (current_packet.ip_fragmented) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->fragmented_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->fragmented_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->fragmented_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->fragmented_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
// TODO: add another counters
|
||||
if (enable_subnet_counters) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&subnet_counter->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(&subnet_counter->out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&subnet_counter->out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(&subnet_counter->out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
conntrack_main_struct* current_element_flow = NULL;
|
||||
if (enable_conection_tracking) {
|
||||
current_element_flow = &itr_flow->second[shift_in_vector];
|
||||
}
|
||||
|
||||
// Collect data when ban client
|
||||
if (!ban_list_details.empty() && ban_list_details.count(current_packet.src_ip) > 0 &&
|
||||
ban_list_details[current_packet.src_ip].size() < ban_details_records_count) {
|
||||
|
||||
ban_list_details_mutex.lock();
|
||||
|
||||
if (collect_attack_pcap_dumps) {
|
||||
// this code SHOULD NOT be called without mutex!
|
||||
if (current_packet.packet_payload_length > 0 && current_packet.packet_payload_pointer != NULL) {
|
||||
ban_list[current_packet.src_ip].pcap_attack_dump.write_packet(current_packet.packet_payload_pointer,
|
||||
current_packet.packet_payload_length);
|
||||
}
|
||||
}
|
||||
|
||||
ban_list_details[current_packet.src_ip].push_back(current_packet);
|
||||
ban_list_details_mutex.unlock();
|
||||
}
|
||||
|
||||
uint64_t connection_tracking_hash = 0;
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
packed_conntrack_hash flow_tracking_structure;
|
||||
flow_tracking_structure.opposite_ip = current_packet.dst_ip;
|
||||
flow_tracking_structure.src_port = current_packet.source_port;
|
||||
flow_tracking_structure.dst_port = current_packet.destination_port;
|
||||
|
||||
// convert this struct to 64 bit integer
|
||||
connection_tracking_hash = convert_conntrack_hash_struct_to_integer(&flow_tracking_structure);
|
||||
}
|
||||
|
||||
if (current_packet.protocol == IPPROTO_TCP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->out_tcp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_UDP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->udp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->udp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->udp_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->udp_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->out_udp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_ICMP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->icmp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->icmp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->icmp_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->icmp_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
// no flow tracking for icmp
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
if (packet_direction == INCOMING or (process_internal_traffic_as_external && packet_direction == INTERNAL)) {
|
||||
uint32_t subnet_in_host_byte_order = 0;
|
||||
|
||||
// Try to find map key for this subnet
|
||||
map_of_vector_counters::iterator itr;
|
||||
|
||||
if (packet_direction == INCOMING) {
|
||||
// We operate in host bytes order and need to convert subnet
|
||||
if (subnet != 0) {
|
||||
subnet_in_host_byte_order = ntohl(current_subnet.first);
|
||||
}
|
||||
|
||||
// Find element in map of vectors
|
||||
itr = SubnetVectorMap.find(current_subnet);
|
||||
}
|
||||
|
||||
// In this case we need to use another subnet
|
||||
if (packet_direction == INTERNAL) {
|
||||
subnet_in_host_byte_order = ntohl(destination_subnet.first);
|
||||
|
||||
// Lookup destination subnet in this case
|
||||
itr = SubnetVectorMap.find(destination_subnet);
|
||||
}
|
||||
|
||||
if (itr == SubnetVectorMap.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map";
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
int64_t shift_in_vector = (int64_t)ntohl(current_packet.dst_ip) - (int64_t)subnet_in_host_byte_order;
|
||||
|
||||
if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
|
||||
logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
|
||||
<< " which located outside allocated vector with size " << itr->second.size();
|
||||
|
||||
logger << log4cpp::Priority::ERROR << "We expect issues with this packet in INCOMING direction: "
|
||||
<< print_simple_packet(current_packet);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
map_element* current_element = &itr->second[shift_in_vector];
|
||||
|
||||
// Main packet/bytes counter
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (enable_subnet_counters) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&subnet_counter->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(&subnet_counter->in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&subnet_counter->in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(&subnet_counter->in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
// Count fragmented IP packets
|
||||
if (current_packet.ip_fragmented) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->fragmented_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->fragmented_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->fragmented_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->fragmented_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
conntrack_main_struct* current_element_flow = NULL;
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
current_element_flow = &itr_flow->second[shift_in_vector];
|
||||
}
|
||||
|
||||
uint64_t connection_tracking_hash = 0;
|
||||
if (enable_conection_tracking) {
|
||||
packed_conntrack_hash flow_tracking_structure;
|
||||
flow_tracking_structure.opposite_ip = current_packet.src_ip;
|
||||
flow_tracking_structure.src_port = current_packet.source_port;
|
||||
flow_tracking_structure.dst_port = current_packet.destination_port;
|
||||
|
||||
// convert this struct to 64 bit integer
|
||||
connection_tracking_hash = convert_conntrack_hash_struct_to_integer(&flow_tracking_structure);
|
||||
}
|
||||
|
||||
// Collect attack details
|
||||
if (!ban_list_details.empty() && ban_list_details.count(current_packet.dst_ip) > 0 &&
|
||||
ban_list_details[current_packet.dst_ip].size() < ban_details_records_count) {
|
||||
|
||||
ban_list_details_mutex.lock();
|
||||
|
||||
if (collect_attack_pcap_dumps) {
|
||||
// this code SHOULD NOT be called without mutex!
|
||||
if (current_packet.packet_payload_length > 0 && current_packet.packet_payload_pointer != NULL) {
|
||||
ban_list[current_packet.dst_ip].pcap_attack_dump.write_packet(current_packet.packet_payload_pointer,
|
||||
current_packet.packet_payload_length);
|
||||
}
|
||||
}
|
||||
|
||||
ban_list_details[current_packet.dst_ip].push_back(current_packet);
|
||||
ban_list_details_mutex.unlock();
|
||||
}
|
||||
|
||||
if (current_packet.protocol == IPPROTO_TCP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->in_tcp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_UDP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->udp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->udp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->udp_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->udp_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->in_udp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_ICMP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->icmp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->icmp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->icmp_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->icmp_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
// no flow tracking for icmp
|
||||
} else {
|
||||
// TBD
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (packet_direction == INTERNAL) {
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef GEOIP
|
||||
unsigned int get_asn_for_ip(uint32_t ip) {
|
||||
@ -2154,419 +1413,6 @@ void recalculate_speed_thread_handler() {
|
||||
}
|
||||
}
|
||||
|
||||
/* Calculate speed for all connnections */
|
||||
void recalculate_speed() {
|
||||
// logger<< log4cpp::Priority::INFO<<"We run recalculate_speed";
|
||||
|
||||
struct timeval start_calc_time;
|
||||
gettimeofday(&start_calc_time, NULL);
|
||||
|
||||
double speed_calc_period = recalculate_speed_timeout;
|
||||
time_t start_time;
|
||||
time(&start_time);
|
||||
|
||||
// If we got 1+ seconds lag we should use new "delta" or skip this step
|
||||
double time_difference = difftime(start_time, last_call_of_traffic_recalculation);
|
||||
|
||||
if (time_difference < 0) {
|
||||
// It may happen when you adjust time
|
||||
logger << log4cpp::Priority::ERROR << "Negative delay for traffic calculation " << time_difference << " Skipped iteration";
|
||||
return;
|
||||
} else if (time_difference < recalculate_speed_timeout) {
|
||||
// It could occur on toolkit start or in some weird cases of Linux scheduler
|
||||
// I really saw cases when sleep executed in zero zeconds:
|
||||
// [WARN] Sleep time expected: 1. Sleep time experienced: 0
|
||||
// But we have handlers for such case and should not bother client about with it
|
||||
// And we are using DEBUG level here
|
||||
logger << log4cpp::Priority::DEBUG
|
||||
<< "We skip one iteration of speed_calc because it runs so early! That's "
|
||||
"really impossible! Please ask support.";
|
||||
logger << log4cpp::Priority::DEBUG << "Sleep time expected: " << recalculate_speed_timeout
|
||||
<< ". Sleep time experienced: " << time_difference;
|
||||
return;
|
||||
} else if (int(time_difference) == int(speed_calc_period)) {
|
||||
// All fine, we run on time
|
||||
} else {
|
||||
// logger << log4cpp::Priority::INFO << "Time from last run of speed_recalc is soooo big, we got ugly lags: " <<
|
||||
// time_difference << " seconds";
|
||||
speed_calc_period = time_difference;
|
||||
}
|
||||
|
||||
map_element zero_map_element;
|
||||
memset(&zero_map_element, 0, sizeof(zero_map_element));
|
||||
|
||||
uint64_t incoming_total_flows = 0;
|
||||
uint64_t outgoing_total_flows = 0;
|
||||
|
||||
if (enable_subnet_counters) {
|
||||
for (map_for_subnet_counters::iterator itr = PerSubnetSpeedMap.begin();
|
||||
itr != PerSubnetSpeedMap.end(); ++itr) {
|
||||
subnet_t current_subnet = itr->first;
|
||||
|
||||
map_for_subnet_counters::iterator iter_subnet = PerSubnetCountersMap.find(current_subnet);
|
||||
|
||||
if (iter_subnet == PerSubnetCountersMap.end()) {
|
||||
logger << log4cpp::Priority::INFO << "Can't find traffic counters for subnet";
|
||||
break;
|
||||
}
|
||||
|
||||
subnet_counter_t* subnet_traffic = &iter_subnet->second;
|
||||
|
||||
subnet_counter_t new_speed_element;
|
||||
|
||||
new_speed_element.in_packets = uint64_t((double)subnet_traffic->in_packets / speed_calc_period);
|
||||
new_speed_element.in_bytes = uint64_t((double)subnet_traffic->in_bytes / speed_calc_period);
|
||||
|
||||
new_speed_element.out_packets = uint64_t((double)subnet_traffic->out_packets / speed_calc_period);
|
||||
new_speed_element.out_bytes = uint64_t((double)subnet_traffic->out_bytes / speed_calc_period);
|
||||
|
||||
/* Moving average recalculation for subnets */
|
||||
/* http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance */
|
||||
double exp_power_subnet = -speed_calc_period / average_calculation_amount_for_subnets;
|
||||
double exp_value_subnet = exp(exp_power_subnet);
|
||||
|
||||
map_element* current_average_speed_element = &PerSubnetAverageSpeedMap[current_subnet];
|
||||
|
||||
current_average_speed_element->in_bytes = uint64_t(
|
||||
new_speed_element.in_bytes + exp_value_subnet * ((double)current_average_speed_element->in_bytes -
|
||||
(double)new_speed_element.in_bytes));
|
||||
|
||||
current_average_speed_element->out_bytes = uint64_t(
|
||||
new_speed_element.out_bytes + exp_value_subnet * ((double)current_average_speed_element->out_bytes -
|
||||
(double)new_speed_element.out_bytes));
|
||||
|
||||
current_average_speed_element->in_packets = uint64_t(
|
||||
new_speed_element.in_packets + exp_value_subnet * ((double)current_average_speed_element->in_packets -
|
||||
(double)new_speed_element.in_packets));
|
||||
|
||||
current_average_speed_element->out_packets =
|
||||
uint64_t(new_speed_element.out_packets +
|
||||
exp_value_subnet * ((double)current_average_speed_element->out_packets -
|
||||
(double)new_speed_element.out_packets));
|
||||
|
||||
// Update speed calculation structure
|
||||
PerSubnetSpeedMap[current_subnet] = new_speed_element;
|
||||
*subnet_traffic = zero_map_element;
|
||||
|
||||
// logger << log4cpp::Priority::INFO<<convert_subnet_to_string(current_subnet)
|
||||
// << "in pps: " << new_speed_element.in_packets << " out pps: " << new_speed_element.out_packets;
|
||||
}
|
||||
}
|
||||
|
||||
for (map_of_vector_counters::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) {
|
||||
for (vector_of_counters::iterator vector_itr = itr->second.begin();
|
||||
vector_itr != itr->second.end(); ++vector_itr) {
|
||||
int current_index = vector_itr - itr->second.begin();
|
||||
|
||||
// New element
|
||||
map_element new_speed_element;
|
||||
|
||||
// convert to host order for math operations
|
||||
uint32_t subnet_ip = ntohl(itr->first.first);
|
||||
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
|
||||
|
||||
// covnert to our standard network byte order
|
||||
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
|
||||
|
||||
// Calculate speed for IP or whole subnet
|
||||
build_speed_counters_from_packet_counters(new_speed_element, &*vector_itr, speed_calc_period);
|
||||
|
||||
conntrack_main_struct* flow_counter_ptr = &SubnetVectorMapFlow[itr->first][current_index];
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
// todo: optimize this operations!
|
||||
// it's really bad and SLOW CODE
|
||||
uint64_t total_out_flows = (uint64_t)flow_counter_ptr->out_tcp.size() +
|
||||
(uint64_t)flow_counter_ptr->out_udp.size() +
|
||||
(uint64_t)flow_counter_ptr->out_icmp.size() +
|
||||
(uint64_t)flow_counter_ptr->out_other.size();
|
||||
|
||||
uint64_t total_in_flows =
|
||||
(uint64_t)flow_counter_ptr->in_tcp.size() + (uint64_t)flow_counter_ptr->in_udp.size() +
|
||||
(uint64_t)flow_counter_ptr->in_icmp.size() + (uint64_t)flow_counter_ptr->in_other.size();
|
||||
|
||||
new_speed_element.out_flows = uint64_t((double)total_out_flows / speed_calc_period);
|
||||
new_speed_element.in_flows = uint64_t((double)total_in_flows / speed_calc_period);
|
||||
|
||||
// Increment global counter
|
||||
outgoing_total_flows += new_speed_element.out_flows;
|
||||
incoming_total_flows += new_speed_element.in_flows;
|
||||
} else {
|
||||
new_speed_element.out_flows = 0;
|
||||
new_speed_element.in_flows = 0;
|
||||
}
|
||||
|
||||
/* Moving average recalculation */
|
||||
// http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance
|
||||
// double speed_calc_period = 1;
|
||||
double exp_power = -speed_calc_period / average_calculation_amount;
|
||||
double exp_value = exp(exp_power);
|
||||
|
||||
map_element* current_average_speed_element = &SubnetVectorMapSpeedAverage[itr->first][current_index];
|
||||
|
||||
// Calculate average speed from per-second speed
|
||||
build_average_speed_counters_from_speed_counters(current_average_speed_element,
|
||||
new_speed_element, exp_value, exp_power);
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
current_average_speed_element->out_flows = uint64_t(
|
||||
new_speed_element.out_flows + exp_value * ((double)current_average_speed_element->out_flows -
|
||||
(double)new_speed_element.out_flows));
|
||||
|
||||
current_average_speed_element->in_flows = uint64_t(
|
||||
new_speed_element.in_flows + exp_value * ((double)current_average_speed_element->in_flows -
|
||||
(double)new_speed_element.in_flows));
|
||||
}
|
||||
|
||||
/* Moving average recalculation end */
|
||||
std::string host_group_name;
|
||||
ban_settings_t current_ban_settings = get_ban_settings_for_this_subnet(itr->first, host_group_name);
|
||||
|
||||
if (we_should_ban_this_ip(current_average_speed_element, current_ban_settings)) {
|
||||
logger << log4cpp::Priority::DEBUG
|
||||
<< "We have found host group for this host as: " << host_group_name;
|
||||
|
||||
std::string flow_attack_details = "";
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_attack_details =
|
||||
print_flow_tracking_for_ip(*flow_counter_ptr, convert_ip_as_uint_to_string(client_ip));
|
||||
}
|
||||
|
||||
// TODO: we should pass type of ddos ban source (pps, flowd, bandwidth)!
|
||||
execute_ip_ban(client_ip, *current_average_speed_element, flow_attack_details, itr->first);
|
||||
}
|
||||
|
||||
SubnetVectorMapSpeed[itr->first][current_index] = new_speed_element;
|
||||
|
||||
*vector_itr = zero_map_element;
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate global flow speed
|
||||
incoming_total_flows_speed = uint64_t((double)incoming_total_flows / (double)speed_calc_period);
|
||||
outgoing_total_flows_speed = uint64_t((double)outgoing_total_flows / (double)speed_calc_period);
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
// Clean Flow Counter
|
||||
flow_counter.lock();
|
||||
zeroify_all_flow_counters();
|
||||
flow_counter.unlock();
|
||||
}
|
||||
|
||||
total_unparsed_packets_speed = uint64_t((double)total_unparsed_packets / (double)speed_calc_period);
|
||||
total_unparsed_packets = 0;
|
||||
|
||||
for (unsigned int index = 0; index < 4; index++) {
|
||||
total_speed_counters[index].bytes =
|
||||
uint64_t((double)total_counters[index].bytes / (double)speed_calc_period);
|
||||
|
||||
total_speed_counters[index].packets =
|
||||
uint64_t((double)total_counters[index].packets / (double)speed_calc_period);
|
||||
|
||||
double exp_power = -speed_calc_period / average_calculation_amount;
|
||||
double exp_value = exp(exp_power);
|
||||
|
||||
total_speed_average_counters[index].bytes = uint64_t(
|
||||
total_speed_counters[index].bytes + exp_value * ((double)total_speed_average_counters[index].bytes -
|
||||
(double)total_speed_counters[index].bytes));
|
||||
|
||||
total_speed_average_counters[index].packets =
|
||||
uint64_t(total_speed_counters[index].packets +
|
||||
exp_value * ((double)total_speed_average_counters[index].packets -
|
||||
(double)total_speed_counters[index].packets));
|
||||
|
||||
// nullify data counters after speed calculation
|
||||
total_counters[index].bytes = 0;
|
||||
total_counters[index].packets = 0;
|
||||
}
|
||||
|
||||
// Set time of previous startup
|
||||
time(&last_call_of_traffic_recalculation);
|
||||
|
||||
struct timeval finish_calc_time;
|
||||
gettimeofday(&finish_calc_time, NULL);
|
||||
|
||||
timeval_subtract(&speed_calculation_time, &finish_calc_time, &start_calc_time);
|
||||
}
|
||||
|
||||
void print_screen_contents_into_file(std::string screen_data_stats_param) {
|
||||
std::ofstream screen_data_file;
|
||||
screen_data_file.open(cli_stats_file_path.c_str(), std::ios::trunc);
|
||||
|
||||
if (screen_data_file.is_open()) {
|
||||
// Set 660 permissions to file for security reasons
|
||||
chmod(cli_stats_file_path.c_str(), S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
|
||||
|
||||
screen_data_file << screen_data_stats_param;
|
||||
screen_data_file.close();
|
||||
} else {
|
||||
logger << log4cpp::Priority::ERROR << "Can't print program screen into file";
|
||||
}
|
||||
}
|
||||
|
||||
void traffic_draw_program() {
|
||||
std::stringstream output_buffer;
|
||||
|
||||
// logger<<log4cpp::Priority::INFO<<"Draw table call";
|
||||
|
||||
struct timeval start_calc_time;
|
||||
gettimeofday(&start_calc_time, NULL);
|
||||
|
||||
sort_type sorter;
|
||||
if (sort_parameter == "packets") {
|
||||
sorter = PACKETS;
|
||||
} else if (sort_parameter == "bytes") {
|
||||
sorter = BYTES;
|
||||
} else if (sort_parameter == "flows") {
|
||||
sorter = FLOWS;
|
||||
} else {
|
||||
logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
|
||||
sorter = PACKETS;
|
||||
}
|
||||
|
||||
output_buffer << "FastNetMon " << fastnetmon_platform_configuration.fastnetmon_version << " Try Advanced edition: https://fastnetmon.com"
|
||||
<< "\n"
|
||||
<< "IPs ordered by: " << sort_parameter << "\n";
|
||||
|
||||
output_buffer << print_channel_speed("Incoming traffic", INCOMING) << std::endl;
|
||||
|
||||
if (process_incoming_traffic) {
|
||||
output_buffer << draw_table(INCOMING, true, sorter);
|
||||
output_buffer << std::endl;
|
||||
}
|
||||
|
||||
output_buffer << print_channel_speed("Outgoing traffic", OUTGOING) << std::endl;
|
||||
|
||||
if (process_outgoing_traffic) {
|
||||
output_buffer << draw_table(OUTGOING, false, sorter);
|
||||
output_buffer << std::endl;
|
||||
}
|
||||
|
||||
output_buffer << print_channel_speed("Internal traffic", INTERNAL) << std::endl;
|
||||
|
||||
output_buffer << std::endl;
|
||||
|
||||
output_buffer << print_channel_speed("Other traffic", OTHER) << std::endl;
|
||||
|
||||
output_buffer << std::endl;
|
||||
|
||||
// Application statistics
|
||||
output_buffer << "Screen updated in:\t\t" << drawing_thread_execution_time.tv_sec << " sec "
|
||||
<< drawing_thread_execution_time.tv_usec << " microseconds\n";
|
||||
|
||||
output_buffer << "Traffic calculated in:\t\t" << speed_calculation_time.tv_sec << " sec "
|
||||
<< speed_calculation_time.tv_usec << " microseconds\n";
|
||||
|
||||
if (speed_calculation_time.tv_sec > 0) {
|
||||
output_buffer
|
||||
<< "ALERT! Toolkit working incorrectly! We should calculate speed in ~1 second\n";
|
||||
}
|
||||
|
||||
#ifdef IPV6_HASH_COUNTERS
|
||||
output_buffer << "Total amount of IPv6 packets: " << total_ipv6_packets << "\n";
|
||||
#endif
|
||||
|
||||
output_buffer << "Total amount of IPv6 packets related to our own network: " << our_ipv6_packets << "\n";
|
||||
output_buffer << "Not processed packets: " << total_unparsed_packets_speed << " pps\n";
|
||||
|
||||
// Print backend stats
|
||||
if (enable_pcap_collection) {
|
||||
output_buffer << get_pcap_stats() << "\n";
|
||||
}
|
||||
|
||||
#ifdef PF_RING
|
||||
if (enable_data_collection_from_mirror) {
|
||||
output_buffer << get_pf_ring_stats();
|
||||
}
|
||||
#endif
|
||||
|
||||
// Print thresholds
|
||||
if (print_configuration_params_on_the_screen) {
|
||||
output_buffer << "\n" << print_ban_thresholds(global_ban_settings);
|
||||
}
|
||||
|
||||
if (!ban_list.empty()) {
|
||||
output_buffer << std::endl << "Ban list:" << std::endl;
|
||||
output_buffer << print_ddos_attack_details();
|
||||
}
|
||||
|
||||
if (enable_subnet_counters) {
|
||||
output_buffer << std::endl << "Subnet load:" << std::endl;
|
||||
output_buffer << print_subnet_load() << "\n";
|
||||
}
|
||||
|
||||
screen_data_stats = output_buffer.str();
|
||||
|
||||
// Print screen contents into file
|
||||
print_screen_contents_into_file(screen_data_stats);
|
||||
|
||||
struct timeval end_calc_time;
|
||||
gettimeofday(&end_calc_time, NULL);
|
||||
|
||||
timeval_subtract(&drawing_thread_execution_time, &end_calc_time, &start_calc_time);
|
||||
}
|
||||
|
||||
// pretty print channel speed in pps and MBit
|
||||
std::string print_channel_speed(std::string traffic_type, direction_t packet_direction) {
|
||||
uint64_t speed_in_pps = total_speed_average_counters[packet_direction].packets;
|
||||
uint64_t speed_in_bps = total_speed_average_counters[packet_direction].bytes;
|
||||
|
||||
unsigned int number_of_tabs = 1;
|
||||
// We need this for correct alignment of blocks
|
||||
if (traffic_type == "Other traffic") {
|
||||
number_of_tabs = 2;
|
||||
}
|
||||
|
||||
std::stringstream stream;
|
||||
stream << traffic_type;
|
||||
|
||||
for (unsigned int i = 0; i < number_of_tabs; i++) {
|
||||
stream << "\t";
|
||||
}
|
||||
|
||||
uint64_t speed_in_mbps = convert_speed_to_mbps(speed_in_bps);
|
||||
|
||||
stream << std::setw(6) << speed_in_pps << " pps " << std::setw(6) << speed_in_mbps << " mbps";
|
||||
|
||||
if (traffic_type == "Incoming traffic" or traffic_type == "Outgoing traffic") {
|
||||
if (packet_direction == INCOMING) {
|
||||
stream << " " << std::setw(6) << incoming_total_flows_speed << " flows";
|
||||
} else if (packet_direction == OUTGOING) {
|
||||
stream << " " << std::setw(6) << outgoing_total_flows_speed << " flows";
|
||||
}
|
||||
|
||||
if (graphite_enabled) {
|
||||
graphite_data_t graphite_data;
|
||||
|
||||
std::string direction_as_string;
|
||||
|
||||
if (packet_direction == INCOMING) {
|
||||
direction_as_string = "incoming";
|
||||
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] =
|
||||
incoming_total_flows_speed;
|
||||
} else if (packet_direction == OUTGOING) {
|
||||
direction_as_string = "outgoing";
|
||||
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] =
|
||||
outgoing_total_flows_speed;
|
||||
}
|
||||
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".pps"] = speed_in_pps;
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".bps"] = speed_in_bps * 8;
|
||||
|
||||
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
|
||||
|
||||
if (!graphite_put_result) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
bool file_is_appendable(std::string path) {
|
||||
std::ofstream check_appendable_file;
|
||||
|
||||
@ -2994,301 +1840,3 @@ void interruption_signal_handler(int signal_number) {
|
||||
exit(1);
|
||||
}
|
||||
|
||||
unsigned int detect_attack_protocol(map_element& speed_element, direction_t attack_direction) {
|
||||
if (attack_direction == INCOMING) {
|
||||
return get_max_used_protocol(speed_element.tcp_in_packets, speed_element.udp_in_packets,
|
||||
speed_element.icmp_in_packets);
|
||||
} else {
|
||||
// OUTGOING
|
||||
return get_max_used_protocol(speed_element.tcp_out_packets, speed_element.udp_out_packets,
|
||||
speed_element.icmp_out_packets);
|
||||
}
|
||||
}
|
||||
|
||||
#define my_max_on_defines(a, b) (a > b ? a : b)
|
||||
unsigned int get_max_used_protocol(uint64_t tcp, uint64_t udp, uint64_t icmp) {
|
||||
unsigned int max = my_max_on_defines(my_max_on_defines(udp, tcp), icmp);
|
||||
|
||||
if (max == tcp) {
|
||||
return IPPROTO_TCP;
|
||||
} else if (max == udp) {
|
||||
return IPPROTO_UDP;
|
||||
} else if (max == icmp) {
|
||||
return IPPROTO_ICMP;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void execute_ip_ban(uint32_t client_ip, map_element average_speed_element, std::string flow_attack_details, subnet_t customer_subnet) {
|
||||
struct attack_details current_attack;
|
||||
uint64_t pps = 0;
|
||||
|
||||
uint64_t in_pps = average_speed_element.in_packets;
|
||||
uint64_t out_pps = average_speed_element.out_packets;
|
||||
uint64_t in_bps = average_speed_element.in_bytes;
|
||||
uint64_t out_bps = average_speed_element.out_bytes;
|
||||
uint64_t in_flows = average_speed_element.in_flows;
|
||||
uint64_t out_flows = average_speed_element.out_flows;
|
||||
|
||||
direction_t data_direction;
|
||||
|
||||
if (!global_ban_settings.enable_ban) {
|
||||
logger << log4cpp::Priority::INFO << "We do not ban: " << convert_ip_as_uint_to_string(client_ip)
|
||||
<< " because ban disabled completely";
|
||||
return;
|
||||
}
|
||||
|
||||
// Detect attack direction with simple heuristic
|
||||
if (abs(int((int)in_pps - (int)out_pps)) < 1000) {
|
||||
// If difference between pps speed is so small we should do additional investigation using
|
||||
// bandwidth speed
|
||||
if (in_bps > out_bps) {
|
||||
data_direction = INCOMING;
|
||||
pps = in_pps;
|
||||
} else {
|
||||
data_direction = OUTGOING;
|
||||
pps = out_pps;
|
||||
}
|
||||
} else {
|
||||
if (in_pps > out_pps) {
|
||||
data_direction = INCOMING;
|
||||
pps = in_pps;
|
||||
} else {
|
||||
data_direction = OUTGOING;
|
||||
pps = out_pps;
|
||||
}
|
||||
}
|
||||
|
||||
current_attack.attack_protocol = detect_attack_protocol(average_speed_element, data_direction);
|
||||
|
||||
if (ban_list.count(client_ip) > 0) {
|
||||
if (ban_list[client_ip].attack_direction != data_direction) {
|
||||
logger << log4cpp::Priority::INFO << "We expected very strange situation: attack direction for "
|
||||
<< convert_ip_as_uint_to_string(client_ip) << " was changed";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// update attack power
|
||||
if (pps > ban_list[client_ip].max_attack_power) {
|
||||
ban_list[client_ip].max_attack_power = pps;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
prefix_t prefix_for_check_adreess;
|
||||
prefix_for_check_adreess.add.sin.s_addr = client_ip;
|
||||
prefix_for_check_adreess.family = AF_INET;
|
||||
prefix_for_check_adreess.bitlen = 32;
|
||||
|
||||
bool in_white_list = (patricia_search_best2(whitelist_tree_ipv4, &prefix_for_check_adreess, 1) != NULL);
|
||||
|
||||
if (in_white_list) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::string data_direction_as_string = get_direction_name(data_direction);
|
||||
|
||||
logger << log4cpp::Priority::INFO << "We run execute_ip_ban code with following params "
|
||||
<< " in_pps: " << in_pps << " out_pps: " << out_pps << " in_bps: " << in_bps
|
||||
<< " out_bps: " << out_bps << " and we decide it's " << data_direction_as_string << " attack";
|
||||
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
|
||||
std::string pps_as_string = convert_int_to_string(pps);
|
||||
|
||||
// Store information about subnet
|
||||
current_attack.customer_network = customer_subnet;
|
||||
|
||||
// Store ban time
|
||||
time(¤t_attack.ban_timestamp);
|
||||
// set ban time in seconds
|
||||
current_attack.ban_time = global_ban_time;
|
||||
current_attack.unban_enabled = unban_enabled;
|
||||
|
||||
// Pass main information about attack
|
||||
current_attack.attack_direction = data_direction;
|
||||
current_attack.attack_power = pps;
|
||||
current_attack.max_attack_power = pps;
|
||||
|
||||
current_attack.in_packets = in_pps;
|
||||
current_attack.out_packets = out_pps;
|
||||
|
||||
current_attack.in_bytes = in_bps;
|
||||
current_attack.out_bytes = out_bps;
|
||||
|
||||
// pass flow information
|
||||
current_attack.in_flows = in_flows;
|
||||
current_attack.out_flows = out_flows;
|
||||
|
||||
current_attack.fragmented_in_packets = average_speed_element.fragmented_in_packets;
|
||||
current_attack.tcp_in_packets = average_speed_element.tcp_in_packets;
|
||||
current_attack.tcp_syn_in_packets = average_speed_element.tcp_syn_in_packets;
|
||||
current_attack.udp_in_packets = average_speed_element.udp_in_packets;
|
||||
current_attack.icmp_in_packets = average_speed_element.icmp_in_packets;
|
||||
|
||||
current_attack.fragmented_out_packets = average_speed_element.fragmented_out_packets;
|
||||
current_attack.tcp_out_packets = average_speed_element.tcp_out_packets;
|
||||
current_attack.tcp_syn_out_packets = average_speed_element.tcp_syn_out_packets;
|
||||
current_attack.udp_out_packets = average_speed_element.udp_out_packets;
|
||||
current_attack.icmp_out_packets = average_speed_element.icmp_out_packets;
|
||||
|
||||
current_attack.fragmented_out_bytes = average_speed_element.fragmented_out_bytes;
|
||||
current_attack.tcp_out_bytes = average_speed_element.tcp_out_bytes;
|
||||
current_attack.tcp_syn_out_bytes = average_speed_element.tcp_syn_out_bytes;
|
||||
current_attack.udp_out_bytes = average_speed_element.udp_out_bytes;
|
||||
current_attack.icmp_out_bytes = average_speed_element.icmp_out_bytes;
|
||||
|
||||
current_attack.fragmented_in_bytes = average_speed_element.fragmented_in_bytes;
|
||||
current_attack.tcp_in_bytes = average_speed_element.tcp_in_bytes;
|
||||
current_attack.tcp_syn_in_bytes = average_speed_element.tcp_syn_in_bytes;
|
||||
current_attack.udp_in_bytes = average_speed_element.udp_in_bytes;
|
||||
current_attack.icmp_in_bytes = average_speed_element.icmp_in_bytes;
|
||||
|
||||
current_attack.average_in_packets = average_speed_element.in_packets;
|
||||
current_attack.average_in_bytes = average_speed_element.in_bytes;
|
||||
current_attack.average_in_flows = average_speed_element.in_flows;
|
||||
|
||||
current_attack.average_out_packets = average_speed_element.out_packets;
|
||||
current_attack.average_out_bytes = average_speed_element.out_bytes;
|
||||
current_attack.average_out_flows = average_speed_element.out_flows;
|
||||
|
||||
if (collect_attack_pcap_dumps) {
|
||||
bool buffer_allocation_result =
|
||||
current_attack.pcap_attack_dump.allocate_buffer(number_of_packets_for_pcap_attack_dump);
|
||||
|
||||
if (!buffer_allocation_result) {
|
||||
logger << log4cpp::Priority::ERROR
|
||||
<< "Can't allocate buffer for attack, switch off this option completely ";
|
||||
collect_attack_pcap_dumps = false;
|
||||
}
|
||||
}
|
||||
|
||||
ban_list_mutex.lock();
|
||||
ban_list[client_ip] = current_attack;
|
||||
ban_list_mutex.unlock();
|
||||
|
||||
ban_list_details_mutex.lock();
|
||||
ban_list_details[client_ip] = std::vector<simple_packet_t>();
|
||||
ban_list_details_mutex.unlock();
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Attack with direction: " << data_direction_as_string
|
||||
<< " IP: " << client_ip_as_string << " Power: " << pps_as_string;
|
||||
|
||||
call_ban_handlers(client_ip, ban_list[client_ip], flow_attack_details);
|
||||
}
|
||||
|
||||
void call_ban_handlers(uint32_t client_ip, attack_details& current_attack, std::string flow_attack_details) {
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
|
||||
std::string pps_as_string = convert_int_to_string(current_attack.attack_power);
|
||||
std::string data_direction_as_string = get_direction_name(current_attack.attack_direction);
|
||||
|
||||
bool store_attack_details_to_file = true;
|
||||
|
||||
std::string basic_attack_information = get_attack_description(client_ip, current_attack);
|
||||
|
||||
std::string basic_attack_information_in_json = get_attack_description_in_json(client_ip, current_attack);
|
||||
|
||||
std::string full_attack_description = basic_attack_information + flow_attack_details;
|
||||
|
||||
if (store_attack_details_to_file) {
|
||||
print_attack_details_to_file(full_attack_description, client_ip_as_string, current_attack);
|
||||
}
|
||||
|
||||
if (pfring_hardware_filters_enabled) {
|
||||
#ifdef PF_RING
|
||||
logger << log4cpp::Priority::INFO << "We will block traffic to/from this IP with hardware filters";
|
||||
pfring_hardware_filter_action_block(client_ip_as_string);
|
||||
#else
|
||||
logger << log4cpp::Priority::ERROR << "You haven't compiled PF_RING hardware filters support";
|
||||
#endif
|
||||
}
|
||||
|
||||
if (notify_script_enabled) {
|
||||
std::string script_call_params = fastnetmon_platform_configuration.notify_script_path + " " + client_ip_as_string + " " +
|
||||
data_direction_as_string + " " + pps_as_string + " " + "ban";
|
||||
logger << log4cpp::Priority::INFO << "Call script for ban client: " << client_ip_as_string;
|
||||
|
||||
// We should execute external script in separate thread because any lag in this code will be
|
||||
// very distructive
|
||||
|
||||
if (notify_script_pass_details) {
|
||||
// We will pass attack details over stdin
|
||||
boost::thread exec_thread(exec_with_stdin_params, script_call_params, full_attack_description);
|
||||
exec_thread.detach();
|
||||
} else {
|
||||
// Do not pass anything to script
|
||||
boost::thread exec_thread(exec, script_call_params);
|
||||
exec_thread.detach();
|
||||
}
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Script for ban client is finished: " << client_ip_as_string;
|
||||
}
|
||||
|
||||
if (exabgp_enabled) {
|
||||
logger << log4cpp::Priority::INFO << "Call ExaBGP for ban client started: " << client_ip_as_string;
|
||||
|
||||
boost::thread exabgp_thread(exabgp_ban_manage, "ban", client_ip_as_string, current_attack);
|
||||
exabgp_thread.detach();
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Call to ExaBGP for ban client is finished: " << client_ip_as_string;
|
||||
}
|
||||
|
||||
#ifdef ENABLE_GOBGP
|
||||
if (gobgp_enabled) {
|
||||
logger << log4cpp::Priority::INFO << "Call GoBGP for ban client started: " << client_ip_as_string;
|
||||
|
||||
boost::thread gobgp_thread(gobgp_ban_manage, "ban", client_ip_as_string, current_attack);
|
||||
gobgp_thread.detach();
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Call to GoBGP for ban client is finished: " << client_ip_as_string;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef REDIS
|
||||
if (redis_enabled) {
|
||||
std::string redis_key_name = client_ip_as_string + "_information";
|
||||
|
||||
if (!redis_prefix.empty()) {
|
||||
redis_key_name = redis_prefix + "_" + client_ip_as_string + "_information";
|
||||
}
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Start data save in Redis in key: " << redis_key_name;
|
||||
boost::thread redis_store_thread(store_data_in_redis, redis_key_name, basic_attack_information_in_json);
|
||||
redis_store_thread.detach();
|
||||
logger << log4cpp::Priority::INFO << "Finish data save in Redis in key: " << redis_key_name;
|
||||
|
||||
// If we have flow dump put in redis too
|
||||
if (!flow_attack_details.empty()) {
|
||||
std::string redis_key_name = client_ip_as_string + "_flow_dump";
|
||||
|
||||
if (!redis_prefix.empty()) {
|
||||
redis_key_name = redis_prefix + "_" + client_ip_as_string + "_flow_dump";
|
||||
}
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Start data save in redis in key: " << redis_key_name;
|
||||
boost::thread redis_store_thread(store_data_in_redis, redis_key_name, flow_attack_details);
|
||||
redis_store_thread.detach();
|
||||
logger << log4cpp::Priority::INFO << "Finish data save in redis in key: " << redis_key_name;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef MONGO
|
||||
if (mongodb_enabled) {
|
||||
std::string mongo_key_name = client_ip_as_string + "_information_" +
|
||||
print_time_t_in_fastnetmon_format(current_attack.ban_timestamp);
|
||||
|
||||
// We could not use dot in key names: http://docs.mongodb.org/manual/core/document/#dot-notation
|
||||
std::replace(mongo_key_name.begin(), mongo_key_name.end(), '.', '_');
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Start data save in Mongo in key: " << mongo_key_name;
|
||||
boost::thread mongo_store_thread(store_data_in_mongo, mongo_key_name, basic_attack_information_in_json);
|
||||
mongo_store_thread.detach();
|
||||
logger << log4cpp::Priority::INFO << "Finish data save in Mongo in key: " << mongo_key_name;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
@ -39,6 +39,48 @@
|
||||
// Yes, maybe it's not an good idea but with this we can guarantee working code in example plugin
|
||||
#include "example_plugin/example_collector.h"
|
||||
|
||||
#ifdef MONGO
|
||||
#include <bson.h>
|
||||
#include <mongoc.h>
|
||||
#endif
|
||||
|
||||
extern bool print_average_traffic_counts;
|
||||
extern std::string cli_stats_file_path;
|
||||
extern unsigned int total_number_of_hosts_in_our_networks;
|
||||
extern map_for_subnet_counters PerSubnetCountersMap;
|
||||
extern unsigned int recalculate_speed_timeout;
|
||||
extern map_of_vector_counters_for_flow SubnetVectorMapFlow;
|
||||
extern bool DEBUG_DUMP_ALL_PACKETS;
|
||||
extern bool DEBUG_DUMP_OTHER_PACKETS;
|
||||
extern uint64_t total_ipv6_packets;
|
||||
extern bool process_internal_traffic_as_external;
|
||||
extern std::string screen_data_stats;
|
||||
extern map_of_vector_counters SubnetVectorMapSpeed;
|
||||
extern double average_calculation_amount;
|
||||
extern double average_calculation_amount_for_subnets;
|
||||
extern bool print_configuration_params_on_the_screen;
|
||||
extern uint64_t our_ipv6_packets;
|
||||
extern map_of_vector_counters SubnetVectorMap;
|
||||
extern unsigned int max_ips_in_list;
|
||||
extern struct timeval speed_calculation_time;
|
||||
extern struct timeval drawing_thread_execution_time;
|
||||
extern time_t last_call_of_traffic_recalculation;
|
||||
extern bool process_incoming_traffic;
|
||||
extern bool process_outgoing_traffic;
|
||||
extern uint64_t total_unparsed_packets;
|
||||
extern uint64_t total_unparsed_packets_speed;
|
||||
extern bool enable_conection_tracking;
|
||||
extern bool enable_afpacket_collection;
|
||||
extern bool enable_data_collection_from_mirror;
|
||||
extern bool enable_netmap_collection;
|
||||
extern bool enable_sflow_collection;
|
||||
extern bool enable_netflow_collection;
|
||||
extern bool enable_pcap_collection;
|
||||
extern uint64_t incoming_total_flows_speed;
|
||||
extern uint64_t outgoing_total_flows_speed;
|
||||
extern total_counter_element total_counters[4];
|
||||
extern total_counter_element total_speed_counters[4];
|
||||
extern total_counter_element total_speed_average_counters[4];
|
||||
extern host_group_ban_settings_map_t host_group_ban_settings_map;
|
||||
extern bool exabgp_announce_whole_subnet;
|
||||
extern subnet_to_host_group_map_t subnet_to_host_groups;
|
||||
@ -66,6 +108,19 @@ extern std::string redis_prefix;
|
||||
extern bool redis_enabled;
|
||||
#endif
|
||||
|
||||
|
||||
#ifdef MONGO
|
||||
extern std::string mongodb_host;
|
||||
extern unsigned int mongodb_port;
|
||||
extern bool mongodb_enabled;
|
||||
extern std::string mongodb_database_name;
|
||||
#endif
|
||||
|
||||
extern bool pfring_hardware_filters_enabled;
|
||||
extern bool notify_script_pass_details;
|
||||
extern unsigned int number_of_packets_for_pcap_attack_dump;
|
||||
extern patricia_tree_t *lookup_tree_ipv4, *whitelist_tree_ipv4;
|
||||
extern patricia_tree_t *lookup_tree_ipv6, *whitelist_tree_ipv6;
|
||||
extern bool process_pcap_attack_dumps_with_dpi;
|
||||
extern std::map<uint32_t, std::vector<simple_packet_t> > ban_list_details;
|
||||
extern map_for_subnet_counters PerSubnetAverageSpeedMap;
|
||||
@ -99,6 +154,32 @@ extern map_for_subnet_counters PerSubnetSpeedMap;
|
||||
extern unsigned int ban_details_records_count;
|
||||
extern FastnetmonPlatformConfigurtion fastnetmon_platform_configuration;
|
||||
|
||||
#define my_max_on_defines(a, b) (a > b ? a : b)
|
||||
unsigned int get_max_used_protocol(uint64_t tcp, uint64_t udp, uint64_t icmp) {
|
||||
unsigned int max = my_max_on_defines(my_max_on_defines(udp, tcp), icmp);
|
||||
|
||||
if (max == tcp) {
|
||||
return IPPROTO_TCP;
|
||||
} else if (max == udp) {
|
||||
return IPPROTO_UDP;
|
||||
} else if (max == icmp) {
|
||||
return IPPROTO_ICMP;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned int detect_attack_protocol(map_element& speed_element, direction_t attack_direction) {
|
||||
if (attack_direction == INCOMING) {
|
||||
return get_max_used_protocol(speed_element.tcp_in_packets, speed_element.udp_in_packets,
|
||||
speed_element.icmp_in_packets);
|
||||
} else {
|
||||
// OUTGOING
|
||||
return get_max_used_protocol(speed_element.tcp_out_packets, speed_element.udp_out_packets,
|
||||
speed_element.icmp_out_packets);
|
||||
}
|
||||
}
|
||||
|
||||
// We calculate speed from packet counters here
|
||||
void build_speed_counters_from_packet_counters(map_element& new_speed_element,
|
||||
map_element* vector_itr,
|
||||
@ -1610,3 +1691,1426 @@ redisContext* redis_init_connection() {
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
void execute_ip_ban(uint32_t client_ip, map_element average_speed_element, std::string flow_attack_details, subnet_t customer_subnet) {
|
||||
struct attack_details current_attack;
|
||||
uint64_t pps = 0;
|
||||
|
||||
uint64_t in_pps = average_speed_element.in_packets;
|
||||
uint64_t out_pps = average_speed_element.out_packets;
|
||||
uint64_t in_bps = average_speed_element.in_bytes;
|
||||
uint64_t out_bps = average_speed_element.out_bytes;
|
||||
uint64_t in_flows = average_speed_element.in_flows;
|
||||
uint64_t out_flows = average_speed_element.out_flows;
|
||||
|
||||
direction_t data_direction;
|
||||
|
||||
if (!global_ban_settings.enable_ban) {
|
||||
logger << log4cpp::Priority::INFO << "We do not ban: " << convert_ip_as_uint_to_string(client_ip)
|
||||
<< " because ban disabled completely";
|
||||
return;
|
||||
}
|
||||
|
||||
// Detect attack direction with simple heuristic
|
||||
if (abs(int((int)in_pps - (int)out_pps)) < 1000) {
|
||||
// If difference between pps speed is so small we should do additional investigation using
|
||||
// bandwidth speed
|
||||
if (in_bps > out_bps) {
|
||||
data_direction = INCOMING;
|
||||
pps = in_pps;
|
||||
} else {
|
||||
data_direction = OUTGOING;
|
||||
pps = out_pps;
|
||||
}
|
||||
} else {
|
||||
if (in_pps > out_pps) {
|
||||
data_direction = INCOMING;
|
||||
pps = in_pps;
|
||||
} else {
|
||||
data_direction = OUTGOING;
|
||||
pps = out_pps;
|
||||
}
|
||||
}
|
||||
|
||||
current_attack.attack_protocol = detect_attack_protocol(average_speed_element, data_direction);
|
||||
|
||||
if (ban_list.count(client_ip) > 0) {
|
||||
if (ban_list[client_ip].attack_direction != data_direction) {
|
||||
logger << log4cpp::Priority::INFO << "We expected very strange situation: attack direction for "
|
||||
<< convert_ip_as_uint_to_string(client_ip) << " was changed";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// update attack power
|
||||
if (pps > ban_list[client_ip].max_attack_power) {
|
||||
ban_list[client_ip].max_attack_power = pps;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
prefix_t prefix_for_check_adreess;
|
||||
prefix_for_check_adreess.add.sin.s_addr = client_ip;
|
||||
prefix_for_check_adreess.family = AF_INET;
|
||||
prefix_for_check_adreess.bitlen = 32;
|
||||
|
||||
bool in_white_list = (patricia_search_best2(whitelist_tree_ipv4, &prefix_for_check_adreess, 1) != NULL);
|
||||
|
||||
if (in_white_list) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::string data_direction_as_string = get_direction_name(data_direction);
|
||||
|
||||
logger << log4cpp::Priority::INFO << "We run execute_ip_ban code with following params "
|
||||
<< " in_pps: " << in_pps << " out_pps: " << out_pps << " in_bps: " << in_bps
|
||||
<< " out_bps: " << out_bps << " and we decide it's " << data_direction_as_string << " attack";
|
||||
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
|
||||
std::string pps_as_string = convert_int_to_string(pps);
|
||||
|
||||
// Store information about subnet
|
||||
current_attack.customer_network = customer_subnet;
|
||||
|
||||
// Store ban time
|
||||
time(¤t_attack.ban_timestamp);
|
||||
// set ban time in seconds
|
||||
current_attack.ban_time = global_ban_time;
|
||||
current_attack.unban_enabled = unban_enabled;
|
||||
|
||||
// Pass main information about attack
|
||||
current_attack.attack_direction = data_direction;
|
||||
current_attack.attack_power = pps;
|
||||
current_attack.max_attack_power = pps;
|
||||
|
||||
current_attack.in_packets = in_pps;
|
||||
current_attack.out_packets = out_pps;
|
||||
|
||||
current_attack.in_bytes = in_bps;
|
||||
current_attack.out_bytes = out_bps;
|
||||
|
||||
// pass flow information
|
||||
current_attack.in_flows = in_flows;
|
||||
current_attack.out_flows = out_flows;
|
||||
|
||||
current_attack.fragmented_in_packets = average_speed_element.fragmented_in_packets;
|
||||
current_attack.tcp_in_packets = average_speed_element.tcp_in_packets;
|
||||
current_attack.tcp_syn_in_packets = average_speed_element.tcp_syn_in_packets;
|
||||
current_attack.udp_in_packets = average_speed_element.udp_in_packets;
|
||||
current_attack.icmp_in_packets = average_speed_element.icmp_in_packets;
|
||||
|
||||
current_attack.fragmented_out_packets = average_speed_element.fragmented_out_packets;
|
||||
current_attack.tcp_out_packets = average_speed_element.tcp_out_packets;
|
||||
current_attack.tcp_syn_out_packets = average_speed_element.tcp_syn_out_packets;
|
||||
current_attack.udp_out_packets = average_speed_element.udp_out_packets;
|
||||
current_attack.icmp_out_packets = average_speed_element.icmp_out_packets;
|
||||
|
||||
current_attack.fragmented_out_bytes = average_speed_element.fragmented_out_bytes;
|
||||
current_attack.tcp_out_bytes = average_speed_element.tcp_out_bytes;
|
||||
current_attack.tcp_syn_out_bytes = average_speed_element.tcp_syn_out_bytes;
|
||||
current_attack.udp_out_bytes = average_speed_element.udp_out_bytes;
|
||||
current_attack.icmp_out_bytes = average_speed_element.icmp_out_bytes;
|
||||
|
||||
current_attack.fragmented_in_bytes = average_speed_element.fragmented_in_bytes;
|
||||
current_attack.tcp_in_bytes = average_speed_element.tcp_in_bytes;
|
||||
current_attack.tcp_syn_in_bytes = average_speed_element.tcp_syn_in_bytes;
|
||||
current_attack.udp_in_bytes = average_speed_element.udp_in_bytes;
|
||||
current_attack.icmp_in_bytes = average_speed_element.icmp_in_bytes;
|
||||
|
||||
current_attack.average_in_packets = average_speed_element.in_packets;
|
||||
current_attack.average_in_bytes = average_speed_element.in_bytes;
|
||||
current_attack.average_in_flows = average_speed_element.in_flows;
|
||||
|
||||
current_attack.average_out_packets = average_speed_element.out_packets;
|
||||
current_attack.average_out_bytes = average_speed_element.out_bytes;
|
||||
current_attack.average_out_flows = average_speed_element.out_flows;
|
||||
|
||||
if (collect_attack_pcap_dumps) {
|
||||
bool buffer_allocation_result =
|
||||
current_attack.pcap_attack_dump.allocate_buffer(number_of_packets_for_pcap_attack_dump);
|
||||
|
||||
if (!buffer_allocation_result) {
|
||||
logger << log4cpp::Priority::ERROR
|
||||
<< "Can't allocate buffer for attack, switch off this option completely ";
|
||||
collect_attack_pcap_dumps = false;
|
||||
}
|
||||
}
|
||||
|
||||
ban_list_mutex.lock();
|
||||
ban_list[client_ip] = current_attack;
|
||||
ban_list_mutex.unlock();
|
||||
|
||||
ban_list_details_mutex.lock();
|
||||
ban_list_details[client_ip] = std::vector<simple_packet_t>();
|
||||
ban_list_details_mutex.unlock();
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Attack with direction: " << data_direction_as_string
|
||||
<< " IP: " << client_ip_as_string << " Power: " << pps_as_string;
|
||||
|
||||
call_ban_handlers(client_ip, ban_list[client_ip], flow_attack_details);
|
||||
}
|
||||
|
||||
void call_ban_handlers(uint32_t client_ip, attack_details& current_attack, std::string flow_attack_details) {
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string(client_ip);
|
||||
std::string pps_as_string = convert_int_to_string(current_attack.attack_power);
|
||||
std::string data_direction_as_string = get_direction_name(current_attack.attack_direction);
|
||||
|
||||
bool store_attack_details_to_file = true;
|
||||
|
||||
std::string basic_attack_information = get_attack_description(client_ip, current_attack);
|
||||
|
||||
std::string basic_attack_information_in_json = get_attack_description_in_json(client_ip, current_attack);
|
||||
|
||||
std::string full_attack_description = basic_attack_information + flow_attack_details;
|
||||
|
||||
if (store_attack_details_to_file) {
|
||||
print_attack_details_to_file(full_attack_description, client_ip_as_string, current_attack);
|
||||
}
|
||||
|
||||
if (pfring_hardware_filters_enabled) {
|
||||
#ifdef PF_RING
|
||||
logger << log4cpp::Priority::INFO << "We will block traffic to/from this IP with hardware filters";
|
||||
pfring_hardware_filter_action_block(client_ip_as_string);
|
||||
#else
|
||||
logger << log4cpp::Priority::ERROR << "You haven't compiled PF_RING hardware filters support";
|
||||
#endif
|
||||
}
|
||||
|
||||
if (notify_script_enabled) {
|
||||
std::string script_call_params = fastnetmon_platform_configuration.notify_script_path + " " + client_ip_as_string + " " +
|
||||
data_direction_as_string + " " + pps_as_string + " " + "ban";
|
||||
logger << log4cpp::Priority::INFO << "Call script for ban client: " << client_ip_as_string;
|
||||
|
||||
// We should execute external script in separate thread because any lag in this code will be
|
||||
// very distructive
|
||||
|
||||
if (notify_script_pass_details) {
|
||||
// We will pass attack details over stdin
|
||||
boost::thread exec_thread(exec_with_stdin_params, script_call_params, full_attack_description);
|
||||
exec_thread.detach();
|
||||
} else {
|
||||
// Do not pass anything to script
|
||||
boost::thread exec_thread(exec, script_call_params);
|
||||
exec_thread.detach();
|
||||
}
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Script for ban client is finished: " << client_ip_as_string;
|
||||
}
|
||||
|
||||
if (exabgp_enabled) {
|
||||
logger << log4cpp::Priority::INFO << "Call ExaBGP for ban client started: " << client_ip_as_string;
|
||||
|
||||
boost::thread exabgp_thread(exabgp_ban_manage, "ban", client_ip_as_string, current_attack);
|
||||
exabgp_thread.detach();
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Call to ExaBGP for ban client is finished: " << client_ip_as_string;
|
||||
}
|
||||
|
||||
#ifdef ENABLE_GOBGP
|
||||
if (gobgp_enabled) {
|
||||
logger << log4cpp::Priority::INFO << "Call GoBGP for ban client started: " << client_ip_as_string;
|
||||
|
||||
boost::thread gobgp_thread(gobgp_ban_manage, "ban", client_ip_as_string, current_attack);
|
||||
gobgp_thread.detach();
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Call to GoBGP for ban client is finished: " << client_ip_as_string;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef REDIS
|
||||
if (redis_enabled) {
|
||||
std::string redis_key_name = client_ip_as_string + "_information";
|
||||
|
||||
if (!redis_prefix.empty()) {
|
||||
redis_key_name = redis_prefix + "_" + client_ip_as_string + "_information";
|
||||
}
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Start data save in Redis in key: " << redis_key_name;
|
||||
boost::thread redis_store_thread(store_data_in_redis, redis_key_name, basic_attack_information_in_json);
|
||||
redis_store_thread.detach();
|
||||
logger << log4cpp::Priority::INFO << "Finish data save in Redis in key: " << redis_key_name;
|
||||
|
||||
// If we have flow dump put in redis too
|
||||
if (!flow_attack_details.empty()) {
|
||||
std::string redis_key_name = client_ip_as_string + "_flow_dump";
|
||||
|
||||
if (!redis_prefix.empty()) {
|
||||
redis_key_name = redis_prefix + "_" + client_ip_as_string + "_flow_dump";
|
||||
}
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Start data save in redis in key: " << redis_key_name;
|
||||
boost::thread redis_store_thread(store_data_in_redis, redis_key_name, flow_attack_details);
|
||||
redis_store_thread.detach();
|
||||
logger << log4cpp::Priority::INFO << "Finish data save in redis in key: " << redis_key_name;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef MONGO
|
||||
if (mongodb_enabled) {
|
||||
std::string mongo_key_name = client_ip_as_string + "_information_" +
|
||||
print_time_t_in_fastnetmon_format(current_attack.ban_timestamp);
|
||||
|
||||
// We could not use dot in key names: http://docs.mongodb.org/manual/core/document/#dot-notation
|
||||
std::replace(mongo_key_name.begin(), mongo_key_name.end(), '.', '_');
|
||||
|
||||
logger << log4cpp::Priority::INFO << "Start data save in Mongo in key: " << mongo_key_name;
|
||||
boost::thread mongo_store_thread(store_data_in_mongo, mongo_key_name, basic_attack_information_in_json);
|
||||
mongo_store_thread.detach();
|
||||
logger << log4cpp::Priority::INFO << "Finish data save in Mongo in key: " << mongo_key_name;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
#ifdef MONGO
|
||||
void store_data_in_mongo(std::string key_name, std::string attack_details_json) {
|
||||
mongoc_client_t* client;
|
||||
mongoc_collection_t* collection;
|
||||
mongoc_cursor_t* cursor;
|
||||
bson_error_t error;
|
||||
bson_oid_t oid;
|
||||
bson_t* doc;
|
||||
|
||||
mongoc_init();
|
||||
|
||||
std::string collection_name = "attacks";
|
||||
std::string connection_string =
|
||||
"mongodb://" + mongodb_host + ":" + convert_int_to_string(mongodb_port) + "/";
|
||||
|
||||
client = mongoc_client_new(connection_string.c_str());
|
||||
|
||||
if (!client) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't connect to MongoDB database";
|
||||
return;
|
||||
}
|
||||
|
||||
bson_error_t bson_from_json_error;
|
||||
bson_t* bson_data = bson_new_from_json((const uint8_t*)attack_details_json.c_str(),
|
||||
attack_details_json.size(), &bson_from_json_error);
|
||||
if (!bson_data) {
|
||||
logger << log4cpp::Priority::ERROR << "Could not convert JSON to BSON";
|
||||
return;
|
||||
}
|
||||
|
||||
// logger << log4cpp::Priority::INFO << bson_as_json(bson_data, NULL);
|
||||
|
||||
collection =
|
||||
mongoc_client_get_collection(client, mongodb_database_name.c_str(), collection_name.c_str());
|
||||
|
||||
doc = bson_new();
|
||||
bson_oid_init(&oid, NULL);
|
||||
BSON_APPEND_OID(doc, "_id", &oid);
|
||||
bson_append_document(doc, key_name.c_str(), key_name.size(), bson_data);
|
||||
|
||||
// logger << log4cpp::Priority::INFO << bson_as_json(doc, NULL);
|
||||
|
||||
if (!mongoc_collection_insert(collection, MONGOC_INSERT_NONE, doc, NULL, &error)) {
|
||||
logger << log4cpp::Priority::ERROR << "Could not store data to MongoDB: " << error.message;
|
||||
}
|
||||
|
||||
// TODO: destroy bson_data too!
|
||||
|
||||
bson_destroy(doc);
|
||||
mongoc_collection_destroy(collection);
|
||||
mongoc_client_destroy(client);
|
||||
}
|
||||
#endif
|
||||
|
||||
// pretty print channel speed in pps and MBit
|
||||
std::string print_channel_speed(std::string traffic_type, direction_t packet_direction) {
|
||||
uint64_t speed_in_pps = total_speed_average_counters[packet_direction].packets;
|
||||
uint64_t speed_in_bps = total_speed_average_counters[packet_direction].bytes;
|
||||
|
||||
unsigned int number_of_tabs = 1;
|
||||
// We need this for correct alignment of blocks
|
||||
if (traffic_type == "Other traffic") {
|
||||
number_of_tabs = 2;
|
||||
}
|
||||
|
||||
std::stringstream stream;
|
||||
stream << traffic_type;
|
||||
|
||||
for (unsigned int i = 0; i < number_of_tabs; i++) {
|
||||
stream << "\t";
|
||||
}
|
||||
|
||||
uint64_t speed_in_mbps = convert_speed_to_mbps(speed_in_bps);
|
||||
|
||||
stream << std::setw(6) << speed_in_pps << " pps " << std::setw(6) << speed_in_mbps << " mbps";
|
||||
|
||||
if (traffic_type == "Incoming traffic" or traffic_type == "Outgoing traffic") {
|
||||
if (packet_direction == INCOMING) {
|
||||
stream << " " << std::setw(6) << incoming_total_flows_speed << " flows";
|
||||
} else if (packet_direction == OUTGOING) {
|
||||
stream << " " << std::setw(6) << outgoing_total_flows_speed << " flows";
|
||||
}
|
||||
|
||||
if (graphite_enabled) {
|
||||
graphite_data_t graphite_data;
|
||||
|
||||
std::string direction_as_string;
|
||||
|
||||
if (packet_direction == INCOMING) {
|
||||
direction_as_string = "incoming";
|
||||
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] =
|
||||
incoming_total_flows_speed;
|
||||
} else if (packet_direction == OUTGOING) {
|
||||
direction_as_string = "outgoing";
|
||||
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".flows"] =
|
||||
outgoing_total_flows_speed;
|
||||
}
|
||||
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".pps"] = speed_in_pps;
|
||||
graphite_data[graphite_prefix + ".total." + direction_as_string + ".bps"] = speed_in_bps * 8;
|
||||
|
||||
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
|
||||
|
||||
if (!graphite_put_result) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return stream.str();
|
||||
}
|
||||
|
||||
|
||||
void traffic_draw_program() {
|
||||
std::stringstream output_buffer;
|
||||
|
||||
// logger<<log4cpp::Priority::INFO<<"Draw table call";
|
||||
|
||||
struct timeval start_calc_time;
|
||||
gettimeofday(&start_calc_time, NULL);
|
||||
|
||||
sort_type sorter;
|
||||
if (sort_parameter == "packets") {
|
||||
sorter = PACKETS;
|
||||
} else if (sort_parameter == "bytes") {
|
||||
sorter = BYTES;
|
||||
} else if (sort_parameter == "flows") {
|
||||
sorter = FLOWS;
|
||||
} else {
|
||||
logger << log4cpp::Priority::INFO << "Unexpected sorter type: " << sort_parameter;
|
||||
sorter = PACKETS;
|
||||
}
|
||||
|
||||
output_buffer << "FastNetMon " << fastnetmon_platform_configuration.fastnetmon_version << " Try Advanced edition: https://fastnetmon.com"
|
||||
<< "\n"
|
||||
<< "IPs ordered by: " << sort_parameter << "\n";
|
||||
|
||||
output_buffer << print_channel_speed("Incoming traffic", INCOMING) << std::endl;
|
||||
|
||||
if (process_incoming_traffic) {
|
||||
output_buffer << draw_table(INCOMING, true, sorter);
|
||||
output_buffer << std::endl;
|
||||
}
|
||||
|
||||
output_buffer << print_channel_speed("Outgoing traffic", OUTGOING) << std::endl;
|
||||
|
||||
if (process_outgoing_traffic) {
|
||||
output_buffer << draw_table(OUTGOING, false, sorter);
|
||||
output_buffer << std::endl;
|
||||
}
|
||||
|
||||
output_buffer << print_channel_speed("Internal traffic", INTERNAL) << std::endl;
|
||||
|
||||
output_buffer << std::endl;
|
||||
|
||||
output_buffer << print_channel_speed("Other traffic", OTHER) << std::endl;
|
||||
|
||||
output_buffer << std::endl;
|
||||
|
||||
// Application statistics
|
||||
output_buffer << "Screen updated in:\t\t" << drawing_thread_execution_time.tv_sec << " sec "
|
||||
<< drawing_thread_execution_time.tv_usec << " microseconds\n";
|
||||
|
||||
output_buffer << "Traffic calculated in:\t\t" << speed_calculation_time.tv_sec << " sec "
|
||||
<< speed_calculation_time.tv_usec << " microseconds\n";
|
||||
|
||||
if (speed_calculation_time.tv_sec > 0) {
|
||||
output_buffer
|
||||
<< "ALERT! Toolkit working incorrectly! We should calculate speed in ~1 second\n";
|
||||
}
|
||||
|
||||
#ifdef IPV6_HASH_COUNTERS
|
||||
output_buffer << "Total amount of IPv6 packets: " << total_ipv6_packets << "\n";
|
||||
#endif
|
||||
|
||||
output_buffer << "Total amount of IPv6 packets related to our own network: " << our_ipv6_packets << "\n";
|
||||
output_buffer << "Not processed packets: " << total_unparsed_packets_speed << " pps\n";
|
||||
|
||||
// Print backend stats
|
||||
if (enable_pcap_collection) {
|
||||
output_buffer << get_pcap_stats() << "\n";
|
||||
}
|
||||
|
||||
#ifdef PF_RING
|
||||
if (enable_data_collection_from_mirror) {
|
||||
output_buffer << get_pf_ring_stats();
|
||||
}
|
||||
#endif
|
||||
|
||||
// Print thresholds
|
||||
if (print_configuration_params_on_the_screen) {
|
||||
output_buffer << "\n" << print_ban_thresholds(global_ban_settings);
|
||||
}
|
||||
|
||||
if (!ban_list.empty()) {
|
||||
output_buffer << std::endl << "Ban list:" << std::endl;
|
||||
output_buffer << print_ddos_attack_details();
|
||||
}
|
||||
|
||||
if (enable_subnet_counters) {
|
||||
output_buffer << std::endl << "Subnet load:" << std::endl;
|
||||
output_buffer << print_subnet_load() << "\n";
|
||||
}
|
||||
|
||||
screen_data_stats = output_buffer.str();
|
||||
|
||||
// Print screen contents into file
|
||||
print_screen_contents_into_file(screen_data_stats);
|
||||
|
||||
struct timeval end_calc_time;
|
||||
gettimeofday(&end_calc_time, NULL);
|
||||
|
||||
timeval_subtract(&drawing_thread_execution_time, &end_calc_time, &start_calc_time);
|
||||
}
|
||||
|
||||
|
||||
/* Calculate speed for all connnections */
|
||||
void recalculate_speed() {
|
||||
// logger<< log4cpp::Priority::INFO<<"We run recalculate_speed";
|
||||
|
||||
struct timeval start_calc_time;
|
||||
gettimeofday(&start_calc_time, NULL);
|
||||
|
||||
double speed_calc_period = recalculate_speed_timeout;
|
||||
time_t start_time;
|
||||
time(&start_time);
|
||||
|
||||
// If we got 1+ seconds lag we should use new "delta" or skip this step
|
||||
double time_difference = difftime(start_time, last_call_of_traffic_recalculation);
|
||||
|
||||
if (time_difference < 0) {
|
||||
// It may happen when you adjust time
|
||||
logger << log4cpp::Priority::ERROR << "Negative delay for traffic calculation " << time_difference << " Skipped iteration";
|
||||
return;
|
||||
} else if (time_difference < recalculate_speed_timeout) {
|
||||
// It could occur on toolkit start or in some weird cases of Linux scheduler
|
||||
// I really saw cases when sleep executed in zero zeconds:
|
||||
// [WARN] Sleep time expected: 1. Sleep time experienced: 0
|
||||
// But we have handlers for such case and should not bother client about with it
|
||||
// And we are using DEBUG level here
|
||||
logger << log4cpp::Priority::DEBUG
|
||||
<< "We skip one iteration of speed_calc because it runs so early! That's "
|
||||
"really impossible! Please ask support.";
|
||||
logger << log4cpp::Priority::DEBUG << "Sleep time expected: " << recalculate_speed_timeout
|
||||
<< ". Sleep time experienced: " << time_difference;
|
||||
return;
|
||||
} else if (int(time_difference) == int(speed_calc_period)) {
|
||||
// All fine, we run on time
|
||||
} else {
|
||||
// logger << log4cpp::Priority::INFO << "Time from last run of speed_recalc is soooo big, we got ugly lags: " <<
|
||||
// time_difference << " seconds";
|
||||
speed_calc_period = time_difference;
|
||||
}
|
||||
|
||||
map_element zero_map_element;
|
||||
memset(&zero_map_element, 0, sizeof(zero_map_element));
|
||||
|
||||
uint64_t incoming_total_flows = 0;
|
||||
uint64_t outgoing_total_flows = 0;
|
||||
|
||||
if (enable_subnet_counters) {
|
||||
for (map_for_subnet_counters::iterator itr = PerSubnetSpeedMap.begin();
|
||||
itr != PerSubnetSpeedMap.end(); ++itr) {
|
||||
subnet_t current_subnet = itr->first;
|
||||
|
||||
map_for_subnet_counters::iterator iter_subnet = PerSubnetCountersMap.find(current_subnet);
|
||||
|
||||
if (iter_subnet == PerSubnetCountersMap.end()) {
|
||||
logger << log4cpp::Priority::INFO << "Can't find traffic counters for subnet";
|
||||
break;
|
||||
}
|
||||
|
||||
subnet_counter_t* subnet_traffic = &iter_subnet->second;
|
||||
|
||||
subnet_counter_t new_speed_element;
|
||||
|
||||
new_speed_element.in_packets = uint64_t((double)subnet_traffic->in_packets / speed_calc_period);
|
||||
new_speed_element.in_bytes = uint64_t((double)subnet_traffic->in_bytes / speed_calc_period);
|
||||
|
||||
new_speed_element.out_packets = uint64_t((double)subnet_traffic->out_packets / speed_calc_period);
|
||||
new_speed_element.out_bytes = uint64_t((double)subnet_traffic->out_bytes / speed_calc_period);
|
||||
|
||||
/* Moving average recalculation for subnets */
|
||||
/* http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance */
|
||||
double exp_power_subnet = -speed_calc_period / average_calculation_amount_for_subnets;
|
||||
double exp_value_subnet = exp(exp_power_subnet);
|
||||
|
||||
map_element* current_average_speed_element = &PerSubnetAverageSpeedMap[current_subnet];
|
||||
|
||||
current_average_speed_element->in_bytes = uint64_t(
|
||||
new_speed_element.in_bytes + exp_value_subnet * ((double)current_average_speed_element->in_bytes -
|
||||
(double)new_speed_element.in_bytes));
|
||||
|
||||
current_average_speed_element->out_bytes = uint64_t(
|
||||
new_speed_element.out_bytes + exp_value_subnet * ((double)current_average_speed_element->out_bytes -
|
||||
(double)new_speed_element.out_bytes));
|
||||
|
||||
current_average_speed_element->in_packets = uint64_t(
|
||||
new_speed_element.in_packets + exp_value_subnet * ((double)current_average_speed_element->in_packets -
|
||||
(double)new_speed_element.in_packets));
|
||||
|
||||
current_average_speed_element->out_packets =
|
||||
uint64_t(new_speed_element.out_packets +
|
||||
exp_value_subnet * ((double)current_average_speed_element->out_packets -
|
||||
(double)new_speed_element.out_packets));
|
||||
|
||||
// Update speed calculation structure
|
||||
PerSubnetSpeedMap[current_subnet] = new_speed_element;
|
||||
*subnet_traffic = zero_map_element;
|
||||
|
||||
// logger << log4cpp::Priority::INFO<<convert_subnet_to_string(current_subnet)
|
||||
// << "in pps: " << new_speed_element.in_packets << " out pps: " << new_speed_element.out_packets;
|
||||
}
|
||||
}
|
||||
|
||||
for (map_of_vector_counters::iterator itr = SubnetVectorMap.begin(); itr != SubnetVectorMap.end(); ++itr) {
|
||||
for (vector_of_counters::iterator vector_itr = itr->second.begin();
|
||||
vector_itr != itr->second.end(); ++vector_itr) {
|
||||
int current_index = vector_itr - itr->second.begin();
|
||||
|
||||
// New element
|
||||
map_element new_speed_element;
|
||||
|
||||
// convert to host order for math operations
|
||||
uint32_t subnet_ip = ntohl(itr->first.first);
|
||||
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
|
||||
|
||||
// covnert to our standard network byte order
|
||||
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
|
||||
|
||||
// Calculate speed for IP or whole subnet
|
||||
build_speed_counters_from_packet_counters(new_speed_element, &*vector_itr, speed_calc_period);
|
||||
|
||||
conntrack_main_struct* flow_counter_ptr = &SubnetVectorMapFlow[itr->first][current_index];
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
// todo: optimize this operations!
|
||||
// it's really bad and SLOW CODE
|
||||
uint64_t total_out_flows = (uint64_t)flow_counter_ptr->out_tcp.size() +
|
||||
(uint64_t)flow_counter_ptr->out_udp.size() +
|
||||
(uint64_t)flow_counter_ptr->out_icmp.size() +
|
||||
(uint64_t)flow_counter_ptr->out_other.size();
|
||||
|
||||
uint64_t total_in_flows =
|
||||
(uint64_t)flow_counter_ptr->in_tcp.size() + (uint64_t)flow_counter_ptr->in_udp.size() +
|
||||
(uint64_t)flow_counter_ptr->in_icmp.size() + (uint64_t)flow_counter_ptr->in_other.size();
|
||||
|
||||
new_speed_element.out_flows = uint64_t((double)total_out_flows / speed_calc_period);
|
||||
new_speed_element.in_flows = uint64_t((double)total_in_flows / speed_calc_period);
|
||||
|
||||
// Increment global counter
|
||||
outgoing_total_flows += new_speed_element.out_flows;
|
||||
incoming_total_flows += new_speed_element.in_flows;
|
||||
} else {
|
||||
new_speed_element.out_flows = 0;
|
||||
new_speed_element.in_flows = 0;
|
||||
}
|
||||
|
||||
/* Moving average recalculation */
|
||||
// http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance
|
||||
// double speed_calc_period = 1;
|
||||
double exp_power = -speed_calc_period / average_calculation_amount;
|
||||
double exp_value = exp(exp_power);
|
||||
|
||||
map_element* current_average_speed_element = &SubnetVectorMapSpeedAverage[itr->first][current_index];
|
||||
|
||||
// Calculate average speed from per-second speed
|
||||
build_average_speed_counters_from_speed_counters(current_average_speed_element,
|
||||
new_speed_element, exp_value, exp_power);
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
current_average_speed_element->out_flows = uint64_t(
|
||||
new_speed_element.out_flows + exp_value * ((double)current_average_speed_element->out_flows -
|
||||
(double)new_speed_element.out_flows));
|
||||
|
||||
current_average_speed_element->in_flows = uint64_t(
|
||||
new_speed_element.in_flows + exp_value * ((double)current_average_speed_element->in_flows -
|
||||
(double)new_speed_element.in_flows));
|
||||
}
|
||||
|
||||
/* Moving average recalculation end */
|
||||
std::string host_group_name;
|
||||
ban_settings_t current_ban_settings = get_ban_settings_for_this_subnet(itr->first, host_group_name);
|
||||
|
||||
if (we_should_ban_this_ip(current_average_speed_element, current_ban_settings)) {
|
||||
logger << log4cpp::Priority::DEBUG
|
||||
<< "We have found host group for this host as: " << host_group_name;
|
||||
|
||||
std::string flow_attack_details = "";
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_attack_details =
|
||||
print_flow_tracking_for_ip(*flow_counter_ptr, convert_ip_as_uint_to_string(client_ip));
|
||||
}
|
||||
|
||||
// TODO: we should pass type of ddos ban source (pps, flowd, bandwidth)!
|
||||
execute_ip_ban(client_ip, *current_average_speed_element, flow_attack_details, itr->first);
|
||||
}
|
||||
|
||||
SubnetVectorMapSpeed[itr->first][current_index] = new_speed_element;
|
||||
|
||||
*vector_itr = zero_map_element;
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate global flow speed
|
||||
incoming_total_flows_speed = uint64_t((double)incoming_total_flows / (double)speed_calc_period);
|
||||
outgoing_total_flows_speed = uint64_t((double)outgoing_total_flows / (double)speed_calc_period);
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
// Clean Flow Counter
|
||||
flow_counter.lock();
|
||||
zeroify_all_flow_counters();
|
||||
flow_counter.unlock();
|
||||
}
|
||||
|
||||
total_unparsed_packets_speed = uint64_t((double)total_unparsed_packets / (double)speed_calc_period);
|
||||
total_unparsed_packets = 0;
|
||||
|
||||
for (unsigned int index = 0; index < 4; index++) {
|
||||
total_speed_counters[index].bytes =
|
||||
uint64_t((double)total_counters[index].bytes / (double)speed_calc_period);
|
||||
|
||||
total_speed_counters[index].packets =
|
||||
uint64_t((double)total_counters[index].packets / (double)speed_calc_period);
|
||||
|
||||
double exp_power = -speed_calc_period / average_calculation_amount;
|
||||
double exp_value = exp(exp_power);
|
||||
|
||||
total_speed_average_counters[index].bytes = uint64_t(
|
||||
total_speed_counters[index].bytes + exp_value * ((double)total_speed_average_counters[index].bytes -
|
||||
(double)total_speed_counters[index].bytes));
|
||||
|
||||
total_speed_average_counters[index].packets =
|
||||
uint64_t(total_speed_counters[index].packets +
|
||||
exp_value * ((double)total_speed_average_counters[index].packets -
|
||||
(double)total_speed_counters[index].packets));
|
||||
|
||||
// nullify data counters after speed calculation
|
||||
total_counters[index].bytes = 0;
|
||||
total_counters[index].packets = 0;
|
||||
}
|
||||
|
||||
// Set time of previous startup
|
||||
time(&last_call_of_traffic_recalculation);
|
||||
|
||||
struct timeval finish_calc_time;
|
||||
gettimeofday(&finish_calc_time, NULL);
|
||||
|
||||
timeval_subtract(&speed_calculation_time, &finish_calc_time, &start_calc_time);
|
||||
}
|
||||
|
||||
std::string draw_table(direction_t data_direction, bool do_redis_update, sort_type sort_item) {
|
||||
std::vector<pair_of_map_elements> vector_for_sort;
|
||||
|
||||
std::stringstream output_buffer;
|
||||
|
||||
// Preallocate memory for sort vector
|
||||
// We use total networks size for this vector
|
||||
vector_for_sort.reserve(total_number_of_hosts_in_our_networks);
|
||||
|
||||
// Switch to Average speed there!!!
|
||||
map_of_vector_counters* current_speed_map = NULL;
|
||||
|
||||
if (print_average_traffic_counts) {
|
||||
current_speed_map = &SubnetVectorMapSpeedAverage;
|
||||
} else {
|
||||
current_speed_map = &SubnetVectorMapSpeed;
|
||||
}
|
||||
|
||||
map_element zero_map_element;
|
||||
memset(&zero_map_element, 0, sizeof(zero_map_element));
|
||||
|
||||
unsigned int count_of_zero_speed_packets = 0;
|
||||
for (map_of_vector_counters::iterator itr = current_speed_map->begin();
|
||||
itr != current_speed_map->end(); ++itr) {
|
||||
for (vector_of_counters::iterator vector_itr = itr->second.begin();
|
||||
vector_itr != itr->second.end(); ++vector_itr) {
|
||||
int current_index = vector_itr - itr->second.begin();
|
||||
|
||||
// convert to host order for math operations
|
||||
uint32_t subnet_ip = ntohl(itr->first.first);
|
||||
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;
|
||||
|
||||
// covnert to our standard network byte order
|
||||
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);
|
||||
|
||||
// Do not add zero speed packets to sort list
|
||||
if (memcmp((void*)&zero_map_element, &*vector_itr, sizeof(map_element)) != 0) {
|
||||
vector_for_sort.push_back(std::make_pair(client_ip, *vector_itr));
|
||||
} else {
|
||||
count_of_zero_speed_packets++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort only first X elements in this vector
|
||||
unsigned int shift_for_sort = max_ips_in_list;
|
||||
|
||||
if (data_direction == INCOMING or data_direction == OUTGOING) {
|
||||
// Because in another case we will got segmentation fault
|
||||
unsigned int vector_size = vector_for_sort.size();
|
||||
|
||||
if (vector_size < shift_for_sort) {
|
||||
shift_for_sort = vector_size;
|
||||
}
|
||||
|
||||
std::partial_sort(vector_for_sort.begin(), vector_for_sort.begin() + shift_for_sort,
|
||||
vector_for_sort.end(),
|
||||
TrafficComparatorClass<pair_of_map_elements>(data_direction, sort_item));
|
||||
} else {
|
||||
logger << log4cpp::Priority::ERROR << "Unexpected bahaviour on sort function";
|
||||
return "Internal error";
|
||||
}
|
||||
|
||||
unsigned int element_number = 0;
|
||||
|
||||
// In this loop we print only top X talkers in our subnet to screen buffer
|
||||
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin();
|
||||
ii != vector_for_sort.end(); ++ii) {
|
||||
// Print first max_ips_in_list elements in list, we will show top X "huge" channel loaders
|
||||
if (element_number >= max_ips_in_list) {
|
||||
break;
|
||||
}
|
||||
|
||||
uint32_t client_ip = (*ii).first;
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
|
||||
|
||||
uint64_t pps = 0;
|
||||
uint64_t bps = 0;
|
||||
uint64_t flows = 0;
|
||||
|
||||
uint64_t pps_average = 0;
|
||||
uint64_t bps_average = 0;
|
||||
uint64_t flows_average = 0;
|
||||
|
||||
// Here we could have average or instantaneous speed
|
||||
map_element* current_speed_element = &ii->second;
|
||||
|
||||
// Create polymorphic pps, byte and flow counters
|
||||
if (data_direction == INCOMING) {
|
||||
pps = current_speed_element->in_packets;
|
||||
bps = current_speed_element->in_bytes;
|
||||
flows = current_speed_element->in_flows;
|
||||
} else if (data_direction == OUTGOING) {
|
||||
pps = current_speed_element->out_packets;
|
||||
bps = current_speed_element->out_bytes;
|
||||
flows = current_speed_element->out_flows;
|
||||
}
|
||||
|
||||
uint64_t mbps = convert_speed_to_mbps(bps);
|
||||
uint64_t mbps_average = convert_speed_to_mbps(bps_average);
|
||||
|
||||
std::string is_banned = ban_list.count(client_ip) > 0 ? " *banned* " : "";
|
||||
|
||||
// We use setw for alignment
|
||||
output_buffer << client_ip_as_string << "\t\t";
|
||||
|
||||
output_buffer << std::setw(6) << pps << " pps ";
|
||||
output_buffer << std::setw(6) << mbps << " mbps ";
|
||||
output_buffer << std::setw(6) << flows << " flows ";
|
||||
|
||||
output_buffer << is_banned << std::endl;
|
||||
|
||||
element_number++;
|
||||
}
|
||||
|
||||
graphite_data_t graphite_data;
|
||||
|
||||
// TODO: add graphite operations time to the config file
|
||||
if (graphite_enabled) {
|
||||
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin();
|
||||
ii != vector_for_sort.end(); ++ii) {
|
||||
uint32_t client_ip = (*ii).first;
|
||||
std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);
|
||||
|
||||
uint64_t pps = 0;
|
||||
uint64_t bps = 0;
|
||||
uint64_t flows = 0;
|
||||
|
||||
// Here we could have average or instantaneous speed
|
||||
map_element* current_speed_element = &ii->second;
|
||||
|
||||
// Create polymorphic pps, byte and flow counters
|
||||
if (data_direction == INCOMING) {
|
||||
pps = current_speed_element->in_packets;
|
||||
bps = current_speed_element->in_bytes;
|
||||
flows = current_speed_element->in_flows;
|
||||
} else if (data_direction == OUTGOING) {
|
||||
pps = current_speed_element->out_packets;
|
||||
bps = current_speed_element->out_bytes;
|
||||
flows = current_speed_element->out_flows;
|
||||
}
|
||||
|
||||
std::string direction_as_string;
|
||||
|
||||
if (data_direction == INCOMING) {
|
||||
direction_as_string = "incoming";
|
||||
} else if (data_direction == OUTGOING) {
|
||||
direction_as_string = "outgoing";
|
||||
}
|
||||
|
||||
std::string ip_as_string_with_dash_delimiters = client_ip_as_string;
|
||||
// Replace dots by dashes
|
||||
std::replace(ip_as_string_with_dash_delimiters.begin(),
|
||||
ip_as_string_with_dash_delimiters.end(), '.', '_');
|
||||
|
||||
std::string graphite_current_prefix =
|
||||
graphite_prefix + ".hosts." + ip_as_string_with_dash_delimiters + "." + direction_as_string;
|
||||
|
||||
if (print_average_traffic_counts) {
|
||||
graphite_current_prefix = graphite_current_prefix + ".average";
|
||||
}
|
||||
|
||||
// We do not store zero data to Graphite
|
||||
if (pps != 0) {
|
||||
graphite_data[graphite_current_prefix + ".pps"] = pps;
|
||||
}
|
||||
|
||||
if (bps != 0) {
|
||||
graphite_data[graphite_current_prefix + ".bps"] = bps * 8;
|
||||
}
|
||||
|
||||
if (flows != 0) {
|
||||
graphite_data[graphite_current_prefix + ".flows"] = flows;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: we should switch to piclke format instead text
|
||||
// TODO: we should check packet size for Graphite
|
||||
// logger << log4cpp::Priority::INFO << "We will write " << graphite_data.size() << " records to Graphite";
|
||||
|
||||
if (graphite_enabled) {
|
||||
bool graphite_put_result = store_data_to_graphite(graphite_port, graphite_host, graphite_data);
|
||||
|
||||
if (!graphite_put_result) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't store data to Graphite";
|
||||
}
|
||||
}
|
||||
|
||||
return output_buffer.str();
|
||||
}
|
||||
|
||||
void print_screen_contents_into_file(std::string screen_data_stats_param) {
|
||||
std::ofstream screen_data_file;
|
||||
screen_data_file.open(cli_stats_file_path.c_str(), std::ios::trunc);
|
||||
|
||||
if (screen_data_file.is_open()) {
|
||||
// Set 660 permissions to file for security reasons
|
||||
chmod(cli_stats_file_path.c_str(), S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
|
||||
|
||||
screen_data_file << screen_data_stats_param;
|
||||
screen_data_file.close();
|
||||
} else {
|
||||
logger << log4cpp::Priority::ERROR << "Can't print program screen into file";
|
||||
}
|
||||
}
|
||||
|
||||
void zeroify_all_flow_counters() {
|
||||
// On creating it initilizes by zeros
|
||||
conntrack_main_struct zero_conntrack_main_struct;
|
||||
|
||||
// Iterate over map
|
||||
for (map_of_vector_counters_for_flow::iterator itr = SubnetVectorMapFlow.begin();
|
||||
itr != SubnetVectorMapFlow.end(); ++itr) {
|
||||
// Iterate over vector
|
||||
for (vector_of_flow_counters::iterator vector_iterator = itr->second.begin();
|
||||
vector_iterator != itr->second.end(); ++vector_iterator) {
|
||||
// TODO: rewrite this monkey code
|
||||
vector_iterator->in_tcp.clear();
|
||||
vector_iterator->in_udp.clear();
|
||||
vector_iterator->in_icmp.clear();
|
||||
vector_iterator->in_other.clear();
|
||||
|
||||
vector_iterator->out_tcp.clear();
|
||||
vector_iterator->out_udp.clear();
|
||||
vector_iterator->out_icmp.clear();
|
||||
vector_iterator->out_other.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Process simple unified packet */
|
||||
void process_packet(simple_packet_t& current_packet) {
|
||||
// Packets dump is very useful for bug hunting
|
||||
if (DEBUG_DUMP_ALL_PACKETS) {
|
||||
logger << log4cpp::Priority::INFO << "Dump: " << print_simple_packet(current_packet);
|
||||
}
|
||||
|
||||
if (current_packet.ip_protocol_version == 6) {
|
||||
#ifdef IPV6_HASH_COUNTERS
|
||||
current_packet.packet_direction =
|
||||
get_packet_direction_ipv6(lookup_tree_ipv6, current_packet.src_ipv6, current_packet.dst_ipv6);
|
||||
|
||||
// TODO: move to bulk operations here!
|
||||
multi_process_queue_for_ipv6_counters.enqueue(current_packet);
|
||||
#else
|
||||
|
||||
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&total_ipv6_packets, 1, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&total_ipv6_packets, 1);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// We do not process IPv6 at all on this mement
|
||||
if (current_packet.ip_protocol_version != 4) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Subnet for found IPs
|
||||
unsigned long subnet = 0;
|
||||
unsigned int subnet_cidr_mask = 0;
|
||||
|
||||
// We use these variables to track subnets for internal traffic because we have two of them
|
||||
unsigned long destination_subnet_host = 0;
|
||||
unsigned int destination_subnet_cidr_mask = 0;
|
||||
unsigned long source_subnet_host = 0;
|
||||
unsigned int source_subnet_cidr_mask = 0;
|
||||
|
||||
|
||||
direction_t packet_direction = get_packet_direction(lookup_tree_ipv4, current_packet.src_ip,
|
||||
current_packet.dst_ip, subnet, subnet_cidr_mask,
|
||||
destination_subnet_host, destination_subnet_cidr_mask,
|
||||
source_subnet_host, source_subnet_cidr_mask);
|
||||
|
||||
// It's useful in case when we can't find what packets do not processed correctly
|
||||
if (DEBUG_DUMP_OTHER_PACKETS && packet_direction == OTHER) {
|
||||
logger << log4cpp::Priority::INFO << "Dump other: " << print_simple_packet(current_packet);
|
||||
}
|
||||
|
||||
// Skip processing of specific traffic direction
|
||||
if ((packet_direction == INCOMING && !process_incoming_traffic) or
|
||||
(packet_direction == OUTGOING && !process_outgoing_traffic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
subnet_t current_subnet = std::make_pair(subnet, subnet_cidr_mask);
|
||||
|
||||
// We will use them for INTERNAL traffic type
|
||||
subnet_t source_subnet = std::make_pair(source_subnet_host, source_subnet_cidr_mask);
|
||||
subnet_t destination_subnet = std::make_pair(destination_subnet_host, destination_subnet_cidr_mask);
|
||||
|
||||
// Iterator for subnet counter
|
||||
subnet_counter_t* subnet_counter = NULL;
|
||||
|
||||
if (packet_direction == OUTGOING or packet_direction == INCOMING) {
|
||||
if (enable_subnet_counters) {
|
||||
map_for_subnet_counters::iterator subnet_iterator;
|
||||
|
||||
// Find element in map of subnet counters
|
||||
subnet_iterator = PerSubnetCountersMap.find(current_subnet);
|
||||
|
||||
if (subnet_iterator == PerSubnetCountersMap.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find counter structure for subnet";
|
||||
return;
|
||||
}
|
||||
|
||||
subnet_counter = &subnet_iterator->second;
|
||||
}
|
||||
}
|
||||
|
||||
map_of_vector_counters_for_flow::iterator itr_flow;
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
if (packet_direction == OUTGOING or packet_direction == INCOMING) {
|
||||
itr_flow = SubnetVectorMapFlow.find(current_subnet);
|
||||
|
||||
if (itr_flow == SubnetVectorMapFlow.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet flow map";
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Because we support mirroring, sflow and netflow we should support different cases:
|
||||
- One packet passed for processing (mirror)
|
||||
- Multiple packets ("flows") passed for processing (netflow)
|
||||
- One sampled packed passed for processing (netflow)
|
||||
- Another combinations of this three options
|
||||
*/
|
||||
|
||||
uint64_t sampled_number_of_packets = current_packet.number_of_packets * current_packet.sample_ratio;
|
||||
uint64_t sampled_number_of_bytes = current_packet.length * current_packet.sample_ratio;
|
||||
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&total_counters[packet_direction].packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(&total_counters[packet_direction].bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&total_counters[packet_direction].packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(&total_counters[packet_direction].bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
// Increment main and per protocol packet counters
|
||||
// Below we will implement different logic according to packet direction
|
||||
// We cannot use if / else if / else in this case because same conditions may trigger twice
|
||||
// For internal traffic type we trigger incoming and outgoing processing paths in same time
|
||||
if (packet_direction == OUTGOING or (process_internal_traffic_as_external && packet_direction == INTERNAL)) {
|
||||
uint32_t subnet_in_host_byte_order = 0;
|
||||
|
||||
// Try to find map key for this subnet
|
||||
map_of_vector_counters::iterator itr;
|
||||
|
||||
if (packet_direction == OUTGOING) {
|
||||
// We operate in host bytes order and need to convert subnet
|
||||
if (subnet != 0) {
|
||||
subnet_in_host_byte_order = ntohl(current_subnet.first);
|
||||
}
|
||||
|
||||
// Find element in map of vectors
|
||||
itr = SubnetVectorMap.find(current_subnet);
|
||||
}
|
||||
|
||||
// In this case we need to use another subnet
|
||||
if (packet_direction == INTERNAL) {
|
||||
subnet_in_host_byte_order = ntohl(source_subnet.first);
|
||||
|
||||
// Lookup another subnet in this case
|
||||
itr = SubnetVectorMap.find(source_subnet);
|
||||
}
|
||||
|
||||
if (itr == SubnetVectorMap.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map";
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t shift_in_vector = (int64_t)ntohl(current_packet.src_ip) - (int64_t)subnet_in_host_byte_order;
|
||||
|
||||
if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
|
||||
logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
|
||||
<< " which located outside allocated vector with size " << itr->second.size();
|
||||
|
||||
logger << log4cpp::Priority::ERROR << "We expect issues with this packet in OUTGOING direction: "
|
||||
<< print_simple_packet(current_packet);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
map_element* current_element = &itr->second[shift_in_vector];
|
||||
|
||||
// Main packet/bytes counter
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
// Fragmented IP packets
|
||||
if (current_packet.ip_fragmented) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->fragmented_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->fragmented_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->fragmented_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->fragmented_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
// TODO: add another counters
|
||||
if (enable_subnet_counters) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&subnet_counter->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(&subnet_counter->out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&subnet_counter->out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(&subnet_counter->out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
conntrack_main_struct* current_element_flow = NULL;
|
||||
if (enable_conection_tracking) {
|
||||
current_element_flow = &itr_flow->second[shift_in_vector];
|
||||
}
|
||||
|
||||
// Collect data when ban client
|
||||
if (!ban_list_details.empty() && ban_list_details.count(current_packet.src_ip) > 0 &&
|
||||
ban_list_details[current_packet.src_ip].size() < ban_details_records_count) {
|
||||
|
||||
ban_list_details_mutex.lock();
|
||||
|
||||
if (collect_attack_pcap_dumps) {
|
||||
// this code SHOULD NOT be called without mutex!
|
||||
if (current_packet.packet_payload_length > 0 && current_packet.packet_payload_pointer != NULL) {
|
||||
ban_list[current_packet.src_ip].pcap_attack_dump.write_packet(current_packet.packet_payload_pointer,
|
||||
current_packet.packet_payload_length);
|
||||
}
|
||||
}
|
||||
|
||||
ban_list_details[current_packet.src_ip].push_back(current_packet);
|
||||
ban_list_details_mutex.unlock();
|
||||
}
|
||||
|
||||
uint64_t connection_tracking_hash = 0;
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
packed_conntrack_hash flow_tracking_structure;
|
||||
flow_tracking_structure.opposite_ip = current_packet.dst_ip;
|
||||
flow_tracking_structure.src_port = current_packet.source_port;
|
||||
flow_tracking_structure.dst_port = current_packet.destination_port;
|
||||
|
||||
// convert this struct to 64 bit integer
|
||||
connection_tracking_hash = convert_conntrack_hash_struct_to_integer(&flow_tracking_structure);
|
||||
}
|
||||
|
||||
if (current_packet.protocol == IPPROTO_TCP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->out_tcp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_UDP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->udp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->udp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->udp_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->udp_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->out_udp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_ICMP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->icmp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->icmp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->icmp_out_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->icmp_out_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
// no flow tracking for icmp
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
if (packet_direction == INCOMING or (process_internal_traffic_as_external && packet_direction == INTERNAL)) {
|
||||
uint32_t subnet_in_host_byte_order = 0;
|
||||
|
||||
// Try to find map key for this subnet
|
||||
map_of_vector_counters::iterator itr;
|
||||
|
||||
if (packet_direction == INCOMING) {
|
||||
// We operate in host bytes order and need to convert subnet
|
||||
if (subnet != 0) {
|
||||
subnet_in_host_byte_order = ntohl(current_subnet.first);
|
||||
}
|
||||
|
||||
// Find element in map of vectors
|
||||
itr = SubnetVectorMap.find(current_subnet);
|
||||
}
|
||||
|
||||
// In this case we need to use another subnet
|
||||
if (packet_direction == INTERNAL) {
|
||||
subnet_in_host_byte_order = ntohl(destination_subnet.first);
|
||||
|
||||
// Lookup destination subnet in this case
|
||||
itr = SubnetVectorMap.find(destination_subnet);
|
||||
}
|
||||
|
||||
if (itr == SubnetVectorMap.end()) {
|
||||
logger << log4cpp::Priority::ERROR << "Can't find vector address in subnet map";
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
int64_t shift_in_vector = (int64_t)ntohl(current_packet.dst_ip) - (int64_t)subnet_in_host_byte_order;
|
||||
|
||||
if (shift_in_vector < 0 or shift_in_vector >= itr->second.size()) {
|
||||
logger << log4cpp::Priority::ERROR << "We tried to access to element with index " << shift_in_vector
|
||||
<< " which located outside allocated vector with size " << itr->second.size();
|
||||
|
||||
logger << log4cpp::Priority::ERROR << "We expect issues with this packet in INCOMING direction: "
|
||||
<< print_simple_packet(current_packet);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
map_element* current_element = &itr->second[shift_in_vector];
|
||||
|
||||
// Main packet/bytes counter
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (enable_subnet_counters) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(&subnet_counter->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(&subnet_counter->in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(&subnet_counter->in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(&subnet_counter->in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
// Count fragmented IP packets
|
||||
if (current_packet.ip_fragmented) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->fragmented_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->fragmented_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->fragmented_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->fragmented_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
conntrack_main_struct* current_element_flow = NULL;
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
current_element_flow = &itr_flow->second[shift_in_vector];
|
||||
}
|
||||
|
||||
uint64_t connection_tracking_hash = 0;
|
||||
if (enable_conection_tracking) {
|
||||
packed_conntrack_hash flow_tracking_structure;
|
||||
flow_tracking_structure.opposite_ip = current_packet.src_ip;
|
||||
flow_tracking_structure.src_port = current_packet.source_port;
|
||||
flow_tracking_structure.dst_port = current_packet.destination_port;
|
||||
|
||||
// convert this struct to 64 bit integer
|
||||
connection_tracking_hash = convert_conntrack_hash_struct_to_integer(&flow_tracking_structure);
|
||||
}
|
||||
|
||||
// Collect attack details
|
||||
if (!ban_list_details.empty() && ban_list_details.count(current_packet.dst_ip) > 0 &&
|
||||
ban_list_details[current_packet.dst_ip].size() < ban_details_records_count) {
|
||||
|
||||
ban_list_details_mutex.lock();
|
||||
|
||||
if (collect_attack_pcap_dumps) {
|
||||
// this code SHOULD NOT be called without mutex!
|
||||
if (current_packet.packet_payload_length > 0 && current_packet.packet_payload_pointer != NULL) {
|
||||
ban_list[current_packet.dst_ip].pcap_attack_dump.write_packet(current_packet.packet_payload_pointer,
|
||||
current_packet.packet_payload_length);
|
||||
}
|
||||
}
|
||||
|
||||
ban_list_details[current_packet.dst_ip].push_back(current_packet);
|
||||
ban_list_details_mutex.unlock();
|
||||
}
|
||||
|
||||
if (current_packet.protocol == IPPROTO_TCP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->tcp_syn_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->tcp_syn_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->in_tcp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_UDP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->udp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->udp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->udp_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->udp_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
|
||||
if (enable_conection_tracking) {
|
||||
flow_counter.lock();
|
||||
conntrack_key_struct* conntrack_key_struct_ptr =
|
||||
¤t_element_flow->in_udp[connection_tracking_hash];
|
||||
|
||||
conntrack_key_struct_ptr->packets += sampled_number_of_packets;
|
||||
conntrack_key_struct_ptr->bytes += sampled_number_of_bytes;
|
||||
flow_counter.unlock();
|
||||
}
|
||||
} else if (current_packet.protocol == IPPROTO_ICMP) {
|
||||
#ifdef USE_NEW_ATOMIC_BUILTINS
|
||||
__atomic_add_fetch(¤t_element->icmp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
|
||||
__atomic_add_fetch(¤t_element->icmp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
|
||||
#else
|
||||
__sync_fetch_and_add(¤t_element->icmp_in_packets, sampled_number_of_packets);
|
||||
__sync_fetch_and_add(¤t_element->icmp_in_bytes, sampled_number_of_bytes);
|
||||
#endif
|
||||
// no flow tracking for icmp
|
||||
} else {
|
||||
// TBD
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (packet_direction == INTERNAL) {
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,3 +85,18 @@ bool exabgp_flow_spec_ban_manage(std::string action, std::string flow_spec_rule_
|
||||
void store_data_in_redis(std::string key_name, std::string attack_details);
|
||||
redisContext* redis_init_connection();
|
||||
#endif
|
||||
|
||||
void execute_ip_ban(uint32_t client_ip, map_element average_speed_element, std::string flow_attack_details, subnet_t customer_subnet);
|
||||
void call_ban_handlers(uint32_t client_ip, attack_details& current_attack, std::string flow_attack_details);
|
||||
|
||||
#ifdef MONGO
|
||||
void store_data_in_mongo(std::string key_name, std::string attack_details_json);
|
||||
#endif
|
||||
|
||||
std::string print_channel_speed(std::string traffic_type, direction_t packet_direction);
|
||||
void traffic_draw_program();
|
||||
void recalculate_speed();
|
||||
std::string draw_table(direction_t data_direction, bool do_redis_update, sort_type sort_item);
|
||||
void print_screen_contents_into_file(std::string screen_data_stats_param);
|
||||
void zeroify_all_flow_counters();
|
||||
void process_packet(simple_packet_t& current_packet) ;
|
||||
|
Loading…
Reference in New Issue
Block a user