mirror of
https://github.com/snovvcrash/usbrip.git
synced 2024-06-08 06:56:02 +02:00
corrected file handling in generate_auth_json and added 2 hints
This commit is contained in:
parent
0f3701607b
commit
67751cb53c
|
@ -25,3 +25,9 @@ venv.bak/
|
|||
|
||||
# Custom files
|
||||
TODO.txt
|
||||
|
||||
#IntelliJ folder
|
||||
.idea/*
|
||||
|
||||
#my auth folder
|
||||
auth.json
|
||||
|
|
|
@ -354,8 +354,9 @@ def _validate_io_args(args):
|
|||
elif not os.path.isfile(args.input):
|
||||
usbrip_arg_error(args.input + ': Not a regular file')
|
||||
|
||||
if hasattr(args, 'output') and os.path.exists(args.output):
|
||||
usbrip_arg_error(args.output + ': Path already exists')
|
||||
# TODO seems to be only relevant for generate_auth call - if so => remove
|
||||
# if hasattr(args, 'output') and os.path.exists(args.output):
|
||||
# usbrip_arg_error(args.output + ': Path already exists')
|
||||
|
||||
|
||||
def _validate_attribute_args(args):
|
||||
|
|
|
@ -43,6 +43,7 @@ import itertools
|
|||
import operator
|
||||
import os
|
||||
import stat
|
||||
import json
|
||||
from datetime import datetime
|
||||
from collections import OrderedDict, defaultdict
|
||||
from string import printable
|
||||
|
@ -207,18 +208,19 @@ class USBEvents:
|
|||
if not self._events_to_show:
|
||||
print_info('No USB devices found!')
|
||||
|
||||
rand_id = f'usbrip-{randint(1000, 9999)}'
|
||||
self._events_to_show += [{
|
||||
'conn': rand_id,
|
||||
'host': rand_id,
|
||||
'vid': rand_id,
|
||||
'pid': rand_id,
|
||||
'prod': rand_id,
|
||||
'manufact': rand_id,
|
||||
'serial': rand_id,
|
||||
'port': rand_id,
|
||||
'disconn': rand_id
|
||||
}]
|
||||
# TODO seems just for debug purposes => Remove
|
||||
# rand_id = f'usbrip-{randint(1000, 9999)}'
|
||||
# self._events_to_show += [{
|
||||
# 'conn': rand_id,
|
||||
# 'host': rand_id,
|
||||
# 'vid': rand_id,
|
||||
# 'pid': rand_id,
|
||||
# 'prod': rand_id,
|
||||
# 'manufact': rand_id,
|
||||
# 'serial': rand_id,
|
||||
# 'port': rand_id,
|
||||
# 'disconn': rand_id
|
||||
# }]
|
||||
|
||||
abs_output_auth = os.path.abspath(output_auth)
|
||||
|
||||
|
@ -231,31 +233,50 @@ class USBEvents:
|
|||
else:
|
||||
print_info(f'Created directory "{dirname}/"')
|
||||
|
||||
# create file if not exists already
|
||||
if not (os.path.isfile(abs_output_auth) and os.access(abs_output_auth, os.R_OK)):
|
||||
try:
|
||||
auth_json = open(abs_output_auth, 'w', encoding='utf-8')
|
||||
auth_json.close()
|
||||
except PermissionError as e:
|
||||
print_critical(f'Permission denied: "{abs_output_auth}". Retry with sudo', initial_error=str(e))
|
||||
return 1
|
||||
|
||||
# read content of file and append
|
||||
try:
|
||||
auth_json = open(abs_output_auth, 'w', encoding='utf-8')
|
||||
with open(abs_output_auth, 'r+') as json_file:
|
||||
auth = defaultdict(set)
|
||||
try:
|
||||
auth = json.load(json_file, encoding='utf-8')
|
||||
except ValueError as vErr:
|
||||
# ignore error and recreate auth list
|
||||
pass
|
||||
|
||||
print_info('Generating authorized device list (JSON)')
|
||||
|
||||
if not attributes:
|
||||
attributes = ('vid', 'pid', 'prod', 'manufact', 'serial')
|
||||
|
||||
for event in tqdm(self._events_to_show, ncols=80, unit='dev'):
|
||||
for key, val in event.items():
|
||||
if key in attributes and val is not None:
|
||||
if not auth[key]:
|
||||
auth[key] = []
|
||||
if not val in auth[key]:
|
||||
auth[key].append(val)
|
||||
|
||||
auth = {key: list(vals) for key, vals in auth.items()}
|
||||
|
||||
for key in auth.keys():
|
||||
auth[key].sort()
|
||||
|
||||
json_file.seek(0)
|
||||
json.dump(auth, json_file, sort_keys=True, indent=indent)
|
||||
json_file.truncate()
|
||||
except PermissionError as e:
|
||||
print_critical(f'Permission denied: "{abs_output_auth}". Retry with sudo', initial_error=str(e))
|
||||
return 1
|
||||
|
||||
print_info('Generating authorized device list (JSON)')
|
||||
|
||||
if not attributes:
|
||||
attributes = ('vid', 'pid', 'prod', 'manufact', 'serial')
|
||||
|
||||
auth = defaultdict(set)
|
||||
for event in tqdm(self._events_to_show, ncols=80, unit='dev'):
|
||||
for key, val in event.items():
|
||||
if key in attributes and val is not None:
|
||||
auth[key].add(val)
|
||||
|
||||
auth = {key: list(vals) for key, vals in auth.items()}
|
||||
|
||||
for key in auth.keys():
|
||||
auth[key].sort()
|
||||
|
||||
json.dump(auth, auth_json, sort_keys=True, indent=indent)
|
||||
auth_json.close()
|
||||
|
||||
os.chmod(abs_output_auth, stat.S_IRUSR | stat.S_IWUSR) # 600
|
||||
|
||||
print_info(f'New authorized device list: "{abs_output_auth}"')
|
||||
|
|
Loading…
Reference in New Issue