{ config, lib, pkgs, homeage, ... }: let hostName = "surtur"; swayTgt = "sway-session.target"; in { home.username = "$USER"; home.sessionVariables.HOSTNAME = "${hostName}"; home.homeDirectory = "/home/$USER"; home.stateVersion = "23.11"; # build configuration and switch: # ➜ home-manager switch --no-out-link -b backup --flake~/utils/dotfiles#$HOST # alternatively, install HM with: # nix profile install --priority 0 home-manager # hit the issue described here, waiting until resolved: # https://github.com/nix-community/home-manager/issues/2848 programs.home-manager.enable = true; home.packages = with pkgs; [ direnv # lorri and arion are apparently provided by cachix#devenv alejandra statix niv rnix-lsp exa ripgrep starship sheldon duf dua du-dust himalaya b3sum cargo-watch zellij cloak headscale btop sops neovim nautilus-open-any-terminal dhall ccache zathura autotiling bemenu swayr kanshi waybar albert ]; homeage = { # Absolute path to identity (created not through home-manager) identityPaths = [ "~/.ssh/theEd" ]; # "activation" if system doesn't support systemd installationType = "systemd"; file."sops-age-keys.txt" = { # Path to encrypted file tracked by the git repository source = ./secrets/sops-keys.age; # can be "copies" or "symlink" symlinks = [".config/sops/age/keys.txt"]; }; file."envs" = { source = ./secrets/envs.age; }; # infra secrets. file."infra-backend" = { source = ./secrets/infra-backend.age; }; file."infra-vars" = { source = ./secrets/infra-vars.age; }; file."pcmt_gitea_token" = { source = ./secrets/pcmt_gitea_token.age; }; }; imports = [ ./nix/programs.nix ]; services = { kdeconnect = { enable = true; indicator = true; }; }; systemd.user.services = { kanshi = { Unit = { Description = "Dynamic output configuration for Wayland compositors"; # Documentation = "man:kanshi(1)"; Documentation = "https://sr.ht/~emersion/kanshi"; BindsTo = config.services.kanshi.systemdTarget; }; Service = { Type = "simple"; # ExecStart = "/usr/sbin/kanshi"; ExecStart = "${config.services.kanshi.package}/bin/kanshi"; Restart = "always"; RestartSec = "5s"; LockPersonality = true; PrivateTmp = "yes"; DevicePolicy = "closed"; }; Install = {WantedBy = [config.services.kanshi.systemdTarget];}; }; waybar = { Unit = { Description = "Highly customizable Wayland bar for Sway and Wlroots based compositors."; Documentation = "https://github.com/Alexays/Waybar/wiki/"; PartOf = swayTgt; After = swayTgt; }; Service = { ExecStart = "${pkgs.waybar}/bin/waybar"; # ExecReload = "kill -SIGUSR2 $MAINPID"; ExecReload = "kill -SIGUSR2 ''$MAINPID"; Restart = "on-failure"; RestartSec = "3s"; LockPersonality = true; PrivateTmp = "yes"; DevicePolicy = "closed"; }; Install = {WantedBy = [swayTgt];}; }; autotiling = { Unit = { Description = "Script for sway and i3 to automatically switch the horizontal / vertical window split orientation"; Documentation = "https://github.com/nwg-piotr/autotiling"; BindsTo = swayTgt; }; Service = { Type = "simple"; ExecStart = "${pkgs.autotiling}/bin/autotiling"; Restart = "always"; RestartSec = "5s"; LockPersonality = true; PrivateTmp = "yes"; DevicePolicy = "closed"; }; Install = {WantedBy = [swayTgt];}; }; albert = { Unit = { Description = "A C++/Qt based desktop agnostic keyboard launcher that helps you to accomplish your workflows in a breeze"; Documentation = "https://albertlauncher.github.io/"; BindsTo = swayTgt; }; Service = { Type = "simple"; # after hm stateVersion bump to 23.05, albert doesn't seem to support explicit wayland. Environment = "QT_QPA_PLATFORM=xcb"; ExecStart = "${pkgs.albert}/bin/albert"; Restart = "always"; RestartSec = "3s"; LockPersonality = true; PrivateTmp = "yes"; DevicePolicy = "closed"; }; Install = {WantedBy = [swayTgt];}; }; gopls = { Unit = { Description = "Go language server"; }; Service = { Type = "simple"; ExecStartPre = "bash -c 'rm -v -f /tmp/.gopls-daemon.sock || true'"; ExecStart = "%h/go/bin/gopls -listen='unix;/tmp/.gopls-daemon.sock'"; ExecStopPost = "bash -c 'rm -v -f /tmp/.gopls-daemon.sock || true'"; Restart = "on-failure"; RestartSec = "1m"; TimeoutStopFailureMode = "abort"; SystemCallFilter = "~@reboot @obsolete"; ProtectProc = "invisible"; ProcSubset = "pid"; ProtectHome = true; RestrictNamespaces = true; NoNewPrivileges = "yes"; ProtectSystem = "strict"; DevicePolicy = "closed"; LockPersonality = true; MemoryDenyWriteExecute = true; #RestrictAddressFamilies="AF_UNIX AF_INET AF_INET6"; RestrictAddressFamilies = "AF_UNIX"; RestrictRealtime = true; RestrictSUIDSGID = true; SystemCallArchitectures = "native"; }; # Install = {WantedBy = [swayTgt];}; Install = {WantedBy = ["default.target"];}; }; ff_nn = { Unit = { Description = "sh*tbrowser"; PartOf = swayTgt; After = swayTgt; }; Service = { # Type = "simple"; Environment = [ "MOZ_ENABLE_WAYLAND=1" "MOZ_DBUS_REMOTE=1" "MOZ_USE_XINPUT2=1" "QT_QPA_PLATFORM=wayland" "XDG_SESSION_TYPE=wayland" "SDL_VIDEODRIVER=wayland" "NO_AT_BRIDGE=1" ]; ExecStart = "-%h/Downloads/firefox-nightly/firefox-bin -desktop"; Restart = "on-failure"; RestartSec = "5s"; StartLimitBurst = 3; StartLimitInterval = "60s"; TimeoutStopFailureMode = "abort"; # RestrictNamespaces=true; DevicePolicy = "closed"; ProtectHome = true; ProtectSystem = "strict"; ReadWritePaths = [ "-%h/Downloads/firefox-nightly" "-%h/Downloads" ]; NoNewPrivileges = true; ProtectProc = "invisible"; # ProcSubset = "pid"; PrivateTmp = "yes"; LockPersonality = true; SystemCallFilter = "~@reboot @obsolete"; SystemCallArchitectures = "native"; }; Install = {WantedBy = [swayTgt];}; }; battery = { Unit = { Description = "Power Profiles service"; PartOf = swayTgt; }; Service = { Type = "simple"; ExecStart = "%h/.local/bin/battery.sh"; Restart = "on-failure"; RestartSec = "15s"; TimeoutStopFailureMode = "abort"; LockPersonality = true; PrivateTmp = "yes"; DevicePolicy = "closed"; }; }; nextcloud = { Unit = { Description = "Podman container Nextcloud"; PartOf = swayTgt; Wants = "network-online.target"; After = "network-online.target"; RequiresMountsFor = "/run/user/1000/containers"; }; Service = { CPUQuota = "2%"; Slice = "nextcloud.slice"; Environment = "PODMAN_SYSTEMD_UNIT=%n"; Restart = "on-failure"; RestartSec = 5; TimeoutStartSec = 600; # TimeoutStopSec=10; ExecStartPre = "/usr/bin/podman-compose -f %h/.nextcloud/docker-compose.yml -p nextcloud down"; ExecStart = "/usr/bin/podman-compose -f %h/.nextcloud/docker-compose.yml -p nextcloud up --remove-orphans"; ExecStop = "/usr/bin/podman-compose -f %h/.nextcloud/docker-compose.yml -p nextcloud down"; Type = "simple"; Delegate = "no"; ProtectSystem = "strict"; ProtectProc = "invisible"; ProcSubset = "pid"; DevicePolicy = "closed"; NoNewPrivileges = true; LockPersonality = true; InaccessiblePaths = [ "-/lost+found" "/dev/shm" "-%h/.ssh" ]; KeyringMode = "private"; SystemCallFilter = "~memfd_create @reboot"; TimeoutStopFailureMode = "abort"; }; Install = { WantedBy = ["default.target"]; }; }; himalaya-watch = { Unit = { Description = "Himalaya watcher"; After = "network.target"; }; Service = { ExecStart = "${pkgs.himalaya}/bin/himalaya watch -a a_mirre"; Restart = "always"; RestartSec = 10; Environment = "RUST_LOG=debug"; ReadWritePaths = "/tmp/himalaya-counter-am"; ProtectHome = "true"; ProtectSystem = "true"; DevicePolicy = "closed"; NoNewPrivileges = true; LockPersonality = true; InaccessiblePaths = [ "-/lost+found" "/dev/shm" "-%h/.ssh" ]; # KeyringMode = "private"; }; Install = { WantedBy = ["default.target"]; }; }; himalaya-notify = { Unit = { Description = "Himalaya new message notifier"; After = "network.target"; }; Service = { ExecStart = "${pkgs.himalaya}/bin/himalaya notify"; Restart = "always"; RestartSec = 10; Environment = "RUST_LOG=debug"; ProtectHome = "true"; ProtectSystem = "true"; DevicePolicy = "closed"; NoNewPrivileges = true; LockPersonality = true; InaccessiblePaths = [ "-/lost+found" "/dev/shm" "-%h/.ssh" ]; # KeyringMode = "private"; }; Install = { WantedBy = ["default.target"]; }; }; trackerMask = { Unit = {Description = "";}; Service = { Type = "oneshot"; ExecStart = "bash -c 'systemctl --user mask tracker-extract-3.service tracker-miner-fs-3.service tracker-miner-rss-3.service tracker-writeback-3.service tracker-xdg-portal-3.service tracker-miner-fs-control-3.service'"; DevicePolicy = "closed"; NoNewPrivileges = true; LockPersonality = true; InaccessiblePaths = [ "-/lost+found" "/dev/shm" "-%h/.ssh" ]; KeyringMode = "private"; }; }; appr120Mask = { Unit = {Description = "";}; Service = { Type = "oneshot"; ExecStart = "bash -c 'systemctl --user mask app-r120@autostart'"; DevicePolicy = "closed"; NoNewPrivileges = true; LockPersonality = true; InaccessiblePaths = [ "-/lost+found" "/dev/shm" "-%h/.ssh" ]; KeyringMode = "private"; }; }; }; systemd.user.slices = { chromium = { Unit = { Description = "Slice that limits chromium's resources"; Before = "slices.target"; }; Slice = { CPUAccounting = "yes"; CPUQuota = "220%"; MemoryAccounting = "yes"; MemoryHigh = "6G"; MemoryMax = "6.1G"; }; }; nextcloud = { Unit = {Description = "Slice that limits nextcloud's resources";}; Slice = { MemoryAccounting = "yes"; # MemoryHigh works only in "unified" cgroups mode, NOT in "hybrid" mode MemoryHigh = "250M"; # MemoryMax works in "hybrid" cgroups mode, too MemoryMax = "300M"; CPUAccounting = "yes"; # CPUQuota=15%; CPUQuota = "3%"; }; }; podman = { # refs: # https://baykara.medium.com/docker-resource-management-via-cgroups-and-systemd-633b093a835c # https://docs.docker.com/engine/reference/commandline/dockerd/#docker-runtime-execution-op> Unit = { Description = "Slice that limits podman resources"; Before = "slices.target"; }; Slice = { MemoryAccounting = "yes"; MemoryHigh = "10G"; MemoryMax = "12G"; MemorySwapMax = "1G"; # 100% is an equivalent of full utilization on a single core # we allow for 85% here - applies to all docker.service-spawn # processes cumulatively CPUAccounting = "yes"; # CPUQuota=85%; CPUQuota = "50%"; }; }; }; systemd.user.timers = { battery = { Unit = { Description = "Power Profiles timer"; }; Timer = { OnActiveSec = "20s"; OnUnitActiveSec = "5m"; Unit = "battery.service"; }; Install = { WantedBy = ["timers.target"]; }; }; }; systemd.user.targets = { sway-session = { Unit = { Description = "Sway compositor session"; Documentation = "man:systemd.special(7)"; BindsTo = "graphical-session.target"; Wants = "graphical-session-pre.target"; After = "graphical-session-pre.target"; Before = "xdg-desktop-autostart.target"; }; }; }; home.file = { ".config/kitty/kitty.conf" = { source = .config/kitty/kitty.conf; }; ".vimrc" = { source = ./.vim/vimrc; }; ".vim/deoplete.vimrc.vim" = { source = ./.vim/deoplete.vimrc.vim; }; ".vim/gotags.vimrc.vim" = { source = ./.vim/gotags.vimrc.vim; }; ".vim/python.vimrc.vim" = { source = ./.vim/python.vimrc.vim; }; ".config/nvim/init.vim" = { source = ./.config/nvim/init.vim; }; ".config/systemd/user.conf" = { text = '' [Manager] DefaultTimeoutStarSec=15s DefaultTimeoutStopSec=15s ''; }; ".config/sway/config" = { source = ./.config/sway/config; }; ".config/sway/env" = { source = ./.config/sway/env; }; ".config/sway/inputs" = { source = ./.config/sway/inputs; }; ".config/sway/config.d/dracula.sway" = { source = ./.config/sway/config.d/dracula.sway; }; ".config/waybar/config" = { source = ./.config/waybar/config; }; ".config/waybar/style.css" = { source = ./.config/waybar/style.css; }; ".config/waybar/modules/storage.sh" = { source = ./.config/waybar/modules/storage.sh; }; ".config/kanshi/config" = { source = ./.config/kanshi/config; }; ".config/mako/config" = { source = ./.config/mako/config; }; ".local/bin/swws.py" = { source = ./bin/swws.py; }; ".local/bin/sway-locker" = { source = ./bin/sway-locker; }; ".config/albert.conf" = { source = ./.config/albert/albert.conf; }; ".config/direnv/direnv.toml" = { source = ./.config/direnv/direnv.toml; }; ".config/fusuma/config-wl.yml" = { source = ./.config/fusuma/config-wl.yml; }; ".config/fusuma/config.yml" = { source = ./.config/fusuma/config.yml; }; ".config/git/config-common" = { source = ./.config/git/config; }; # host-specific gitconfig. ".config/git/config.${hostName}" = { text = '' [user] name = ${hostName} email = wanderer@dotya.ml signingkey = ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIBtG6NCgdLHX4ztpfvYNRaslKWZcl6KdTc1DehVH4kAL ''; }; ".config/git/config" = { text = '' [include] path = ~/.config/git/config-common [include] path = ~/.config/git/config.${hostName} ''; }; ".config/git/allowed_signers" = { source = ./.config/git/allowed_signers; }; # begin zsh-related. ".zshrc" = { source = ./.zshrc; }; ".zshenv" = { source = ./.zshenv; }; ".zprofile" = { source = ./.zprofile; }; ".zsh" = { source = ./.zsh; recursive = true; }; ".zsh/bemenu-dracula" = { source = pkgs.fetchFromGitHub { owner = "dracula"; repo = "bemenu"; rev = "9b1165b3d97e3b2a74c6ce220781b78d8a11febf"; sha256 = "sha256-TwfkEZ1aTkHur+jCqRsaqvzOw6qpH0L4pvYqkx7iCDk="; }; }; # end zsh-related. ".cargo/config.toml" = { source = .cargo/config.toml; }; ".npmrc" = { text = '' prefix=''${HOME}/.npm-packages audit=false fund=false ''; }; ".config/swaylock/config" = { source = .config/swaylock/config; }; ".config/tridactyl/tridactylrc" = { source = .config/tridactyl/tridactylrc; }; ".config/bat/config" = { source = .config/bat/config; }; ".ncpamixer.conf" = { source = .config/ncpamixer.conf; }; ".gdbinit" = { text = '' set auto-load safe-path /nix/store set history save on set history size 10000 set history remove-duplicates 100 set history filename ~/.gdb_history ''; }; ".searchsploit_rc" = { text = '' ##-- Program Settings progname="$( basename "$0" )" ##-- Exploits files_array+=("files_exploits.csv") path_array+=("''${HOME}/utils/exploit-database") name_array+=("Exploit") git_array+=("https://gitlab.com/exploit-database/exploitdb.git") package_array+=("exploitdb") ##-- Shellcodes files_array+=("files_shellcodes.csv") path_array+=("''${HOME}/utils/exploit-database") name_array+=("Shellcode") git_array+=("https://gitlab.com/exploit-database/exploitdb.git") package_array+=("exploitdb") # ##-- Papers # files_array+=("files_papers.csv") # path_array+=("''${HOME}/utils/exploitdb-papers") # name_array+=("Paper") # git_array+=("https://gitlab.com/exploit-database/exploitdb-papers.git") # package_array+=("exploitdb-papers") ''; }; ".local/bin/fuzzypassage" = { executable = true; text = '' PREFIX="''${PASSAGE_DIR:-$HOME/.passage/store}" FZF_DEFAULT_OPTS="" name="$(find "$PREFIX" -type f -name '*.age' | \ sed -e "s|$PREFIX/||" -e 's|\.age$||' | \ fzf --height 40% --reverse --no-multi)" passage "''${@}" "$name" ''; }; ".local/bin/battery.sh" = { source = bin/battery.sh; executable = true; }; ".local/bin/localbsync" = { source = bin/localbsync; executable = true; }; ".local/bin/parec-wr" = { source = bin/parec-wr; executable = true; }; ".local/bin/pscbg" = { source = bin/pscbg; executable = true; }; ".local/bin/qst_up" = { source = bin/qst_up; executable = true; }; ".local/bin/winprint.sh" = { source = bin/winprint.sh; executable = true; }; ".local/bin/authenticator.sh" = { text = '' #!/bin/sh # adopted from https://wiki.archlinux.org/index.php/Google_Authenticator # This is the path to the Google Authenticator app file. It's typically located # in /data under Android. Copy it to your PC in a safe location and specify the # path to it here. #DB="/path/to/com.google.android.apps.authenticator/databases/databases" DB="$1" if [ $# -ne 1 ]; then printf "authenticator\n" printf "usage: authenticator \n" printf "\tThis is the path to the Authenticator app owned SQLite db file.\n" printf "\tCopy it to your PC to a safe location and specify the path to it here.\n" exit 1 fi # On most Android systems with sufficient user access, the Google Authenticator # database can be copied off the device and accessed directly, as it is an # sqlite3 database. This shell script will read a Google Authenticator database # and generate live codes for each key found: sqlite3 "$DB" 'SELECT email,secret FROM accounts;' | while read A do NAME=`echo "$A" | cut -d '|' -f 1` KEY=`echo "$A" | cut -d '|' -f 2` CODE=`oathtool --totp -b "$KEY"` echo -e "\e[1;32m$CODE\e[0m - \e[1;33m$NAME\e[0m" done ''; executable = true; }; ".local/bin/createarchive.sh" = { text = '' #!/bin/bash if [ $# -ne 1 ]; then printf "createarchive\n" printf "usage: createarchive \n" printf "warning: the archive will be moved to "backups" directory (`echo $dest`)\n" exit 1 fi # what this does in short: tar, compress, timestamp, shred the tar, mv .xz to pwd and display it logdate="$(date +%Y%m%dT%H%M%S)" basedir="$1" tmpdir=$(mktemp -d "${TMPDIR:-/tmp/}$(basename $0).XXXXXXXXXX") #/run/user/$(id -u) tmpfs 0700 perms f="`cd $basedir; pwd | tr '/' ' ' | sed 's/^.* / /' | cut -c2-`" > /dev/null g="$logdate-$f.tar" dest=~/MEGA/Private/backups doathing() { cd $basedir/.. tar cfv "$tmpdir/$g" "$f" && \ xz -vzk9e "$tmpdir/$g" -S .xz && \ rsync -avP "$tmpdir/$g.xz" "$dest" && \ shred -zuv "$tmpdir/$g" "$tmpdir/$g.xz" && \ printf "\n" ls -latr "$dest/$g.xz" } if [ ! -d $1 ]; then echo "$1 is not a directory" exit 1 else echo `pwd` echo "$f" echo "$1" doathing trap "rm -rfv $tmpdir" 0 1 3 15 exit $? fi ''; executable = true; }; # qutebrowser userscripts start. ".config/qutebrowser/userscripts/localhost" = { executable = true; text = '' #!/usr/bin/env bash export BEMENU_OPTS="--tb '#6272a4' --tf '#f8f8f2' --fb '#282a36' --ff '#f8f8f2' --nb '#282a36' --nf '#6272a4' --hb '#44475a' --hf '#50fa7b' --sb '#44475a' --sf '#50fa7b' --scb '#282a36' --scf '#ff79c6' -p 'localhost: ▶' --fork -l 5 --fn 'FiraCode Retina 17'" if [[ $1 -eq 'list' ]] && [[ -z $QUTE_COUNT ]]; then PORTS="$(ss -nltp | tail -n +2 | awk '{print $4}' | awk -F: '{print $2}')" QUTE_COUNT=$(echo "$PORTS" | bemenu -n ) fi # echo open -t localhost:''${QUTE_COUNT:-8080} > $QUTE_FIFO [ -n "$QUTE_COUNT" ] && echo open -t localhost:"''${QUTE_COUNT}" > $QUTE_FIFO ''; }; ".config/qutebrowser/userscripts/speak" = { executable = true; text = '' #!/bin/bash export IFS=$'\n' pkill -f qute_speak || { ~/.local/bin/gtts-cli "$QUTE_SELECTED_TEXT" | mpv --no-video --speed=1.26 - # ~/.local/bin/gtts-cli "$QUTE_SELECTED_TEXT" > /tmp/qute_speak.mp3 # mpv /tmp/qute_speak.mp3 } ''; }; ".config/qutebrowser/userscripts/dark_mode.user" = { executable = true; text = '' #!/bin/zsh # on a new system cp DR.js DarkReader.user.js darkreader_file="$HOME/.config/qutebrowser/greasemonkey/DarkReader.user.js" enabled="^//DarkReader.disable();" darkreader_enabled="$(grep -q -e "$enabled" $darkreader_file; echo $?)" # echo $darkreader_enabled if [[ "$(echo $darkreader_enabled)" == "1" ]]; then # enable DarkReader by commenting out the line that disables it. sed -i --follow-symlink 's/DarkReader.disable()/\/\/DarkReader.disable()/' "$darkreader_file" else # disable DarkReader sed -i --follow-symlink 's/\/\/DarkReader.disable()/DarkReader.disable()/' "$darkreader_file" fi ''; }; ".config/qutebrowser/userscripts/code_select.py" = { executable = true; # source = .local/share/qutebrowser/userscripts/code_select.py; text = '' #!/usr/bin/env python3 import os import html import re import sys import xml.etree.ElementTree as ET try: import pyperclip except ImportError: PYPERCLIP = False else: PYPERCLIP = True def parse_text_content(element): root = ET.fromstring(element) text = ET.tostring(root, encoding="unicode", method="text") text = html.unescape(text) return text def send_command_to_qute(command): with open(os.environ.get("QUTE_FIFO"), "w") as f: f.write(command) def main(): delimiter = sys.argv[1] if len(sys.argv) > 1 else ";" # For info on qute environment vairables, see # https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc element = os.environ.get("QUTE_SELECTED_HTML") code_text = parse_text_content(element) if PYPERCLIP: pyperclip.copy(code_text) send_command_to_qute( "message-info 'copied to clipboard: {info}{suffix}'".format( info=code_text.splitlines()[0], suffix="..." if len(code_text.splitlines()) > 1 else "" ) ) else: # Qute's yank command won't copy accross multiple lines so we # compromise by placing lines on a single line seperated by the # specified delimiter code_text = re.sub("(\n)+", delimiter, code_text) code_text = code_text.replace("'", "\"") send_command_to_qute("yank inline '{code}'\n".format(code=code_text)) if __name__ == "__main__": main() ''; }; ".config/qutebrowser/userscripts/getbib" = { executable = true; text = '' #!/usr/bin/env python3 """Qutebrowser userscript scraping the current web page for DOIs and downloading corresponding bibtex information. Set the environment variable 'QUTE_BIB_FILEPATH' to indicate the path to download to. Otherwise, bibtex information is downloaded to '/tmp' and hence deleted at reboot. Installation: see qute://help/userscripts.html Inspired by https://ocefpaf.github.io/python4oceanographers/blog/2014/05/19/doi2bibtex/ """ import os import sys import re from collections import Counter from urllib import parse as url_parse from urllib import request as url_request FIFO_PATH = os.getenv("QUTE_FIFO") def message_fifo(message, level="warning"): """Send message to qutebrowser FIFO. The level must be one of 'info', 'warning' (default) or 'error'.""" with open(FIFO_PATH, "w") as fifo: fifo.write("message-{} '{}'".format(level, message)) source = os.getenv("QUTE_TEXT") with open(source) as f: text = f.read() # find DOIs on page using regex dval = re.compile(r'(10\.(\d)+/([^(\s\>\"\<)])+)') # https://stackoverflow.com/a/10324802/3865876, too strict # dval = re.compile(r'\b(10[.][0-9]{4,}(?:[.][0-9]+)*/(?:(?!["&\'<>])\S)+)\b') dois = dval.findall(text) dois = Counter(e[0] for e in dois) try: doi = dois.most_common(1)[0][0] except IndexError: message_fifo("No DOIs found on page") sys.exit() message_fifo("Found {} DOIs on page, selecting {}".format(len(dois), doi), level="info") # get bibtex data corresponding to DOI url = "https://dx.doi.org/" + url_parse.quote(doi) headers = dict(Accept='text/bibliography; style=bibtex') request = url_request.Request(url, headers=headers) response = url_request.urlopen(request) status_code = response.getcode() if status_code >= 400: message_fifo("Request returned {}".format(status_code)) sys.exit() # obtain content and format it bibtex = response.read().decode("utf-8").strip() bibtex = bibtex.replace(" ", "\n ", 1).\ replace("}, ", "},\n ").replace("}}", "}\n}") # append to file bib_filepath = os.getenv("QUTE_BIB_FILEPATH", "/tmp/qute.bib") with open(bib_filepath, "a") as f: f.write(bibtex + "\n\n") ''; }; ".config/qutebrowser/userscripts/qute-gemini" = { executable = true; text = '' #!/usr/bin/env python3 # qute-gemini - Open Gemini links in qutebrowser and render them as HTML # # SPDX-FileCopyrightText: 2019-2020 solderpunk # SPDX-FileCopyrightText: 2020 Aaron Janse # SPDX-FileCopyrightText: 2020 petedussin # SPDX-FileCopyrightText: 2020-2021 Sotiris Papatheodorou # SPDX-License-Identifier: GPL-3.0-or-later import cgi import html import os import socket import ssl import sys import tempfile import urllib.parse from typing import Tuple _version = "1.0.0" _max_redirects = 5 _error_page_template = '''' Error opening page: URL

qute-gemini error

Error while opening:
URL_TEXT

DESCRIPTION

'''' _status_code_desc = { "1": "Gemini status code 1 Input. This is not implemented in qute-gemini.", "10": "Gemini status code 10 Input. This is not implemented in qute-gemini.", "11": "Gemini status code 11 Sensitive Input. This is not implemented in qute-gemini.", "3": "Gemini status code 3 Redirect. Stopped after " + str(_max_redirects) + " redirects.", "30": "Gemini status code 30 Temporary Redirect. Stopped after " + str(_max_redirects) + " redirects.", "31": "Gemini status code 31 Permanent Redirect. Stopped after " + str(_max_redirects) + " redirects.", "4": "Gemini status code 4 Temporary Failure. Server message: META", "40": "Gemini status code 40 Temporary Failure. Server message: META", "41": "Gemini status code 41 Server Unavailable. The server is unavailable due to overload or maintenance. Server message: META", "42": "Gemini status code 42 CGI Error. A CGI process, or similar system for generating dynamic content, died unexpectedly or timed out. Server message: META", "43": "Gemini status code 43 Proxy Error. A proxy request failed because the server was unable to successfully complete a transaction with the remote host. Server message: META", "44": "Gemini status code 44 Slow Down. Rate limiting is in effect. Please wait META seconds before making another request to this server.", "5": "Gemini status code 5 Permanent Failure. Server message: META", "50": "Gemini status code 50 Permanent Failure. Server message: META", "51": "Gemini status code 51 Not Found. he requested resource could not be found but may be available in the future. Server message: META", "52": "Gemini status code 52 Gone. The resource requested is no longer available and will not be available again. Server message: META", "53": "Gemini status code 53 Proxy Request Refused. The request was for a resource at a domain not served by the server and the server does not accept proxy requests. Server message: META", "59": "Gemini status code 59 Bad Request. The server was unable to parse the client's request, presumably due to a malformed request. Server message: META", "6": "Gemini status code 6 Client Certificate Required. This is not implemented in qute-gemini.", } def qute_url() -> str: """Get the URL passed to the script by qutebrowser.""" return os.environ["QUTE_URL"] def qute_fifo() -> str: """Get the FIFO or file to write qutebrowser commands to.""" return os.environ["QUTE_FIFO"] def html_href(url: str, description: str) -> str: return "".join(['', description, ""]) def qute_gemini_css_path() -> str: """Return the path where the custom CSS file is expected to be.""" try: base_dir = os.environ["XDG_DATA_HOME"] except KeyError: base_dir = os.path.join(os.environ["HOME"], ".local/share") return os.path.join(base_dir, "qutebrowser/userscripts/qute-gemini.css") def gemini_absolutise_url(base_url: str, relative_url: str) -> str: """Absolutise relative gemini URLs. Adapted from gcat: https://github.com/aaronjanse/gcat """ if "://" not in relative_url: # Python's URL tools somehow only work with known schemes? base_url = base_url.replace("gemini://", "http://") relative_url = urllib.parse.urljoin(base_url, relative_url) relative_url = relative_url.replace("http://", "gemini://") return relative_url def gemini_fetch_url(url: str) -> Tuple[str, str, str, str, str]: """Fetch a Gemini URL and return the content as a string. url: URL with gemini:// or no scheme. Returns 4 strings: the content, the URL the content was fetched from, the Gemini status code, the value of the meta field and an error message. Adapted from gcat: https://github.com/aaronjanse/gcat """ # Parse the URL to get the hostname and port parsed_url = urllib.parse.urlparse(url) if not parsed_url.scheme: url = "gemini://" + url parsed_url = urllib.parse.urlparse(url) if parsed_url.scheme != "gemini": return "", "Received non-gemini:// URL: " + url if parsed_url.port is not None: useport = parsed_url.port else: useport = 1965 # Do the Gemini transaction, looping for redirects redirects = 0 while True: # Send the request s = socket.create_connection((parsed_url.hostname, useport)) context = ssl.SSLContext(ssl.PROTOCOL_TLS) context.check_hostname = False context.verify_mode = ssl.CERT_NONE s = context.wrap_socket(s, server_hostname = parsed_url.netloc) s.sendall((url + "\r\n").encode("UTF-8")) # Get the status code and meta fp = s.makefile("rb") header = fp.readline().decode("UTF-8").strip() status, meta = header.split()[:2] # Follow up to 5 redirects if status.startswith("3"): url = gemini_absolutise_url(url, meta) parsed_url = urllib.parse.urlparse(url) redirects += 1 if redirects > _max_redirects: # Too many redirects break # Otherwise we're done else: break # Process the response content = "" error_msg = "" # 2x Success if status.startswith("2"): media_type, media_type_opts = cgi.parse_header(meta) # Decode according to declared charset defaulting to UTF-8 if meta.startswith("text/gemini"): charset = media_type_opts.get("charset", "UTF-8") content = fp.read().decode(charset) else: error_msg = "Expected media type text/gemini but received " \ + media_type # Handle errors else: # Try matching a 2-digit and then a 1-digit status code try: error_msg = _status_code_desc[status[0:2]] except KeyError: try: error_msg = _status_code_desc[status[0]] except KeyError: error_msg = "The server sent back something weird." # Substitute the contents of meta into the error message if needed error_msg = error_msg.replace("META", meta) return content, url, status, meta, error_msg def gemtext_to_html(gemtext: str, url: str, original_url: str, status: str, meta: str) -> str: """Convert gemtext to HTML. title: Used as the document title. url: The URL the gemtext was received from. Used to resolve relative URLs in the gemtext content. original_url: The URL the original request was made at. status: The Gemini status code returned by the server. meta: The meta returned by the server. Returns the HTML representation as a string. """ # Accumulate converted gemtext lines lines = ['', '', "\t", "\t\t" + html.escape(url) + "", "\t\t", "\t", "\t", "\t
"] in_pre = False in_list = False # Add an extra newline to ensure list tags are closed properly for line in (gemtext + "\n").splitlines(): # Add the list closing tag if not line.startswith("*") and in_list: lines.append("\t\t") in_list = False # Blank line, ignore if not line: pass # Link elif line.startswith("=>"): l = line[2:].split(None, 1) # Use the URL itself as the description if there is none if len(l) == 1: l.append(l[0]) # Encode the link description l[1] = html.escape(l[1]) # Resolve relative URLs l[0] = gemini_absolutise_url(url, l[0]) lines.append("\t\t

" + html_href(l[0], l[1]) + "

") # Preformated toggle elif line.startswith("```"): if in_pre: lines.append("\t\t") else: lines.append("\t\t
")
                    in_pre = not in_pre
                # Preformated
                elif in_pre:
                    lines.append(line)
                # Header
                elif line.startswith("###"):
                    lines.append("\t\t

" + html.escape(line[3:].strip()) + "

") elif line.startswith("##"): lines.append("\t\t

" + html.escape(line[2:].strip()) + "

") elif line.startswith("#"): lines.append("\t\t

" + html.escape(line[1:].strip()) + "

") # List elif line.startswith("*"): if not in_list: lines.append("\t\t
    ") in_list = True lines.append("\t\t\t
  • " + html.escape(line[1:].strip()) + "
  • ") # Quote elif line.startswith(">"): lines.extend(["\t\t
    ", "\t\t\t

    " + line[1:].strip() + "

    ", "\t\t
    "]) # Normal text else: lines.append("\t\t

    " + html.escape(line.strip()) + "

    ") url_html = html_href(url, html.escape(url)) original_url_html = html_href(original_url, html.escape(original_url)) lines.extend(["", "\t
", "\t
", "\t\t", "\t\t\tContent from " + url_html, "\t\t", "\t\t
", "\t\t\t
Original URL
", "\t\t\t
" + original_url_html + "
", "\t\t\t
Status
", "\t\t\t
" + status + "
", "\t\t\t
Meta
", "\t\t\t
" + meta + "
", "\t\t\t
Fetched by
", '\t\t\t
qute-gemini ' + str(_version) + "
", "\t\t
", "\t
", "\t", ""]) return "\n".join(lines) def get_css() -> str: # Search for qute-gemini.css in the directory this script is located in css_file = qute_gemini_css_path() if os.path.isfile(css_file): # Return the file contents with open(css_file, "r") as f: return f.read().strip() else: # Use no CSS return "" def qute_error_page(url: str, description: str) -> str: """Return a data URI error page like qutebrowser does. url: The URL of the page that failed to load. description: A description of the error. Returns a data URI containing the error page. """ # Generate the HTML error page html_page = _error_page_template.replace("URL", url) html_page = html_page.replace("URL_TEXT", html.escape(url)) html_page = html_page.replace("DESCRIPTION", html.escape(description)) html_page = html_page.replace("CSS", get_css()) # URL encode and return as a data URI return "data:text/html;charset=UTF-8," + urllib.parse.quote(html_page) def open_gemini(url: str, open_args: str) -> None: """Open Gemini URL in qutebrowser.""" # Get the Gemini content content, content_url, status, meta, error_msg = gemini_fetch_url(url) if error_msg: # Generate an error page in a data URI open_url = qute_error_page(url, error_msg) else: # Success, convert to HTML in a temporary file tmpf = tempfile.NamedTemporaryFile("w", suffix=".html", delete=False) tmp_filename = tmpf.name tmpf.close() with open(tmp_filename, "w") as f: f.write(gemtext_to_html(content, content_url, url, status, meta)) open_url = " file://" + tmp_filename # Open the HTML file in qutebrowser with open(qute_fifo(), "w") as qfifo: qfifo.write("open " + open_args + open_url) def open_other(url: str, open_args: str) -> None: """Open non-Gemini URL in qutebrowser.""" with open(qute_fifo(), "w") as qfifo: qfifo.write("open " + open_args + " " + url) if __name__ == "__main__": # Open in the current or a new tab depending on the script name if sys.argv[0].endswith("-tab"): open_args = "-t" else: open_args = "" # Select how to open the URL depending on its scheme url = qute_url() parsed_url = urllib.parse.urlparse(url) if parsed_url.scheme == "gemini": open_gemini(url, open_args) else: open_other(url, open_args) ''; }; ".config/qutebrowser/userscripts/qute-keepassxc" = { executable = true; text = '' #!/usr/bin/env python3 # Copyright (c) 2018-2021 Markus Blöchl # # This file is part of qutebrowser. # # qutebrowser is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # qutebrowser is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with qutebrowser. If not, see . """ # Introduction This is a [qutebrowser][2] [userscript][5] to fill website credentials from a [KeepassXC][1] password database. # Installation First, you need to enable [KeepassXC-Browser][6] extensions in your KeepassXC config. Second, you must make sure to have a working private-public-key-pair in your [GPG keyring][3]. Third, install the python module `pynacl`. Finally, adapt your qutebrowser config. You can e.g. add the following lines to your `~/.config/qutebrowser/config.py` Remember to replace `ABC1234` with your actual GPG key. ```python config.bind('', 'spawn --userscript qute-keepassxc --key ABC1234', mode='insert') config.bind('pw', 'spawn --userscript qute-keepassxc --key ABC1234', mode='normal') ``` # Usage If you are on a webpage with a login form, simply activate one of the configured key-bindings. The first time you run this script, KeepassXC will ask you for authentication like with any other browser extension. Just provide a name of your choice and accept the request if nothing looks fishy. # How it works This script will talk to KeepassXC using the native [KeepassXC-Browser protocol][4]. This script needs to store the key used to associate with your KeepassXC instance somewhere. Unlike most browser extensions which only use plain local storage, this one attempts to do so in a safe way by storing the key in encrypted form using GPG. Therefore you need to have a public-key-pair readily set up. GPG might then ask for your private-key passwort whenever you query the database for login credentials. [1]: https://keepassxc.org/ [2]: https://qutebrowser.org/ [3]: https://gnupg.org/ [4]: https://github.com/keepassxreboot/keepassxc-browser/blob/develop/keepassxc-protocol.md [5]: https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc [6]: https://keepassxc.org/docs/KeePassXC_GettingStarted.html#_setup_browser_integration """ import sys import os import socket import json import base64 import subprocess import argparse import nacl.utils import nacl.public def parse_args(): parser = argparse.ArgumentParser(description="Full passwords from KeepassXC") parser.add_argument('url', nargs='?', default=os.environ.get('QUTE_URL')) parser.add_argument('--socket', '-s', default='/run/user/{}/org.keepassxc.KeePassXC.BrowserServer'.format(os.getuid()), help='Path to KeepassXC browser socket') parser.add_argument('--key', '-k', default='alice@example.com', help='GPG key to encrypt KeepassXC auth key with') parser.add_argument('--insecure', action='store_true', help="Do not encrypt auth key") return parser.parse_args() class KeepassError(Exception): def __init__(self, code, desc): self.code = code self.description = desc def __str__(self): return f"KeepassXC Error [{self.code}]: {self.description}" class KeepassXC: """ Wrapper around the KeepassXC socket API """ def __init__(self, id=None, *, key, socket_path): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.id = id self.socket_path = socket_path self.client_key = nacl.public.PrivateKey.generate() self.id_key = nacl.public.PrivateKey.from_seed(key) self.cryptobox = None def connect(self): if not os.path.exists(self.socket_path): raise KeepassError(-1, "KeepassXC Browser socket does not exists") self.client_id = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8') self.sock.connect(self.socket_path) self.send_raw_msg(dict( action = 'change-public-keys', publicKey = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'), nonce = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8'), clientID = self.client_id )) resp = self.recv_raw_msg() assert resp['action'] == 'change-public-keys' assert resp['success'] == 'true' assert resp['nonce'] self.cryptobox = nacl.public.Box( self.client_key, nacl.public.PublicKey(base64.b64decode(resp['publicKey'])) ) def get_databasehash(self): self.send_msg(dict(action='get-databasehash')) return self.recv_msg()['hash'] def lock_database(self): self.send_msg(dict(action='lock-database')) try: self.recv_msg() except KeepassError as e: if e.code == 1: return True raise return False def test_associate(self): if not self.id: return False self.send_msg(dict( action = 'test-associate', id = self.id, key = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') )) return self.recv_msg()['success'] == 'true' def associate(self): self.send_msg(dict( action = 'associate', key = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'), idKey = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') )) resp = self.recv_msg() self.id = resp['id'] def get_logins(self, url): self.send_msg(dict( action = 'get-logins', url = url, keys = [{ 'id': self.id, 'key': base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') }] )) return self.recv_msg()['entries'] def send_raw_msg(self, msg): self.sock.send( json.dumps(msg).encode('utf-8') ) def recv_raw_msg(self): return json.loads( self.sock.recv(4096).decode('utf-8') ) def send_msg(self, msg, **extra): nonce = nacl.utils.random(nacl.public.Box.NONCE_SIZE) self.send_raw_msg(dict( action = msg['action'], message = base64.b64encode(self.cryptobox.encrypt(json.dumps(msg).encode('utf-8'), nonce).ciphertext).decode('utf-8'), nonce = base64.b64encode(nonce).decode('utf-8'), clientID = self.client_id, **extra )) def recv_msg(self): resp = self.recv_raw_msg() if 'error' in resp: raise KeepassError(resp['errorCode'], resp['error']) assert resp['action'] return json.loads(self.cryptobox.decrypt(base64.b64decode(resp['message']), base64.b64decode(resp['nonce'])).decode('utf-8')) class SecretKeyStore: def __init__(self, gpgkey): self.gpgkey = gpgkey if gpgkey is None: self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key') else: self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key.gpg') def load(self): "Load existing association key from file" if self.gpgkey is None: jsondata = open(self.path, 'r').read() else: jsondata = subprocess.check_output(['gpg', '--decrypt', self.path]).decode('utf-8') data = json.loads(jsondata) self.id = data['id'] self.key = base64.b64decode(data['key']) def create(self): "Create new association key" self.key = nacl.utils.random(32) self.id = None def store(self, id): "Store newly created association key in file" self.id = id jsondata = json.dumps({'id':self.id, 'key':base64.b64encode(self.key).decode('utf-8')}) if self.gpgkey is None: open(self.path, "w").write(jsondata) else: subprocess.run(['gpg', '--encrypt', '-o', self.path, '-r', self.gpgkey], input=jsondata.encode('utf-8'), check=True) def qute(cmd): with open(os.environ['QUTE_FIFO'], 'w') as fifo: fifo.write(cmd) fifo.write('\n') fifo.flush() def error(msg): print(msg, file=sys.stderr) qute('message-error "{}"'.format(msg)) def connect_to_keepassxc(args): assert args.key or args.insecure, "Missing GPG key to use for auth key encryption" keystore = SecretKeyStore(args.key) if os.path.isfile(keystore.path): keystore.load() kp = KeepassXC(keystore.id, key=keystore.key, socket_path=args.socket) kp.connect() if not kp.test_associate(): error('No KeepassXC association') return None else: keystore.create() kp = KeepassXC(key=keystore.key, socket_path=args.socket) kp.connect() kp.associate() if not kp.test_associate(): error('No KeepassXC association') return None keystore.store(kp.id) return kp def make_js_code(username, password): return ' '.join(""" function isVisible(elem) { var style = elem.ownerDocument.defaultView.getComputedStyle(elem, null); if (style.getPropertyValue("visibility") !== "visible" || style.getPropertyValue("display") === "none" || style.getPropertyValue("opacity") === "0") { return false; } return elem.offsetWidth > 0 && elem.offsetHeight > 0; }; function hasPasswordField(form) { var inputs = form.getElementsByTagName("input"); for (var j = 0; j < inputs.length; j++) { var input = inputs[j]; if (input.type === "password") { return true; } } return false; }; function loadData2Form (form) { var inputs = form.getElementsByTagName("input"); for (var j = 0; j < inputs.length; j++) { var input = inputs[j]; if (isVisible(input) && (input.type === "text" || input.type === "email")) { input.focus(); input.value = %s; input.dispatchEvent(new Event('input', { 'bubbles': true })); input.dispatchEvent(new Event('change', { 'bubbles': true })); input.blur(); } if (input.type === "password") { input.focus(); input.value = %s; input.dispatchEvent(new Event('input', { 'bubbles': true })); input.dispatchEvent(new Event('change', { 'bubbles': true })); input.blur(); } } }; function fillFirstForm() { var forms = document.getElementsByTagName("form"); for (i = 0; i < forms.length; i++) { if (hasPasswordField(forms[i])) { loadData2Form(forms[i]); return; } } alert("No Credentials Form found"); }; fillFirstForm() """.splitlines()) % (json.dumps(username), json.dumps(password)) def main(): if 'QUTE_FIFO' not in os.environ: print(f"No QUTE_FIFO found - {sys.argv[0]} must be run as a qutebrowser userscript") sys.exit(-1) try: args = parse_args() assert args.url, "Missing URL" kp = connect_to_keepassxc(args) if not kp: error('Could not connect to KeepassXC') return creds = kp.get_logins(args.url) if not creds: error('No credentials found') return # TODO: handle multiple matches name, pw = creds[0]['login'], creds[0]['password'] if name and pw: qute('jseval -q ' + make_js_code(name, pw)) except Exception as e: error(str(e)) if __name__ == '__main__': main() ''; }; # qutebrowser userscripts end. ".config/qutebrowser/greasemonkey/DR.js" = { text = '' // ==UserScript== // @name Dark Reader (Unofficial) // @icon https://darkreader.org/images/darkreader-icon-256x256.png // @namespace DarkReader // @description Inverts the brightness of pages to reduce eye strain // @version 4.9.52 // @author https://github.com/darkreader/darkreader#contributors // @homepageURL https://darkreader.org/ | https://github.com/darkreader/darkreader // @run-at document-end // @grant none // @exclude https://git.dotya.ml* // @exclude https://dotya.ml* // @exclude https://status.dotya.ml* // @exclude https://searxng.dotya.ml* // @exclude https://grafana.dotya.ml* // @exclude https://github.com* // @exclude https://dnswatch.com* // @exclude https://docs.immudb.io* // @exclude https://woodpecker-ci.org* // @exclude https://duckduckgo.com* // @exclude https://www.redit.com* // @exclude https://codeberg.org* // @include http* // @require https://cdn.jsdelivr.net/npm/darkreader/darkreader.min.js // @noframes // ==/UserScript== DarkReader.enable({ brightness: 105, contrast: 105, sepia: 0 }); DarkReader.disable(); ''; }; ".config/qutebrowser/config.py" = { source = .config/qutebrowser/config.py; }; ".local/bin/workqb" = { text = '' #!/bin/zsh qutebrowser \ --restore work \ --config ~/.config/qutebrowser/config.py \ --basedir ~/.config/qutebrowser-work \ & disown ''; executable = true; }; # ref: https://go.dev/blog/pprof ".local/bin/xtime" = { text = '' #!/bin/sh /usr/bin/time -f '%Uu %Ss %er %MkB %C' "$@" ''; executable = true; }; ".local/bin/xdp-screen-cast" = { text = '' #!/usr/bin/python3 # ref: https://gitlab.gnome.org/-/snippets/19 import re import signal import dbus from gi.repository import GLib from dbus.mainloop.glib import DBusGMainLoop import gi gi.require_version('Gst', '1.0') from gi.repository import GObject, Gst DBusGMainLoop(set_as_default=True) Gst.init(None) loop = GLib.MainLoop() bus = dbus.SessionBus() request_iface = 'org.freedesktop.portal.Request' screen_cast_iface = 'org.freedesktop.portal.ScreenCast' pipeline = None def terminate(): if pipeline is not None: self.player.set_state(Gst.State.NULL) loop.quit() request_token_counter = 0 session_token_counter = 0 sender_name = re.sub(r'\.', r'_', bus.get_unique_name()[1:]) def new_request_path(): global request_token_counter request_token_counter = request_token_counter + 1 token = 'u%d'%request_token_counter path = '/org/freedesktop/portal/desktop/request/%s/%s'%(sender_name, token) return (path, token) def new_session_path(): global session_token_counter session_token_counter = session_token_counter + 1 token = 'u%d'%session_token_counter path = '/org/freedesktop/portal/desktop/session/%s/%s'%(sender_name, token) return (path, token) def screen_cast_call(method, callback, *args, options={}): (request_path, request_token) = new_request_path() bus.add_signal_receiver(callback, 'Response', request_iface, 'org.freedesktop.portal.Desktop', request_path) options['handle_token'] = request_token method(*(args + (options, )), dbus_interface=screen_cast_iface) def on_gst_message(bus, message): type = message.type if type == Gst.MessageType.EOS or type == Gst.MessageType.ERROR: terminate() def play_pipewire_stream(node_id): empty_dict = dbus.Dictionary(signature="sv") fd_object = portal.OpenPipeWireRemote(session, empty_dict, dbus_interface=screen_cast_iface) fd = fd_object.take() pipeline = Gst.parse_launch('pipewiresrc fd=%d path=%u ! videoconvert ! xvimagesink'%(fd, node_id)) pipeline.set_state(Gst.State.PLAYING) pipeline.get_bus().connect('message', on_gst_message) def on_start_response(response, results): if response != 0: print("Failed to start: %s"%response) terminate() return print("streams:") for (node_id, stream_properties) in results['streams']: print("stream {}".format(node_id)) play_pipewire_stream(node_id) def on_select_sources_response(response, results): if response != 0: print("Failed to select sources: %d"%response) terminate() return print("sources selected") global session screen_cast_call(portal.Start, on_start_response, session, ''') def on_create_session_response(response, results): if response != 0: print("Failed to create session: %d"%response) terminate() return global session session = results['session_handle'] print("session %s created"%session) screen_cast_call(portal.SelectSources, on_select_sources_response, session, options={ 'multiple': False, 'types': dbus.UInt32(1|2) }) portal = bus.get_object('org.freedesktop.portal.Desktop', '/org/freedesktop/portal/desktop') (session_path, session_token) = new_session_path() screen_cast_call(portal.CreateSession, on_create_session_response, options={ 'session_handle_token': session_token }) try: loop.run() except KeyboardInterrupt: terminate() ''; executable = true; }; ".local/bin/playerctl.sh" = { source = ./bin/playerctl.sh; executable = true; }; }; xdg = (import ./nix/modules/xdg.nix) {inherit pkgs config hostName;}; }