Add support for LUA into Netflow v5 plugin. Since this commit we could drop or pass flows to processing according to rich logic

This commit is contained in:
Pavel Odintsov 2015-06-22 04:19:04 -04:00
parent 8b7edb8a5c
commit 4eb385f21c
4 changed files with 136 additions and 32 deletions

View File

@ -103,6 +103,15 @@ add_library(sflow_plugin STATIC sflow_plugin/sflow_collector.cpp)
add_library(netflow_plugin STATIC netflow_plugin/netflow_collector.cpp)
target_link_libraries(netflow_plugin ipfix_rfc)
# We do not enable it by default, it's testing feature
# If you want it please build with:
# cmake -DENABLE_LUA_SUPPORT=ON ..
if (ENABLE_LUA_SUPPORT)
add_definitions(-DENABLE_LUA_HOOKS)
target_link_libraries(netflow_plugin luajit-5.1)
endif()
# pcap plugin
add_library(pcap_plugin STATIC pcap_plugin/pcap_collector.cpp)
target_link_libraries(pcap_plugin pcap)

43
src/netflow_hooks.lua Normal file
View File

@ -0,0 +1,43 @@
local json = require("json")
-- We have this library bundled only in luajit:
-- g++ lua_integration.cpp -lluajit-5.1
-- Before production use, please call your code with luajit CLI
local ffi = require("ffi")
-- Load declaration from the inside separate header file
-- This code should be in sync with https://github.com/FastVPSEestiOu/fastnetmon/blob/master/src/netflow_plugin/netflow.h
-- And we use uintXX_t instead u_intXX_t here
ffi.cdef([[typedef struct __attribute__((packed)) NF5_FLOW {
uint32_t src_ip, dest_ip, nexthop_ip;
uint16_t if_index_in, if_index_out;
uint32_t flow_packets, flow_octets;
uint32_t flow_start, flow_finish;
uint16_t src_port, dest_port;
uint8_t pad1;
uint8_t tcp_flags, protocol, tos;
uint16_t src_as, dest_as;
uint8_t src_mask, dst_mask;
uint16_t pad2;
} NF5_FLOW_t;]])
function process_netflow(flow)
local netlflow5_t = ffi.typeof('NF5_FLOW_t*')
local lua_flow = ffi.cast(netlflow5_t, flow)
-- TODO: PLEASE BE AWARE! Thid code will read json file for every netflow packet
print ("Flow packets and bytes: ", lua_flow.flow_packets, lua_flow.flow_octets)
print ("In interface :", lua_flow.if_index_in, " out interface: ", lua_flow.if_index_out)
local json_file = io.open("/usr/src/fastnetmon/src/tests/netflow_exclude.json", "r")
local decoded = json.decode(json_file:read("*all"))
for k,v in pairs(decoded) do
for kk, vv in pairs(v) do
--print(k, kk, vv)
end
end
return true
end

View File

@ -27,6 +27,22 @@
#include "log4cpp/PatternLayout.hh"
#include "log4cpp/Priority.hh"
#ifdef ENABLE_LUA_HOOKS
#include <lua5.1/lua.hpp>
#endif
#ifdef ENABLE_LUA_HOOKS
lua_State* lua_state = NULL;
bool lua_hooks_enabled = true;
std::string lua_hooks_path = "/usr/src/fastnetmon/src/netflow_hooks.lua";
bool call_netflow_process_lua_hook(lua_State* lua_state_param, struct NF5_FLOW* flow);
void init_lua_jit();
#endif
unsigned int netflow_port = 2055;
// Get it from main programm
extern log4cpp::Category& logger;
@ -892,9 +908,13 @@ void process_netflow_packet_v5(u_int8_t* packet, u_int len, std::string client_a
}
/* Decode to host encoding */
// TODO: move to separate function
nf5_flow->flow_octets = fast_ntoh(nf5_flow->flow_octets);
nf5_flow->flow_packets = fast_ntoh(nf5_flow->flow_packets);
nf5_flow->if_index_in = fast_ntoh(nf5_flow->if_index_in);
nf5_flow->if_index_out = fast_ntoh(nf5_flow->if_index_out);
// convert netflow to simple packet form
simple_packet current_packet;
@ -908,8 +928,8 @@ void process_netflow_packet_v5(u_int8_t* packet, u_int len, std::string client_a
current_packet.destination_port = 0;
// TODO: we should pass data about "flow" structure of this data
current_packet.length = fast_ntoh(nf5_flow->flow_octets);
current_packet.number_of_packets = fast_ntoh(nf5_flow->flow_packets);
current_packet.length = nf5_flow->flow_octets;
current_packet.number_of_packets = nf5_flow->flow_packets;
if (netflow_divide_counters_on_interval_length) {
// This interval in milliseconds, convert it to seconds
@ -967,6 +987,22 @@ void process_netflow_packet_v5(u_int8_t* packet, u_int len, std::string client_a
} break;
}
#ifdef ENABLE_LUA_HOOKS
if (lua_hooks_enabled) {
// TODO: remove it!!!
if (lua_state == NULL) {
init_lua_jit();
}
if (call_netflow_process_lua_hook(lua_state, nf5_flow)) {
// We will process this packet
} else {
logger << log4cpp::Priority::INFO << "We will drop this packets because LUA script decided to do it";
return;
}
}
#endif
// Call processing function for every flow in packet
netflow_process_func_ptr(current_packet);
}
@ -992,12 +1028,56 @@ void process_netflow_packet(u_int8_t* packet, u_int len, std::string client_addr
}
}
unsigned int netflow_port = 2055;
#ifdef ENABLE_LUA_HOOKS
void init_lua_jit() {
if (lua_hooks_enabled) {
lua_state = luaL_newstate();
// load libraries
luaL_openlibs(lua_state);
int lua_load_file_result = luaL_dofile(lua_state, lua_hooks_path.c_str());
if (lua_load_file_result != 0) {
logger << log4cpp::Priority::ERROR << "LuaJIT can't load file correctly from path: " << lua_hooks_path
<< " disable LUA support";
lua_hooks_enabled = false;
}
}
}
bool call_netflow_process_lua_hook(lua_State* lua_state_param, struct NF5_FLOW* flow) {
lua_getfield(lua_state_param, LUA_GLOBALSINDEX, "process_netflow");
lua_pushlightuserdata(lua_state_param, (void*)flow);
// Call with 1 argumnents and 1 result
lua_call(lua_state_param, 1, 1);
if (lua_gettop(lua_state_param) == 1) {
bool result = lua_toboolean(lua_state_param, -1) == 1 ? true : false;
// pop returned value
lua_pop(lua_state_param, 1);
return result;
} else {
logger << log4cpp::Priority::ERROR << "We got " << lua_gettop(lua_state_param) << " return values from the LUA, it's error, please check your LUA code";
return false;
}
return false;
}
#endif
// #include <sys/prctl.h>
void start_netflow_collection(process_packet_pointer func_ptr) {
logger << log4cpp::Priority::INFO << "netflow plugin started";
#ifdef ENABLE_LUA_HOOKS
init_lua_jit();
#endif
// prctl(PR_SET_NAME,"fastnetmon_netflow", 0, 0, 0);
netflow_process_func_ptr = func_ptr;

View File

@ -1,28 +0,0 @@
local json = require("json")
-- We have this library bundled only in luajit:
-- g++ lua_integration.cpp -lluajit-5.1
local ffi = require("ffi")
-- Load declaration from the inside separate header file
ffi.cdef("typedef struct netflow_struct { int packets; int bytes; } netflow_t;")
function process_netflow(flow)
local netlflow_t = ffi.typeof('netflow_t*')
local lua_flow = ffi.cast(netlflow_t, flow)
print ("Function param: ", lua_flow.packets, lua_flow.bytes)
local json_file = io.open("netflow_exclude.json", "r")
local decoded = json.decode(json_file:read("*all"))
for k,v in pairs(decoded) do
for kk, vv in pairs(v) do
--print(k, kk, vv)
end
end
return true
end
-- process_netflow("test")