1
0
mirror of https://github.com/pavel-odintsov/fastnetmon synced 2024-11-24 02:46:36 +01:00

Add working handle for filters flush for netmap-ipfw

This commit is contained in:
Pavel Odintsov 2015-05-18 01:01:30 +02:00
parent a50217515b
commit 080b9b2541
2 changed files with 81 additions and 57 deletions

@ -43,23 +43,38 @@ while True:
# u'source-ipv4': [u'10.0.0.1/32'],
# u'string': u'flow destination-ipv4 10.0.0.2/32 source-ipv4 10.0.0.1/32 protocol =tcp destination-port =3128'}
# Peer shutdown notification:
# { "exabgp": "3.5.0", "time": 1431900440, "host" : "filter.fv.ee", "pid" : 8637, "ppid" : 8435, "counter": 21, "type": "state", "neighbor": { "address": { "local": "10.0.3.115", "peer": "10.0.3.114" }, "asn": { "local": "1234", "peer": "65001" }, "state": "down", "reason": "in loop, peer reset, message [closing connection] error[the TCP connection was closed by the remote end]" } }
# Fix bug: https://github.com/Exa-Networks/exabgp/issues/269
line = line.replace('0x800900000000000A', '"0x800900000000000A"')
io = StringIO(line)
### print >> sys.stderr, line
decoded_update = json.load(io)
pp = pprint.PrettyPrinter(indent=4, stream=sys.stderr)
# pp.pprint(decoded_update)
pp.pprint(decoded_update)
try:
current_flow_announce = decoded_update["neighbor"]["message"]["update"]["announce"]["ipv4 flow"]
peer_ip = decoded_update['neighbor']['address']['peer']
for next_hop in current_flow_announce:
flow_announce_with_certain_hop = current_flow_announce[next_hop]
for flow in flow_announce_with_certain_hop:
pp.pprint(flow_announce_with_certain_hop[flow])
q.enqueue(firewall_queue.execute_ip_ban, flow_announce_with_certain_hop[flow])
q.enqueue(firewall_queue.manage_flow, 'announce', peer_ip, flow_announce_with_certain_hop[flow])
except KeyError:
pass
# We got notification about neighbor status
if 'type' in decoded_update and decoded_update['type'] == 'state':
if 'state' in decoded_update['neighbor'] and decoded_update['neighbor']['state'] == 'down':
peer_ip = decoded_update['neighbor']['address']['peer']
print >> sys.stderr, "We received notification about peer down for: " + peer_ip
q.enqueue(firewall_queue.manage_flow, 'withdrawal', peer_ip, {})
exabgp_log.write(line + "\n")
except KeyboardInterrupt:

@ -62,7 +62,70 @@ def ipfw_add_rule(action, protocol, source_host, source_port, target_host, targe
logger.info( "We generated this command: " + ipfw_command )
logger.info( "We have following number of processors: " + str(multiprocessing.cpu_count()) )
for cpu_number in range(0, multiprocessing.cpu_count() - 1):
execute_command_for_all_ipfw_backends(ipfw_command)
return True
def manage_flow(action, peer_ip, flow):
if action == 'announce':
pp = pprint.PrettyPrinter(indent=4)
logger.info(pp.pformat(flow))
# ipfw_add_rule(action, protocol, source_host, source_port, target_host, target_port, flags)
action = 'deny'
protocol = 'all'
source_port = ''
source_host = 'any'
target_port = ''
target_host = 'any'
flags = ''
# We support only one subnet for source and destination
if 'source-ipv4' in flow:
source_host = flow["source-ipv4"][0]
if 'destination-ipv4' in flow:
target_host = flow["destination-ipv4"][0]
if source_host == "any" and target_host == "any":
logger.info( "We can't process this rule because it will drop whole traffic to the network" )
return False
if 'destination-port' in flow:
target_port = flow['destination-port'][0].lstrip('=')
if 'source-port' in flow:
source_port = flow['source-port'][0].lstrip('=');
if 'fragment' in flow:
if '=is-fragment' in flow['fragment']:
flags = "fragmented"
if 'protocol' in flow:
global_result = True
for current_protocol in flow['protocol']:
logger.info("Call ipfw_add_rule")
result = ipfw_add_rule(action, current_protocol.lstrip('='), source_host, source_port, target_host, target_port, flags)
if result != True:
global_result = False
return global_result
else:
return ipfw_add_rule(action, "all", source_host, source_port, target_host, target_port, flags)
return False
elif action == 'withdrawal':
logger.info("We will flush all rules from peer " + peer_ip)
execute_command_for_all_ipfw_backends("-f flush")
return True
else:
logger.info("Unknown action: " + action)
return False
def execute_command_for_all_ipfw_backends(ipfw_command):
for cpu_number in range(0, multiprocessing.cpu_count() - 1):
port_for_current_cpu = 5550 + cpu_number
args = [ '/usr/src/netmap-ipfw/ipfw/ipfw' ]
@ -76,57 +139,3 @@ def ipfw_add_rule(action, protocol, source_host, source_port, target_host, targe
new_env['IPFW_PORT'] = str(port_for_current_cpu)
subprocess.Popen( args, env=new_env)
return True
def execute_ip_ban(flow):
pp = pprint.PrettyPrinter(indent=4)
logger.info(pp.pformat(flow))
# ipfw_add_rule(action, protocol, source_host, source_port, target_host, target_port, flags)
action = 'deny'
protocol = 'all'
source_port = ''
source_host = 'any'
target_port = ''
target_host = 'any'
flags = ''
# We support only one subnet for source and destination
if 'source-ipv4' in flow:
source_host = flow["source-ipv4"][0]
if 'destination-ipv4' in flow:
target_host = flow["destination-ipv4"][0]
if source_host == "any" and target_host == "any":
logger.info( "We can't process this rule because it will drop whole traffic to the network" )
return False
if 'destination-port' in flow:
target_port = flow['destination-port'][0].lstrip('=')
if 'source-port' in flow:
source_port = flow['source-port'][0].lstrip('=');
if 'fragment' in flow:
if '=is-fragment' in flow['fragment']:
flags = "fragmented"
if 'protocol' in flow:
global_result = True
for current_protocol in flow['protocol']:
logger.info("Call ipfw_add_rule")
result = ipfw_add_rule(action, current_protocol.lstrip('='), source_host, source_port, target_host, target_port, flags)
if result != True:
global_result = False
return global_result
else:
return ipfw_add_rule(action, "all", source_host, source_port, target_host, target_port, flags)
return False