1
0
mirror of https://github.com/pavel-odintsov/fastnetmon synced 2024-09-20 03:33:04 +02:00

Added logic to send all traffic in JSON format to Kafka

This commit is contained in:
Pavel Odintsov 2023-02-05 20:14:44 +00:00
parent 8ed555546a
commit 38bf9d15dd
3 changed files with 50 additions and 1 deletions

View File

@ -162,3 +162,4 @@ bool execute_web_request_secure(std::string address,
std::string& error_text);
std::string forwarding_status_to_string(forwarding_status_t status);
std::string country_static_string_to_dynamic_string(const boost::beast::static_string<2>& country_code);
bool serialize_simple_packet_to_json(const simple_packet_t& packet, nlohmann::json& json_packet);

View File

@ -137,6 +137,7 @@ bool enable_api = false;
#ifdef KAFKA
cppkafka::Producer* kafka_traffic_export_producer = nullptr;
#endif
// Traffic export to Kafka
bool kafka_traffic_export = false;
@ -144,7 +145,6 @@ bool kafka_traffic_export = false;
std::string kafka_traffic_export_topic = "fastnetmon";
kafka_traffic_export_format_t kafka_traffic_export_format = kafka_traffic_export_format_t::JSON;
std::vector<std::string> kafka_traffic_export_brokers;
#endif
std::chrono::steady_clock::time_point last_call_of_traffic_recalculation;

View File

@ -46,6 +46,9 @@
#include "ban_list.hpp"
#ifdef KAFKA
#include <cppkafka/cppkafka.h>
#endif
extern uint64_t influxdb_writes_total;
extern uint64_t influxdb_writes_failed;
@ -2599,8 +2602,43 @@ void zeroify_all_flow_counters() {
}
}
// Exportst traffic to Kafka
void export_to_kafka(const simple_packet_t& current_packet) {
extern std::string kafka_traffic_export_topic;
#ifdef KAFKA
extern cppkafka::Producer* kafka_traffic_export_producer;
#endif
nlohmann::json json_packet;
if (!serialize_simple_packet_to_json(current_packet, json_packet)) {
return;
}
std::string simple_packet_as_json_string = json_packet.dump();
try {
kafka_traffic_export_producer->produce(
cppkafka::MessageBuilder(kafka_traffic_export_topic)
.partition(RD_KAFKA_PARTITION_UA)
.payload(simple_packet_as_json_string));
} catch (...) {
// We do not log it as it will flood log files
// logger << log4cpp::Priority::ERROR << "Kafka write failed";
}
try {
kafka_traffic_export_producer->flush();
} catch (...) {
// We do not log it as it will flood log files
// logger << log4cpp::Priority::ERROR << "Kafka flush failed";
}
}
// Process IPv6 traffic
void process_ipv6_packet(simple_packet_t& current_packet) {
extern bool kafka_traffic_export;
uint64_t sampled_number_of_packets = current_packet.number_of_packets * current_packet.sample_ratio;
uint64_t sampled_number_of_bytes = current_packet.length * current_packet.sample_ratio;
@ -2609,6 +2647,10 @@ void process_ipv6_packet(simple_packet_t& current_packet) {
current_packet.packet_direction =
get_packet_direction_ipv6(lookup_tree_ipv6, current_packet.src_ipv6, current_packet.dst_ipv6, ipv6_cidr_subnet);
if (kafka_traffic_export) {
export_to_kafka(current_packet);
}
#ifdef USE_NEW_ATOMIC_BUILTINS
__atomic_add_fetch(&total_counters_ipv6.total_counters[current_packet.packet_direction].packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&total_counters_ipv6.total_counters[current_packet.packet_direction].bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
@ -2669,6 +2711,8 @@ void process_ipv6_packet(simple_packet_t& current_packet) {
// Process simple unified packet
void process_packet(simple_packet_t& current_packet) {
extern bool kafka_traffic_export;
// Packets dump is very useful for bug hunting
if (DEBUG_DUMP_ALL_PACKETS) {
logger << log4cpp::Priority::INFO << "Dump: " << print_simple_packet(current_packet);
@ -2719,6 +2763,10 @@ void process_packet(simple_packet_t& current_packet) {
current_packet.packet_direction =
get_packet_direction(lookup_tree_ipv4, current_packet.src_ip, current_packet.dst_ip, current_subnet);
if (kafka_traffic_export) {
export_to_kafka(current_packet);
}
// It's useful in case when we can't find what packets do not processed correctly
if (DEBUG_DUMP_OTHER_PACKETS && current_packet.packet_direction == OTHER) {
logger << log4cpp::Priority::INFO << "Dump other: " << print_simple_packet(current_packet);