1
0
mirror of https://github.com/pavel-odintsov/fastnetmon synced 2024-11-22 20:42:03 +01:00

Sync of sFlow plugin with Advanced edition

This commit is contained in:
Pavel Odintsov 2024-07-17 18:26:45 +03:00
parent 4a9fc72729
commit 7be16b0a19
4 changed files with 73 additions and 81 deletions

@ -14,6 +14,7 @@ class fastnetmon_configuration_t {
std::vector<unsigned int> sflow_ports{};
std::string sflow_host{ "0.0.0.0" };
bool sflow_read_packet_length_from_ip_header{ false };
bool sflow_extract_tunnel_traffic{ false };
// Netflow / IPFIX
bool netflow{ false };

@ -5,7 +5,7 @@
class fixed_size_packet_storage_t {
public:
fixed_size_packet_storage_t() = default;
fixed_size_packet_storage_t(void* payload_pointer, unsigned int captured_length, unsigned int real_packet_length) {
fixed_size_packet_storage_t(const void* payload_pointer, unsigned int captured_length, unsigned int real_packet_length) {
// TODO: performance killer! Check it!
bool we_do_timestamps = true;

@ -183,7 +183,7 @@ void start_sflow_collection(process_packet_pointer func_ptr) {
boost::thread_group sflow_collector_threads;
logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We will listen on "
<< fastnetmon_global_configuration.sflow_ports.size() << " ports";
<< fastnetmon_global_configuration.sflow_ports.size() << " ports";
for (auto sflow_port : fastnetmon_global_configuration.sflow_ports) {
sflow_collector_threads.add_thread(
@ -231,7 +231,7 @@ void start_sflow_collector(const std::string& sflow_host, unsigned int sflow_por
if (bind_result != 0) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "cannot bind on " << sflow_port << ":" << sflow_host
<< " with errno: " << errno << " error: " << strerror(errno);
<< " with errno: " << errno << " error: " << strerror(errno);
return;
}
@ -380,13 +380,11 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
if (sflow_raw_protocol_header.header_protocol == SFLOW_HEADER_PROTOCOL_ETHERNET) {
bool unpack_gre = false;
// We could enable this new parser for testing purpose
auto result = parse_raw_packet_to_simple_packet_full_ng(header_payload_pointer,
sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
unpack_gre, fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);
auto result =
parse_raw_packet_to_simple_packet_full_ng(header_payload_pointer, sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
fastnetmon_global_configuration.sflow_extract_tunnel_traffic,
fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);
if (result != network_data_stuctures::parser_code_t::success) {
sflow_parse_error_nested_header++;
@ -401,10 +399,11 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
sflow_ipv4_header_protocol++;
// We parse this packet using special version of our parser which looks only on IPv4 packet
auto result = parse_raw_ipv4_packet_to_simple_packet_full_ng(header_payload_pointer,
sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);
auto result =
parse_raw_ipv4_packet_to_simple_packet_full_ng(header_payload_pointer,
sflow_raw_protocol_header.frame_length_before_sampling,
sflow_raw_protocol_header.header_size, packet,
fastnetmon_global_configuration.sflow_read_packet_length_from_ip_header);
if (result != network_data_stuctures::parser_code_t::success) {
sflow_parse_error_nested_header++;
@ -427,10 +426,13 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
return false;
}
// That's actually WEIRD as we touch these fields in packet parser logic too.
// So basically we do not use logic from parsers above and override values after parser finishes work
// Pass pointer to raw header to FastNetMon processing functions
packet.payload_pointer = header_payload_pointer;
packet.payload_full_length = sflow_raw_protocol_header.frame_length_before_sampling;
packet.captured_payload_length = sflow_raw_protocol_header.header_size;
packet.payload_pointer = header_payload_pointer;
packet.payload_full_length = sflow_raw_protocol_header.frame_length_before_sampling;
packet.captured_payload_length = sflow_raw_protocol_header.header_size;
packet.sample_ratio = sflow_sample_header_unified_accessor.sampling_rate;
@ -470,7 +472,7 @@ bool process_sflow_flow_sample(const uint8_t* data_pointer,
}
// We're ready to parse it
const sflow_extended_gateway_information_t* gateway_details = (sflow_extended_gateway_information_t*)payload_ptr;
const sflow_extended_gateway_information_t* gateway_details = (const sflow_extended_gateway_information_t*)payload_ptr;
packet.src_asn = fast_ntoh(gateway_details->router_asn);
packet.dst_asn = fast_ntoh(gateway_details->source_asn);
@ -501,14 +503,14 @@ void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_leng
bool read_sflow_header_result = read_sflow_header(payload_ptr, payload_length, sflow_header_accessor);
if (!read_sflow_header_result) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sflow packet header correctly";
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sFlow packet header correctly";
sflow_bad_packets++;
return;
}
if (sflow_header_accessor.get_datagram_samples_count() <= 0) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix
<< "Strange number of sFLOW samples: " << sflow_header_accessor.get_datagram_samples_count();
<< "Strange number of sFlow samples: " << sflow_header_accessor.get_datagram_samples_count();
sflow_bad_packets++;
return;
}
@ -539,10 +541,10 @@ void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_leng
// << " enterprise " << enterprise
// << " and length " << sample_length << std::endl;
int32_t enterprise = std::get<0>(sample);
int32_t integer_format = std::get<1>(sample);
uint8_t* data_pointer = std::get<2>(sample);
size_t data_length = std::get<3>(sample);
int32_t enterprise = std::get<0>(sample);
int32_t integer_format = std::get<1>(sample);
const uint8_t* data_pointer = std::get<2>(sample);
size_t data_length = std::get<3>(sample);
if (enterprise != 0) {
// We do not support vendor specific additions
@ -552,36 +554,33 @@ void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_leng
sflow_sample_type_t sample_format = sflow_sample_type_from_integer(integer_format);
if (sample_format == sflow_sample_type_t::BROKEN_TYPE) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type number: " << integer_format;
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type number: " << integer_format;
continue;
}
// Move this code to separate function!!!
if (sample_format == sflow_sample_type_t::FLOW_SAMPLE) {
// std::cout << "We got flow sample" << std::endl;
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got flow sample";
process_sflow_flow_sample(data_pointer, data_length, false, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::COUNTER_SAMPLE) {
// std::cout << "We got counter sample" << std::endl;
// TODO: add support for sflow counetrs
// process_sflow_counter_sample(data_pointer, data_length, false, sflow_header_accessor);
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got counter sample";
process_sflow_counter_sample(data_pointer, data_length, false, sflow_header_accessor);
sflow_counter_sample++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_FLOW_SAMPLE) {
// std::cout << "We got expanded flow sample" << std::endl;
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got expanded flow sample";
process_sflow_flow_sample(data_pointer, data_length, true, sflow_header_accessor, client_ipv4_address);
sflow_flow_samples++;
} else if (sample_format == sflow_sample_type_t::EXPANDED_COUNTER_SAMPLE) {
// TODO:add support for sflow counetrs
// std::cout << "We got expanded counter sample" << std::endl;
////process_sflow_counter_sample(data_pointer, data_length, true, sflow_header_accessor);
sflow_counter_sample++;
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "We got expanded counter sample";
// process_sflow_counter_sample(data_pointer, data_length, true, sflow_header_accessor);
sflow_expanded_counter_sample++;
} else {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we got broken format type: " << integer_format;
}
}
}
bool process_sflow_counter_sample(uint8_t* data_pointer,
bool process_sflow_counter_sample(const uint8_t* data_pointer,
size_t data_length,
bool expanded,
const sflow_packet_header_unified_accessor& sflow_header_accessor) {
@ -591,7 +590,7 @@ bool process_sflow_counter_sample(uint8_t* data_pointer,
read_sflow_counter_header(data_pointer, data_length, expanded, sflow_counter_header_unified_accessor);
if (!read_sflow_counter_header_result) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sflow counter header";
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "could not read sFlow counter header";
return false;
}
@ -614,53 +613,45 @@ bool process_sflow_counter_sample(uint8_t* data_pointer,
return false;
}
for (auto counter_record : counter_record_sample_vector) {
uint32_t enterprise = 0;
uint32_t format = 0;
ssize_t length = 0;
uint8_t* data_pointer = nullptr;
std::tie(enterprise, format, length, data_pointer) = counter_record;
if (enterprise == 0) {
sample_counter_types_t sample_type = sample_counter_types_t::BROKEN_COUNTER;
;
if (format == 1) {
sample_type = sample_counter_types_t::GENERIC_INTERFACE_COUNTERS;
} else if (format == 2) {
sample_type = sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS;
}
if (sample_type == sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS) {
// std::cout << "ETHERNET_INTERFACE_COUNTERS" << std::endl;
if (sizeof(ethernet_sflow_interface_counters_t) != length) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we haven't enough data for ethernet counter packet";
return false;
}
ethernet_sflow_interface_counters_t ethernet_counters(data_pointer);
// std::cout << ethernet_counters.print() << std::endl;
}
if (sample_type == sample_counter_types_t::GENERIC_INTERFACE_COUNTERS) {
// std::cout << "GENERIC_INTERFACE_COUNTERS" << std::endl;
if (sizeof(generic_sflow_interface_counters_t) != length) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we haven't enough data for generic packet";
return false;
}
generic_sflow_interface_counters_t generic_sflow_interface_counters(data_pointer);
// std::cout << generic_sflow_interface_counters.print() << std::endl;
}
} else {
for (const auto& counter_record : counter_record_sample_vector) {
if (counter_record.enterprise != 0) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix << "we do not support vendor specific enterprise numbers";
continue;
}
// std::cout << "Counter record" << std::endl;
sample_counter_types_t sample_type = sample_counter_types_t::BROKEN_COUNTER;
if (counter_record.format == 1) {
sample_type = sample_counter_types_t::GENERIC_INTERFACE_COUNTERS;
} else if (counter_record.format == 2) {
sample_type = sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS;
}
if (sample_type == sample_counter_types_t::GENERIC_INTERFACE_COUNTERS) {
// logger << log4cpp::Priority::DEBUG << plugin_log_prefix << "GENERIC_INTERFACE_COUNTERS";
if (sizeof(generic_sflow_interface_counters_t) != counter_record.length) {
logger << log4cpp::Priority::ERROR << plugin_log_prefix
<< "length mismatch for generic interface counter packet: " << counter_record.length;
continue;
}
sflow_generic_interface_counter_sample++;
const generic_sflow_interface_counters_t* generic_sflow_interface_counters =
(const generic_sflow_interface_counters_t*)(counter_record.pointer);
if (logger.getPriority() == log4cpp::Priority::DEBUG) {
logger << log4cpp::Priority::DEBUG << generic_sflow_interface_counters->print();
}
// TODO: we need to get relevant fields in special intermediate structure and then pass them for processing
// somwhere else I think we need one dedicated thread to implement such metrics pusshes to Clickhouse
} else if (sample_type == sample_counter_types_t::ETHERNET_INTERFACE_COUNTERS) {
// These counters provide too detailed information about interface and we do not need it on such level for DDoS detection purposes
}
}
return true;
}

@ -8,5 +8,5 @@ void init_sflow_module();
void deinit_sflow_module();
// New code for v5 only
void parse_sflow_v5_packet(uint8_t* payload_ptr, unsigned int payload_length, uint32_t client_ipv4_address);
void parse_sflow_v5_packet(const uint8_t* payload_ptr, unsigned int payload_length, uint32_t client_ipv4_address);
std::vector<system_counter_t> get_sflow_stats();