Added logic to apply IPFIX sampling rate

This commit is contained in:
Pavel Odintsov 2021-01-23 18:23:37 +00:00
parent a82cd92cdb
commit 72a4ec3d6f

View File

@ -514,7 +514,7 @@ bool process_netflow_v10_template(uint8_t* pkt, size_t len, uint32_t source_id,
uint32_t count = ntohs(tmplh->count); uint32_t count = ntohs(tmplh->count);
offset += sizeof(*tmplh); offset += sizeof(*tmplh);
std::vector<peer_nf9_record_t> template_records_map; std::vector<peer_nf9_record_t> template_records_map;
uint32_t total_size = 0; uint32_t total_size = 0;
for (uint32_t i = 0; i < count; i++) { for (uint32_t i = 0; i < count; i++) {
if (offset >= len) { if (offset >= len) {
@ -1164,7 +1164,7 @@ bool nf10_options_flowset_to_store(uint8_t* pkt, size_t len, nf10_header_t* nf10
} }
// We should rewrite nf9_flowset_to_store accroding to fixes here // We should rewrite nf9_flowset_to_store accroding to fixes here
void nf10_flowset_to_store(uint8_t* pkt, size_t len, nf10_header_t* nf10_hdr, peer_nf9_template* field_template, uint32_t client_ipv4_address) { void nf10_flowset_to_store(uint8_t* pkt, size_t len, nf10_header_t* nf10_hdr, peer_nf9_template* field_template, uint32_t client_ipv4_address, const std::string& client_addres_in_string_format) {
uint32_t offset = 0; uint32_t offset = 0;
if (len < field_template->total_len) { if (len < field_template->total_len) {
@ -1182,7 +1182,17 @@ void nf10_flowset_to_store(uint8_t* pkt, size_t len, nf10_header_t* nf10_hdr, pe
packet.number_of_packets = 0; packet.number_of_packets = 0;
packet.ts.tv_sec = ntohl(nf10_hdr->time_sec); packet.ts.tv_sec = ntohl(nf10_hdr->time_sec);
packet.sample_ratio = netflow_sampling_ratio; {
std::lock_guard<std::mutex> lock(ipfix_sampling_rates_mutex);
auto itr = ipfix_sampling_rates.find(client_addres_in_string_format);
if (itr == ipfix_sampling_rates.end()) {
// Use global value
packet.sample_ratio = netflow_sampling_ratio;
} else {
packet.sample_ratio = itr->second;
}
}
// By default, assume IPv4 traffic here // By default, assume IPv4 traffic here
// But code below can switch it to IPv6 // But code below can switch it to IPv6
@ -1433,7 +1443,7 @@ void nf9_flowset_to_store(uint8_t* pkt,
netflow_meta_info_t flow_meta; netflow_meta_info_t flow_meta;
// We should iterate over all available template fields // We should iterate over all available template fields
for (std::vector<peer_nf9_record_t> ::iterator iter = template_records.begin(); iter != template_records.end(); iter++) { for (std::vector<peer_nf9_record_t>::iterator iter = template_records.begin(); iter != template_records.end(); iter++) {
uint32_t record_type = iter->record_type; uint32_t record_type = iter->record_type;
uint32_t record_length = iter->record_length; uint32_t record_length = iter->record_length;
@ -1505,7 +1515,7 @@ void nf9_flowset_to_store(uint8_t* pkt,
netflow_process_func_ptr(packet); netflow_process_func_ptr(packet);
} }
int process_netflow_v10_data(uint8_t* pkt, bool process_netflow_v10_data(uint8_t* pkt,
size_t len, size_t len,
nf10_header_t* nf10_hdr, nf10_header_t* nf10_hdr,
uint32_t source_id, uint32_t source_id,
@ -1514,9 +1524,12 @@ int process_netflow_v10_data(uint8_t* pkt,
nf10_data_flowset_header_t* dath = (nf10_data_flowset_header_t*)pkt; nf10_data_flowset_header_t* dath = (nf10_data_flowset_header_t*)pkt;
// Store packet end, it's useful for sanity checks
uint8_t* packet_end = pkt + len;
if (len < sizeof(*dath)) { if (len < sizeof(*dath)) {
logger << log4cpp::Priority::INFO << "Short netflow v10 data flowset header"; logger << log4cpp::Priority::ERROR << "Short netflow v10 data flowset header. Agent: " << client_addres_in_string_format;
return 1; return false;
} }
uint32_t flowset_id = ntohs(dath->c.flowset_id); uint32_t flowset_id = ntohs(dath->c.flowset_id);
@ -1532,12 +1545,12 @@ int process_netflow_v10_data(uint8_t* pkt,
"seconds. We need some " "seconds. We need some "
"time to learn it!"; "time to learn it!";
return 1; return false;
} }
if (flowset_template->records.empty()) { if (flowset_template->records.empty()) {
logger << log4cpp::Priority::ERROR << "Blank records in template"; logger << log4cpp::Priority::ERROR << "Blank records in IPFIX template. Agent: " << client_addres_in_string_format;
return 1; return false;
} }
uint32_t offset = sizeof(*dath); uint32_t offset = sizeof(*dath);
@ -1545,17 +1558,35 @@ int process_netflow_v10_data(uint8_t* pkt,
if (num_flowsets == 0 || num_flowsets > 0x4000) { if (num_flowsets == 0 || num_flowsets > 0x4000) {
logger << log4cpp::Priority::ERROR << "Invalid number of data flowset, strange number of flows: " << num_flowsets; logger << log4cpp::Priority::ERROR << "Invalid number of data flowset, strange number of flows: " << num_flowsets;
return 1; return false;
} }
for (uint32_t i = 0; i < num_flowsets; i++) { if (flowset_template->type == netflow9_template_type::Data) {
// process whole flowset
nf10_flowset_to_store(pkt + offset, flowset_template->total_len, nf10_hdr, flowset_template, client_ipv4_address);
offset += flowset_template->total_len; for (uint32_t i = 0; i < num_flowsets; i++) {
// process whole flowset
nf10_flowset_to_store(pkt + offset, flowset_template->total_len, nf10_hdr, flowset_template,
client_ipv4_address, client_addres_in_string_format);
offset += flowset_template->total_len;
}
} else if (flowset_template->type == netflow9_template_type::Options) {
ipfix_options_packet_number++;
// Check that we will not read outside of packet
if (pkt + offset + flowset_template->total_len > packet_end) {
logger << log4cpp::Priority::ERROR << "We tried to read data outside packet for IPFIX options. "
<< "Agent: " << client_addres_in_string_format;
return 1;
}
// Process options packet
nf10_options_flowset_to_store(pkt + offset, flowset_template->total_len, nf10_hdr, flowset_template,
client_addres_in_string_format);
} }
return 0; return true;
} }
int process_netflow_v9_data(uint8_t* pkt, int process_netflow_v9_data(uint8_t* pkt,