mirror of
https://github.com/snovvcrash/usbrip.git
synced 2024-06-03 12:46:03 +02:00
Add storage module
This commit is contained in:
parent
97d372bc7b
commit
d3aef763b0
|
@ -29,4 +29,5 @@ along with usbrip. If not, see <http://www.gnu.org/licenses/>.
|
|||
"""
|
||||
|
||||
from lib.core.usbevents import USBEvents
|
||||
from lib.core.usbstorage import USBStorage
|
||||
from lib.core.usbids import USBIDs
|
||||
|
|
|
@ -1,344 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
"""
|
||||
@file cliopts.py
|
||||
@author Sam Freeside <snovvcrash@protonmail.com>
|
||||
@date 2018-03
|
||||
|
||||
@brief Command line option parser.
|
||||
|
||||
@license
|
||||
Copyright (C) 2018 Sam Freeside
|
||||
|
||||
This file is part of usbrip.
|
||||
|
||||
usbrip is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
usbrip is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with usbrip. If not, see <http://www.gnu.org/licenses/>.
|
||||
@endlicense
|
||||
"""
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from lib.core.common import root_dir_join
|
||||
|
||||
|
||||
def cmd_line_options():
|
||||
parser = ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest='subparser')
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------- Banner -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
build_ub_parser(subparsers)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ----------------------- USB Events -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
build_ue_parser(subparsers)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------ USB IDs -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
build_uis_parser(subparsers)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------- Banner -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def build_ub_parser(subparsers):
|
||||
subparsers.add_parser('banner',
|
||||
help='show tool banner')
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ----------------------- USB Events -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def build_ue_parser(subparsers):
|
||||
ue_parser = subparsers.add_parser('events',
|
||||
help='work with USB events')
|
||||
|
||||
ue_subparsers = ue_parser.add_subparsers(dest='ue_subparser')
|
||||
|
||||
build_ueh_parser(ue_subparsers)
|
||||
build_ueg_parser(ue_subparsers)
|
||||
build_uev_parser(ue_subparsers)
|
||||
|
||||
|
||||
# ------------------- USB Events History -------------------
|
||||
|
||||
|
||||
def build_ueh_parser(subparsers):
|
||||
ueh_parser = subparsers.add_parser('history',
|
||||
help='show USB event history')
|
||||
|
||||
ueh_parser.add_argument('-q',
|
||||
'--quiet',
|
||||
action='store_true',
|
||||
help='supress banner, info (geen) messages and user iteraction')
|
||||
|
||||
ueh_group_table_list = ueh_parser.add_mutually_exclusive_group()
|
||||
|
||||
ueh_group_table_list.add_argument('-t',
|
||||
'--table',
|
||||
action='store_true',
|
||||
help='represent as table (not list)')
|
||||
|
||||
ueh_group_table_list.add_argument('-l',
|
||||
'--list',
|
||||
action='store_true',
|
||||
help='represent as list (not table)')
|
||||
|
||||
ueh_parser.add_argument('-e',
|
||||
'--external',
|
||||
action='store_true',
|
||||
help='show only those devices which have \'disconnect\' date')
|
||||
|
||||
ueh_parser.add_argument('-n',
|
||||
'--number',
|
||||
type=int,
|
||||
default=-1,
|
||||
help='number of events to show')
|
||||
|
||||
ueh_parser.add_argument('-d',
|
||||
'--date',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='filter by DATES')
|
||||
|
||||
ueh_parser.add_argument('-c',
|
||||
'--column',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='columns to show (options: \'conn\', '
|
||||
'\'user\', '
|
||||
'\'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\', '
|
||||
'\'port\', '
|
||||
'\'disconn\'.)')
|
||||
|
||||
ueh_parser.add_argument('-f',
|
||||
'--file',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='obtain log from FILES')
|
||||
|
||||
|
||||
# ------------------ USB Events Gen Auth -------------------
|
||||
|
||||
|
||||
def build_ueg_parser(subparsers):
|
||||
ueg_parser = subparsers.add_parser('gen_auth',
|
||||
help='generate authorized device list (JSON)')
|
||||
|
||||
ueg_parser.add_argument('output',
|
||||
type=str,
|
||||
help='set output path')
|
||||
|
||||
ueg_parser.add_argument('-a',
|
||||
'--attribute',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='attributes to include in authorized device list '
|
||||
'(options: \'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\'.)')
|
||||
|
||||
ueg_parser.add_argument('-q',
|
||||
'--quiet',
|
||||
action='store_true',
|
||||
help='supress banner, info messages and user iteraction')
|
||||
|
||||
ueg_parser.add_argument('-e',
|
||||
'--external',
|
||||
action='store_true',
|
||||
help='show only those devices which have \'disconnect\' date')
|
||||
|
||||
ueg_parser.add_argument('-n',
|
||||
'--number',
|
||||
type=int,
|
||||
default=-1,
|
||||
help='number of events to show')
|
||||
|
||||
ueg_parser.add_argument('-d',
|
||||
'--date',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='filter by DATES')
|
||||
|
||||
ueg_parser.add_argument('-f',
|
||||
'--file',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='obtain log from FILES')
|
||||
|
||||
|
||||
# ----------------- USB Events Violations ------------------
|
||||
|
||||
|
||||
def build_uev_parser(subparsers):
|
||||
uev_parser = subparsers.add_parser('violations',
|
||||
help='search USB event history for violations '
|
||||
'(show USB devices that do appear in hist'
|
||||
'ory and do NOT appear in authorized devi'
|
||||
'ce list (JSON))')
|
||||
|
||||
uev_parser.add_argument('input',
|
||||
type=str,
|
||||
help='set input path')
|
||||
|
||||
uev_parser.add_argument('-a',
|
||||
'--attribute',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='attributes to look through when searching for USB violation events '
|
||||
'(options: \'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\'.)')
|
||||
|
||||
uev_parser.add_argument('-q',
|
||||
'--quiet',
|
||||
action='store_true',
|
||||
help='supress banner, info messages and user iteraction')
|
||||
|
||||
uev_group_table_list = uev_parser.add_mutually_exclusive_group()
|
||||
|
||||
uev_group_table_list.add_argument('-t',
|
||||
'--table',
|
||||
action='store_true',
|
||||
help='represent as table (not list)')
|
||||
|
||||
uev_group_table_list.add_argument('-l',
|
||||
'--list',
|
||||
action='store_true',
|
||||
help='represent as list (not table)')
|
||||
|
||||
uev_parser.add_argument('-e',
|
||||
'--external',
|
||||
action='store_true',
|
||||
help='show only those devices which have \'disconnect\' date')
|
||||
|
||||
uev_parser.add_argument('-n',
|
||||
'--number',
|
||||
type=int,
|
||||
default=-1,
|
||||
help='number of events to show')
|
||||
|
||||
uev_parser.add_argument('-d',
|
||||
'--date',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='filter by DATES')
|
||||
|
||||
uev_parser.add_argument('-c',
|
||||
'--column',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='columns to show (options: \'conn\', '
|
||||
'\'user\', '
|
||||
'\'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\', '
|
||||
'\'port\', '
|
||||
'\'disconn\'.)')
|
||||
|
||||
uev_parser.add_argument('-f',
|
||||
'--file',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='obtain log from FILES')
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------ USB IDs -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def build_ui_parser(subparsers):
|
||||
ui_parser = subparsers.add_parser('ids',
|
||||
help='work with USB IDs')
|
||||
|
||||
ui_subparsers = ui_parser.add_subparsers(dest='ui_subparser')
|
||||
|
||||
build_uis_parser(ui_subparsers)
|
||||
build_uid_parser(ui_subparsers)
|
||||
|
||||
|
||||
# --------------------- USB IDs Search ---------------------
|
||||
|
||||
|
||||
def build_uis_parser(subparsers):
|
||||
uis_parser = subparsers.add_parser('search',
|
||||
help='search by VID and/or PID; '
|
||||
'ids database path is \'{}\''.format(root_dir_join('usb_ids/usb.ids')))
|
||||
|
||||
uis_parser.add_argument('-q',
|
||||
'--quiet',
|
||||
action='store_true',
|
||||
help='supress banner, info messages and user iteraction')
|
||||
|
||||
uis_parser.add_argument('--vid',
|
||||
type=str,
|
||||
default=None,
|
||||
help='vendor ID')
|
||||
|
||||
uis_parser.add_argument('--pid',
|
||||
type=str,
|
||||
default=None,
|
||||
help='product ID')
|
||||
|
||||
uis_parser.add_argument('--offline',
|
||||
action='store_true',
|
||||
help='offline mode (no database download/update)')
|
||||
|
||||
|
||||
# -------------------- USB IDs Download --------------------
|
||||
|
||||
|
||||
def build_uid_parser(subparsers):
|
||||
uid_parser = subparsers.add_parser('download',
|
||||
help='download/update database;'
|
||||
'ids database path is \'{}\''.format(root_dir_join('usb_ids/usb.ids')))
|
||||
|
||||
uid_parser.add_argument('-q',
|
||||
'--quiet',
|
||||
action='store_true',
|
||||
help='supress banner, info messages and user iteraction')
|
|
@ -32,6 +32,7 @@ import random
|
|||
import os
|
||||
import sys
|
||||
|
||||
from string import printable
|
||||
from calendar import month_name
|
||||
from collections import OrderedDict, Callable
|
||||
|
||||
|
@ -50,22 +51,28 @@ SEPARATOR = '\u2212' # '−', U_MINUS_SIGN
|
|||
# Enable colored text when terminal output (True), else (| or > for example) no color (False)
|
||||
ISATTY = True if sys.stdout.isatty() else False
|
||||
|
||||
DEBUG = False
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------- Banner -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
VERSION = '2.0'
|
||||
SITE = 'https://github.com/snovvcrash/usbrip'
|
||||
|
||||
VERSION_FORMATTED = '\033[1;37m{\033[1;34mv%s\033[1;37m}\033[1;33m' % VERSION
|
||||
SITE_FORMATTED = '\033[0m\033[4;37m%s\033[0m' % SITE
|
||||
|
||||
BANNER = """\033[1;33m\
|
||||
|
||||
_ {{4}}
|
||||
_ {{4}} %s
|
||||
_ _ ___| |_ ___[+]___
|
||||
| | |_ -| . | _[*] . |
|
||||
|___|___|___|_| [?] _|
|
||||
x[^]_| \033[;0m\033[4;37m%s\033[;0m\
|
||||
""" % SITE
|
||||
x[^]_| %s\
|
||||
""" % (VERSION_FORMATTED, SITE_FORMATTED)
|
||||
|
||||
E = ('E', 'e', '3')
|
||||
N = ('N', 'n')
|
||||
|
@ -75,9 +82,12 @@ I = ('I', 'i', '1', '!')
|
|||
E,N,S,I = list(map(lambda x: random.choice(x), (E,N,S,I)))
|
||||
|
||||
if ISATTY:
|
||||
E,N,S,I = list(map(lambda x: colored(x, 'green', 'on_blue')+'\033[1;33m', (E,N,S,I)))
|
||||
E,N,S,I = list(map(lambda x: colored(x, 'green', 'on_blue') + '\033[1;33m', (E,N,S,I)))
|
||||
else:
|
||||
BANNER = BANNER[7:151] + BANNER[163:199]
|
||||
mid_start = 55 + len(VERSION_FORMATTED)
|
||||
mid_end = mid_start + 97
|
||||
VERSION = '{v' + VERSION + '}'
|
||||
BANNER = BANNER[31:55] + VERSION + BANNER[mid_start:mid_end] + SITE
|
||||
|
||||
BANNER = BANNER.replace('+', E, 1)
|
||||
BANNER = BANNER.replace('*', N, 1)
|
||||
|
@ -90,12 +100,10 @@ BANNER = BANNER.replace('^', I, 1)
|
|||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
DEBUG = False
|
||||
|
||||
|
||||
def time_it(func):
|
||||
import functools
|
||||
import time
|
||||
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
start = time.time()
|
||||
|
@ -171,7 +179,7 @@ MONTH_ENUM = {m[:3]: str(i+1) for i, m in enumerate(month_name[1:])}
|
|||
|
||||
class DefaultOrderedDict(OrderedDict):
|
||||
def __init__(self, *args, default_factory=None, **kwargs):
|
||||
if (default_factory is not None and not isinstance(default_factory, Callable)):
|
||||
if default_factory is not None and not isinstance(default_factory, Callable):
|
||||
raise TypeError('first argument must be callable')
|
||||
OrderedDict.__init__(self, *args, **kwargs)
|
||||
self._default_factory = default_factory
|
||||
|
@ -215,19 +223,23 @@ class DefaultOrderedDict(OrderedDict):
|
|||
|
||||
|
||||
def root_dir_join(name):
|
||||
return os.path.join(os.path.abspath(__file__).rsplit('/', 2)[0], name)
|
||||
return os.path.join(os.path.abspath(__file__).rsplit('/', 3)[0], name)
|
||||
|
||||
|
||||
def os_makedirs(dirname):
|
||||
try:
|
||||
os.makedirs(dirname)
|
||||
except PermissionError as e:
|
||||
raise USBRipError('Permission denied: \'{}\''.format(dirname),
|
||||
errors={'initial_error': str(e)})
|
||||
raise USBRipError(
|
||||
'Permission denied: \'{}\''.format(dirname),
|
||||
errors={'initial_error': str(e)}
|
||||
)
|
||||
except OSError as e: # exists
|
||||
if not os.path.isdir(dirname):
|
||||
raise USBRipError('Path exists and it is not a directory: \'{}\''.format(dirname),
|
||||
errors={'initial_error': str(e)})
|
||||
raise USBRipError(
|
||||
'Path exists and it is not a directory: \'{}\''.format(dirname),
|
||||
errors={'initial_error': str(e)}
|
||||
)
|
||||
|
||||
|
||||
def traverse_dir(source_dir):
|
||||
|
@ -242,6 +254,17 @@ def list_files(source_dir):
|
|||
if os.path.isfile(os.path.join(source_dir, filename))]
|
||||
|
||||
|
||||
def is_correct(password):
|
||||
if (len(password) < 8 or
|
||||
not any(c.islower() for c in password) or
|
||||
not any(c.isupper() for c in password) or
|
||||
not any(c.isdigit() for c in password) or
|
||||
any(c not in printable for c in password)):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------ Messages ------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
@ -257,7 +280,10 @@ def print_info(message, *, quiet=False):
|
|||
print('[INFO] {}'.format(message))
|
||||
|
||||
|
||||
def print_warning(message, *, errcode=0, initial_error=''):
|
||||
def print_warning(message, *, errcode=0, initial_error='', quiet=False):
|
||||
if quiet:
|
||||
return
|
||||
|
||||
if DEBUG:
|
||||
if errcode:
|
||||
print('ERRCODE: {}'.format(errcode))
|
||||
|
@ -281,3 +307,16 @@ def print_critical(message, *, errcode=0, initial_error=''):
|
|||
cprint('[CRITICAL] {}'. format(message), 'white', 'on_red', attrs=['bold'])
|
||||
else:
|
||||
print('[CRITICAL] {}'. format(message))
|
||||
|
||||
|
||||
def print_secret(message, *, secret=''):
|
||||
if ISATTY:
|
||||
cprint(
|
||||
'[SECRET] {} {}'.format(
|
||||
colored(message, 'white', attrs=['bold']),
|
||||
colored(secret, 'white', 'on_grey', attrs=['bold'])
|
||||
),
|
||||
'white', attrs=['bold']
|
||||
)
|
||||
else:
|
||||
print('[SECRET] {} {}'.format(message, secret))
|
||||
|
|
|
@ -76,14 +76,13 @@ from lib.core.common import time_it_if_debug
|
|||
|
||||
class USBEvents:
|
||||
|
||||
QUIET = False
|
||||
|
||||
# SingleTable (uses ANSI escape codes) when termianl output, else (| or > for example) AsciiTable (only ASCII)
|
||||
TableClass = SingleTable if ISATTY else AsciiTable
|
||||
|
||||
# If True -> supress banner, info messages and user iteraction
|
||||
QUIET = False
|
||||
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def __init__(self, files=None, *, quiet=False):
|
||||
def __new__(cls, files=None, *, quiet=False):
|
||||
if quiet:
|
||||
USBEvents.QUIET = quiet
|
||||
|
||||
|
@ -96,24 +95,23 @@ class USBEvents:
|
|||
raw_history = _get_raw_history()
|
||||
except USBRipError as e:
|
||||
print_critical(str(e))
|
||||
return None
|
||||
|
||||
divided_history = _divide_history(raw_history)
|
||||
all_events = _parse_history(divided_history)
|
||||
|
||||
self._all_events = _parse_history(divided_history)
|
||||
self._violations, self._events_to_show = [], None
|
||||
instance = super().__new__(cls)
|
||||
instance._all_events = all_events # self._all_events
|
||||
instance._violations = [] # self._violations
|
||||
instance._events_to_show = None # self._events_to_show
|
||||
return instance
|
||||
|
||||
# ------------------- USB Events History -------------------
|
||||
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def event_history(self, columns, *, sieve=None, repres=None):
|
||||
if columns:
|
||||
table_data = [[COLUMN_NAMES[name] for name in columns]]
|
||||
else:
|
||||
columns = [key for key in COLUMN_NAMES.keys()]
|
||||
table_data = [[val for val in COLUMN_NAMES.values()]]
|
||||
|
||||
def event_history(self, columns, *, indent=4, sieve=None, repres=None):
|
||||
self._events_to_show = _filter_events(self._all_events, sieve)
|
||||
if self._events_to_show is None:
|
||||
if not self._events_to_show:
|
||||
print_info('No USB events found!', quiet=USBEvents.QUIET)
|
||||
return
|
||||
|
||||
|
@ -123,37 +121,73 @@ class USBEvents:
|
|||
return
|
||||
elif number == 1:
|
||||
try:
|
||||
_json_dump(self._events_to_show, 'event history', filename)
|
||||
_dump_events(self._events_to_show, 'event history', filename, indent)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), initial_error=e.errors['initial_error'])
|
||||
return
|
||||
|
||||
if columns:
|
||||
table_data = [[COLUMN_NAMES[name] for name in columns]]
|
||||
else:
|
||||
columns = [key for key in COLUMN_NAMES.keys()]
|
||||
table_data = [[val for val in COLUMN_NAMES.values()]]
|
||||
|
||||
_represent_events(self._events_to_show, columns, table_data, 'USB-History-Events', repres)
|
||||
|
||||
# -------------------- USB Events Open ---------------------
|
||||
|
||||
@staticmethod
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def open_dump(input_dump, columns, *, sieve=None, repres=None):
|
||||
print_info('Opening USB event dump: \'{}\''.format(os.path.abspath(input_dump)), quiet=USBEvents.QUIET)
|
||||
|
||||
try:
|
||||
with open(input_dump, 'r', encoding='utf-8') as dump:
|
||||
events_dumped = json.load(dump)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
print_critical('Failed to decode event dump (JSON)', initial_error=str(e))
|
||||
return
|
||||
|
||||
if not events_dumped:
|
||||
print_critical('This dump is empty!')
|
||||
return
|
||||
|
||||
events_to_show = _filter_events(events_dumped, sieve)
|
||||
if not events_to_show:
|
||||
print_info('No USB events found!', quiet=USBEvents.QUIET)
|
||||
return
|
||||
|
||||
if columns:
|
||||
table_data = [[COLUMN_NAMES[name] for name in columns]]
|
||||
else:
|
||||
columns = [key for key in COLUMN_NAMES.keys()]
|
||||
table_data = [[val for val in COLUMN_NAMES.values()]]
|
||||
|
||||
_represent_events(events_to_show, columns, table_data, 'USB-Event-Dump', repres)
|
||||
|
||||
# ------------------ USB Events Gen Auth -------------------
|
||||
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def generate_auth_json(self, output_auth, attributes, *, sieve=None):
|
||||
def generate_auth_json(self, output_auth, attributes, *, indent=4, sieve=None):
|
||||
self._events_to_show = _filter_events(self._all_events, sieve)
|
||||
if not self._events_to_show:
|
||||
print_info('No USB devices found!', quiet=USBEvents.QUIET)
|
||||
return 1
|
||||
|
||||
try:
|
||||
dirname = os.path.dirname(output_auth)
|
||||
os_makedirs(dirname)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), initial_error=e.errors['initial_error'])
|
||||
return
|
||||
return 1
|
||||
else:
|
||||
print_info('Created \'{}\''.format(dirname))
|
||||
print_info('Created \'{}\''.format(dirname), quiet=USBEvents.QUIET)
|
||||
|
||||
try:
|
||||
auth_json = open(output_auth, 'w')
|
||||
auth_json = open(output_auth, 'w', encoding='utf-8')
|
||||
except PermissionError as e:
|
||||
print_critical('Permission denied: \'{}\''.format(output_auth), initial_error=str(e))
|
||||
return
|
||||
|
||||
self._events_to_show = _filter_events(self._all_events, sieve)
|
||||
if self._events_to_show is None:
|
||||
print_info('No USB violation events found!', quiet=USBEvents.QUIET)
|
||||
json.dump([], auth_json)
|
||||
auth_json.close()
|
||||
return
|
||||
print_critical('Permission denied: \'{}\'. Retry with sudo'.format(output_auth), initial_error=str(e))
|
||||
return 1
|
||||
|
||||
print_info('Generating authorized device list (JSON)', quiet=USBEvents.QUIET)
|
||||
|
||||
|
@ -163,35 +197,31 @@ class USBEvents:
|
|||
auth = defaultdict(list)
|
||||
for event in self._events_to_show:
|
||||
for key, val in event.items():
|
||||
if key in attributes and \
|
||||
val is not None and \
|
||||
val not in auth[key]:
|
||||
if (key in attributes and
|
||||
val is not None and
|
||||
val not in auth[key]):
|
||||
auth[key].append(val)
|
||||
|
||||
for key in auth.keys():
|
||||
auth[key].sort()
|
||||
|
||||
json.dump(auth, auth_json, sort_keys=True, indent=4)
|
||||
json.dump(auth, auth_json, sort_keys=True, indent=indent)
|
||||
auth_json.close()
|
||||
|
||||
print_info('New authorized device list: \'{}\''.format(output_auth), quiet=USBEvents.QUIET)
|
||||
print_info('New authorized device list: \'{}\''.format(os.path.abspath(output_auth)), quiet=USBEvents.QUIET)
|
||||
|
||||
# ----------------- USB Events Violations ------------------
|
||||
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def search_violations(self, input_auth, attributes, columns, *, sieve=None, repres=None):
|
||||
def search_violations(self, input_auth, attributes, columns, *, indent=4, sieve=None, repres=None):
|
||||
print_info('Opening authorized device list: \'{}\''.format(os.path.abspath(input_auth)), quiet=USBEvents.QUIET)
|
||||
|
||||
try:
|
||||
auth = _process_auth_json(input_auth)
|
||||
auth = _process_auth_list(input_auth, indent)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
print_critical('Failed to decode authorized device list (JSON)', initial_error=str(e))
|
||||
return
|
||||
|
||||
if columns:
|
||||
table_data = [[COLUMN_NAMES[name] for name in columns]]
|
||||
else:
|
||||
columns = [key for key in COLUMN_NAMES.keys()]
|
||||
table_data = [[val for val in COLUMN_NAMES.values()]]
|
||||
|
||||
print_info('Searching for violations', quiet=USBEvents.QUIET)
|
||||
|
||||
if not attributes:
|
||||
|
@ -199,14 +229,16 @@ class USBEvents:
|
|||
|
||||
for event in self._all_events:
|
||||
try:
|
||||
if any(event[key] not in vals and event[key] is not None for key, vals in zip(attributes, auth.values())):
|
||||
if any(event[key] not in vals and
|
||||
event[key] is not None
|
||||
for key, vals in zip(attributes, auth.values())):
|
||||
self._violations.append(event)
|
||||
except KeyError as e:
|
||||
print_critical('No such attribute in authorized device list', initial_error=str(e))
|
||||
return
|
||||
|
||||
self._events_to_show = _filter_events(self._violations, sieve)
|
||||
if self._events_to_show is None:
|
||||
if not self._events_to_show:
|
||||
print_info('No USB violation events found!', quiet=USBEvents.QUIET)
|
||||
return
|
||||
|
||||
|
@ -216,9 +248,16 @@ class USBEvents:
|
|||
return
|
||||
elif number == 1:
|
||||
try:
|
||||
_json_dump(self._events_to_show, 'violations', filename)
|
||||
_dump_events(self._events_to_show, 'violations', filename, indent)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), initial_error=e.errors['initial_error'])
|
||||
return
|
||||
|
||||
if columns:
|
||||
table_data = [[COLUMN_NAMES[name] for name in columns]]
|
||||
else:
|
||||
columns = [key for key in COLUMN_NAMES.keys()]
|
||||
table_data = [[val for val in COLUMN_NAMES.values()]]
|
||||
|
||||
_represent_events(self._events_to_show, columns, table_data, 'USB-Violation-Events', repres)
|
||||
|
||||
|
@ -259,16 +298,22 @@ def _read_log_file(filename):
|
|||
|
||||
if filename.endswith('.gz'):
|
||||
print_info('Unpacking \'{}\''.format(filename), quiet=USBEvents.QUIET)
|
||||
|
||||
try:
|
||||
log = gzip.open(filename, 'rb')
|
||||
except PermissionError as e:
|
||||
print_warning('Permission denied: \'{}\''.format(filename), initial_error=str(e))
|
||||
print_warning(
|
||||
'Permission denied: \'{}\'. Retry with sudo'.format(filename),
|
||||
initial_error=str(e),
|
||||
quiet=USBEvents.QUIET
|
||||
)
|
||||
return filtered
|
||||
else:
|
||||
sentinel = b''
|
||||
filename = filename[:-3]
|
||||
|
||||
else:
|
||||
log = open(filename, 'r')
|
||||
log = open(filename, 'r', encoding='utf-8')
|
||||
sentinel = ''
|
||||
|
||||
print_info('Reading \'{}\''.format(filename), quiet=USBEvents.QUIET)
|
||||
|
@ -316,15 +361,17 @@ def _parse_history(divided_history):
|
|||
pid = re_pid.search(line).group(1)
|
||||
port = re_port.search(line).group(1)
|
||||
|
||||
event = {'conn': date,
|
||||
'user': user,
|
||||
'vid': vid,
|
||||
'pid': pid,
|
||||
'prod': None,
|
||||
'manufact': None,
|
||||
'serial': None,
|
||||
'port': port,
|
||||
'disconn': None}
|
||||
event = {
|
||||
'conn': date,
|
||||
'user': user,
|
||||
'vid': vid,
|
||||
'pid': pid,
|
||||
'prod': None,
|
||||
'manufact': None,
|
||||
'serial': None,
|
||||
'port': port,
|
||||
'disconn': None
|
||||
}
|
||||
|
||||
record_collection.append(event)
|
||||
curr += 1
|
||||
|
@ -382,8 +429,8 @@ def _sort_by_date(unsorted_log):
|
|||
return sorted(unsorted_log, key=lambda i: MONTH_ENUM[i[0][0][:3]] + i[0][0][3:])
|
||||
|
||||
|
||||
def _process_auth_json(input_auth):
|
||||
with open(input_auth, 'r+') as auth_json:
|
||||
def _process_auth_list(input_auth, indent):
|
||||
with open(input_auth, 'r+', encoding='utf-8') as auth_json:
|
||||
#auth = json.load(auth_json, object_pairs_hook=OrderedDict)
|
||||
auth = json.load(auth_json)
|
||||
auth_json.seek(0)
|
||||
|
@ -391,7 +438,7 @@ def _process_auth_json(input_auth):
|
|||
auth[key] = list(filter(None, vals))
|
||||
if not _is_sorted(vals):
|
||||
auth[key].sort()
|
||||
json.dump(auth, auth_json, sort_keys=True, indent=4)
|
||||
json.dump(auth, auth_json, sort_keys=True, indent=indent)
|
||||
auth_json.truncate()
|
||||
|
||||
return auth
|
||||
|
@ -410,59 +457,74 @@ def _is_sorted(iterable, reverse=False):
|
|||
in pairwise(iterable))
|
||||
|
||||
|
||||
def _filter_events(all_events, sieve=None):
|
||||
def _filter_events(all_events, sieve):
|
||||
if sieve is None:
|
||||
sieve = {'external': False,
|
||||
'number': -1,
|
||||
'dates': []}
|
||||
else:
|
||||
sieve = {
|
||||
'external': False,
|
||||
'number': -1,
|
||||
'dates': [],
|
||||
'fields': {}
|
||||
}
|
||||
|
||||
if sieve != {'external': False, 'number': -1, 'dates': [], 'fields': {}}:
|
||||
print_info('Filtering events', quiet=USBEvents.QUIET)
|
||||
|
||||
events_to_show = all_events
|
||||
|
||||
if sieve['fields']:
|
||||
events_to_show = []
|
||||
for key, vals in sieve['fields'].items():
|
||||
events_to_show += [event for event in all_events for val in vals if event[key] == val]
|
||||
|
||||
if sieve['external']:
|
||||
events_to_show = [event for event in all_events if event['disconn'] is not None]
|
||||
else:
|
||||
events_to_show = all_events
|
||||
|
||||
if sieve['dates']:
|
||||
events_to_show = [event for date in sieve['dates'] for event in events_to_show if event['conn'][:6] == date]
|
||||
|
||||
if not events_to_show:
|
||||
return None
|
||||
return []
|
||||
|
||||
SIZE = len(events_to_show)
|
||||
if sieve['number'] == -1 or sieve['number'] >= SIZE:
|
||||
if sieve['number'] > SIZE:
|
||||
print_warning('USB action history has only {} entries instead of requested {}, ' \
|
||||
'displaying all of them...'.format(SIZE, sieve['number']))
|
||||
print_warning(
|
||||
'USB action history has only {} entries instead of requested {}, '
|
||||
'displaying all of them...'
|
||||
.format(SIZE, sieve['number']),
|
||||
quiet=USBEvents.QUIET
|
||||
)
|
||||
|
||||
sieve['number'] = SIZE
|
||||
|
||||
return [events_to_show[SIZE-i] for i in range(sieve['number'], 0, -1)]
|
||||
|
||||
|
||||
def _represent_events(events_to_show, columns, table_data, title, repres=None):
|
||||
def _represent_events(events_to_show, columns, table_data, title, repres):
|
||||
print_info('Preparing gathered events', quiet=USBEvents.QUIET)
|
||||
|
||||
if repres is None:
|
||||
repres = {'table': False,
|
||||
'list': False,
|
||||
'smart': True}
|
||||
repres = {
|
||||
'table': False,
|
||||
'list': False,
|
||||
'smart': True
|
||||
}
|
||||
|
||||
max_len = {'conn': 15,
|
||||
'user': max(max(len(event['user']) for event in events_to_show), len('User')),
|
||||
'vid': 4,
|
||||
'pid': 4,
|
||||
'prod': max(max(len(str(event['prod'])) for event in events_to_show), len('Product')),
|
||||
'manufact': max(max(len(str(event['manufact'])) for event in events_to_show), len('Manufacturer')),
|
||||
'serial': max(max(len(str(event['serial'])) for event in events_to_show), len('Serial Number')),
|
||||
'port': max(max(len(event['port']) for event in events_to_show), len('Port')),
|
||||
'disconn': 15}
|
||||
max_len = {
|
||||
'conn': 15,
|
||||
'user': max(max(len(event['user']) for event in events_to_show), len('User')),
|
||||
'vid': 4,
|
||||
'pid': 4,
|
||||
'prod': max(max(len(str(event['prod'])) for event in events_to_show), len('Product')),
|
||||
'manufact': max(max(len(str(event['manufact'])) for event in events_to_show), len('Manufacturer')),
|
||||
'serial': max(max(len(str(event['serial'])) for event in events_to_show), len('Serial Number')),
|
||||
'port': max(max(len(event['port']) for event in events_to_show), len('Port')),
|
||||
'disconn': 15
|
||||
}
|
||||
|
||||
prev_cday = ''
|
||||
for event in events_to_show:
|
||||
if 'conn' in columns:
|
||||
try:
|
||||
prev_cday
|
||||
except NameError:
|
||||
prev_cday = ''
|
||||
curr_cday = event['conn'][:6]
|
||||
if prev_cday != curr_cday:
|
||||
cday = ['{} {}'.format(curr_cday, BULLET*8)] # 8 == len(event['conn'] - event['conn'][:6] - 1)
|
||||
|
@ -497,11 +559,13 @@ def _represent_events(events_to_show, columns, table_data, title, repres=None):
|
|||
# Display as list
|
||||
elif repres['smart'] and not event_table.ok or repres['list']:
|
||||
if not event_table.ok:
|
||||
print_warning('Terminal window is too small to display table properly')
|
||||
print_warning('Representation: List')
|
||||
print_warning('Terminal window is too small to display table properly', quiet=USBEvents.QUIET)
|
||||
print_warning('Representation: List', quiet=USBEvents.QUIET)
|
||||
else:
|
||||
print_info('Representation: List', quiet=USBEvents.QUIET)
|
||||
|
||||
max_len = max(len(str(val)) for event in events_to_show for val in event.values()) + \
|
||||
len('Serial Number: ') # max length string
|
||||
len('Serial Number: ') # max length string
|
||||
if not max_len // 2: max_len += 1
|
||||
date_sep_len = (max_len - 8) // 2
|
||||
|
||||
|
@ -554,24 +618,26 @@ def _build_single_table(TableClass, table_data, title, align='right', inner_row_
|
|||
return single_table
|
||||
|
||||
|
||||
def _json_dump(events_to_show, list_name, filename):
|
||||
def _dump_events(events_to_show, list_name, filename, indent):
|
||||
print_info('Generating {} list (JSON)'.format(list_name), quiet=USBEvents.QUIET)
|
||||
|
||||
out = OrderedDict()
|
||||
out = []
|
||||
for event in events_to_show:
|
||||
out[event['conn']] = OrderedDict()
|
||||
for key, val in sorted(event.items()):
|
||||
if key != 'conn':
|
||||
out[event['conn']][key] = val
|
||||
tmp_event_dict = OrderedDict()
|
||||
for key in ('conn', 'user', 'vid', 'pid', 'prod', 'manufact', 'serial', 'port', 'disconn'):
|
||||
tmp_event_dict[key] = event[key]
|
||||
out.append(tmp_event_dict)
|
||||
|
||||
try:
|
||||
with open(filename, 'w') as out_json:
|
||||
json.dump(out, out_json, indent=4)
|
||||
with open(filename, 'w', encoding='utf-8') as out_json:
|
||||
json.dump(out, out_json, indent=indent)
|
||||
except PermissionError as e:
|
||||
raise USBRipError('Permission denied: \'{}\''.format(filename),
|
||||
errors={'initial_error': str(e)})
|
||||
raise USBRipError(
|
||||
'Permission denied: \'{}\'. Retry with sudo'.format(filename),
|
||||
errors={'initial_error': str(e)}
|
||||
)
|
||||
|
||||
print_info('New {} list: \'{}\''.format(list_name, filename), quiet=USBEvents.QUIET)
|
||||
print_info('New {} list: \'{}\''.format(list_name, os.path.abspath(filename)), quiet=USBEvents.QUIET)
|
||||
|
||||
|
||||
def _output_choice(list_name, default_filename, dirname):
|
||||
|
@ -586,7 +652,8 @@ def _output_choice(list_name, default_filename, dirname):
|
|||
if number == '1':
|
||||
while True:
|
||||
filename = input('[>] Please enter the base name for the output file '
|
||||
'(default is \'{}\'): '.format(default_filename))
|
||||
'(default is \'{}\'): '
|
||||
.format(default_filename))
|
||||
|
||||
if all(c in printable for c in filename) and len(filename) < 256:
|
||||
if not filename:
|
||||
|
@ -603,10 +670,10 @@ def _output_choice(list_name, default_filename, dirname):
|
|||
print_critical(str(e), initial_error=e.errors['initial_error'])
|
||||
return (None, '')
|
||||
else:
|
||||
print_info('Created \'{}\''.format(dirname))
|
||||
print_info('Created \'{}\''.format(dirname), quiet=USBEvents.QUIET)
|
||||
|
||||
overwrite = True
|
||||
if os.path.exists(filename):
|
||||
if os.path.isfile(filename):
|
||||
while True:
|
||||
overwrite = input('[?] File exists. Would you like to overwrite it? [Y/n]: ')
|
||||
if len(overwrite) == 1 and overwrite in 'Yy':
|
||||
|
|
|
@ -57,7 +57,6 @@ from lib.core.common import time_it_if_debug
|
|||
|
||||
class USBIDs:
|
||||
|
||||
# If True -> supress banner, info messages and user iteraction
|
||||
QUIET = False
|
||||
|
||||
_INTERNET_CONNECTION_ERROR = -1
|
||||
|
@ -72,7 +71,7 @@ class USBIDs:
|
|||
@time_it_if_debug(DEBUG, time_it)
|
||||
def search_ids(vid, pid, *, offline=True):
|
||||
if offline:
|
||||
print_warning('Offline mode')
|
||||
print_warning('Offline mode', quiet=USBIDs.QUIET)
|
||||
|
||||
try:
|
||||
usb_ids = USBIDs.prepare_database(offline=offline)
|
||||
|
@ -93,7 +92,7 @@ class USBIDs:
|
|||
elif file_exists and not offline:
|
||||
usb_ids = _update_database(filename)
|
||||
elif not file_exists and not offline:
|
||||
print_warning('No local database found, trying to download')
|
||||
print_warning('No local database found, trying to download', quiet=USBIDs.QUIET)
|
||||
usb_ids = _download_database(filename)
|
||||
elif not file_exists and offline:
|
||||
raise USBRipError('No local database found')
|
||||
|
@ -119,15 +118,32 @@ def _update_database(filename):
|
|||
print('Date: {}'.format(curr_date))
|
||||
|
||||
print_info('Checking local database for update', quiet=USBIDs.QUIET)
|
||||
db, latest_ver, latest_date, error, e = _get_latest_version()
|
||||
db, latest_ver, latest_date, errcode, e = _get_latest_version()
|
||||
|
||||
if errcode:
|
||||
if errcode == USBIDs._INTERNET_CONNECTION_ERROR:
|
||||
print_warning(
|
||||
'No internet connection, using current version',
|
||||
errcode=errcode,
|
||||
quiet=USBIDs.QUIET
|
||||
)
|
||||
|
||||
elif errcode == USBIDs._SERVER_TIMEOUT_ERROR:
|
||||
print_warning(
|
||||
'Server timeout, using current version',
|
||||
errcode=errcode,
|
||||
initial_error=e,
|
||||
quiet=USBIDs.QUIET
|
||||
)
|
||||
|
||||
elif errcode == USBIDs._SERVER_CONTENT_ERROR:
|
||||
print_warning(
|
||||
'Server error, using current version',
|
||||
errcode=errcode,
|
||||
initial_error=e,
|
||||
quiet=USBIDs.QUIET
|
||||
)
|
||||
|
||||
if error:
|
||||
if error == USBIDs._INTERNET_CONNECTION_ERROR:
|
||||
print_warning('No internet connection, using current version', errcode=error)
|
||||
elif error == USBIDs._SERVER_TIMEOUT_ERROR:
|
||||
print_warning('Server timeout, using current version', errcode=error, initial_error=e)
|
||||
elif error == USBIDs._SERVER_CONTENT_ERROR:
|
||||
print_warning('Server error, using current version', errcode=error, initial_error=e)
|
||||
return usb_ids
|
||||
|
||||
if curr_ver != latest_ver and curr_date != latest_date: # if there's newer database version
|
||||
|
@ -163,18 +179,20 @@ def _download_database(filename):
|
|||
print_critical('Permission denied: \'{}\''.format(filename), initial_error=str(e))
|
||||
return None
|
||||
|
||||
db, latest_ver, latest_date, error, e = _get_latest_version()
|
||||
db, latest_ver, latest_date, errcode, e = _get_latest_version()
|
||||
|
||||
if error:
|
||||
if errcode:
|
||||
usb_ids.close()
|
||||
os.remove(filename)
|
||||
if error == USBIDs._INTERNET_CONNECTION_ERROR:
|
||||
raise USBRipError('No internet connection')
|
||||
elif error == USBIDs._SERVER_TIMEOUT_ERROR:
|
||||
raise USBRipError('Server timeout', errors={'errcode': error, 'initial_error': e})
|
||||
elif error == USBIDs._SERVER_CONTENT_ERROR:
|
||||
raise USBRipError('Server content error: no version or date found',
|
||||
errors={'errcode': error, 'initial_error': e})
|
||||
|
||||
if errcode == USBIDs._INTERNET_CONNECTION_ERROR:
|
||||
errmsg = 'No internet connection'
|
||||
elif errcode == USBIDs._SERVER_TIMEOUT_ERROR:
|
||||
errmsg = 'Server timeout'
|
||||
elif errcode == USBIDs._SERVER_CONTENT_ERROR:
|
||||
errmsg = 'Server content error: no version or date found'
|
||||
|
||||
raise USBRipError(errmsg, errors={'errcode': errcode, 'initial_error': e})
|
||||
|
||||
usb_ids.write(db)
|
||||
usb_ids.seek(0)
|
||||
|
@ -195,16 +213,18 @@ def _get_current_version(usb_ids):
|
|||
curr_ver = re.search(r'^# Version:\s*(.*?$)', db, re.MULTILINE).group(1)
|
||||
curr_date = re.search(r'^# Date:\s*(.*?$)', db, re.MULTILINE).group(1)
|
||||
except AttributeError as e:
|
||||
raise USBRipError('Invalid database content structure: no version or date found',
|
||||
errors={'initial_error': str(e)})
|
||||
raise USBRipError(
|
||||
'Invalid database content structure: no version or date found',
|
||||
errors={'initial_error': str(e)}
|
||||
)
|
||||
|
||||
return (curr_ver, curr_date)
|
||||
|
||||
|
||||
def _get_latest_version():
|
||||
connected, error, e = _check_connection('www.google.com')
|
||||
connected, errcode, e = _check_connection('www.google.com')
|
||||
if not connected:
|
||||
return (None, -1, -1, error, e)
|
||||
return (None, -1, -1, errcode, e)
|
||||
|
||||
print_info('Getting latest version and date', quiet=USBIDs.QUIET)
|
||||
|
||||
|
|
|
@ -0,0 +1,448 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
"""
|
||||
@file usbstorage.py
|
||||
@author Sam Freeside <snovvcrash@protonmail.com>
|
||||
@date 2018-05
|
||||
|
||||
@brief USB Storage handler.
|
||||
|
||||
@license
|
||||
Copyright (C) 2018 Sam Freeside
|
||||
|
||||
This file is part of usbrip.
|
||||
|
||||
usbrip is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
usbrip is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with usbrip. If not, see <http://www.gnu.org/licenses/>.
|
||||
@endlicense
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
from base64 import b64encode
|
||||
from datetime import datetime
|
||||
|
||||
from lib.core import USBEvents
|
||||
|
||||
from lib.core.usbevents import _filter_events
|
||||
from lib.core.usbevents import _dump_events
|
||||
from lib.core.usbevents import _process_auth_list
|
||||
from lib.core.common import MONTH_ENUM
|
||||
from lib.core.common import is_correct
|
||||
from lib.core.common import print_info
|
||||
from lib.core.common import print_warning
|
||||
from lib.core.common import print_critical
|
||||
from lib.core.common import print_secret
|
||||
from lib.core.common import USBRipError
|
||||
from lib.core.common import DEBUG
|
||||
from lib.core.common import time_it
|
||||
from lib.core.common import time_it_if_debug
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ---------------------- USB Storage -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
class USBStorage:
|
||||
|
||||
QUIET = False
|
||||
|
||||
_STORAGE_BASE = '/var/opt/usbrip/storage'
|
||||
|
||||
_7Z_WRONG_PASSWORD_ERROR = -1
|
||||
_7Z_PERMISSION_ERROR = -2
|
||||
_7Z_UNKNOWN_ERROR = -3
|
||||
|
||||
def __init__(self, *, quiet=False):
|
||||
if quiet:
|
||||
USBStorage.QUIET = quiet
|
||||
|
||||
# -------------------- USB Storage List --------------------
|
||||
|
||||
@staticmethod
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def list_storage(storage_type, password):
|
||||
storage_full_path = '{}/{}.7z'.format(USBStorage._STORAGE_BASE, storage_type)
|
||||
if not os.path.isfile(storage_full_path):
|
||||
print_critical('Storage not found: \'{}\''.format(storage_full_path))
|
||||
return
|
||||
|
||||
try:
|
||||
out = _7zip_list(storage_full_path, password)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), errcode=e.errors['errcode'], initial_error=e.errors['initial_error'])
|
||||
return
|
||||
|
||||
if '--' in out:
|
||||
print(out[out.index('--'):] + '--')
|
||||
else:
|
||||
print_critical('Undefined behaviour while listing storage contents', initial_error=out)
|
||||
|
||||
# -------------------- USB Storage Open --------------------
|
||||
|
||||
@staticmethod
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def open_storage(storage_type, password, columns, *, sieve=None, repres=None):
|
||||
storage_full_path = '{}/{}.7z'.format(USBStorage._STORAGE_BASE, storage_type)
|
||||
if not os.path.isfile(storage_full_path):
|
||||
print_critical('Storage not found: \'{}\''.format(storage_full_path))
|
||||
return
|
||||
|
||||
try:
|
||||
out = _7zip_unpack(storage_full_path, password)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), errcode=e.errors['errcode'], initial_error=e.errors['initial_error'])
|
||||
return
|
||||
|
||||
if 'Everything is Ok' in out:
|
||||
base_filename = re.search(r'Extracting\s*(.*?$)', out, re.MULTILINE).group(1)
|
||||
json_file = '{}/{}'.format(USBStorage._STORAGE_BASE, base_filename)
|
||||
USBEvents.QUIET = USBStorage.QUIET
|
||||
USBEvents.open_dump(json_file, columns, sieve=sieve, repres=repres)
|
||||
os.remove(json_file)
|
||||
else:
|
||||
print_critical('Undefined behaviour while unpacking storage', initial_error=out)
|
||||
|
||||
# ------------------- USB Storage Update -------------------
|
||||
|
||||
@staticmethod
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def update_storage(
|
||||
storage_type,
|
||||
password=None,
|
||||
*,
|
||||
input_auth=None,
|
||||
attributes=None,
|
||||
compression_level='5',
|
||||
indent=4,
|
||||
sieve=None
|
||||
):
|
||||
if storage_type == 'history':
|
||||
events_to_show = _get_history_events(sieve)
|
||||
elif storage_type == 'violations':
|
||||
try:
|
||||
events_to_show = _get_violation_events(sieve, input_auth, attributes, indent)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), initial_error=e.errors['initial_error'])
|
||||
return 1
|
||||
|
||||
if events_to_show is None:
|
||||
return 1
|
||||
|
||||
if events_to_show:
|
||||
min_date, max_date = _get_dates(events_to_show)
|
||||
else:
|
||||
print_info('No events to append', quiet=USBStorage.QUIET)
|
||||
return 1
|
||||
|
||||
storage_full_path = '{}/{}.7z'.format(USBStorage._STORAGE_BASE, storage_type)
|
||||
if not os.path.isfile(storage_full_path):
|
||||
print_critical('Storage not found: \'{}\''.format(storage_full_path))
|
||||
return 1
|
||||
|
||||
print_info('Updating storage: \'{}\''.format(storage_full_path), quiet=USBStorage.QUIET)
|
||||
|
||||
try:
|
||||
out = _7zip_unpack(storage_full_path, password)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), errcode=e.errors['errcode'], initial_error=e.errors['initial_error'])
|
||||
return 1
|
||||
|
||||
if 'Everything is Ok' in out:
|
||||
os.remove(storage_full_path)
|
||||
base_filename = re.search(r'Extracting\s*(.*?$)', out, re.MULTILINE).group(1)
|
||||
json_file = '{}/{}'.format(USBStorage._STORAGE_BASE, base_filename)
|
||||
|
||||
with open(json_file, 'r', encoding='utf-8') as dump:
|
||||
events_dumped = json.load(dump)
|
||||
os.remove(json_file)
|
||||
|
||||
merged_events = _merge_json_events(events_dumped, events_to_show)
|
||||
|
||||
if len(base_filename) > 9: # len('mmdd.json') == 9
|
||||
min_date = base_filename[:4]
|
||||
|
||||
new_json_file = '{}/{}-{}.json'.format(USBStorage._STORAGE_BASE, min_date, max_date)
|
||||
_dump_events(merged_events, storage_type, new_json_file, indent)
|
||||
|
||||
try:
|
||||
out = _7zip_pack(storage_full_path, new_json_file, password, compression_level)
|
||||
except USBRipError as e:
|
||||
os.remove(new_json_file)
|
||||
print_critical(str(e), errcode=e.errors['errcode'], initial_error=e.errors['initial_error'])
|
||||
return 1
|
||||
|
||||
if 'Everything is Ok' in out:
|
||||
print_info('Storage was successfully updated', quiet=USBStorage.QUIET)
|
||||
else:
|
||||
print_critical('Undefined behaviour while creating storage', initial_error=out)
|
||||
|
||||
os.remove(new_json_file)
|
||||
|
||||
else:
|
||||
print_critical('Undefined behaviour while unpacking storage', initial_error=out)
|
||||
|
||||
# ------------------- USB Storage Create -------------------
|
||||
|
||||
@staticmethod
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def create_storage(
|
||||
storage_type,
|
||||
*,
|
||||
password=None,
|
||||
input_auth=None,
|
||||
attributes=None,
|
||||
compression_level='5',
|
||||
indent=4,
|
||||
sieve=None
|
||||
):
|
||||
if storage_type == 'history':
|
||||
events_to_show = _get_history_events(sieve)
|
||||
elif storage_type == 'violations':
|
||||
try:
|
||||
events_to_show = _get_violation_events(sieve, input_auth, attributes, indent)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), initial_error=e.errors['initial_error'])
|
||||
return 1
|
||||
|
||||
if events_to_show is None:
|
||||
return 1
|
||||
|
||||
if events_to_show:
|
||||
min_date, max_date = _get_dates(events_to_show)
|
||||
json_file = '{}/{}-{}.json'.format(USBStorage._STORAGE_BASE, min_date, max_date)
|
||||
else:
|
||||
json_file = '{}/{}.json'.format(USBStorage._STORAGE_BASE, datetime.now().strftime('%m%d'))
|
||||
|
||||
try:
|
||||
_dump_events(events_to_show, storage_type, json_file, indent)
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), initial_error=e.errors['initial_error'])
|
||||
return 1
|
||||
|
||||
if password is None:
|
||||
print_warning('No password provided, generating random one', quiet=USBEvents.QUIET)
|
||||
password = _gen_random_password(12)
|
||||
|
||||
storage_full_path = '{}/{}.7z'.format(USBStorage._STORAGE_BASE, storage_type)
|
||||
if os.path.exists(storage_full_path):
|
||||
os.remove(storage_full_path)
|
||||
|
||||
try:
|
||||
out = _7zip_pack(storage_full_path, json_file, password, compression_level)
|
||||
except USBRipError as e:
|
||||
os.remove(json_file)
|
||||
print_critical(str(e), errcode=e.errors['errcode'], initial_error=e.errors['initial_error'])
|
||||
return 1
|
||||
|
||||
if 'Everything is Ok' in out:
|
||||
print_info('New {} storage: \'{}\''.format(storage_type, storage_full_path), quiet=USBStorage.QUIET)
|
||||
print_secret('Your password is', secret=password)
|
||||
os.remove(json_file)
|
||||
else:
|
||||
print_critical('Undefined behaviour while creating storage', initial_error=out)
|
||||
|
||||
# ------------------- USB Storage Passwd -------------------
|
||||
|
||||
@staticmethod
|
||||
@time_it_if_debug(DEBUG, time_it)
|
||||
def change_password(storage_type, old_password, new_password, *, compression_level='5'):
|
||||
storage_full_path = '{}/{}.7z'.format(USBStorage._STORAGE_BASE, storage_type)
|
||||
if not os.path.isfile(storage_full_path):
|
||||
print_critical('Storage not found: \'{}\''.format(storage_full_path))
|
||||
return
|
||||
|
||||
try:
|
||||
out = _7zip_unpack(storage_full_path, old_password)
|
||||
if 'Everything is Ok' in out:
|
||||
os.remove(storage_full_path)
|
||||
|
||||
base_filename = re.search(r'Extracting\s*(.*?$)', out, re.MULTILINE).group(1)
|
||||
json_file = '{}/{}'.format(USBStorage._STORAGE_BASE, base_filename)
|
||||
|
||||
out = _7zip_pack(storage_full_path, json_file, new_password, compression_level)
|
||||
if 'Everything is Ok' in out:
|
||||
print_info('Password was successfully changed', quiet=USBStorage.QUIET)
|
||||
else:
|
||||
print_critical('Undefined behaviour while creating storage', initial_error=out)
|
||||
|
||||
os.remove(json_file)
|
||||
|
||||
else:
|
||||
print_critical('Undefined behaviour while unpacking storage', initial_error=out)
|
||||
|
||||
except USBRipError as e:
|
||||
print_critical(str(e), errcode=e.errors['errcode'], initial_error=e.errors['initial_error'])
|
||||
return
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ----------------------- Utilities ------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def _gen_random_password(length):
|
||||
while True:
|
||||
b64 = b64encode(os.urandom(length)).decode('utf-8')
|
||||
password = re.sub(r'[+=/]', '', b64)[:length]
|
||||
if is_correct(password):
|
||||
return password
|
||||
|
||||
|
||||
def _get_history_events(sieve):
|
||||
ue = USBEvents(quiet=USBStorage.QUIET)
|
||||
if not ue:
|
||||
return None
|
||||
|
||||
return _filter_events(ue._all_events, sieve)
|
||||
|
||||
|
||||
def _get_violation_events(sieve, input_auth, attributes, indent):
|
||||
try:
|
||||
auth = _process_auth_list(input_auth, indent)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
raise USBRipError(
|
||||
'Failed to decode authorized device list (JSON)',
|
||||
errors={'initial_error': str(e)}
|
||||
)
|
||||
|
||||
if not attributes:
|
||||
attributes = auth.keys()
|
||||
|
||||
ue = USBEvents(quiet=USBStorage.QUIET)
|
||||
if not ue:
|
||||
return None
|
||||
|
||||
for event in ue._all_events:
|
||||
try:
|
||||
if any(event[key] not in vals and
|
||||
event[key] is not None
|
||||
for key, vals in zip(attributes, auth.values())):
|
||||
ue._violations.append(event)
|
||||
except KeyError as e:
|
||||
raise USBRipError(
|
||||
'No such attribute in authorized device list',
|
||||
errors={'initial_error': str(e)}
|
||||
)
|
||||
|
||||
return _filter_events(ue._violations, sieve)
|
||||
|
||||
|
||||
def _get_dates(events_to_show):
|
||||
dates = {event['conn'][:6] for event in events_to_show}
|
||||
min_date = min(dates, key=lambda i: MONTH_ENUM[i[:3]] + i[3:]).split()
|
||||
min_date = MONTH_ENUM[min_date[0]].zfill(2) + min_date[-1].zfill(2)
|
||||
max_date = max(dates, key=lambda i: MONTH_ENUM[i[:3]] + i[3:]).split()
|
||||
max_date = MONTH_ENUM[max_date[0]].zfill(2) + max_date[-1].zfill(2)
|
||||
|
||||
return (min_date, max_date)
|
||||
|
||||
|
||||
'''
|
||||
def _create_shadow(password, rounds):
|
||||
from bcrypt import hashpw, gensalt
|
||||
hashed = hashpw(password.encode('utf-8'), gensalt(rounds))
|
||||
with open('/var/opt/usbrip/shadow', 'wb') as f:
|
||||
f.write(hashed)
|
||||
'''
|
||||
|
||||
|
||||
def _merge_json_events(events_dumped, events_to_show):
|
||||
events_dumped_set = {json.dumps(event) for event in events_dumped}
|
||||
events_union_set = events_dumped_set.union([json.dumps(event) for event in events_to_show])
|
||||
events_union = [json.loads(event) for event in events_union_set]
|
||||
events_union_sorted = sorted(events_union, key=lambda i: MONTH_ENUM[i['conn'][:3]] + i['conn'][3:])
|
||||
|
||||
return events_union_sorted
|
||||
|
||||
|
||||
def _7zip_list(archive, password):
|
||||
print_info('Listing archive: \'{}\''.format(archive), quiet=USBStorage.QUIET)
|
||||
|
||||
cmd = [
|
||||
'7z',
|
||||
'l',
|
||||
archive,
|
||||
'-p' + password
|
||||
]
|
||||
|
||||
out, errcode, errmsg, e = _7zip_subprocess_handler(cmd)
|
||||
if errcode:
|
||||
raise USBRipError(errmsg, errors={'errcode': errcode, 'initial_error': e})
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _7zip_unpack(archive, password):
|
||||
print_info('Unpacking archive: \'{}\''.format(archive), quiet=USBStorage.QUIET)
|
||||
|
||||
cmd = [
|
||||
'7z',
|
||||
'e',
|
||||
archive,
|
||||
'-p' + password,
|
||||
'-o' + USBStorage._STORAGE_BASE,
|
||||
'-y'
|
||||
]
|
||||
|
||||
out, errcode, errmsg, e = _7zip_subprocess_handler(cmd)
|
||||
if errcode:
|
||||
raise USBRipError(errmsg, errors={'errcode': errcode, 'initial_error': e})
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _7zip_pack(archive, file, password, compression_level):
|
||||
print_info('Creating storage (7-Zip): \'{}\''.format(archive), quiet=USBStorage.QUIET)
|
||||
|
||||
cmd = [
|
||||
'7z',
|
||||
'a',
|
||||
archive,
|
||||
file,
|
||||
'-mhe=on',
|
||||
'-p' + password,
|
||||
'-mx=' + compression_level
|
||||
]
|
||||
|
||||
out, errcode, errmsg, e = _7zip_subprocess_handler(cmd)
|
||||
if errcode:
|
||||
raise USBRipError(errmsg, errors={'errcode': errcode, 'initial_error': e})
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _7zip_subprocess_handler(cmd):
|
||||
try:
|
||||
out = subprocess.check_output(cmd).decode('utf-8')
|
||||
except subprocess.CalledProcessError as e:
|
||||
initial_error = e.output.decode('utf-8')
|
||||
|
||||
if 'Wrong password?' in initial_error:
|
||||
errmsg = 'Can not open encrypted archive. Wrong password?'
|
||||
errcode = USBStorage._7Z_WRONG_PASSWORD_ERROR
|
||||
elif 'can not open output file' in initial_error:
|
||||
errmsg = 'Permission denied. Retry with sudo'
|
||||
errcode = USBStorage._7Z_PERMISSION_ERROR
|
||||
else:
|
||||
errmsg = 'Something went wrong while working with 7-Zip archive'
|
||||
errcode = USBStorage._7Z_UNKNOWN_ERROR
|
||||
|
||||
return ('', errcode, errmsg, initial_error)
|
||||
|
||||
return (out, 0, '', '')
|
|
@ -0,0 +1,598 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: UTF-8 -*-
|
||||
|
||||
"""
|
||||
@file cliopts.py
|
||||
@author Sam Freeside <snovvcrash@protonmail.com>
|
||||
@date 2018-03
|
||||
|
||||
@brief Command line option parser.
|
||||
|
||||
@license
|
||||
Copyright (C) 2018 Sam Freeside
|
||||
|
||||
This file is part of usbrip.
|
||||
|
||||
usbrip is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
usbrip is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with usbrip. If not, see <http://www.gnu.org/licenses/>.
|
||||
@endlicense
|
||||
"""
|
||||
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from lib.core import USBStorage
|
||||
|
||||
from lib.core.common import root_dir_join
|
||||
|
||||
|
||||
def cmd_line_options():
|
||||
parser = ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest='subparser')
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------- Banner -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
build_ub_parser(subparsers)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ----------------------- USB Events -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
build_ue_parser(subparsers)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ---------------------- USB Storage -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
build_us_parser(subparsers)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------ USB IDs -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
build_ui_parser(subparsers)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------- Banner -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def build_ub_parser(subparsers):
|
||||
subparsers.add_parser(
|
||||
'banner',
|
||||
help='show tool banner'
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ----------------------- USB Events -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def build_ue_parser(subparsers):
|
||||
ue_parser = subparsers.add_parser(
|
||||
'events',
|
||||
help='work with USB events'
|
||||
)
|
||||
|
||||
ue_subparsers = ue_parser.add_subparsers(dest='ue_subparser')
|
||||
|
||||
build_ueh_parser(ue_subparsers)
|
||||
build_ueo_parser(ue_subparsers)
|
||||
build_ueg_parser(ue_subparsers)
|
||||
build_uev_parser(ue_subparsers)
|
||||
|
||||
|
||||
# ------------------- USB Events History -------------------
|
||||
|
||||
|
||||
def build_ueh_parser(subparsers):
|
||||
ueh_parser = subparsers.add_parser(
|
||||
'history',
|
||||
help='show USB event history'
|
||||
)
|
||||
|
||||
_parse_quiet_args(ueh_parser)
|
||||
_parse_column_args(ueh_parser)
|
||||
_parse_sieve_args(ueh_parser)
|
||||
_parse_repres_args(ueh_parser)
|
||||
_parse_file_args(ueh_parser)
|
||||
|
||||
|
||||
# -------------------- USB Events Open ---------------------
|
||||
|
||||
|
||||
def build_ueo_parser(subparsers):
|
||||
ueo_parser = subparsers.add_parser(
|
||||
'open',
|
||||
help='open USB event dump'
|
||||
)
|
||||
|
||||
_parse_quiet_args(ueo_parser)
|
||||
_parse_column_args(ueo_parser)
|
||||
_parse_sieve_args(ueo_parser)
|
||||
_parse_repres_args(ueo_parser)
|
||||
_parse_file_args(ueo_parser)
|
||||
|
||||
ueo_parser.add_argument(
|
||||
'input',
|
||||
type=str,
|
||||
help='input path for the event dump (JSON)'
|
||||
)
|
||||
|
||||
|
||||
# ------------------ USB Events Gen Auth -------------------
|
||||
|
||||
|
||||
def build_ueg_parser(subparsers):
|
||||
ueg_parser = subparsers.add_parser(
|
||||
'gen_auth',
|
||||
help='generate authorized device list (JSON)'
|
||||
)
|
||||
|
||||
_parse_quiet_args(ueg_parser)
|
||||
_parse_sieve_args(ueg_parser)
|
||||
_parse_file_args(ueg_parser)
|
||||
_parse_attribute_args(
|
||||
ueg_parser,
|
||||
help_msg='attributes to include in authorized device list '
|
||||
'(options: \'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\'.)'
|
||||
)
|
||||
|
||||
ueg_parser.add_argument(
|
||||
'output',
|
||||
type=str,
|
||||
help='output path for the list of authorized devices (JSON)'
|
||||
)
|
||||
|
||||
|
||||
# ----------------- USB Events Violations ------------------
|
||||
|
||||
|
||||
def build_uev_parser(subparsers):
|
||||
uev_parser = subparsers.add_parser(
|
||||
'violations',
|
||||
help='search USB event history for violations '
|
||||
'(show USB devices that do appear in hist'
|
||||
'ory and do NOT appear in authorized devi'
|
||||
'ce list (JSON))'
|
||||
)
|
||||
|
||||
_parse_quiet_args(uev_parser)
|
||||
_parse_column_args(uev_parser)
|
||||
_parse_sieve_args(uev_parser)
|
||||
_parse_repres_args(uev_parser)
|
||||
_parse_file_args(uev_parser)
|
||||
_parse_attribute_args(
|
||||
uev_parser,
|
||||
help_msg='attributes to look through when searching for USB violation events '
|
||||
'(options: \'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\'.)'
|
||||
)
|
||||
|
||||
uev_parser.add_argument(
|
||||
'input',
|
||||
type=str,
|
||||
help='input path for the list of authorized devices (JSON)'
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ---------------------- USB Storage -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def build_us_parser(subparsers):
|
||||
us_parser = subparsers.add_parser(
|
||||
'storage',
|
||||
help='work with USB event storage'
|
||||
)
|
||||
|
||||
us_subparsers = us_parser.add_subparsers(dest='us_subparser')
|
||||
|
||||
build_usl_parser(us_subparsers)
|
||||
build_uso_parser(us_subparsers)
|
||||
build_usu_parser(us_subparsers)
|
||||
build_usc_parser(us_subparsers)
|
||||
build_usp_parser(us_subparsers)
|
||||
|
||||
|
||||
# -------------------- USB Storage List --------------------
|
||||
|
||||
|
||||
def build_usl_parser(subparsers):
|
||||
usl_parser = subparsers.add_parser(
|
||||
'list',
|
||||
help='list storage contents'
|
||||
)
|
||||
|
||||
_parse_quiet_args(usl_parser)
|
||||
_parse_storage_type_args(usl_parser)
|
||||
_parse_password_args(usl_parser, required=True)
|
||||
|
||||
|
||||
# -------------------- USB Storage Open --------------------
|
||||
|
||||
|
||||
def build_uso_parser(subparsers):
|
||||
uso_parser = subparsers.add_parser(
|
||||
'open',
|
||||
help='open storage contents'
|
||||
)
|
||||
|
||||
_parse_quiet_args(uso_parser)
|
||||
_parse_storage_type_args(uso_parser)
|
||||
_parse_password_args(uso_parser, required=True)
|
||||
_parse_column_args(uso_parser)
|
||||
_parse_sieve_args(uso_parser)
|
||||
_parse_repres_args(uso_parser)
|
||||
|
||||
|
||||
# ------------------- USB Storage Update -------------------
|
||||
|
||||
|
||||
def build_usu_parser(subparsers):
|
||||
usu_parser = subparsers.add_parser(
|
||||
'update',
|
||||
help='update current storage'
|
||||
)
|
||||
|
||||
_parse_quiet_args(usu_parser)
|
||||
_parse_storage_type_args(usu_parser)
|
||||
_parse_password_args(usu_parser, required=True)
|
||||
_parse_comperssion_level_args(usu_parser)
|
||||
_parse_sieve_args(usu_parser)
|
||||
_parse_attribute_args(
|
||||
usu_parser,
|
||||
help_msg='attributes to look through when searching for USB violation events '
|
||||
'(options: \'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\'.)'
|
||||
)
|
||||
|
||||
usu_parser.add_argument(
|
||||
'-i',
|
||||
'--input',
|
||||
type=str,
|
||||
default=None,
|
||||
help='input path for the list of authorized devices (JSON)'
|
||||
)
|
||||
|
||||
|
||||
# ------------------- USB Storage Create -------------------
|
||||
|
||||
|
||||
def build_usc_parser(subparsers):
|
||||
usc_parser = subparsers.add_parser(
|
||||
'create',
|
||||
help='create initial history/violations storage; '
|
||||
'storage path is \'{}\''
|
||||
.format(USBStorage._STORAGE_BASE)
|
||||
)
|
||||
|
||||
_parse_quiet_args(usc_parser)
|
||||
_parse_storage_type_args(usc_parser)
|
||||
_parse_password_args(usc_parser, required=False)
|
||||
_parse_comperssion_level_args(usc_parser)
|
||||
_parse_sieve_args(usc_parser)
|
||||
_parse_attribute_args(
|
||||
usc_parser,
|
||||
help_msg='attributes to look through when searching for USB violation events '
|
||||
'(options: \'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\'.)'
|
||||
)
|
||||
|
||||
usc_parser.add_argument(
|
||||
'-i',
|
||||
'--input',
|
||||
type=str,
|
||||
default=None,
|
||||
help='input path for the list of authorized devices (JSON)'
|
||||
)
|
||||
|
||||
|
||||
# ------------------- USB Storage Passwd -------------------
|
||||
|
||||
|
||||
def build_usp_parser(subparsers):
|
||||
usp_parser = subparsers.add_parser(
|
||||
'passwd',
|
||||
help='change storage password'
|
||||
)
|
||||
|
||||
_parse_quiet_args(usp_parser)
|
||||
_parse_storage_type_args(usp_parser)
|
||||
_parse_old_new_passwords_args(usp_parser)
|
||||
_parse_comperssion_level_args(usp_parser)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------ USB IDs -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def build_ui_parser(subparsers):
|
||||
ui_parser = subparsers.add_parser(
|
||||
'ids',
|
||||
help='work with USB IDs'
|
||||
)
|
||||
|
||||
ui_subparsers = ui_parser.add_subparsers(dest='ui_subparser')
|
||||
|
||||
build_uis_parser(ui_subparsers)
|
||||
build_uid_parser(ui_subparsers)
|
||||
|
||||
|
||||
# --------------------- USB IDs Search ---------------------
|
||||
|
||||
|
||||
def build_uis_parser(subparsers):
|
||||
uis_parser = subparsers.add_parser(
|
||||
'search',
|
||||
help='search by VID and/or PID; '
|
||||
'ids database path is \'{}\''
|
||||
.format(root_dir_join('usb_ids/usb.ids'))
|
||||
)
|
||||
|
||||
_parse_quiet_args(uis_parser)
|
||||
|
||||
uis_parser.add_argument(
|
||||
'--vid',
|
||||
type=str,
|
||||
default=None,
|
||||
help='vendor ID'
|
||||
)
|
||||
|
||||
uis_parser.add_argument(
|
||||
'--pid',
|
||||
type=str,
|
||||
default=None,
|
||||
help='product ID'
|
||||
)
|
||||
|
||||
uis_parser.add_argument(
|
||||
'--offline',
|
||||
action='store_true',
|
||||
help='offline mode (no database download/update)'
|
||||
)
|
||||
|
||||
|
||||
# -------------------- USB IDs Download --------------------
|
||||
|
||||
|
||||
def build_uid_parser(subparsers):
|
||||
uid_parser = subparsers.add_parser(
|
||||
'download',
|
||||
help='download/update database;'
|
||||
'ids database path is \'{}\''
|
||||
.format(root_dir_join('usb_ids/usb.ids'))
|
||||
)
|
||||
|
||||
_parse_quiet_args(uid_parser)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ----------------------- Utilities ------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def _parse_quiet_args(parser):
|
||||
parser.add_argument(
|
||||
'-q',
|
||||
'--quiet',
|
||||
action='store_true',
|
||||
help='supress banner, some info messages, '
|
||||
'time capture and user iteraction'
|
||||
)
|
||||
|
||||
|
||||
def _parse_column_args(parser):
|
||||
parser.add_argument(
|
||||
'-c',
|
||||
'--column',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='columns to show (options: \'conn\', '
|
||||
'\'user\', '
|
||||
'\'vid\', '
|
||||
'\'pid\', '
|
||||
'\'prod\', '
|
||||
'\'manufact\', '
|
||||
'\'serial\', '
|
||||
'\'port\', '
|
||||
'\'disconn\'.)'
|
||||
)
|
||||
|
||||
|
||||
def _parse_sieve_args(parser):
|
||||
parser.add_argument(
|
||||
'-e',
|
||||
'--external',
|
||||
action='store_true',
|
||||
help='show only those devices which have \'disconnect\' date'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-n',
|
||||
'--number',
|
||||
type=int,
|
||||
default=-1,
|
||||
help='number of events to show'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--date',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='filter by dates'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--user',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='search by users'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--vid',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='search by VIDs'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--pid',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='search by PIDs'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--prod',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='search by products'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--manufact',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='search by manufacturers'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--serial',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='search by serial numbers'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='search by ports'
|
||||
)
|
||||
|
||||
|
||||
def _parse_repres_args(parser):
|
||||
group_table_list = parser.add_mutually_exclusive_group()
|
||||
|
||||
group_table_list.add_argument(
|
||||
'-t',
|
||||
'--table',
|
||||
action='store_true',
|
||||
help='represent as table (not list)'
|
||||
)
|
||||
|
||||
group_table_list.add_argument(
|
||||
'-l',
|
||||
'--list',
|
||||
action='store_true',
|
||||
help='represent as list (not table)'
|
||||
)
|
||||
|
||||
|
||||
def _parse_attribute_args(parser, *, help_msg):
|
||||
parser.add_argument(
|
||||
'-a',
|
||||
'--attribute',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help=help_msg
|
||||
)
|
||||
|
||||
|
||||
def _parse_storage_type_args(parser):
|
||||
parser.add_argument(
|
||||
'storage_type',
|
||||
type=str,
|
||||
help='storage type (options: \'history\', \'violations\')'
|
||||
)
|
||||
|
||||
|
||||
def _parse_password_args(parser, *, required):
|
||||
parser.add_argument(
|
||||
'-p',
|
||||
'--password',
|
||||
type=str,
|
||||
required=required,
|
||||
help='storage password'
|
||||
)
|
||||
|
||||
|
||||
def _parse_old_new_passwords_args(parser):
|
||||
parser.add_argument(
|
||||
'-o',
|
||||
'--old',
|
||||
type=str,
|
||||
required=True,
|
||||
help='old storage password'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-n',
|
||||
'--new',
|
||||
type=str,
|
||||
required=True,
|
||||
help='new storage password'
|
||||
)
|
||||
|
||||
|
||||
def _parse_comperssion_level_args(parser):
|
||||
parser.add_argument(
|
||||
'--lvl',
|
||||
type=str,
|
||||
default='5',
|
||||
help='compression level (from 0 to 9, default is 0 = no compression)'
|
||||
)
|
||||
|
||||
|
||||
def _parse_file_args(parser):
|
||||
parser.add_argument(
|
||||
'-f',
|
||||
'--file',
|
||||
nargs='+',
|
||||
type=str,
|
||||
default=[],
|
||||
help='obtain log from the outer files'
|
||||
)
|
|
@ -34,17 +34,16 @@ import sys
|
|||
|
||||
import lib.utils.timing as timing
|
||||
|
||||
from string import printable
|
||||
|
||||
from lib.core import USBEvents
|
||||
from lib.core import USBStorage
|
||||
from lib.core import USBIDs
|
||||
|
||||
from lib.core.cliopts import cmd_line_options
|
||||
from lib.parse.cliopts import cmd_line_options
|
||||
from lib.core.common import BANNER
|
||||
from lib.core.common import COLUMN_NAMES
|
||||
from lib.core.common import USBRipError
|
||||
from lib.core.common import is_correct
|
||||
from lib.core.common import print_critical
|
||||
from lib.core.common import USBRipError
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
|
@ -55,7 +54,7 @@ from lib.core.common import print_critical
|
|||
def main():
|
||||
if not len(sys.argv) > 1:
|
||||
print(BANNER + '\n')
|
||||
usbrip_error('No arguments were passed')
|
||||
usbrip_arg_error()
|
||||
|
||||
parser = cmd_line_options()
|
||||
args = parser.parse_args()
|
||||
|
@ -75,50 +74,99 @@ def main():
|
|||
# ----------------------------------------------------------
|
||||
|
||||
elif args.subparser == 'events' and args.ue_subparser:
|
||||
timing.begin(quiet=args.quiet)
|
||||
|
||||
sieve, repres = validate_ue_args(args)
|
||||
ue = USBEvents(args.file, quiet=args.quiet)
|
||||
|
||||
# ------------------- USB Events History -------------------
|
||||
|
||||
if args.ue_subparser == 'history':
|
||||
ue.event_history(args.column, sieve=sieve, repres=repres)
|
||||
timing.begin(quiet=args.quiet)
|
||||
ueh = USBEvents(args.file, quiet=args.quiet)
|
||||
if ueh:
|
||||
ueh.event_history(args.column, sieve=sieve, repres=repres)
|
||||
|
||||
# -------------------- USB Events Open ---------------------
|
||||
|
||||
elif args.ue_subparser == 'open':
|
||||
timing.begin(quiet=args.quiet)
|
||||
USBEvents.QUIET = args.quiet
|
||||
USBEvents.open_dump(args.input, args.column, sieve=sieve, repres=repres)
|
||||
|
||||
# ------------------ USB Events Gen Auth -------------------
|
||||
|
||||
elif args.ue_subparser == 'gen_auth':
|
||||
ue.generate_auth_json(args.output, args.attribute, sieve=sieve)
|
||||
timing.begin(quiet=args.quiet)
|
||||
ueg = USBEvents(args.file, quiet=args.quiet)
|
||||
if ueg:
|
||||
if ueg.generate_auth_json(args.output, args.attribute, sieve=sieve):
|
||||
usbrip_internal_error()
|
||||
else:
|
||||
usbrip_internal_error()
|
||||
|
||||
# ----------------- USB Events Violations ------------------
|
||||
|
||||
elif args.ue_subparser == 'violations':
|
||||
ue.search_violations(args.input, args.attribute, args.column, sieve=sieve, repres=repres)
|
||||
timing.begin(quiet=args.quiet)
|
||||
uev = USBEvents(args.file, quiet=args.quiet)
|
||||
if uev:
|
||||
uev.search_violations(args.input, args.attribute, args.column, sieve=sieve, repres=repres)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ---------------------- USB Storage -----------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
'''elif args.subparser == 'storage' and args.us_subparser:
|
||||
elif args.subparser == 'storage' and args.us_subparser:
|
||||
sieve, repres = validate_us_args(args)
|
||||
timing.begin(quiet=args.quiet)
|
||||
|
||||
validate_us_args(args)
|
||||
us = USBStorage(quiet=args.quiet)
|
||||
|
||||
if args.us_subparser == 'create':
|
||||
us.create_storage(args.storage_type,
|
||||
passwd=args.password,
|
||||
input_auth=args.input,
|
||||
attributes=args.attribute)'''
|
||||
# -------------------- USB Storage List --------------------
|
||||
|
||||
if args.us_subparser == 'list':
|
||||
us.list_storage(args.storage_type, args.password)
|
||||
|
||||
# -------------------- USB Storage Open --------------------
|
||||
|
||||
elif args.us_subparser == 'open':
|
||||
us.open_storage(args.storage_type, args.password, args.column, sieve=sieve, repres=repres)
|
||||
|
||||
# ------------------- USB Storage Update -------------------
|
||||
|
||||
elif args.us_subparser == 'update':
|
||||
if us.update_storage(
|
||||
args.storage_type,
|
||||
args.password,
|
||||
input_auth=args.input,
|
||||
attributes=args.attribute,
|
||||
compression_level=args.lvl,
|
||||
sieve=sieve
|
||||
):
|
||||
usbrip_internal_error()
|
||||
|
||||
# ------------------- USB Storage Create -------------------
|
||||
|
||||
elif args.us_subparser == 'create':
|
||||
if us.create_storage(
|
||||
args.storage_type,
|
||||
password=args.password,
|
||||
input_auth=args.input,
|
||||
attributes=args.attribute,
|
||||
compression_level=args.lvl,
|
||||
sieve=sieve
|
||||
):
|
||||
usbrip_internal_error()
|
||||
|
||||
# ------------------- USB Storage Passwd -------------------
|
||||
|
||||
elif args.us_subparser == 'passwd':
|
||||
us.change_password(args.storage_type, args.old, args.new, compression_level=args.lvl)
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------ USB IDs -------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
elif args.subparser == 'ids' and args.ui_subparser:
|
||||
timing.begin(quiet=args.quiet)
|
||||
|
||||
validate_ui_args(args)
|
||||
timing.begin(quiet=args.quiet)
|
||||
ui = USBIDs(quiet=args.quiet)
|
||||
|
||||
# --------------------- USB IDs Search ---------------------
|
||||
|
@ -138,93 +186,192 @@ def main():
|
|||
|
||||
else:
|
||||
subparser = ' ' + args.subparser + ' '
|
||||
usbrip_error('Choose one of the usbrip {} actions'.format(args.subparser), subparser=subparser)
|
||||
usbrip_arg_error('Choose one of the usbrip {} actions'.format(args.subparser), subparser=subparser)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------ Argument validation -------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
# ----------------------- USB Events -----------------------
|
||||
|
||||
|
||||
def validate_ue_args(args):
|
||||
if 'column' in args and args.column:
|
||||
for column in args.column:
|
||||
if column not in COLUMN_NAMES.keys():
|
||||
usbrip_error(column + ': Invalid column name')
|
||||
_validate_column_args(args)
|
||||
_validate_attribute_args(args)
|
||||
_validate_io_args(args)
|
||||
_validate_file_args(args)
|
||||
|
||||
if 'attribute' in args and args.attribute:
|
||||
for attribute in args.attribute:
|
||||
if attribute not in ('vid', 'pid', 'prod', 'manufact', 'serial'):
|
||||
usbrip_error(attribute + ': Invalid attribute name')
|
||||
return (_validate_sieve_args(args), _validate_repres_args(args))
|
||||
|
||||
if 'date' in args and args.date:
|
||||
re_date = re.compile(r'^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+[1-3]?[0-9]$')
|
||||
for i in range(len(args.date)):
|
||||
if not re_date.search(args.date[i]):
|
||||
usbrip_error(args.date[i] + ': Wrong date format')
|
||||
date_parts = args.date[i].split()
|
||||
args.date[i] = ' '.join(date_parts) if len(date_parts[-1]) == 2 else ' '.join(date_parts)
|
||||
|
||||
if 'file' in args and args.file:
|
||||
for file in args.file:
|
||||
if not os.path.exists(file):
|
||||
usbrip_error(file + ': Path does not exist')
|
||||
|
||||
if 'output' in args and os.path.exists(args.output):
|
||||
usbrip_error(args.output + ': Path already exists')
|
||||
|
||||
if 'input' in args and not os.path.exists(args.input):
|
||||
usbrip_error(args.input + ': Path does not exist')
|
||||
|
||||
sieve = dict(zip(('external', 'number', 'dates'), (args.external, args.number, args.date)))
|
||||
repres = dict.fromkeys(('table', 'list', 'smart'), False)
|
||||
|
||||
if 'table' in args and args.table:
|
||||
repres['table'] = True
|
||||
elif 'list' in args and args.list:
|
||||
repres['list'] = True
|
||||
else:
|
||||
repres['smart'] = True
|
||||
|
||||
return (sieve, repres)
|
||||
# ---------------------- USB Storage -----------------------
|
||||
|
||||
|
||||
def validate_us_args(args):
|
||||
if 'storage_type' in args and not args.storage_type in ('history', 'violations'):
|
||||
usbrip_error(args.storage_type + ': Invalid storage type')
|
||||
_validate_storage_type_args(args)
|
||||
_validate_password_args(args)
|
||||
_validate_compression_level_args(args)
|
||||
_validate_io_args(args)
|
||||
_validate_attribute_args(args)
|
||||
|
||||
if 'password' in args and args.password and \
|
||||
(len(args.password) < 8 or \
|
||||
not any(c.islower() for c in args.password) or \
|
||||
not any(c.isupper() for c in args.password) or \
|
||||
not any(c.isdigit() for c in args.password) or \
|
||||
any(c not in printable for c in args.password)):
|
||||
usbrip_error(args.password + ': Password must be at least 8 chars long and contain at least '
|
||||
'1 lowercase letter, at least 1 uppercase letter and at least '
|
||||
'1 digit')
|
||||
return (_validate_sieve_args(args), _validate_repres_args(args))
|
||||
|
||||
if 'attribute' in args and args.attribute:
|
||||
for attribute in args.attribute:
|
||||
if attribute not in ('vid', 'pid', 'prod', 'manufact', 'serial'):
|
||||
usbrip_error(attribute + ': Invalid attribute name')
|
||||
|
||||
# ------------------------ USB IDs -------------------------
|
||||
|
||||
|
||||
def validate_ui_args(args):
|
||||
if not args.vid and not args.pid:
|
||||
usbrip_error('At least one of --vid/--pid or --download option should be specified')
|
||||
_validate_vid_pid_args(args)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------- Error Message Gen --------------------
|
||||
# ---------------- Error Message Generators ----------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def usbrip_error(message, *, subparser=' '):
|
||||
print('usage: python3 {}{}[-h]\n'.format(sys.argv[0], subparser))
|
||||
print(sys.argv[0].rsplit('/', 1)[-1] + ': error: ' + message, file=sys.stderr)
|
||||
def usbrip_arg_error(message=None, *, subparser=' '):
|
||||
if message:
|
||||
print('usage: python3 {}{}[-h]\n'.format(sys.argv[0], subparser))
|
||||
print(sys.argv[0].rsplit('/', 1)[-1] + ': argument error: ' + message, file=sys.stderr)
|
||||
else:
|
||||
print('usage: python3 {} [-h]'.format(sys.argv[0]))
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def usbrip_internal_error():
|
||||
print(sys.argv[0].rsplit('/', 1)[-1] + ': Internal error occured', file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ----------------------- Utilities ------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
||||
|
||||
def _validate_column_args(args):
|
||||
if 'column' in args and args.column:
|
||||
for column in args.column:
|
||||
if column not in COLUMN_NAMES.keys():
|
||||
usbrip_arg_error(column + ': Invalid column name')
|
||||
|
||||
|
||||
def _validate_sieve_args(args):
|
||||
if 'external' in args:
|
||||
sieve = dict(zip(('external', 'number', 'dates', 'fields'),
|
||||
(args.external, args.number, args.date, {})))
|
||||
|
||||
if args.date:
|
||||
re_date = re.compile(r'^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+[1-3]?[0-9]$')
|
||||
for i in range(len(args.date)):
|
||||
if not re_date.search(args.date[i]):
|
||||
usbrip_arg_error(args.date[i] + ': Wrong date format')
|
||||
date_parts = args.date[i].split()
|
||||
args.date[i] = ' '.join(date_parts) if len(date_parts[-1]) == 2 else ' '.join(date_parts)
|
||||
|
||||
if args.user:
|
||||
sieve['fields']['user'] = args.user
|
||||
if args.vid:
|
||||
sieve['fields']['vid'] = args.vid
|
||||
if args.pid:
|
||||
sieve['fields']['pid'] = args.pid
|
||||
if args.prod:
|
||||
sieve['fields']['prod'] = args.prod
|
||||
if args.manufact:
|
||||
sieve['fields']['manufact'] = args.manufact
|
||||
if args.serial:
|
||||
sieve['fields']['serial'] = args.serial
|
||||
if args.port:
|
||||
sieve['fields']['port'] = args.port
|
||||
|
||||
return sieve
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _validate_repres_args(args):
|
||||
if 'table' in args or 'list' in args:
|
||||
repres = dict.fromkeys(('table', 'list', 'smart'), False)
|
||||
|
||||
if 'table' in args and args.table:
|
||||
repres['table'] = True
|
||||
elif 'list' in args and args.list:
|
||||
repres['list'] = True
|
||||
else:
|
||||
repres['smart'] = True
|
||||
|
||||
return repres
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _validate_io_args(args):
|
||||
if 'input' in args and args.input:
|
||||
if not os.path.exists(args.input):
|
||||
usbrip_arg_error(args.input + ': Path does not exist')
|
||||
elif not os.path.isfile(args.input):
|
||||
usbrip_arg_error(args.input + ': Not a regular file')
|
||||
|
||||
if 'output' in args and os.path.exists(args.output):
|
||||
usbrip_arg_error(args.output + ': Path already exists')
|
||||
|
||||
|
||||
def _validate_attribute_args(args):
|
||||
if 'attribute' in args and args.attribute:
|
||||
for attribute in args.attribute:
|
||||
if attribute not in ('vid', 'pid', 'prod', 'manufact', 'serial'):
|
||||
usbrip_arg_error(attribute + ': Invalid attribute name')
|
||||
|
||||
|
||||
def _validate_storage_type_args(args):
|
||||
if args.storage_type not in ('history', 'violations'):
|
||||
usbrip_arg_error(args.storage_type + ': Invalid storage type')
|
||||
|
||||
if args.storage_type == 'history':
|
||||
if 'input' in args and args.input:
|
||||
usbrip_arg_error('Cannot use \'--input\' swith with history storage')
|
||||
if 'attribute' in args and args.attribute:
|
||||
usbrip_arg_error('Cannot use \'--attribute\' swith with history storage')
|
||||
elif args.storage_type == 'violations':
|
||||
if 'input' in args and args.input is None:
|
||||
usbrip_arg_error('Please specify input path for the list of authorized devices (-i)')
|
||||
|
||||
|
||||
def _validate_password_args(args):
|
||||
errmsg = ': Password must be at least 8 chars long and contain at least ' \
|
||||
'1 lowercase letter, at least 1 uppercase letter and at least ' \
|
||||
'1 digit'
|
||||
|
||||
if 'password' in args and args.password and not is_correct(args.password):
|
||||
usbrip_arg_error(args.password + errmsg)
|
||||
|
||||
if 'new' in args and args.new and not is_correct(args.new):
|
||||
usbrip_arg_error(args.new + errmsg)
|
||||
|
||||
if 'old' in args and args.old and not is_correct(args.old):
|
||||
usbrip_arg_error(args.old + errmsg)
|
||||
|
||||
|
||||
def _validate_compression_level_args(args):
|
||||
if 'lvl' in args and args.lvl and (len(args.lvl) > 1 or args.lvl not in '0123456789'):
|
||||
usbrip_arg_error(args.lvl + ': Invalid compression level')
|
||||
|
||||
|
||||
def _validate_file_args(args):
|
||||
if 'file' in args and args.file:
|
||||
for file in args.file:
|
||||
if not os.path.exists(file):
|
||||
usbrip_arg_error(file + ': Path does not exist')
|
||||
elif not os.path.isfile(file):
|
||||
usbrip_arg_error(file + ': Not a regular file')
|
||||
|
||||
|
||||
def _validate_vid_pid_args(args):
|
||||
if not args.vid and not args.pid:
|
||||
usbrip_arg_error('At least one of --vid/--pid or --download option should be specified')
|
||||
|
||||
|
||||
# ----------------------------------------------------------
|
||||
# ------------------------- Start --------------------------
|
||||
# ----------------------------------------------------------
|
||||
|
|
Loading…
Reference in New Issue