1
0
Fork 0
mirror of https://github.com/snovvcrash/usbrip.git synced 2024-06-07 22:46:03 +02:00
This commit is contained in:
TheHappyAkita 2023-08-03 16:42:06 +03:00 committed by GitHub
commit 7370d54db0
Signed by: GitHub
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 34 deletions

6
.gitignore vendored
View File

@ -25,3 +25,9 @@ venv.bak/
# Custom files
TODO.txt
#IntelliJ folder
.idea/*
#my auth folder
auth.json

View File

@ -354,8 +354,9 @@ def _validate_io_args(args):
elif not os.path.isfile(args.input):
usbrip_arg_error(args.input + ': Not a regular file')
if hasattr(args, 'output') and os.path.exists(args.output):
usbrip_arg_error(args.output + ': Path already exists')
# TODO seems to be only relevant for generate_auth call - if so => remove
# if hasattr(args, 'output') and os.path.exists(args.output):
# usbrip_arg_error(args.output + ': Path already exists')
def _validate_attribute_args(args):

View File

@ -43,6 +43,7 @@ import itertools
import operator
import os
import stat
import json
from datetime import datetime
from collections import OrderedDict, defaultdict
from string import printable
@ -207,18 +208,19 @@ class USBEvents:
if not self._events_to_show:
print_info('No USB devices found!')
rand_id = f'usbrip-{randint(1000, 9999)}'
self._events_to_show += [{
'conn': rand_id,
'host': rand_id,
'vid': rand_id,
'pid': rand_id,
'prod': rand_id,
'manufact': rand_id,
'serial': rand_id,
'port': rand_id,
'disconn': rand_id
}]
# TODO seems just for debug purposes => Remove
# rand_id = f'usbrip-{randint(1000, 9999)}'
# self._events_to_show += [{
# 'conn': rand_id,
# 'host': rand_id,
# 'vid': rand_id,
# 'pid': rand_id,
# 'prod': rand_id,
# 'manufact': rand_id,
# 'serial': rand_id,
# 'port': rand_id,
# 'disconn': rand_id
# }]
abs_output_auth = os.path.abspath(output_auth)
@ -231,31 +233,50 @@ class USBEvents:
else:
print_info(f'Created directory "{dirname}/"')
# create file if not exists already
if not (os.path.isfile(abs_output_auth) and os.access(abs_output_auth, os.R_OK)):
try:
auth_json = open(abs_output_auth, 'w', encoding='utf-8')
auth_json.close()
except PermissionError as e:
print_critical(f'Permission denied: "{abs_output_auth}". Retry with sudo', initial_error=str(e))
return 1
# read content of file and append
try:
auth_json = open(abs_output_auth, 'w', encoding='utf-8')
with open(abs_output_auth, 'r+') as json_file:
auth = defaultdict(set)
try:
auth = json.load(json_file, encoding='utf-8')
except ValueError as vErr:
# ignore error and recreate auth list
pass
print_info('Generating authorized device list (JSON)')
if not attributes:
attributes = ('vid', 'pid', 'prod', 'manufact', 'serial')
for event in tqdm(self._events_to_show, ncols=80, unit='dev'):
for key, val in event.items():
if key in attributes and val is not None:
if not auth[key]:
auth[key] = []
if not val in auth[key]:
auth[key].append(val)
auth = {key: list(vals) for key, vals in auth.items()}
for key in auth.keys():
auth[key].sort()
json_file.seek(0)
json.dump(auth, json_file, sort_keys=True, indent=indent)
json_file.truncate()
except PermissionError as e:
print_critical(f'Permission denied: "{abs_output_auth}". Retry with sudo', initial_error=str(e))
return 1
print_info('Generating authorized device list (JSON)')
if not attributes:
attributes = ('vid', 'pid', 'prod', 'manufact', 'serial')
auth = defaultdict(set)
for event in tqdm(self._events_to_show, ncols=80, unit='dev'):
for key, val in event.items():
if key in attributes and val is not None:
auth[key].add(val)
auth = {key: list(vals) for key, vals in auth.items()}
for key in auth.keys():
auth[key].sort()
json.dump(auth, auth_json, sort_keys=True, indent=indent)
auth_json.close()
os.chmod(abs_output_auth, stat.S_IRUSR | stat.S_IWUSR) # 600
print_info(f'New authorized device list: "{abs_output_auth}"')