Add Redis support to site tracker

This commit is contained in:
Pavel Odintsov 2015-08-03 18:41:16 +03:00
parent c633116c24
commit 5c6ced5932
2 changed files with 96 additions and 7 deletions

View File

@ -1,6 +1,8 @@
#!/bin/bash
# apt-get install -y libhiredis-dev redis-server
g++ -O3 ../../fastnetmon_packet_parser.c -c -o fastnetmon_packet_parser.o -fPIC
g++ -O3 ../../fastnetmon_pcap_format.cpp -c -o fastnetmon_pcap_format.o -fPIC
g++ -O3 ../../fast_dpi.cpp -c -o fast_dpi.o `PKG_CONFIG_PATH=/opt/ndpi/lib/pkgconfig pkg-config pkg-config --cflags --libs libndpi` -fPIC
g++ -O3 -shared -o ndpicallback.so -fPIC ndpicallback.cpp fastnetmon_pcap_format.o fast_dpi.o fastnetmon_packet_parser.o `PKG_CONFIG_PATH=/opt/ndpi/lib/pkgconfig pkg-config pkg-config --cflags --libs libndpi` -std=c++11 -fPIC
g++ -O3 -shared -o ndpicallback.so -fPIC ndpicallback.cpp fastnetmon_pcap_format.o fast_dpi.o fastnetmon_packet_parser.o `PKG_CONFIG_PATH=/opt/ndpi/lib/pkgconfig pkg-config pkg-config --cflags --libs libndpi` -std=c++11 -fPIC -lhiredis

View File

@ -15,20 +15,77 @@
#include <boost/functional/hash.hpp>
#include <hiredis/hiredis.h>
#include "../../fastnetmon_pcap_format.h"
#include "../../fastnetmon_types.h"
#include "../../fastnetmon_packet_parser.h"
#include "../../fast_dpi.h"
unsigned int redis_port = 6379;
std::string redis_host = "127.0.0.1";
u_int32_t size_flow_struct = 0;
u_int32_t size_id_struct = 0;
double last_timestamp = 0;
double system_tsc_resolution_hz = 0;
redisContext* redis_context = NULL;
#ifdef __cplusplus
extern "C" {
redisContext* redis_init_connection();
void store_data_in_redis(std::string key_name, std::string value) {
redisReply* reply = NULL;
//redisContext* redis_context = redis_init_connection();
if (!redis_context) {
printf("Could not initiate connection to Redis\n");
return;
}
reply = (redisReply*)redisCommand(redis_context, "SET %s %s", key_name.c_str(), value.c_str());
// If we store data correctly ...
if (!reply) {
std::cout << "Can't increment traffic in redis error_code: " << redis_context->err
<< " error_string: " << redis_context->errstr;
// Handle redis server restart corectly
if (redis_context->err == 1 or redis_context->err == 3) {
// Connection refused
printf("Unfortunately we can't store data in Redis because server reject connection\n");
}
} else {
freeReplyObject(reply);
}
//redisFree(redis_context);
}
redisContext* redis_init_connection() {
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
redisContext* redis_context = redisConnectWithTimeout(redis_host.c_str(), redis_port, timeout);
if (redis_context->err) {
std::cout << "Connection error:" << redis_context->errstr;
return NULL;
}
// We should check connection with ping because redis do not check connection
redisReply* reply = (redisReply*)redisCommand(redis_context, "PING");
if (reply) {
freeReplyObject(reply);
} else {
return NULL;
}
return redis_context;
}
#endif
inline uint64_t rte_rdtsc(void) {
@ -129,6 +186,9 @@ class ndpi_tracking_flow_t {
typedef std::unordered_map<conntrack_hash_struct_for_simple_packet_t, ndpi_tracking_flow_t> my_connection_tracking_storage_t;
my_connection_tracking_storage_t my_connection_tracking_storage;
typedef std::unordered_map<std::string, unsigned int> known_http_hosts_t;
known_http_hosts_t known_http_hosts;
// For correct compilation with g++
#ifdef __cplusplus
extern "C" {
@ -236,6 +296,7 @@ void* speed_printer(void* ptr) {
printf("We process: %llu pps %.2f Gbps\n", (long long)pps, (float)bps/1024/1024/1024 * 8);
// std::cout << "Hash size: " << my_connection_tracking_storage.size() << std::endl;
std::cout << "Uniq hosts: " << known_http_hosts.size() << std::endl;
}
}
@ -243,6 +304,13 @@ void* speed_printer(void* ptr) {
void firehose_start() {
my_ndpi_struct = init_ndpi();
// Connect to the Redis
redis_context = redis_init_connection();
if (!redis_context) {
printf("Can't connect to the Redis\n");
}
// Tune timer
set_tsc_freq_fallback();
@ -451,21 +519,21 @@ void firehose_packet(const char *pciaddr, char *data, int length) {
ndpi_tracking_flow_t& dpi_tracking_structure = my_connection_tracking_storage[ conntrack_structure ];
// Protocol already detected
/*
if (dpi_tracking_structure.protocol_detected && dpi_tracking_structure.detected_protocol.protocol == NDPI_PROTOCOL_IRC) {
char print_buffer[512];
fastnetmon_print_parsed_pkt(print_buffer, 512, (u_char*)data, &packet_header);
printf("packet: %s\n", print_buffer);
/*
for (unsigned int index = packet_header.extended_hdr.parsed_pkt.offset.payload_offset; index < packet_header.len; index++) {
printf("%c", data[index]);
}
printf("\n");
*/
return;
}
*/
dpi_tracking_structure.update_timestamp();
@ -490,7 +558,29 @@ void firehose_packet(const char *pciaddr, char *data, int length) {
char* protocol_name = ndpi_get_proto_name(my_ndpi_struct, detected_protocol.protocol);
char* master_protocol_name = ndpi_get_proto_name(my_ndpi_struct, detected_protocol.master_protocol);
// printf("Protocol: %s master protocol: %s\n", protocol_name, master_protocol_name);
if (detected_protocol.protocol == NDPI_PROTOCOL_HTTP) {
std::string host_name = std::string((const char*)dpi_tracking_structure.flow->host_server_name);
//printf("server name: %s\n", dpi_tracking_structure.flow->host_server_name);
if (redis_context != NULL) {
known_http_hosts_t::iterator itr = known_http_hosts.find(host_name);
if (itr == known_http_hosts.end()) {
// Not defined in internal cache
// Add in local cache:
known_http_hosts[ host_name ] = 1;
// Add to Redis
store_data_in_redis(host_name, "1");
} else {
// Already stored
}
}
}
//printf("Protocol: %s master protocol: %s\n", protocol_name, master_protocol_name);
bool its_bad_protocol = false;
//if(ndpi_is_proto(detected_protocol, NDPI_PROTOCOL_TOR)) {
@ -507,14 +597,11 @@ void firehose_packet(const char *pciaddr, char *data, int length) {
fastnetmon_print_parsed_pkt(print_buffer, 512, (u_char*)data, &packet_header);
printf("packet: %s\n", print_buffer);
/*
for (unsigned int index = packet_header.extended_hdr.parsed_pkt.offset.payload_offset; index < packet_header.len; index++) {
printf("%c", data[index]);
}
printf("\n");
*/
}
}
}