Added logic to track IPFIX sampling rate per device basis

This commit is contained in:
Pavel Odintsov 2021-01-23 17:30:35 +00:00
parent e3253963c0
commit a82cd92cdb

View File

@ -40,6 +40,10 @@ unsigned int netflow_sampling_ratio = 1;
std::mutex netflow9_sampling_rates_mutex;
std::map<std::string, uint32_t> netflow9_sampling_rates;
// and IPFIX
std::mutex ipfix_sampling_rates_mutex;
std::map<std::string, uint32_t> ipfix_sampling_rates;
std::string netflow_plugin_name = "netflow";
std::string netflow_plugin_log_prefix = netflow_plugin_name + ": ";
@ -1087,6 +1091,77 @@ bool nf10_rec_to_flow(uint32_t record_type, uint32_t record_length, uint8_t* dat
return true;
}
// Read options data packet with known templat
bool nf10_options_flowset_to_store(uint8_t* pkt, size_t len, nf10_header_t* nf10_hdr, peer_nf9_template* flow_template, std::string client_addres_in_string_format) {
// Skip scope fields, I really do not want to parse this informations
pkt += flow_template->option_scope_length;
auto template_records = flow_template->records;
uint32_t sampling_rate = 0;
uint32_t offset = 0;
for (auto elem : template_records) {
uint8_t* data_shift = pkt + offset;
// Time to extract sampling rate
if (elem.record_type == NF10_SAMPLING_INTERVAL) {
// RFC suggest that this field is 4 byte: https://www.iana.org/assignments/ipfix/ipfix.xhtml
if (elem.record_length > sizeof(sampling_rate)) {
logger << log4cpp::Priority::ERROR << "Unexpectedly big size for IPFIX_SAMPLING_INTERVAL: " << elem.record_length;
return false;
}
bool result = be_copy_function(data_shift, (uint8_t*)&sampling_rate, sizeof(sampling_rate), elem.record_length);
if (!result) {
logger << log4cpp::Priority::ERROR
<< "Prevented attempt to read outside of allowed memory region for IPFIX_SAMPLING_INTERVAL";
return false;
}
} else if (elem.record_type == NF10_SAMPLING_PACKET_INTERVAL) {
// RFC suggest that this field is 4 byte: https://www.iana.org/assignments/ipfix/ipfix.xhtml
if (elem.record_length > sizeof(sampling_rate)) {
logger << log4cpp::Priority::ERROR
<< "Unexpectedly big size for IPFIX_SAMPLING_PACKET_INTERVAL: " << elem.record_length;
return false;
}
bool result = be_copy_function(data_shift, (uint8_t*)&sampling_rate, sizeof(sampling_rate), elem.record_length);
if (!result) {
logger << log4cpp::Priority::ERROR << "Prevented attempt to read outside of allowed memory region for IPFIX_SAMPLING_PACKET_INTERVAL";
return false;
}
}
offset += elem.record_length;
}
if (sampling_rate != 0) {
auto new_sampling_rate = fast_ntoh(sampling_rate);
ipfix_custom_sampling_rate_received++;
logger << log4cpp::Priority::DEBUG << "I extracted sampling rate: " << new_sampling_rate << " for "
<< client_addres_in_string_format;
// Replace old sampling rate value
std::lock_guard<std::mutex> lock(ipfix_sampling_rates_mutex);
auto old_sampling_rate = ipfix_sampling_rates[client_addres_in_string_format];
if (old_sampling_rate != new_sampling_rate) {
ipfix_sampling_rates[client_addres_in_string_format] = new_sampling_rate;
ipfix_sampling_rate_changes++;
logger << log4cpp::Priority::DEBUG << "Change IPFIX sampling rate from " << old_sampling_rate << " to "
<< new_sampling_rate << " for " << client_addres_in_string_format;
}
}
return true;
}
// We should rewrite nf9_flowset_to_store accroding to fixes here
void nf10_flowset_to_store(uint8_t* pkt, size_t len, nf10_header_t* nf10_hdr, peer_nf9_template* field_template, uint32_t client_ipv4_address) {