1
0
mirror of https://github.com/lineageos4microg/docker-lineage-cicd synced 2024-11-09 10:09:56 +01:00

Refactor init.py

- create Init class
- Update behavior to match lineage-20 init.sh (new keys)
This commit is contained in:
Travis Burrows 2022-11-29 21:23:05 -05:00
parent 26d84402ec
commit b97691d225
2 changed files with 125 additions and 95 deletions

@ -27,100 +27,131 @@ def make_key(key_path: str, key_subj: str) -> None:
)
def init() -> None:
# Copy the user scripts
root_scripts = "/root/user_scripts"
user_scripts = getvar("USERSCRIPTS_DIR")
shutil.copytree(user_scripts, root_scripts)
class Init:
def __init__(self):
self.root_scripts = "/root/user_scripts"
self.user_scripts = getvar("USERSCRIPTS_DIR")
self.use_ccache = getvar("USE_CCACHE").lower() in ["1", "true"]
self.sign_builds = getvar("SIGN_BUILDS").lower() == "true"
if self.sign_builds:
self.key_dir = Path(getvar("KEYS_DIR"))
if self.use_ccache:
self.ccache_size = getvar("CCACHE_SIZE")
self.cron_time = getvar("CRONTAB_TIME")
self.git_username = getvar("USER_NAME")
self.git_email = getvar("USER_MAIL")
self.key_subj = getvar("KEYS_SUBJECT")
self.key_names = [
"releasekey",
"platform",
"shared",
"media",
"networkstack",
"sdk_sandbox",
"bluetooth",
]
self.key_exts = [".pk8", ".x509.pem"]
self.key_aliases = ["cyngn-priv-app", "cyngn-app", "testkey"]
# New keys needed as of LOS20
self.new_key_names = [
"sdk_sandbox",
"bluetooth",
]
# Delete non-root files
to_delete = []
for path in Path(root_scripts).rglob("*"):
if path.isdir(path):
continue
# Check if not owned by root
if path.owner != "root":
logging.warning("File not owned by root. Removing %s", path)
to_delete.append(path)
continue
# Check if non-root can write (group or other)
perm = oct(path.stat().st_mode)
group_write = perm[-2] > "4"
other_write = perm[-1] > "4"
if group_write or other_write:
logging.warning("File writable by non root users. Removing %s", path)
to_delete.append(path)
for f in to_delete:
f.unlink()
# Initialize CCache if it will be used
use_ccache = getvar("USE_CCACHE").lower() in ['1', 'true']
if use_ccache:
size = getvar("CCACHE_SIZE")
subprocess.run(["ccache", "-M", size], check=True, stderr=subprocess.STDOUT)
# Initialize Git user information
subprocess.run(
["git", "config", "--global", "user.name", getvar("USER_NAME")], check=True
)
subprocess.run(
["git", "config", "--global", "user.email", getvar("USER_MAIL")], check=True
)
sign_builds = getvar("SIGN_BUILDS").lower() == "true"
if sign_builds:
key_dir = Path(getvar("KEYS_DIR"))
key_names = ["releasekey", "platform", "shared", "media", "networkstack"]
key_exts = [".pk8", ".x509.pem"]
key_aliases = ["cyngn-priv-app", "cyngn-app", "testkey"]
# Generate keys if directory empty
if not list(key_dir.glob("*")):
logging.info("SIGN_BUILDS = true but empty $KEYS_DIR, generating new keys")
key_subj = getvar("KEYS_SUBJECT")
for k in key_names:
logging.info("Generating %s..." % k)
make_key(str(key_dir.joinpath(k)), key_subj)
# Check that all expected key files exist
for k, e in product(key_names, key_exts):
path = key_dir.joinpath(k).with_suffix(e)
if not path.exists():
raise AssertionError('Expected key file "%s" does not exist' % path)
# Create releasekey aliases
for a, e in product(key_aliases, key_exts):
src = key_dir.joinpath("releasekey").with_suffix(e)
dst = key_dir.joinpath(a).with_suffix(e)
dst.symlink_to(src)
cron_time = getvar("CRONTAB_TIME")
if cron_time == "now":
build.build()
else:
scheduler = BlockingScheduler()
scheduler.add_job(
func=build.build,
trigger=CronTrigger.from_crontab(cron_time),
misfire_grace_time=None, # Allow job to run as long as it needs
coalesce=True,
max_instances=1, # Allow only one concurrent instance
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="[%(asctime)s] %(levelname)s %(message)s",
datefmt="%c %Z",
)
# Run forever
scheduler.start()
def generate_key(self, key_name: str):
logging.info("Generating %s..." % key_name)
make_key(str(self.key_dir.joinpath(key_name)), self.key_subj)
def do(self):
# Copy the user scripts
shutil.copytree(self.user_scripts, self.root_scripts)
# Delete non-root files
to_delete = []
for path in Path(self.root_scripts).rglob("*"):
if path.isdir(path):
continue
# Check if not owned by root
if path.owner != "root":
logging.warning("File not owned by root. Removing %s", path)
to_delete.append(path)
continue
# Check if non-root can write (group or other)
perm = oct(path.stat().st_mode)
group_write = perm[-2] > "4"
other_write = perm[-1] > "4"
if group_write or other_write:
logging.warning("File writable by non root users. Removing %s", path)
to_delete.append(path)
for f in to_delete:
f.unlink()
# Initialize CCache if it will be used
if self.use_ccache:
subprocess.run(
["ccache", "-M", self.ccache_size], check=True, stderr=subprocess.STDOUT
)
# Initialize Git user information
subprocess.run(
["git", "config", "--global", "user.name", self.git_username], check=True
)
subprocess.run(
["git", "config", "--global", "user.email", self.git_email], check=True
)
if self.sign_builds:
# Generate keys if directory empty
if not list(self.key_dir.glob("*")):
logging.info(
"SIGN_BUILDS = true but empty $KEYS_DIR, generating new keys"
)
for k in self.key_names:
self.generate_key(k)
# Check that all expected key files exist. If a LOS20 key does not exist, create it.
for k, e in product(self.key_names, self.key_exts):
path = self.key_dir.joinpath(k).with_suffix(e)
if not path.exists():
if k in self.new_key_names:
self.generate_key(k)
else:
raise AssertionError(
'Expected key file "%s" does not exist' % path
)
# Create releasekey aliases
for a, e in product(self.key_aliases, self.key_exts):
src = self.key_dir.joinpath("releasekey").with_suffix(e)
dst = self.key_dir.joinpath(a).with_suffix(e)
dst.symlink_to(src)
if self.cron_time == "now":
build.build()
else:
scheduler = BlockingScheduler()
scheduler.add_job(
func=build.build,
trigger=CronTrigger.from_crontab(self.cron_time),
misfire_grace_time=None, # Allow job to run as long as it needs
coalesce=True,
max_instances=1, # Allow only one concurrent instance
)
# Run forever
scheduler.start()
if __name__ == "__main__":
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="[%(asctime)s] %(levelname)s %(message)s",
datefmt="%c %Z",
)
init()
initialize = Init()
initialize.do()

@ -14,11 +14,10 @@ def test_key_gen(monkeypatch):
monkeypatch.setenv("SIGN_BUILDS", "true")
monkeypatch.setattr(build, "build", mock_build)
init.init()
initialize = init.Init()
initialize.do()
# Confirm all keys are generated
key_names = ["releasekey", "platform", "shared", "media", "networkstack"]
key_exts = [".pk8", ".x509.pem"]
for k, e in product(key_names, key_exts):
for k, e in product(initialize.key_names, initialize.key_exts):
path = Path("/srv/keys").joinpath(k).with_suffix(e)
assert path.exists()