qutebrowser: add userscripts + bindings

This commit is contained in:
surtur 2021-06-14 11:26:21 +02:00
parent 49c6542e34
commit b4173d8a39
Signed by: wanderer
GPG Key ID: 19CE1EC1D9E0486D
6 changed files with 831 additions and 0 deletions

View File

@ -157,6 +157,11 @@ config.set('content.javascript.enabled', True, 'qute://*/*')
# Type: Bool
c.content.pdfjs = False
# CSS selectors used to determine which elements on a page should have
# hints.
# Type: Dict
c.hints.selectors = {'all': ['a', 'area', 'textarea', 'select', 'input:not([type="hidden"])', 'button', 'frame', 'iframe', 'img', 'link', 'summary', '[contenteditable]:not([contenteditable="false"])', '[onclick]', '[onmousedown]', '[role="link"]', '[role="option"]', '[role="button"]', '[ng-click]', '[ngClick]', '[data-ng-click]', '[x-ng-click]', '[tabindex]'], 'links': ['a[href]', 'area[href]', 'link[href]', '[role="link"][href]'], 'images': ['img'], 'media': ['audio', 'img', 'video'], 'url': ['[src]', '[href]'], 'inputs': ['input[type="text"]', 'input[type="date"]', 'input[type="datetime-local"]', 'input[type="email"]', 'input[type="month"]', 'input[type="number"]', 'input[type="password"]', 'input[type="search"]', 'input[type="tel"]', 'input[type="time"]', 'input[type="url"]', 'input[type="week"]', 'input:not([type])', '[contenteditable]:not([contenteditable="false"])', 'textarea'], 'code': [':not(pre) > code', 'pre']}
# Position of the tab bar.
# Type: Position
# Valid values:
@ -236,3 +241,10 @@ c.fonts.web.size.default_fixed = 20
# Hard minimum font size (in pixels).
# Type: Int
c.fonts.web.size.minimum = 15
# Bindings for normal mode
config.bind(';#', 'hint code userscript code_select.py')
config.bind(';g', 'hint links userscript qute-gemini')
config.bind(';G', 'hint links userscript qute-gemini-tab')
config.bind(';b', 'hint links userscript getbib')
config.bind(';m', 'hint links spawn mpv --gpu-context=wayland --hwdec=auto {hint-url}')

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import os
import html
import re
import sys
import xml.etree.ElementTree as ET
try:
import pyperclip
except ImportError:
PYPERCLIP = False
else:
PYPERCLIP = True
def parse_text_content(element):
root = ET.fromstring(element)
text = ET.tostring(root, encoding="unicode", method="text")
text = html.unescape(text)
return text
def send_command_to_qute(command):
with open(os.environ.get("QUTE_FIFO"), "w") as f:
f.write(command)
def main():
delimiter = sys.argv[1] if len(sys.argv) > 1 else ";"
# For info on qute environment vairables, see
# https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc
element = os.environ.get("QUTE_SELECTED_HTML")
code_text = parse_text_content(element)
if PYPERCLIP:
pyperclip.copy(code_text)
send_command_to_qute(
"message-info 'copied to clipboard: {info}{suffix}'".format(
info=code_text.splitlines()[0],
suffix="..." if len(code_text.splitlines()) > 1 else ""
)
)
else:
# Qute's yank command won't copy accross multiple lines so we
# compromise by placing lines on a single line seperated by the
# specified delimiter
code_text = re.sub("(\n)+", delimiter, code_text)
code_text = code_text.replace("'", "\"")
send_command_to_qute("yank inline '{code}'\n".format(code=code_text))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""Qutebrowser userscript scraping the current web page for DOIs and downloading
corresponding bibtex information.
Set the environment variable 'QUTE_BIB_FILEPATH' to indicate the path to
download to. Otherwise, bibtex information is downloaded to '/tmp' and hence
deleted at reboot.
Installation: see qute://help/userscripts.html
Inspired by
https://ocefpaf.github.io/python4oceanographers/blog/2014/05/19/doi2bibtex/
"""
import os
import sys
import re
from collections import Counter
from urllib import parse as url_parse
from urllib import request as url_request
FIFO_PATH = os.getenv("QUTE_FIFO")
def message_fifo(message, level="warning"):
"""Send message to qutebrowser FIFO. The level must be one of 'info',
'warning' (default) or 'error'."""
with open(FIFO_PATH, "w") as fifo:
fifo.write("message-{} '{}'".format(level, message))
source = os.getenv("QUTE_TEXT")
with open(source) as f:
text = f.read()
# find DOIs on page using regex
dval = re.compile(r'(10\.(\d)+/([^(\s\>\"\<)])+)')
# https://stackoverflow.com/a/10324802/3865876, too strict
# dval = re.compile(r'\b(10[.][0-9]{4,}(?:[.][0-9]+)*/(?:(?!["&\'<>])\S)+)\b')
dois = dval.findall(text)
dois = Counter(e[0] for e in dois)
try:
doi = dois.most_common(1)[0][0]
except IndexError:
message_fifo("No DOIs found on page")
sys.exit()
message_fifo("Found {} DOIs on page, selecting {}".format(len(dois), doi),
level="info")
# get bibtex data corresponding to DOI
url = "https://dx.doi.org/" + url_parse.quote(doi)
headers = dict(Accept='text/bibliography; style=bibtex')
request = url_request.Request(url, headers=headers)
response = url_request.urlopen(request)
status_code = response.getcode()
if status_code >= 400:
message_fifo("Request returned {}".format(status_code))
sys.exit()
# obtain content and format it
bibtex = response.read().decode("utf-8").strip()
bibtex = bibtex.replace(" ", "\n ", 1).\
replace("}, ", "},\n ").replace("}}", "}\n}")
# append to file
bib_filepath = os.getenv("QUTE_BIB_FILEPATH", "/tmp/qute.bib")
with open(bib_filepath, "a") as f:
f.write(bibtex + "\n\n")

View File

@ -0,0 +1,340 @@
#!/usr/bin/env python3
# qute-gemini - Open Gemini links in qutebrowser and render them as HTML
#
# SPDX-FileCopyrightText: 2019-2020 solderpunk
# SPDX-FileCopyrightText: 2020 Aaron Janse
# SPDX-FileCopyrightText: 2020 petedussin
# SPDX-FileCopyrightText: 2020-2021 Sotiris Papatheodorou
# SPDX-License-Identifier: GPL-3.0-or-later
import cgi
import html
import os
import socket
import ssl
import sys
import tempfile
import urllib.parse
from typing import Tuple
_version = "1.0.0"
_max_redirects = 5
_error_page_template = '''<?xml version="1.0" encoding="UTF-8"?>
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
<head>
<title>Error opening page: URL</title>
<style>
CSS
</style>
</head>
<body>
<h1>qute-gemini error</h1>
<p>Error while opening:<br/><a href="URL">URL_TEXT</a></p>
<p>DESCRIPTION</p>
</body>
</html>
'''
_status_code_desc = {
"1": "Gemini status code 1 Input. This is not implemented in qute-gemini.",
"10": "Gemini status code 10 Input. This is not implemented in qute-gemini.",
"11": "Gemini status code 11 Sensitive Input. This is not implemented in qute-gemini.",
"3": "Gemini status code 3 Redirect. Stopped after " + str(_max_redirects) + " redirects.",
"30": "Gemini status code 30 Temporary Redirect. Stopped after " + str(_max_redirects) + " redirects.",
"31": "Gemini status code 31 Permanent Redirect. Stopped after " + str(_max_redirects) + " redirects.",
"4": "Gemini status code 4 Temporary Failure. Server message: META",
"40": "Gemini status code 40 Temporary Failure. Server message: META",
"41": "Gemini status code 41 Server Unavailable. The server is unavailable due to overload or maintenance. Server message: META",
"42": "Gemini status code 42 CGI Error. A CGI process, or similar system for generating dynamic content, died unexpectedly or timed out. Server message: META",
"43": "Gemini status code 43 Proxy Error. A proxy request failed because the server was unable to successfully complete a transaction with the remote host. Server message: META",
"44": "Gemini status code 44 Slow Down. Rate limiting is in effect. Please wait META seconds before making another request to this server.",
"5": "Gemini status code 5 Permanent Failure. Server message: META",
"50": "Gemini status code 50 Permanent Failure. Server message: META",
"51": "Gemini status code 51 Not Found. he requested resource could not be found but may be available in the future. Server message: META",
"52": "Gemini status code 52 Gone. The resource requested is no longer available and will not be available again. Server message: META",
"53": "Gemini status code 53 Proxy Request Refused. The request was for a resource at a domain not served by the server and the server does not accept proxy requests. Server message: META",
"59": "Gemini status code 59 Bad Request. The server was unable to parse the client's request, presumably due to a malformed request. Server message: META",
"6": "Gemini status code 6 Client Certificate Required. This is not implemented in qute-gemini.",
}
def qute_url() -> str:
"""Get the URL passed to the script by qutebrowser."""
return os.environ["QUTE_URL"]
def qute_fifo() -> str:
"""Get the FIFO or file to write qutebrowser commands to."""
return os.environ["QUTE_FIFO"]
def html_href(url: str, description: str) -> str:
return "".join(['<a href="', url, '">', description, "</a>"])
def qute_gemini_css_path() -> str:
"""Return the path where the custom CSS file is expected to be."""
try:
base_dir = os.environ["XDG_DATA_HOME"]
except KeyError:
base_dir = os.path.join(os.environ["HOME"], ".local/share")
return os.path.join(base_dir, "qutebrowser/userscripts/qute-gemini.css")
def gemini_absolutise_url(base_url: str, relative_url: str) -> str:
"""Absolutise relative gemini URLs.
Adapted from gcat: https://github.com/aaronjanse/gcat
"""
if "://" not in relative_url:
# Python's URL tools somehow only work with known schemes?
base_url = base_url.replace("gemini://", "http://")
relative_url = urllib.parse.urljoin(base_url, relative_url)
relative_url = relative_url.replace("http://", "gemini://")
return relative_url
def gemini_fetch_url(url: str) -> Tuple[str, str, str, str, str]:
"""Fetch a Gemini URL and return the content as a string.
url: URL with gemini:// or no scheme.
Returns 4 strings: the content, the URL the content was fetched from, the
Gemini status code, the value of the meta field and an error message.
Adapted from gcat: https://github.com/aaronjanse/gcat
"""
# Parse the URL to get the hostname and port
parsed_url = urllib.parse.urlparse(url)
if not parsed_url.scheme:
url = "gemini://" + url
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme != "gemini":
return "", "Received non-gemini:// URL: " + url
if parsed_url.port is not None:
useport = parsed_url.port
else:
useport = 1965
# Do the Gemini transaction, looping for redirects
redirects = 0
while True:
# Send the request
s = socket.create_connection((parsed_url.hostname, useport))
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
s = context.wrap_socket(s, server_hostname = parsed_url.netloc)
s.sendall((url + "\r\n").encode("UTF-8"))
# Get the status code and meta
fp = s.makefile("rb")
header = fp.readline().decode("UTF-8").strip()
status, meta = header.split()[:2]
# Follow up to 5 redirects
if status.startswith("3"):
url = gemini_absolutise_url(url, meta)
parsed_url = urllib.parse.urlparse(url)
redirects += 1
if redirects > _max_redirects:
# Too many redirects
break
# Otherwise we're done
else:
break
# Process the response
content = ""
error_msg = ""
# 2x Success
if status.startswith("2"):
media_type, media_type_opts = cgi.parse_header(meta)
# Decode according to declared charset defaulting to UTF-8
if meta.startswith("text/gemini"):
charset = media_type_opts.get("charset", "UTF-8")
content = fp.read().decode(charset)
else:
error_msg = "Expected media type text/gemini but received " \
+ media_type
# Handle errors
else:
# Try matching a 2-digit and then a 1-digit status code
try:
error_msg = _status_code_desc[status[0:2]]
except KeyError:
try:
error_msg = _status_code_desc[status[0]]
except KeyError:
error_msg = "The server sent back something weird."
# Substitute the contents of meta into the error message if needed
error_msg = error_msg.replace("META", meta)
return content, url, status, meta, error_msg
def gemtext_to_html(gemtext: str, url: str, original_url: str,
status: str, meta: str) -> str:
"""Convert gemtext to HTML.
title: Used as the document title.
url: The URL the gemtext was received from. Used to resolve
relative URLs in the gemtext content.
original_url: The URL the original request was made at.
status: The Gemini status code returned by the server.
meta: The meta returned by the server.
Returns the HTML representation as a string.
"""
# Accumulate converted gemtext lines
lines = ['<?xml version="1.0" encoding="UTF-8"?>',
'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">',
"\t<head>",
"\t\t<title>" + html.escape(url) + "</title>",
"\t\t<style>",
get_css(),
"\t\t</style>",
"\t</head>",
"\t<body>",
"\t<article>"]
in_pre = False
in_list = False
# Add an extra newline to ensure list tags are closed properly
for line in (gemtext + "\n").splitlines():
# Add the list closing tag
if not line.startswith("*") and in_list:
lines.append("\t\t</ul>")
in_list = False
# Blank line, ignore
if not line:
pass
# Link
elif line.startswith("=>"):
l = line[2:].split(None, 1)
# Use the URL itself as the description if there is none
if len(l) == 1:
l.append(l[0])
# Encode the link description
l[1] = html.escape(l[1])
# Resolve relative URLs
l[0] = gemini_absolutise_url(url, l[0])
lines.append("\t\t<p>" + html_href(l[0], l[1]) + "</p>")
# Preformated toggle
elif line.startswith("```"):
if in_pre:
lines.append("\t\t</pre>")
else:
lines.append("\t\t<pre>")
in_pre = not in_pre
# Preformated
elif in_pre:
lines.append(line)
# Header
elif line.startswith("###"):
lines.append("\t\t<h3>" + html.escape(line[3:].strip()) + "</h3>")
elif line.startswith("##"):
lines.append("\t\t<h2>" + html.escape(line[2:].strip()) + "</h2>")
elif line.startswith("#"):
lines.append("\t\t<h1>" + html.escape(line[1:].strip()) + "</h1>")
# List
elif line.startswith("*"):
if not in_list:
lines.append("\t\t<ul>")
in_list = True
lines.append("\t\t\t<li>" + html.escape(line[1:].strip()) + "</li>")
# Quote
elif line.startswith(">"):
lines.extend(["\t\t<blockquote>",
"\t\t\t<p>" + line[1:].strip() + "</p>",
"\t\t</blockquote>"])
# Normal text
else:
lines.append("\t\t<p>" + html.escape(line.strip()) + "</p>")
url_html = html_href(url, html.escape(url))
original_url_html = html_href(original_url, html.escape(original_url))
lines.extend(["",
"\t</article>",
"\t<details>",
"\t\t<summary>",
"\t\t\tContent from " + url_html,
"\t\t</summary>",
"\t\t<dl>",
"\t\t\t<dt>Original URL</dt>",
"\t\t\t<dd>" + original_url_html + "</dd>",
"\t\t\t<dt>Status</dt>",
"\t\t\t<dd>" + status + "</dd>",
"\t\t\t<dt>Meta</dt>",
"\t\t\t<dd>" + meta + "</dd>",
"\t\t\t<dt>Fetched by</dt>",
'\t\t\t<dd><a href="https://git.sr.ht/~sotirisp/qute-gemini">qute-gemini ' + str(_version) + "</a></dd>",
"\t\t</dl>",
"\t</details>",
"\t</body>",
"</html>"])
return "\n".join(lines)
def get_css() -> str:
# Search for qute-gemini.css in the directory this script is located in
css_file = qute_gemini_css_path()
if os.path.isfile(css_file):
# Return the file contents
with open(css_file, "r") as f:
return f.read().strip()
else:
# Use no CSS
return ""
def qute_error_page(url: str, description: str) -> str:
"""Return a data URI error page like qutebrowser does.
url: The URL of the page that failed to load.
description: A description of the error.
Returns a data URI containing the error page.
"""
# Generate the HTML error page
html_page = _error_page_template.replace("URL", url)
html_page = html_page.replace("URL_TEXT", html.escape(url))
html_page = html_page.replace("DESCRIPTION", html.escape(description))
html_page = html_page.replace("CSS", get_css())
# URL encode and return as a data URI
return "data:text/html;charset=UTF-8," + urllib.parse.quote(html_page)
def open_gemini(url: str, open_args: str) -> None:
"""Open Gemini URL in qutebrowser."""
# Get the Gemini content
content, content_url, status, meta, error_msg = gemini_fetch_url(url)
if error_msg:
# Generate an error page in a data URI
open_url = qute_error_page(url, error_msg)
else:
# Success, convert to HTML in a temporary file
tmpf = tempfile.NamedTemporaryFile("w", suffix=".html", delete=False)
tmp_filename = tmpf.name
tmpf.close()
with open(tmp_filename, "w") as f:
f.write(gemtext_to_html(content, content_url, url, status, meta))
open_url = " file://" + tmp_filename
# Open the HTML file in qutebrowser
with open(qute_fifo(), "w") as qfifo:
qfifo.write("open " + open_args + open_url)
def open_other(url: str, open_args: str) -> None:
"""Open non-Gemini URL in qutebrowser."""
with open(qute_fifo(), "w") as qfifo:
qfifo.write("open " + open_args + " " + url)
if __name__ == "__main__":
# Open in the current or a new tab depending on the script name
if sys.argv[0].endswith("-tab"):
open_args = "-t"
else:
open_args = ""
# Select how to open the URL depending on its scheme
url = qute_url()
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme == "gemini":
open_gemini(url, open_args)
else:
open_other(url, open_args)

View File

@ -0,0 +1 @@
qute-gemini

View File

@ -0,0 +1,361 @@
#!/usr/bin/env python3
# Copyright (c) 2018-2021 Markus Blöchl <ususdei@gmail.com>
#
# This file is part of qutebrowser.
#
# qutebrowser is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# qutebrowser is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with qutebrowser. If not, see <https://www.gnu.org/licenses/>.
"""
# Introduction
This is a [qutebrowser][2] [userscript][5] to fill website credentials from a [KeepassXC][1] password database.
# Installation
First, you need to enable [KeepassXC-Browser][6] extensions in your KeepassXC config.
Second, you must make sure to have a working private-public-key-pair in your [GPG keyring][3].
Third, install the python module `pynacl`.
Finally, adapt your qutebrowser config.
You can e.g. add the following lines to your `~/.config/qutebrowser/config.py`
Remember to replace `ABC1234` with your actual GPG key.
```python
config.bind('<Alt-Shift-u>', 'spawn --userscript qute-keepassxc --key ABC1234', mode='insert')
config.bind('pw', 'spawn --userscript qute-keepassxc --key ABC1234', mode='normal')
```
# Usage
If you are on a webpage with a login form, simply activate one of the configured key-bindings.
The first time you run this script, KeepassXC will ask you for authentication like with any other browser extension.
Just provide a name of your choice and accept the request if nothing looks fishy.
# How it works
This script will talk to KeepassXC using the native [KeepassXC-Browser protocol][4].
This script needs to store the key used to associate with your KeepassXC instance somewhere.
Unlike most browser extensions which only use plain local storage, this one attempts to do so in a safe way
by storing the key in encrypted form using GPG.
Therefore you need to have a public-key-pair readily set up.
GPG might then ask for your private-key passwort whenever you query the database for login credentials.
[1]: https://keepassxc.org/
[2]: https://qutebrowser.org/
[3]: https://gnupg.org/
[4]: https://github.com/keepassxreboot/keepassxc-browser/blob/develop/keepassxc-protocol.md
[5]: https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc
[6]: https://keepassxc.org/docs/KeePassXC_GettingStarted.html#_setup_browser_integration
"""
import sys
import os
import socket
import json
import base64
import subprocess
import argparse
import nacl.utils
import nacl.public
def parse_args():
parser = argparse.ArgumentParser(description="Full passwords from KeepassXC")
parser.add_argument('url', nargs='?', default=os.environ.get('QUTE_URL'))
parser.add_argument('--socket', '-s', default='/run/user/{}/org.keepassxc.KeePassXC.BrowserServer'.format(os.getuid()),
help='Path to KeepassXC browser socket')
parser.add_argument('--key', '-k', default='alice@example.com',
help='GPG key to encrypt KeepassXC auth key with')
parser.add_argument('--insecure', action='store_true',
help="Do not encrypt auth key")
return parser.parse_args()
class KeepassError(Exception):
def __init__(self, code, desc):
self.code = code
self.description = desc
def __str__(self):
return f"KeepassXC Error [{self.code}]: {self.description}"
class KeepassXC:
""" Wrapper around the KeepassXC socket API """
def __init__(self, id=None, *, key, socket_path):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.id = id
self.socket_path = socket_path
self.client_key = nacl.public.PrivateKey.generate()
self.id_key = nacl.public.PrivateKey.from_seed(key)
self.cryptobox = None
def connect(self):
if not os.path.exists(self.socket_path):
raise KeepassError(-1, "KeepassXC Browser socket does not exists")
self.client_id = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8')
self.sock.connect(self.socket_path)
self.send_raw_msg(dict(
action = 'change-public-keys',
publicKey = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
nonce = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8'),
clientID = self.client_id
))
resp = self.recv_raw_msg()
assert resp['action'] == 'change-public-keys'
assert resp['success'] == 'true'
assert resp['nonce']
self.cryptobox = nacl.public.Box(
self.client_key,
nacl.public.PublicKey(base64.b64decode(resp['publicKey']))
)
def get_databasehash(self):
self.send_msg(dict(action='get-databasehash'))
return self.recv_msg()['hash']
def lock_database(self):
self.send_msg(dict(action='lock-database'))
try:
self.recv_msg()
except KeepassError as e:
if e.code == 1:
return True
raise
return False
def test_associate(self):
if not self.id:
return False
self.send_msg(dict(
action = 'test-associate',
id = self.id,
key = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
))
return self.recv_msg()['success'] == 'true'
def associate(self):
self.send_msg(dict(
action = 'associate',
key = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
idKey = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
))
resp = self.recv_msg()
self.id = resp['id']
def get_logins(self, url):
self.send_msg(dict(
action = 'get-logins',
url = url,
keys = [{ 'id': self.id, 'key': base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') }]
))
return self.recv_msg()['entries']
def send_raw_msg(self, msg):
self.sock.send( json.dumps(msg).encode('utf-8') )
def recv_raw_msg(self):
return json.loads( self.sock.recv(4096).decode('utf-8') )
def send_msg(self, msg, **extra):
nonce = nacl.utils.random(nacl.public.Box.NONCE_SIZE)
self.send_raw_msg(dict(
action = msg['action'],
message = base64.b64encode(self.cryptobox.encrypt(json.dumps(msg).encode('utf-8'), nonce).ciphertext).decode('utf-8'),
nonce = base64.b64encode(nonce).decode('utf-8'),
clientID = self.client_id,
**extra
))
def recv_msg(self):
resp = self.recv_raw_msg()
if 'error' in resp:
raise KeepassError(resp['errorCode'], resp['error'])
assert resp['action']
return json.loads(self.cryptobox.decrypt(base64.b64decode(resp['message']), base64.b64decode(resp['nonce'])).decode('utf-8'))
class SecretKeyStore:
def __init__(self, gpgkey):
self.gpgkey = gpgkey
if gpgkey is None:
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key')
else:
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key.gpg')
def load(self):
"Load existing association key from file"
if self.gpgkey is None:
jsondata = open(self.path, 'r').read()
else:
jsondata = subprocess.check_output(['gpg', '--decrypt', self.path]).decode('utf-8')
data = json.loads(jsondata)
self.id = data['id']
self.key = base64.b64decode(data['key'])
def create(self):
"Create new association key"
self.key = nacl.utils.random(32)
self.id = None
def store(self, id):
"Store newly created association key in file"
self.id = id
jsondata = json.dumps({'id':self.id, 'key':base64.b64encode(self.key).decode('utf-8')})
if self.gpgkey is None:
open(self.path, "w").write(jsondata)
else:
subprocess.run(['gpg', '--encrypt', '-o', self.path, '-r', self.gpgkey], input=jsondata.encode('utf-8'), check=True)
def qute(cmd):
with open(os.environ['QUTE_FIFO'], 'w') as fifo:
fifo.write(cmd)
fifo.write('\n')
fifo.flush()
def error(msg):
print(msg, file=sys.stderr)
qute('message-error "{}"'.format(msg))
def connect_to_keepassxc(args):
assert args.key or args.insecure, "Missing GPG key to use for auth key encryption"
keystore = SecretKeyStore(args.key)
if os.path.isfile(keystore.path):
keystore.load()
kp = KeepassXC(keystore.id, key=keystore.key, socket_path=args.socket)
kp.connect()
if not kp.test_associate():
error('No KeepassXC association')
return None
else:
keystore.create()
kp = KeepassXC(key=keystore.key, socket_path=args.socket)
kp.connect()
kp.associate()
if not kp.test_associate():
error('No KeepassXC association')
return None
keystore.store(kp.id)
return kp
def make_js_code(username, password):
return ' '.join("""
function isVisible(elem) {
var style = elem.ownerDocument.defaultView.getComputedStyle(elem, null);
if (style.getPropertyValue("visibility") !== "visible" ||
style.getPropertyValue("display") === "none" ||
style.getPropertyValue("opacity") === "0") {
return false;
}
return elem.offsetWidth > 0 && elem.offsetHeight > 0;
};
function hasPasswordField(form) {
var inputs = form.getElementsByTagName("input");
for (var j = 0; j < inputs.length; j++) {
var input = inputs[j];
if (input.type === "password") {
return true;
}
}
return false;
};
function loadData2Form (form) {
var inputs = form.getElementsByTagName("input");
for (var j = 0; j < inputs.length; j++) {
var input = inputs[j];
if (isVisible(input) && (input.type === "text" || input.type === "email")) {
input.focus();
input.value = %s;
input.dispatchEvent(new Event('input', { 'bubbles': true }));
input.dispatchEvent(new Event('change', { 'bubbles': true }));
input.blur();
}
if (input.type === "password") {
input.focus();
input.value = %s;
input.dispatchEvent(new Event('input', { 'bubbles': true }));
input.dispatchEvent(new Event('change', { 'bubbles': true }));
input.blur();
}
}
};
function fillFirstForm() {
var forms = document.getElementsByTagName("form");
for (i = 0; i < forms.length; i++) {
if (hasPasswordField(forms[i])) {
loadData2Form(forms[i]);
return;
}
}
alert("No Credentials Form found");
};
fillFirstForm()
""".splitlines()) % (json.dumps(username), json.dumps(password))
def main():
if 'QUTE_FIFO' not in os.environ:
print(f"No QUTE_FIFO found - {sys.argv[0]} must be run as a qutebrowser userscript")
sys.exit(-1)
try:
args = parse_args()
assert args.url, "Missing URL"
kp = connect_to_keepassxc(args)
if not kp:
error('Could not connect to KeepassXC')
return
creds = kp.get_logins(args.url)
if not creds:
error('No credentials found')
return
# TODO: handle multiple matches
name, pw = creds[0]['login'], creds[0]['password']
if name and pw:
qute('jseval -q ' + make_js_code(name, pw))
except Exception as e:
error(str(e))
if __name__ == '__main__':
main()