Moved logic to unified functions (#876)

This commit is contained in:
Pavel Odintsov 2020-11-14 23:19:13 +00:00 committed by GitHub
parent e3344cb726
commit 6f62a77b16
Signed by: GitHub
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 181 additions and 59 deletions

View File

@ -130,6 +130,9 @@ std::string cli_stats_file_path = "/tmp/fastnetmon.dat";
unsigned int stats_thread_sleep_time = 3600;
unsigned int stats_thread_initial_call_delay = 30;
// Current time with pretty low precision, we use separate thread to update it
time_t current_inaccurate_time = 0;
bool process_internal_traffic_as_external = false;
unsigned int recalculate_speed_timeout = 1;
@ -1699,6 +1702,12 @@ int main(int argc, char** argv) {
}
#endif
// Set inaccurate time value which will be used in process_packet() from capture backends
time(&current_inaccurate_time);
// Start system speed recalculation thread
service_thread_group.add_thread(new boost::thread(system_counters_speed_thread_handler));
// Run screen draw thread
service_thread_group.add_thread(new boost::thread(screen_draw_thread));

View File

@ -67,6 +67,7 @@ extern time_t last_call_of_traffic_recalculation;
extern bool process_incoming_traffic;
extern bool process_outgoing_traffic;
extern uint64_t total_unparsed_packets;
extern time_t current_inaccurate_time;
extern uint64_t total_unparsed_packets_speed;
extern bool enable_conection_tracking;
extern bool enable_afpacket_collection;
@ -2740,9 +2741,6 @@ void process_packet(simple_packet_t& current_packet) {
subnet_t source_subnet = std::make_pair(source_subnet_host, source_subnet_cidr_mask);
subnet_t destination_subnet = std::make_pair(destination_subnet_host, destination_subnet_cidr_mask);
// Iterator for subnet counter
subnet_counter_t* subnet_counter = NULL;
if (packet_direction == OUTGOING or packet_direction == INCOMING) {
if (enable_subnet_counters) {
map_for_subnet_counters::iterator subnet_iterator;
@ -2755,7 +2753,14 @@ void process_packet(simple_packet_t& current_packet) {
return;
}
subnet_counter = &subnet_iterator->second;
subnet_counter_t* subnet_counter = &subnet_iterator->second;
// Increment countres for each subnet
if (current_packet.packet_direction == OUTGOING) {
increment_outgoing_counters(subnet_counter, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
} else if (current_packet.packet_direction == INCOMING) {
increment_incoming_counters(subnet_counter, current_packet, sampled_number_of_packets, sampled_number_of_bytes);
}
}
}
@ -2854,17 +2859,6 @@ void process_packet(simple_packet_t& current_packet) {
#endif
}
// TODO: add another counters
if (enable_subnet_counters) {
#ifdef USE_NEW_ATOMIC_BUILTINS
__atomic_add_fetch(&subnet_counter->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&subnet_counter->out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
#else
__sync_fetch_and_add(&subnet_counter->out_packets, sampled_number_of_packets);
__sync_fetch_and_add(&subnet_counter->out_bytes, sampled_number_of_bytes);
#endif
}
conntrack_main_struct* current_element_flow = NULL;
if (enable_conection_tracking) {
current_element_flow = &itr_flow->second[shift_in_vector];
@ -3014,16 +3008,6 @@ void process_packet(simple_packet_t& current_packet) {
__sync_fetch_and_add(&current_element->in_bytes, sampled_number_of_bytes);
#endif
if (enable_subnet_counters) {
#ifdef USE_NEW_ATOMIC_BUILTINS
__atomic_add_fetch(&subnet_counter->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&subnet_counter->in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
#else
__sync_fetch_and_add(&subnet_counter->in_packets, sampled_number_of_packets);
__sync_fetch_and_add(&subnet_counter->in_bytes, sampled_number_of_bytes);
#endif
}
// Count fragmented IP packets
if (current_packet.ip_fragmented) {
#ifdef USE_NEW_ATOMIC_BUILTINS
@ -3136,3 +3120,94 @@ void process_packet(simple_packet_t& current_packet) {
}
}
// Increment fields using data from specified packet
void increment_outgoing_counters(map_element_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
// Update last update time
current_element->last_update_time = current_inaccurate_time;
// Main packet/bytes counter
__sync_fetch_and_add(&current_element->out_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->out_bytes, sampled_number_of_bytes);
// Fragmented IP packets
if (current_packet.ip_fragmented) {
__sync_fetch_and_add(&current_element->fragmented_out_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->fragmented_out_bytes, sampled_number_of_bytes);
}
if (current_packet.protocol == IPPROTO_TCP) {
__sync_fetch_and_add(&current_element->tcp_out_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->tcp_out_bytes, sampled_number_of_bytes);
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
__sync_fetch_and_add(&current_element->tcp_syn_out_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->tcp_syn_out_bytes, sampled_number_of_bytes);
}
} else if (current_packet.protocol == IPPROTO_UDP) {
__sync_fetch_and_add(&current_element->udp_out_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->udp_out_bytes, sampled_number_of_bytes);
} else if (current_packet.protocol == IPPROTO_ICMP) {
__sync_fetch_and_add(&current_element->icmp_out_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->icmp_out_bytes, sampled_number_of_bytes);
// no flow tracking for icmp
} else {
}
}
// This function increments all our accumulators according to data from packet
void increment_incoming_counters(map_element_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {
// Uodate last update time
current_element->last_update_time = current_inaccurate_time;
// Main packet/bytes counter
__sync_fetch_and_add(&current_element->in_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->in_bytes, sampled_number_of_bytes);
// Count fragmented IP packets
if (current_packet.ip_fragmented) {
__sync_fetch_and_add(&current_element->fragmented_in_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->fragmented_in_bytes, sampled_number_of_bytes);
}
// Count per protocol packets
if (current_packet.protocol == IPPROTO_TCP) {
__sync_fetch_and_add(&current_element->tcp_in_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->tcp_in_bytes, sampled_number_of_bytes);
if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
__sync_fetch_and_add(&current_element->tcp_syn_in_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->tcp_syn_in_bytes, sampled_number_of_bytes);
}
} else if (current_packet.protocol == IPPROTO_UDP) {
__sync_fetch_and_add(&current_element->udp_in_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->udp_in_bytes, sampled_number_of_bytes);
} else if (current_packet.protocol == IPPROTO_ICMP) {
__sync_fetch_and_add(&current_element->icmp_in_packets, sampled_number_of_packets);
__sync_fetch_and_add(&current_element->icmp_in_bytes, sampled_number_of_bytes);
} else {
// TBD
}
}
void system_counters_speed_thread_handler() {
while (true) {
// We recalculate it each second to avoid confusion
boost::this_thread::sleep(boost::posix_time::seconds(1));
// We use this thread to update time each second
time_t current_time = 0;
time(&current_time);
// Update global time, yes, it may became inaccurate due to thread syncronization but that's OK for our purposes
current_inaccurate_time = current_time;
}
}

View File

@ -100,3 +100,15 @@ std::string draw_table(direction_t data_direction, bool do_redis_update, sort_ty
void print_screen_contents_into_file(std::string screen_data_stats_param);
void zeroify_all_flow_counters();
void process_packet(simple_packet_t& current_packet) ;
void increment_outgoing_counters(map_element_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes);
void increment_incoming_counters(map_element_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes);
void system_counters_speed_thread_handler();

View File

@ -1,52 +1,48 @@
#include <stdint.h>
#include <boost/serialization/nvp.hpp>
// main data structure for storing traffic and speed data for all our IPs
class map_element_t {
public:
map_element_t()
: in_bytes(0), out_bytes(0), in_packets(0), out_packets(0), tcp_in_packets(0), tcp_out_packets(0),
tcp_in_bytes(0), tcp_out_bytes(0), tcp_syn_in_packets(0), tcp_syn_out_packets(0),
tcp_syn_in_bytes(0), tcp_syn_out_bytes(0), udp_in_packets(0), udp_out_packets(0),
udp_in_bytes(0), udp_out_bytes(0), in_flows(0), out_flows(0), fragmented_in_packets(0),
fragmented_out_packets(0), fragmented_in_bytes(0), fragmented_out_bytes(0),
icmp_in_packets(0), icmp_out_packets(0), icmp_in_bytes(0), icmp_out_bytes(0) {
}
uint64_t in_bytes;
uint64_t out_bytes;
uint64_t in_packets;
uint64_t out_packets;
// We use inaccurate time source for it becasue we do not care about precise time in this case
time_t last_update_time = 0;
uint64_t in_bytes = 0;
uint64_t out_bytes = 0;
uint64_t in_packets = 0;
uint64_t out_packets = 0;
// Fragmented traffic is so recently used for attacks
uint64_t fragmented_in_packets;
uint64_t fragmented_out_packets;
uint64_t fragmented_in_bytes;
uint64_t fragmented_out_bytes;
uint64_t fragmented_in_packets = 0;
uint64_t fragmented_out_packets = 0;
uint64_t fragmented_in_bytes = 0;
uint64_t fragmented_out_bytes = 0;
// Additional data for correct attack protocol detection
uint64_t tcp_in_packets;
uint64_t tcp_out_packets;
uint64_t tcp_in_bytes;
uint64_t tcp_out_bytes;
uint64_t tcp_in_packets = 0;
uint64_t tcp_out_packets = 0;
uint64_t tcp_in_bytes = 0;
uint64_t tcp_out_bytes = 0;
// Additional details about one of most popular atatck type
uint64_t tcp_syn_in_packets;
uint64_t tcp_syn_out_packets;
uint64_t tcp_syn_in_bytes;
uint64_t tcp_syn_out_bytes;
uint64_t tcp_syn_in_packets = 0;
uint64_t tcp_syn_out_packets = 0;
uint64_t tcp_syn_in_bytes = 0;
uint64_t tcp_syn_out_bytes = 0;
uint64_t udp_in_packets;
uint64_t udp_out_packets;
uint64_t udp_in_bytes;
uint64_t udp_out_bytes;
uint64_t udp_in_packets = 0;
uint64_t udp_out_packets = 0;
uint64_t udp_in_bytes = 0;
uint64_t udp_out_bytes = 0;
uint64_t icmp_in_packets;
uint64_t icmp_out_packets;
uint64_t icmp_in_bytes;
uint64_t icmp_out_bytes;
uint64_t icmp_in_packets = 0;
uint64_t icmp_out_packets = 0;
uint64_t icmp_in_bytes = 0;
uint64_t icmp_out_bytes = 0;
uint64_t in_flows;
uint64_t out_flows;
uint64_t in_flows = 0;
uint64_t out_flows = 0;
// Is total counters fields are zero? We are not handling per protocol counters here because we assume they should
// be counted twice
@ -90,4 +86,34 @@ class map_element_t {
in_flows = 0;
out_flows = 0;
}
template <class Archive> void serialize(Archive& ar, const unsigned int version) {
ar& BOOST_SERIALIZATION_NVP(in_bytes);
ar& BOOST_SERIALIZATION_NVP(out_bytes);
ar& BOOST_SERIALIZATION_NVP(in_packets);
ar& BOOST_SERIALIZATION_NVP(out_packets);
ar& BOOST_SERIALIZATION_NVP(fragmented_in_packets);
ar& BOOST_SERIALIZATION_NVP(fragmented_out_packets);
ar& BOOST_SERIALIZATION_NVP(fragmented_in_bytes);
ar& BOOST_SERIALIZATION_NVP(fragmented_out_bytes);
ar& BOOST_SERIALIZATION_NVP(tcp_in_packets);
ar& BOOST_SERIALIZATION_NVP(tcp_out_packets);
ar& BOOST_SERIALIZATION_NVP(tcp_in_bytes);
ar& BOOST_SERIALIZATION_NVP(tcp_out_bytes);
ar& BOOST_SERIALIZATION_NVP(tcp_syn_in_packets);
ar& BOOST_SERIALIZATION_NVP(tcp_syn_out_packets);
ar& BOOST_SERIALIZATION_NVP(tcp_syn_in_bytes);
ar& BOOST_SERIALIZATION_NVP(tcp_syn_out_bytes);
ar& BOOST_SERIALIZATION_NVP(udp_in_packets);
ar& BOOST_SERIALIZATION_NVP(udp_out_packets);
ar& BOOST_SERIALIZATION_NVP(udp_in_bytes);
ar& BOOST_SERIALIZATION_NVP(udp_out_bytes);
ar& BOOST_SERIALIZATION_NVP(icmp_in_packets);
ar& BOOST_SERIALIZATION_NVP(icmp_out_packets);
ar& BOOST_SERIALIZATION_NVP(icmp_in_bytes);
ar& BOOST_SERIALIZATION_NVP(icmp_out_bytes);
ar& BOOST_SERIALIZATION_NVP(in_flows);
ar& BOOST_SERIALIZATION_NVP(out_flows);
}
};