1
0
Fork 0
mirror of https://github.com/snovvcrash/usbrip.git synced 2024-05-21 14:36:03 +02:00
usbrip/usbrip/__main__.py
2021-01-06 02:05:46 +03:00

408 lines
12 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Usage: python3 -m usbrip [-h]
"""LICENSE
Copyright (C) 2020 Sam Freeside
This file is part of usbrip.
usbrip is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
usbrip is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with usbrip. If not, see <http://www.gnu.org/licenses/>.
"""
__author__ = 'Sam Freeside (@snovvcrash)'
__email__ = 'snovvcrash@protonmail[.]ch'
__date__ = '2018-03-21'
__site__ = 'https://github.com/snovvcrash/usbrip'
__brief__ = 'usbrip project\'s driver program'
import os
import sys
import usbrip.lib.core.config as cfg; cfg.DEBUG = '--debug' in sys.argv
import usbrip.lib.utils.timing as timing
from usbrip.lib.core.usbevents import USBEvents
from usbrip.lib.core.usbstorage import USBStorage
from usbrip.lib.core.usbids import USBIDs
from usbrip.lib.core.common import BANNER
from usbrip.lib.core.common import COLUMN_NAMES
from usbrip.lib.core.common import print_critical
from usbrip.lib.core.common import USBRipError
from usbrip.lib.parse.argparser import get_arg_parser
from usbrip.lib.parse.configparser import get_config_parser
# ----------------------------------------------------------
# -------------------------- Main --------------------------
# ----------------------------------------------------------
def main():
if not len(sys.argv) > 1:
print(BANNER + '\n')
usbrip_arg_error()
arg_parser = get_arg_parser()
args = arg_parser.parse_args()
if hasattr(args, 'quiet') and not args.quiet:
if cfg.ISATTY:
print(BANNER + '\n')
else:
print(f'# Done as: usbrip {" ".join(sys.argv[1:])}')
else:
cfg.QUIET = True
# ----------------------------------------------------------
# ------------------------- Banner -------------------------
# ----------------------------------------------------------
if args.subparser == 'banner':
print(BANNER)
# ----------------------------------------------------------
# ----------------------- USB Events -----------------------
# ----------------------------------------------------------
elif args.subparser == 'events' and args.ue_subparser:
if (args.ue_subparser == 'genauth' or args.ue_subparser == 'violations') and os.geteuid() != 0:
sys.exit('Permission denied. Retry with sudo')
sieve, repres = validate_ue_args(args)
# ------------------- USB Events History -------------------
if args.ue_subparser == 'history':
timing.begin()
ue = USBEvents(args.file)
if ue:
ue.event_history(
args.column,
sieve=sieve,
repres=repres
)
# -------------------- USB Events Open ---------------------
elif args.ue_subparser == 'open':
timing.begin()
USBEvents.open_dump(
args.input,
args.column,
sieve=sieve,
repres=repres
)
# ------------------ USB Events GenAuth -------------------
elif args.ue_subparser == 'genauth':
timing.begin()
ue = USBEvents(args.file)
if ue:
if ue.generate_auth_json(
args.output,
args.attribute,
sieve=sieve
):
usbrip_internal_error()
else:
usbrip_internal_error()
# ----------------- USB Events Violations ------------------
elif args.ue_subparser == 'violations':
timing.begin()
ue = USBEvents(args.file)
if ue:
ue.search_violations(
args.input,
args.attribute,
args.column,
sieve=sieve,
repres=repres
)
# ----------------------------------------------------------
# ---------------------- USB Storage -----------------------
# ----------------------------------------------------------
elif args.subparser == 'storage' and args.us_subparser:
if os.geteuid() != 0:
sys.exit('Permission denied. Retry with sudo')
if any (not os.path.exists(p) for p in ('/opt/usbrip/', '/var/opt/usbrip', '/usr/local/bin/usbrip')):
sys.exit('The "storage" module can only be used when usbrip is installed via "install.sh" - https://git.io/JJfJq')
sieve, repres = validate_us_args(args)
timing.begin()
config_parser = get_config_parser()
us = USBStorage()
# -------------------- USB Storage List --------------------
if args.us_subparser == 'list':
us.list_storage(
args.storage_type,
config_parser[args.storage_type]['password']
)
# -------------------- USB Storage Open --------------------
elif args.us_subparser == 'open':
us.open_storage(
args.storage_type,
config_parser[args.storage_type]['password'],
args.column,
sieve=sieve,
repres=repres
)
# ------------------- USB Storage Update -------------------
elif args.us_subparser == 'update':
if us.update_storage(
args.storage_type,
config_parser[args.storage_type]['password'],
input_auth=args.input,
attributes=args.attribute,
compression_level=args.lvl,
sieve=sieve
):
usbrip_internal_error()
# ------------------- USB Storage Create -------------------
elif args.us_subparser == 'create':
if us.create_storage(
args.storage_type,
config_parser[args.storage_type]['password'],
input_auth=args.input,
attributes=args.attribute,
compression_level=args.lvl,
sieve=sieve
):
usbrip_internal_error()
# ------------------- USB Storage Passwd -------------------
elif args.us_subparser == 'passwd':
us.change_password(
args.storage_type,
compression_level=args.lvl
)
# ----------------------------------------------------------
# ------------------------ USB IDs -------------------------
# ----------------------------------------------------------
elif args.subparser == 'ids' and args.ui_subparser:
validate_ui_args(args)
timing.begin()
ui = USBIDs()
# --------------------- USB IDs Search ---------------------
if args.ui_subparser == 'search':
ui.search_ids(
args.vid,
args.pid,
offline=args.offline
)
# -------------------- USB IDs Download --------------------
elif args.ui_subparser == 'download':
try:
usb_ids = ui.prepare_database(offline=False)
except USBRipError as e:
print_critical(str(e), errcode=e.errors['errcode'], initial_error=e.errors['initial_error'])
else:
usb_ids.close()
else:
subparser = ' ' + args.subparser + ' '
usbrip_arg_error(f'Choose one of the usbrip {args.subparser} actions', subparser=subparser)
# ----------------------------------------------------------
# ------------------ Argument validation -------------------
# ----------------------------------------------------------
# ----------------------- USB Events -----------------------
def validate_ue_args(args):
_validate_column_args(args)
_validate_attribute_args(args)
_validate_io_args(args)
_validate_file_args(args)
return (_validate_sieve_args(args), _validate_repres_args(args))
# ---------------------- USB Storage -----------------------
def validate_us_args(args):
_validate_storage_type_args(args)
_validate_compression_level_args(args)
_validate_io_args(args)
_validate_attribute_args(args)
return (_validate_sieve_args(args), _validate_repres_args(args))
# ------------------------ USB IDs -------------------------
def validate_ui_args(args):
_validate_vid_pid_args(args)
# ----------------------------------------------------------
# ---------------- Error Message Generators ----------------
# ----------------------------------------------------------
def usbrip_arg_error(message=None, *, subparser=' '):
if message:
print(f'Usage: {sys.argv[0]}{subparser}[-h]\n')
print(sys.argv[0].rsplit('/', 1)[-1] + ': argument error: ' + message, file=sys.stderr)
else:
print(f'Usage: {sys.argv[0]} [-h]')
sys.exit(1)
def usbrip_internal_error():
print(sys.argv[0].rsplit('/', 1)[-1] + ': Internal error occurred', file=sys.stderr)
sys.exit(1)
# ----------------------------------------------------------
# ----------------------- Utilities ------------------------
# ----------------------------------------------------------
def _validate_column_args(args):
if hasattr(args, 'column') and args.column:
for column in args.column:
if column not in COLUMN_NAMES.keys():
usbrip_arg_error(column + ': Invalid column name')
def _validate_sieve_args(args):
if 'external' in args:
sieve = dict(
zip(('external', 'number', 'dates', 'fields'),
(args.external, args.number, args.date, {}))
)
if args.host:
sieve['fields']['host'] = args.host
if args.vid:
sieve['fields']['vid'] = args.vid
if args.pid:
sieve['fields']['pid'] = args.pid
if args.prod:
sieve['fields']['prod'] = args.prod
if args.manufact:
sieve['fields']['manufact'] = args.manufact
if args.serial:
sieve['fields']['serial'] = args.serial
if args.port:
sieve['fields']['port'] = args.port
return sieve
return None
def _validate_repres_args(args):
if hasattr(args, 'table') or hasattr(args, 'list'):
repres = dict.fromkeys(('table', 'list', 'smart'), False)
if hasattr(args, 'table') and args.table:
repres['table'] = True
elif hasattr(args, 'list') and args.list:
repres['list'] = True
else:
repres['smart'] = True
return repres
return None
def _validate_io_args(args):
if hasattr(args, 'input') and args.input and args.input != '/var/opt/usbrip/trusted/auth.json':
if not os.path.exists(args.input):
usbrip_arg_error(args.input + ': Path does not exist')
elif not os.path.isfile(args.input):
usbrip_arg_error(args.input + ': Not a regular file')
if hasattr(args, 'output') and os.path.exists(args.output):
usbrip_arg_error(args.output + ': Path already exists')
def _validate_attribute_args(args):
if hasattr(args, 'attribute') and args.attribute:
for attribute in args.attribute:
if attribute not in ('vid', 'pid', 'prod', 'manufact', 'serial'):
usbrip_arg_error(attribute + ': Invalid attribute name')
def _validate_storage_type_args(args):
if args.storage_type not in ('history', 'violations'):
usbrip_arg_error(args.storage_type + ': Invalid storage type')
if args.storage_type == 'history':
if hasattr(args, 'input') and args.input and args.input != '/var/opt/usbrip/trusted/auth.json':
usbrip_arg_error('Cannot use "--input" switch with history storage')
if hasattr(args, 'attribute') and args.attribute:
usbrip_arg_error('Cannot use "--attribute" switch with history storage')
elif args.storage_type == 'violations':
if hasattr(args, 'input') and args.input is None:
usbrip_arg_error('Please specify input path for the list of authorized devices (-i)')
def _validate_compression_level_args(args):
if hasattr(args, 'lvl') and args.lvl and (len(args.lvl) > 1 or args.lvl not in '0123456789'):
usbrip_arg_error(args.lvl + ': Invalid compression level')
def _validate_file_args(args):
if hasattr(args, 'file') and args.file:
for file in args.file:
if not os.path.exists(file):
usbrip_arg_error(file + ': Path does not exist')
elif not os.path.isfile(file):
usbrip_arg_error(file + ': Not a regular file')
def _validate_vid_pid_args(args):
if hasattr(args, 'vid') and hasattr(args, 'pid') and not args.vid and not args.pid:
usbrip_arg_error('At least one of --vid/--pid or --download option should be specified')
# ----------------------------------------------------------
# -------------------------- RUN ---------------------------
# ----------------------------------------------------------
if __name__ == '__main__':
main()