From 38bf9d15dd75ccbf1dce4bf13d14b9d2a93f10f3 Mon Sep 17 00:00:00 2001 From: Pavel Odintsov Date: Sun, 5 Feb 2023 20:14:44 +0000 Subject: [PATCH] Added logic to send all traffic in JSON format to Kafka --- src/fast_library.hpp | 1 + src/fastnetmon.cpp | 2 +- src/fastnetmon_logic.cpp | 48 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/src/fast_library.hpp b/src/fast_library.hpp index afb8dca..c516915 100644 --- a/src/fast_library.hpp +++ b/src/fast_library.hpp @@ -162,3 +162,4 @@ bool execute_web_request_secure(std::string address, std::string& error_text); std::string forwarding_status_to_string(forwarding_status_t status); std::string country_static_string_to_dynamic_string(const boost::beast::static_string<2>& country_code); +bool serialize_simple_packet_to_json(const simple_packet_t& packet, nlohmann::json& json_packet); diff --git a/src/fastnetmon.cpp b/src/fastnetmon.cpp index 3451135..d4cfee8 100644 --- a/src/fastnetmon.cpp +++ b/src/fastnetmon.cpp @@ -137,6 +137,7 @@ bool enable_api = false; #ifdef KAFKA cppkafka::Producer* kafka_traffic_export_producer = nullptr; +#endif // Traffic export to Kafka bool kafka_traffic_export = false; @@ -144,7 +145,6 @@ bool kafka_traffic_export = false; std::string kafka_traffic_export_topic = "fastnetmon"; kafka_traffic_export_format_t kafka_traffic_export_format = kafka_traffic_export_format_t::JSON; std::vector kafka_traffic_export_brokers; -#endif std::chrono::steady_clock::time_point last_call_of_traffic_recalculation; diff --git a/src/fastnetmon_logic.cpp b/src/fastnetmon_logic.cpp index 244988a..71733c1 100644 --- a/src/fastnetmon_logic.cpp +++ b/src/fastnetmon_logic.cpp @@ -46,6 +46,9 @@ #include "ban_list.hpp" +#ifdef KAFKA +#include +#endif extern uint64_t influxdb_writes_total; extern uint64_t influxdb_writes_failed; @@ -2599,8 +2602,43 @@ void zeroify_all_flow_counters() { } } +// Exportst traffic to Kafka +void export_to_kafka(const simple_packet_t& current_packet) { + extern std::string kafka_traffic_export_topic; +#ifdef KAFKA + extern cppkafka::Producer* kafka_traffic_export_producer; +#endif + + nlohmann::json json_packet; + + if (!serialize_simple_packet_to_json(current_packet, json_packet)) { + return; + } + + std::string simple_packet_as_json_string = json_packet.dump(); + + try { + kafka_traffic_export_producer->produce( + cppkafka::MessageBuilder(kafka_traffic_export_topic) + .partition(RD_KAFKA_PARTITION_UA) + .payload(simple_packet_as_json_string)); + } catch (...) { + // We do not log it as it will flood log files + // logger << log4cpp::Priority::ERROR << "Kafka write failed"; + } + + try { + kafka_traffic_export_producer->flush(); + } catch (...) { + // We do not log it as it will flood log files + // logger << log4cpp::Priority::ERROR << "Kafka flush failed"; + } +} + // Process IPv6 traffic void process_ipv6_packet(simple_packet_t& current_packet) { + extern bool kafka_traffic_export; + uint64_t sampled_number_of_packets = current_packet.number_of_packets * current_packet.sample_ratio; uint64_t sampled_number_of_bytes = current_packet.length * current_packet.sample_ratio; @@ -2609,6 +2647,10 @@ void process_ipv6_packet(simple_packet_t& current_packet) { current_packet.packet_direction = get_packet_direction_ipv6(lookup_tree_ipv6, current_packet.src_ipv6, current_packet.dst_ipv6, ipv6_cidr_subnet); + if (kafka_traffic_export) { + export_to_kafka(current_packet); + } + #ifdef USE_NEW_ATOMIC_BUILTINS __atomic_add_fetch(&total_counters_ipv6.total_counters[current_packet.packet_direction].packets, sampled_number_of_packets, __ATOMIC_RELAXED); __atomic_add_fetch(&total_counters_ipv6.total_counters[current_packet.packet_direction].bytes, sampled_number_of_bytes, __ATOMIC_RELAXED); @@ -2669,6 +2711,8 @@ void process_ipv6_packet(simple_packet_t& current_packet) { // Process simple unified packet void process_packet(simple_packet_t& current_packet) { + extern bool kafka_traffic_export; + // Packets dump is very useful for bug hunting if (DEBUG_DUMP_ALL_PACKETS) { logger << log4cpp::Priority::INFO << "Dump: " << print_simple_packet(current_packet); @@ -2719,6 +2763,10 @@ void process_packet(simple_packet_t& current_packet) { current_packet.packet_direction = get_packet_direction(lookup_tree_ipv4, current_packet.src_ip, current_packet.dst_ip, current_subnet); + if (kafka_traffic_export) { + export_to_kafka(current_packet); + } + // It's useful in case when we can't find what packets do not processed correctly if (DEBUG_DUMP_OTHER_PACKETS && current_packet.packet_direction == OTHER) { logger << log4cpp::Priority::INFO << "Dump other: " << print_simple_packet(current_packet);