1
0
mirror of https://github.com/pavel-odintsov/fastnetmon synced 2024-11-23 00:52:00 +01:00

Migrated Protobuf API to new schema

This commit is contained in:
Pavel Odintsov 2024-07-13 21:06:11 +03:00
parent a8187a7164
commit 8cb11c967f
5 changed files with 331 additions and 37 deletions

@ -769,8 +769,8 @@ if (ENABLE_GOBGP_SUPPORT)
message(STATUS "Protoc return code for Protobuf fastnetmon_internal_api.proto: ${PROTOC_RETURN_CODE} std err: ${PROTOC_STDERR}") message(STATUS "Protoc return code for Protobuf fastnetmon_internal_api.proto: ${PROTOC_RETURN_CODE} std err: ${PROTOC_STDERR}")
add_library(fastnetmon_grpc_pb_cc STATIC fastnetmon.grpc.pb.cc) add_library(fastnetmon_grpc_pb_cc STATIC fastnetmon_internal_api.grpc.pb.cc)
add_library(fastnetmon_pb_cc STATIC fastnetmon.pb.cc) add_library(fastnetmon_pb_cc STATIC fastnetmon_internal_api.pb.cc)
add_executable(fastnetmon_api_client fastnetmon_api_client.cpp) add_executable(fastnetmon_api_client fastnetmon_api_client.cpp)

@ -8,9 +8,9 @@
#include "ban_list.hpp" #include "ban_list.hpp"
Status FastnetmonApiServiceImpl::GetBanlist(::grpc::ServerContext* context, ::grpc::Status FastnetmonApiServiceImpl::GetBanlist(::grpc::ServerContext* context,
const ::fastmitigation::BanListRequest* request, const ::fastnetmoninternal::BanListRequest* request,
::grpc::ServerWriter<::fastmitigation::BanListReply>* writer) { ::grpc::ServerWriter<::fastnetmoninternal::BanListReply>* writer) {
extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6; extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6;
extern blackhole_ban_list_t<uint32_t> ban_list_ipv4; extern blackhole_ban_list_t<uint32_t> ban_list_ipv4;
@ -24,7 +24,7 @@ Status FastnetmonApiServiceImpl::GetBanlist(::grpc::ServerContext* context,
ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy); ban_list_ipv4.get_whole_banlist(ban_list_ipv4_copy);
for (auto itr : ban_list_ipv4_copy) { for (auto itr : ban_list_ipv4_copy) {
BanListReply reply; fastnetmoninternal::BanListReply reply;
reply.set_ip_address(convert_ip_as_uint_to_string(itr.first) + "/32"); reply.set_ip_address(convert_ip_as_uint_to_string(itr.first) + "/32");
@ -39,17 +39,17 @@ Status FastnetmonApiServiceImpl::GetBanlist(::grpc::ServerContext* context,
for (auto itr : ban_list_ipv6_copy) { for (auto itr : ban_list_ipv6_copy) {
BanListReply reply; fastnetmoninternal::BanListReply reply;
reply.set_ip_address(print_ipv6_cidr_subnet(itr.first)); reply.set_ip_address(print_ipv6_cidr_subnet(itr.first));
writer->Write(reply); writer->Write(reply);
} }
return Status::OK; return grpc::Status::OK;
} }
Status FastnetmonApiServiceImpl::ExecuteBan(ServerContext* context, ::grpc::Status FastnetmonApiServiceImpl::ExecuteBan(ServerContext* context,
const fastmitigation::ExecuteBanRequest* request, const fastnetmoninternal::ExecuteBanRequest* request,
fastmitigation::ExecuteBanReply* reply) { fastnetmoninternal::ExecuteBanReply* reply) {
extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6; extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6;
extern blackhole_ban_list_t<uint32_t> ban_list_ipv4; extern blackhole_ban_list_t<uint32_t> ban_list_ipv4;
extern patricia_tree_t *lookup_tree_ipv4; extern patricia_tree_t *lookup_tree_ipv4;
@ -131,12 +131,12 @@ Status FastnetmonApiServiceImpl::ExecuteBan(ServerContext* context,
call_blackhole_actions_per_host(attack_action_t::ban, client_ip, ipv6_address, ipv6, current_attack, call_blackhole_actions_per_host(attack_action_t::ban, client_ip, ipv6_address, ipv6, current_attack,
attack_detection_source_t::Automatic, flow_attack_details, empty_simple_packets_buffer, empty_raw_packets_buffer); attack_detection_source_t::Automatic, flow_attack_details, empty_simple_packets_buffer, empty_raw_packets_buffer);
return Status::OK; return grpc::Status::OK;
} }
Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context, ::grpc::Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context,
const fastmitigation::ExecuteBanRequest* request, const fastnetmoninternal::ExecuteBanRequest* request,
fastmitigation::ExecuteBanReply* reply) { fastnetmoninternal::ExecuteBanReply* reply) {
extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6; extern blackhole_ban_list_t<subnet_ipv6_cidr_mask_t> ban_list_ipv6;
extern blackhole_ban_list_t<uint32_t> ban_list_ipv4; extern blackhole_ban_list_t<uint32_t> ban_list_ipv4;
@ -176,7 +176,7 @@ Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context,
if (!is_blackholed_ipv4) { if (!is_blackholed_ipv4) {
logger << log4cpp::Priority::ERROR << "API: Could not find IPv4 address in ban list"; logger << log4cpp::Priority::ERROR << "API: Could not find IPv4 address in ban list";
return Status::CANCELLED; return grpc::Status::CANCELLED;
} }
bool get_details = ban_list_ipv4.get_blackhole_details(client_ip, current_attack); bool get_details = ban_list_ipv4.get_blackhole_details(client_ip, current_attack);
@ -198,7 +198,7 @@ Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context,
if (!is_blackholed_ipv6) { if (!is_blackholed_ipv6) {
logger << log4cpp::Priority::ERROR << "API: Could not find IPv6 address in ban list"; logger << log4cpp::Priority::ERROR << "API: Could not find IPv6 address in ban list";
return Status::CANCELLED; return grpc::Status::CANCELLED;
} }
bool get_details = ban_list_ipv6.get_blackhole_details(ipv6_address, current_attack); bool get_details = ban_list_ipv6.get_blackhole_details(ipv6_address, current_attack);
@ -220,5 +220,288 @@ Status FastnetmonApiServiceImpl::ExecuteUnBan(ServerContext* context,
call_blackhole_actions_per_host(attack_action_t::unban, client_ip, ipv6_address, ipv6, call_blackhole_actions_per_host(attack_action_t::unban, client_ip, ipv6_address, ipv6,
current_attack, attack_detection_source_t::Automatic, flow_attack_details, simple_packets_buffer, raw_packets_buffer); current_attack, attack_detection_source_t::Automatic, flow_attack_details, simple_packets_buffer, raw_packets_buffer);
return Status::OK; return grpc::Status::OK;
} }
void fill_total_traffic_counters_api(::grpc::ServerWriter<::fastnetmoninternal::SixtyFourNamedCounter>* writer,
const direction_t& packet_direction,
const total_speed_counters_t& total_counters,
bool return_per_protocol_metrics,
const std::string& unit) {
std::string direction_as_string = get_direction_name(packet_direction);
fastnetmoninternal::SixtyFourNamedCounter reply;
reply.set_counter_name(direction_as_string + " traffic");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].total.packets);
reply.set_counter_unit("pps");
writer->Write(reply);
if (return_per_protocol_metrics) {
// tcp
reply.set_counter_name(direction_as_string + " tcp traffic");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].tcp.packets);
reply.set_counter_unit("pps");
writer->Write(reply);
// udp
reply.set_counter_name(direction_as_string + " udp traffic");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].udp.packets);
reply.set_counter_unit("pps");
writer->Write(reply);
// icmp
reply.set_counter_name(direction_as_string + " icmp traffic");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].icmp.packets);
reply.set_counter_unit("pps");
writer->Write(reply);
// fragmented
reply.set_counter_name(direction_as_string + " fragmented traffic");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].fragmented.packets);
reply.set_counter_unit("pps");
writer->Write(reply);
// tcp_syn
reply.set_counter_name(direction_as_string + " tcp_syn traffic");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].tcp_syn.packets);
reply.set_counter_unit("pps");
writer->Write(reply);
// dropped
reply.set_counter_name(direction_as_string + " dropped traffic");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].dropped.packets);
reply.set_counter_unit("pps");
writer->Write(reply);
}
// Write traffic speed with same name but with other unit
reply.set_counter_name(direction_as_string + " traffic");
if (unit == "bps") {
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].total.bytes * 8);
reply.set_counter_unit("bps");
} else {
reply.set_counter_value(
convert_speed_to_mbps(total_counters.total_speed_average_counters[packet_direction].total.bytes));
reply.set_counter_unit("mbps");
}
writer->Write(reply);
if (return_per_protocol_metrics) {
// tcp
reply.set_counter_name(direction_as_string + " tcp traffic");
if (unit == "bps") {
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].tcp.bytes * 8);
reply.set_counter_unit("bps");
} else {
reply.set_counter_value(
convert_speed_to_mbps(total_counters.total_speed_average_counters[packet_direction].tcp.bytes));
reply.set_counter_unit("mbps");
}
writer->Write(reply);
// udp
reply.set_counter_name(direction_as_string + " udp traffic");
if (unit == "bps") {
reply.set_counter_unit("bps");
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].udp.bytes * 8);
} else {
reply.set_counter_value(
convert_speed_to_mbps(total_counters.total_speed_average_counters[packet_direction].udp.bytes));
reply.set_counter_unit("mbps");
}
writer->Write(reply);
// icmp
reply.set_counter_name(direction_as_string + " icmp traffic");
if (unit == "bps") {
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].icmp.bytes * 8);
reply.set_counter_unit("bps");
} else {
reply.set_counter_value(
convert_speed_to_mbps(total_counters.total_speed_average_counters[packet_direction].icmp.bytes));
reply.set_counter_unit("mbps");
}
writer->Write(reply);
// fragmented
reply.set_counter_name(direction_as_string + " fragmented traffic");
if (unit == "bps") {
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].fragmented.bytes * 8);
reply.set_counter_unit("bps");
} else {
reply.set_counter_value(
convert_speed_to_mbps(total_counters.total_speed_average_counters[packet_direction].fragmented.bytes));
reply.set_counter_unit("mbps");
}
writer->Write(reply);
// tcp_syn
reply.set_counter_name(direction_as_string + " tcp_syn traffic");
if (unit == "bps") {
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].tcp_syn.bytes * 8);
reply.set_counter_unit("bps");
} else {
reply.set_counter_value(
convert_speed_to_mbps(total_counters.total_speed_average_counters[packet_direction].tcp_syn.bytes));
reply.set_counter_unit("mbps");
}
writer->Write(reply);
// dropped
reply.set_counter_name(direction_as_string + " dropped traffic");
if (unit == "bps") {
reply.set_counter_value(total_counters.total_speed_average_counters[packet_direction].dropped.bytes * 8);
reply.set_counter_unit("bps");
} else {
reply.set_counter_value(
convert_speed_to_mbps(total_counters.total_speed_average_counters[packet_direction].dropped.bytes));
reply.set_counter_unit("mbps");
}
writer->Write(reply);
}
}
::grpc::Status
FastnetmonApiServiceImpl::GetTotalTrafficCounters([[maybe_unused]] ::grpc::ServerContext* context,
const ::fastnetmoninternal::GetTotalTrafficCountersRequest* request,
::grpc::ServerWriter<::fastnetmoninternal::SixtyFourNamedCounter>* writer) {
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern total_speed_counters_t total_counters;
logger << log4cpp::Priority::DEBUG << "API we asked for GetTotalTrafficCounters";
extern total_speed_counters_t total_counters;
std::vector<direction_t> directions = { INCOMING, OUTGOING, INTERNAL, OTHER };
bool get_per_protocol_metrics = request->get_per_protocol_metrics();
extern bool enable_connection_tracking;
std::string unit = request->unit();
for (auto packet_direction : directions) {
// Forward our total counters to API format
fill_total_traffic_counters_api(writer, packet_direction, total_counters, get_per_protocol_metrics, unit);
if (enable_connection_tracking) {
fastnetmoninternal::SixtyFourNamedCounter reply;
std::string direction_as_string = get_direction_name(packet_direction);
reply.set_counter_name(direction_as_string + " traffic");
// Populate flow per second rates
if (packet_direction == INCOMING) {
reply.set_counter_unit("flows");
reply.set_counter_value(incoming_total_flows_speed);
writer->Write(reply);
} else if (packet_direction == OUTGOING) {
reply.set_counter_unit("flows");
reply.set_counter_value(outgoing_total_flows_speed);
writer->Write(reply);
}
}
}
return grpc::Status::OK;
}
::grpc::Status
FastnetmonApiServiceImpl::GetTotalTrafficCountersV6([[maybe_unused]] ::grpc::ServerContext* context,
const ::fastnetmoninternal::GetTotalTrafficCountersRequest* request,
::grpc::ServerWriter<::fastnetmoninternal::SixtyFourNamedCounter>* writer) {
extern total_speed_counters_t total_counters_ipv6;
logger << log4cpp::Priority::DEBUG << "API we asked for GetTotalTrafficCountersV6";
std::vector<direction_t> directions = { INCOMING, OUTGOING, INTERNAL, OTHER };
bool get_per_protocol_metrics = request->get_per_protocol_metrics();
std::string unit = request->unit();
for (auto packet_direction : directions) {
// Forward our total counters to API format
fill_total_traffic_counters_api(writer, packet_direction, total_counters_ipv6, get_per_protocol_metrics, unit);
}
return grpc::Status::OK;
}
::grpc::Status
FastnetmonApiServiceImpl::GetTotalTrafficCountersV4([[maybe_unused]] ::grpc::ServerContext* context,
const ::fastnetmoninternal::GetTotalTrafficCountersRequest* request,
::grpc::ServerWriter<::fastnetmoninternal::SixtyFourNamedCounter>* writer) {
extern uint64_t incoming_total_flows_speed;
extern uint64_t outgoing_total_flows_speed;
extern total_speed_counters_t total_counters_ipv4;
logger << log4cpp::Priority::DEBUG << "API we asked for GetTotalTrafficCounters";
extern total_speed_counters_t total_counters_ipv4;
extern bool enable_connection_tracking;
std::vector<direction_t> directions = { INCOMING, OUTGOING, INTERNAL, OTHER };
bool get_per_protocol_metrics = request->get_per_protocol_metrics();
std::string unit = request->unit();
for (auto packet_direction : directions) {
fill_total_traffic_counters_api(writer, packet_direction, total_counters_ipv4, get_per_protocol_metrics, unit);
if (enable_connection_tracking) {
fastnetmoninternal::SixtyFourNamedCounter reply;
std::string direction_as_string = get_direction_name(packet_direction);
reply.set_counter_name(direction_as_string + " traffic");
// Populate flow per second rates
if (packet_direction == INCOMING) {
reply.set_counter_unit("flows");
reply.set_counter_value(incoming_total_flows_speed);
writer->Write(reply);
} else if (packet_direction == OUTGOING) {
reply.set_counter_unit("flows");
reply.set_counter_value(outgoing_total_flows_speed);
writer->Write(reply);
}
}
}
return grpc::Status::OK;
}

@ -1,22 +1,30 @@
#include "fastnetmon.grpc.pb.h" #include "fastnetmon_internal_api.grpc.pb.h"
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
// API declaration // API declaration
using fastmitigation::BanListReply;
using fastmitigation::BanListRequest;
using fastmitigation::Fastnetmon;
using grpc::Server; using grpc::Server;
using grpc::ServerBuilder; using grpc::ServerBuilder;
using grpc::ServerContext; using grpc::ServerContext;
using grpc::Status;
class FastnetmonApiServiceImpl final : public Fastnetmon::Service { class FastnetmonApiServiceImpl final : public fastnetmoninternal::Fastnetmon::Service {
Status GetBanlist(::grpc::ServerContext* context, ::grpc::Status GetBanlist(::grpc::ServerContext* context,
const ::fastmitigation::BanListRequest* request, const ::fastnetmoninternal::BanListRequest* request,
::grpc::ServerWriter<::fastmitigation::BanListReply>* writer) override; ::grpc::ServerWriter<::fastnetmoninternal::BanListReply>* writer) override;
::grpc::Status ExecuteBan(ServerContext* context, const fastnetmoninternal::ExecuteBanRequest* request, fastnetmoninternal::ExecuteBanReply* reply) override;
::grpc::Status ExecuteUnBan(ServerContext* context,
const fastnetmoninternal::ExecuteBanRequest* request,
fastnetmoninternal::ExecuteBanReply* reply) override;
::grpc::Status GetTotalTrafficCounters([[maybe_unused]] ::grpc::ServerContext* context,
const ::fastnetmoninternal::GetTotalTrafficCountersRequest* request,
::grpc::ServerWriter<::fastnetmoninternal::SixtyFourNamedCounter>* writer) override;
::grpc::Status GetTotalTrafficCountersV6([[maybe_unused]] ::grpc::ServerContext* context,
const ::fastnetmoninternal::GetTotalTrafficCountersRequest* request,
::grpc::ServerWriter<::fastnetmoninternal::SixtyFourNamedCounter>* writer) override;
::grpc::Status GetTotalTrafficCountersV4([[maybe_unused]] ::grpc::ServerContext* context,
const ::fastnetmoninternal::GetTotalTrafficCountersRequest* request,
::grpc::ServerWriter<::fastnetmoninternal::SixtyFourNamedCounter>* writer) override;
Status ExecuteBan(ServerContext* context, const fastmitigation::ExecuteBanRequest* request, fastmitigation::ExecuteBanReply* reply) override;
Status ExecuteUnBan(ServerContext* context,
const fastmitigation::ExecuteBanRequest* request,
fastmitigation::ExecuteBanReply* reply) override;
}; };

@ -4,11 +4,11 @@
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
#include "fastnetmon.grpc.pb.h" #include "fastnetmon_internal_api.grpc.pb.h"
using fastmitigation::BanListReply; using fastnetmoninternal::BanListReply;
using fastmitigation::BanListRequest; using fastnetmoninternal::BanListRequest;
using fastmitigation::Fastnetmon; using fastnetmoninternal::Fastnetmon;
using grpc::Channel; using grpc::Channel;
using grpc::ClientContext; using grpc::ClientContext;
using grpc::Status; using grpc::Status;
@ -22,8 +22,8 @@ class FastnetmonClient {
void ExecuteBan(std::string host, bool is_ban) { void ExecuteBan(std::string host, bool is_ban) {
ClientContext context; ClientContext context;
fastmitigation::ExecuteBanRequest request; fastnetmoninternal::ExecuteBanRequest request;
fastmitigation::ExecuteBanReply reply; fastnetmoninternal::ExecuteBanReply reply;
request.set_ip_address(host); request.set_ip_address(host);

@ -5,6 +5,9 @@ package fastnetmoninternal;
option go_package = "./;fastnetmoninternal"; option go_package = "./;fastnetmoninternal";
service Fastnetmon { service Fastnetmon {
// TODO: legacy to remove and replace by DisableMitigation
rpc ExecuteUnBan(ExecuteBanRequest) returns (ExecuteBanReply) {}
// Pings gRPC server to check availability // Pings gRPC server to check availability
rpc Ping(PingRequest) returns (PingReply) {} rpc Ping(PingRequest) returns (PingReply) {}