home-manager: inline qutebrowser userscripts
This commit is contained in:
parent
1e3230c268
commit
630a54f5f7
@ -392,7 +392,7 @@ c.fonts.web.size.minimum = 15
|
|||||||
# Bindings for normal mode
|
# Bindings for normal mode
|
||||||
config.bind(';#', 'hint code userscript code_select.py')
|
config.bind(';#', 'hint code userscript code_select.py')
|
||||||
config.bind(';D', 'spawn --userscript dark_mode.user ;; greasemonkey-reload')
|
config.bind(';D', 'spawn --userscript dark_mode.user ;; greasemonkey-reload')
|
||||||
config.bind(';G', 'hint links userscript qute-gemini-tab')
|
config.bind(';G', 'hint links userscript qute-gemini')
|
||||||
config.bind(';b', 'hint links userscript getbib')
|
config.bind(';b', 'hint links userscript getbib')
|
||||||
config.bind(';g', 'hint links userscript qute-gemini')
|
config.bind(';g', 'hint links userscript qute-gemini')
|
||||||
config.bind(';m', 'hint links spawn nohup mpv --gpu-context=wayland --hwdec=auto {hint-url}')
|
config.bind(';m', 'hint links spawn nohup mpv --gpu-context=wayland --hwdec=auto {hint-url}')
|
||||||
|
@ -1,52 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import os
|
|
||||||
import html
|
|
||||||
import re
|
|
||||||
import sys
|
|
||||||
import xml.etree.ElementTree as ET
|
|
||||||
try:
|
|
||||||
import pyperclip
|
|
||||||
except ImportError:
|
|
||||||
PYPERCLIP = False
|
|
||||||
else:
|
|
||||||
PYPERCLIP = True
|
|
||||||
|
|
||||||
|
|
||||||
def parse_text_content(element):
|
|
||||||
root = ET.fromstring(element)
|
|
||||||
text = ET.tostring(root, encoding="unicode", method="text")
|
|
||||||
text = html.unescape(text)
|
|
||||||
return text
|
|
||||||
|
|
||||||
|
|
||||||
def send_command_to_qute(command):
|
|
||||||
with open(os.environ.get("QUTE_FIFO"), "w") as f:
|
|
||||||
f.write(command)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
delimiter = sys.argv[1] if len(sys.argv) > 1 else ";"
|
|
||||||
# For info on qute environment vairables, see
|
|
||||||
# https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc
|
|
||||||
element = os.environ.get("QUTE_SELECTED_HTML")
|
|
||||||
code_text = parse_text_content(element)
|
|
||||||
if PYPERCLIP:
|
|
||||||
pyperclip.copy(code_text)
|
|
||||||
send_command_to_qute(
|
|
||||||
"message-info 'copied to clipboard: {info}{suffix}'".format(
|
|
||||||
info=code_text.splitlines()[0],
|
|
||||||
suffix="..." if len(code_text.splitlines()) > 1 else ""
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Qute's yank command won't copy accross multiple lines so we
|
|
||||||
# compromise by placing lines on a single line seperated by the
|
|
||||||
# specified delimiter
|
|
||||||
code_text = re.sub("(\n)+", delimiter, code_text)
|
|
||||||
code_text = code_text.replace("'", "\"")
|
|
||||||
send_command_to_qute("yank inline '{code}'\n".format(code=code_text))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,65 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""Qutebrowser userscript scraping the current web page for DOIs and downloading
|
|
||||||
corresponding bibtex information.
|
|
||||||
Set the environment variable 'QUTE_BIB_FILEPATH' to indicate the path to
|
|
||||||
download to. Otherwise, bibtex information is downloaded to '/tmp' and hence
|
|
||||||
deleted at reboot.
|
|
||||||
Installation: see qute://help/userscripts.html
|
|
||||||
Inspired by
|
|
||||||
https://ocefpaf.github.io/python4oceanographers/blog/2014/05/19/doi2bibtex/
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import re
|
|
||||||
from collections import Counter
|
|
||||||
from urllib import parse as url_parse
|
|
||||||
from urllib import request as url_request
|
|
||||||
|
|
||||||
|
|
||||||
FIFO_PATH = os.getenv("QUTE_FIFO")
|
|
||||||
|
|
||||||
def message_fifo(message, level="warning"):
|
|
||||||
"""Send message to qutebrowser FIFO. The level must be one of 'info',
|
|
||||||
'warning' (default) or 'error'."""
|
|
||||||
with open(FIFO_PATH, "w") as fifo:
|
|
||||||
fifo.write("message-{} '{}'".format(level, message))
|
|
||||||
|
|
||||||
|
|
||||||
source = os.getenv("QUTE_TEXT")
|
|
||||||
with open(source) as f:
|
|
||||||
text = f.read()
|
|
||||||
|
|
||||||
# find DOIs on page using regex
|
|
||||||
dval = re.compile(r'(10\.(\d)+/([^(\s\>\"\<)])+)')
|
|
||||||
# https://stackoverflow.com/a/10324802/3865876, too strict
|
|
||||||
# dval = re.compile(r'\b(10[.][0-9]{4,}(?:[.][0-9]+)*/(?:(?!["&\'<>])\S)+)\b')
|
|
||||||
dois = dval.findall(text)
|
|
||||||
dois = Counter(e[0] for e in dois)
|
|
||||||
try:
|
|
||||||
doi = dois.most_common(1)[0][0]
|
|
||||||
except IndexError:
|
|
||||||
message_fifo("No DOIs found on page")
|
|
||||||
sys.exit()
|
|
||||||
message_fifo("Found {} DOIs on page, selecting {}".format(len(dois), doi),
|
|
||||||
level="info")
|
|
||||||
|
|
||||||
# get bibtex data corresponding to DOI
|
|
||||||
url = "https://dx.doi.org/" + url_parse.quote(doi)
|
|
||||||
headers = dict(Accept='text/bibliography; style=bibtex')
|
|
||||||
request = url_request.Request(url, headers=headers)
|
|
||||||
response = url_request.urlopen(request)
|
|
||||||
status_code = response.getcode()
|
|
||||||
if status_code >= 400:
|
|
||||||
message_fifo("Request returned {}".format(status_code))
|
|
||||||
sys.exit()
|
|
||||||
|
|
||||||
# obtain content and format it
|
|
||||||
bibtex = response.read().decode("utf-8").strip()
|
|
||||||
bibtex = bibtex.replace(" ", "\n ", 1).\
|
|
||||||
replace("}, ", "},\n ").replace("}}", "}\n}")
|
|
||||||
|
|
||||||
# append to file
|
|
||||||
bib_filepath = os.getenv("QUTE_BIB_FILEPATH", "/tmp/qute.bib")
|
|
||||||
with open(bib_filepath, "a") as f:
|
|
||||||
f.write(bibtex + "\n\n")
|
|
@ -1,340 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
# qute-gemini - Open Gemini links in qutebrowser and render them as HTML
|
|
||||||
#
|
|
||||||
# SPDX-FileCopyrightText: 2019-2020 solderpunk
|
|
||||||
# SPDX-FileCopyrightText: 2020 Aaron Janse
|
|
||||||
# SPDX-FileCopyrightText: 2020 petedussin
|
|
||||||
# SPDX-FileCopyrightText: 2020-2021 Sotiris Papatheodorou
|
|
||||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
||||||
|
|
||||||
import cgi
|
|
||||||
import html
|
|
||||||
import os
|
|
||||||
import socket
|
|
||||||
import ssl
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
import urllib.parse
|
|
||||||
|
|
||||||
from typing import Tuple
|
|
||||||
|
|
||||||
|
|
||||||
_version = "1.0.0"
|
|
||||||
|
|
||||||
_max_redirects = 5
|
|
||||||
|
|
||||||
_error_page_template = '''<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
|
|
||||||
<head>
|
|
||||||
<title>Error opening page: URL</title>
|
|
||||||
<style>
|
|
||||||
CSS
|
|
||||||
</style>
|
|
||||||
</head>
|
|
||||||
<body>
|
|
||||||
<h1>qute-gemini error</h1>
|
|
||||||
<p>Error while opening:<br/><a href="URL">URL_TEXT</a></p>
|
|
||||||
<p>DESCRIPTION</p>
|
|
||||||
</body>
|
|
||||||
</html>
|
|
||||||
'''
|
|
||||||
|
|
||||||
_status_code_desc = {
|
|
||||||
"1": "Gemini status code 1 Input. This is not implemented in qute-gemini.",
|
|
||||||
"10": "Gemini status code 10 Input. This is not implemented in qute-gemini.",
|
|
||||||
"11": "Gemini status code 11 Sensitive Input. This is not implemented in qute-gemini.",
|
|
||||||
"3": "Gemini status code 3 Redirect. Stopped after " + str(_max_redirects) + " redirects.",
|
|
||||||
"30": "Gemini status code 30 Temporary Redirect. Stopped after " + str(_max_redirects) + " redirects.",
|
|
||||||
"31": "Gemini status code 31 Permanent Redirect. Stopped after " + str(_max_redirects) + " redirects.",
|
|
||||||
"4": "Gemini status code 4 Temporary Failure. Server message: META",
|
|
||||||
"40": "Gemini status code 40 Temporary Failure. Server message: META",
|
|
||||||
"41": "Gemini status code 41 Server Unavailable. The server is unavailable due to overload or maintenance. Server message: META",
|
|
||||||
"42": "Gemini status code 42 CGI Error. A CGI process, or similar system for generating dynamic content, died unexpectedly or timed out. Server message: META",
|
|
||||||
"43": "Gemini status code 43 Proxy Error. A proxy request failed because the server was unable to successfully complete a transaction with the remote host. Server message: META",
|
|
||||||
"44": "Gemini status code 44 Slow Down. Rate limiting is in effect. Please wait META seconds before making another request to this server.",
|
|
||||||
"5": "Gemini status code 5 Permanent Failure. Server message: META",
|
|
||||||
"50": "Gemini status code 50 Permanent Failure. Server message: META",
|
|
||||||
"51": "Gemini status code 51 Not Found. he requested resource could not be found but may be available in the future. Server message: META",
|
|
||||||
"52": "Gemini status code 52 Gone. The resource requested is no longer available and will not be available again. Server message: META",
|
|
||||||
"53": "Gemini status code 53 Proxy Request Refused. The request was for a resource at a domain not served by the server and the server does not accept proxy requests. Server message: META",
|
|
||||||
"59": "Gemini status code 59 Bad Request. The server was unable to parse the client's request, presumably due to a malformed request. Server message: META",
|
|
||||||
"6": "Gemini status code 6 Client Certificate Required. This is not implemented in qute-gemini.",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def qute_url() -> str:
|
|
||||||
"""Get the URL passed to the script by qutebrowser."""
|
|
||||||
return os.environ["QUTE_URL"]
|
|
||||||
|
|
||||||
|
|
||||||
def qute_fifo() -> str:
|
|
||||||
"""Get the FIFO or file to write qutebrowser commands to."""
|
|
||||||
return os.environ["QUTE_FIFO"]
|
|
||||||
|
|
||||||
|
|
||||||
def html_href(url: str, description: str) -> str:
|
|
||||||
return "".join(['<a href="', url, '">', description, "</a>"])
|
|
||||||
|
|
||||||
|
|
||||||
def qute_gemini_css_path() -> str:
|
|
||||||
"""Return the path where the custom CSS file is expected to be."""
|
|
||||||
try:
|
|
||||||
base_dir = os.environ["XDG_DATA_HOME"]
|
|
||||||
except KeyError:
|
|
||||||
base_dir = os.path.join(os.environ["HOME"], ".local/share")
|
|
||||||
return os.path.join(base_dir, "qutebrowser/userscripts/qute-gemini.css")
|
|
||||||
|
|
||||||
|
|
||||||
def gemini_absolutise_url(base_url: str, relative_url: str) -> str:
|
|
||||||
"""Absolutise relative gemini URLs.
|
|
||||||
|
|
||||||
Adapted from gcat: https://github.com/aaronjanse/gcat
|
|
||||||
"""
|
|
||||||
if "://" not in relative_url:
|
|
||||||
# Python's URL tools somehow only work with known schemes?
|
|
||||||
base_url = base_url.replace("gemini://", "http://")
|
|
||||||
relative_url = urllib.parse.urljoin(base_url, relative_url)
|
|
||||||
relative_url = relative_url.replace("http://", "gemini://")
|
|
||||||
return relative_url
|
|
||||||
|
|
||||||
|
|
||||||
def gemini_fetch_url(url: str) -> Tuple[str, str, str, str, str]:
|
|
||||||
"""Fetch a Gemini URL and return the content as a string.
|
|
||||||
|
|
||||||
url: URL with gemini:// or no scheme.
|
|
||||||
Returns 4 strings: the content, the URL the content was fetched from, the
|
|
||||||
Gemini status code, the value of the meta field and an error message.
|
|
||||||
|
|
||||||
Adapted from gcat: https://github.com/aaronjanse/gcat
|
|
||||||
"""
|
|
||||||
# Parse the URL to get the hostname and port
|
|
||||||
parsed_url = urllib.parse.urlparse(url)
|
|
||||||
if not parsed_url.scheme:
|
|
||||||
url = "gemini://" + url
|
|
||||||
parsed_url = urllib.parse.urlparse(url)
|
|
||||||
if parsed_url.scheme != "gemini":
|
|
||||||
return "", "Received non-gemini:// URL: " + url
|
|
||||||
if parsed_url.port is not None:
|
|
||||||
useport = parsed_url.port
|
|
||||||
else:
|
|
||||||
useport = 1965
|
|
||||||
# Do the Gemini transaction, looping for redirects
|
|
||||||
redirects = 0
|
|
||||||
while True:
|
|
||||||
# Send the request
|
|
||||||
s = socket.create_connection((parsed_url.hostname, useport))
|
|
||||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
|
||||||
context.check_hostname = False
|
|
||||||
context.verify_mode = ssl.CERT_NONE
|
|
||||||
s = context.wrap_socket(s, server_hostname = parsed_url.netloc)
|
|
||||||
s.sendall((url + "\r\n").encode("UTF-8"))
|
|
||||||
# Get the status code and meta
|
|
||||||
fp = s.makefile("rb")
|
|
||||||
header = fp.readline().decode("UTF-8").strip()
|
|
||||||
status, meta = header.split()[:2]
|
|
||||||
# Follow up to 5 redirects
|
|
||||||
if status.startswith("3"):
|
|
||||||
url = gemini_absolutise_url(url, meta)
|
|
||||||
parsed_url = urllib.parse.urlparse(url)
|
|
||||||
redirects += 1
|
|
||||||
if redirects > _max_redirects:
|
|
||||||
# Too many redirects
|
|
||||||
break
|
|
||||||
# Otherwise we're done
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
# Process the response
|
|
||||||
content = ""
|
|
||||||
error_msg = ""
|
|
||||||
# 2x Success
|
|
||||||
if status.startswith("2"):
|
|
||||||
media_type, media_type_opts = cgi.parse_header(meta)
|
|
||||||
# Decode according to declared charset defaulting to UTF-8
|
|
||||||
if meta.startswith("text/gemini"):
|
|
||||||
charset = media_type_opts.get("charset", "UTF-8")
|
|
||||||
content = fp.read().decode(charset)
|
|
||||||
else:
|
|
||||||
error_msg = "Expected media type text/gemini but received " \
|
|
||||||
+ media_type
|
|
||||||
# Handle errors
|
|
||||||
else:
|
|
||||||
# Try matching a 2-digit and then a 1-digit status code
|
|
||||||
try:
|
|
||||||
error_msg = _status_code_desc[status[0:2]]
|
|
||||||
except KeyError:
|
|
||||||
try:
|
|
||||||
error_msg = _status_code_desc[status[0]]
|
|
||||||
except KeyError:
|
|
||||||
error_msg = "The server sent back something weird."
|
|
||||||
# Substitute the contents of meta into the error message if needed
|
|
||||||
error_msg = error_msg.replace("META", meta)
|
|
||||||
return content, url, status, meta, error_msg
|
|
||||||
|
|
||||||
|
|
||||||
def gemtext_to_html(gemtext: str, url: str, original_url: str,
|
|
||||||
status: str, meta: str) -> str:
|
|
||||||
"""Convert gemtext to HTML.
|
|
||||||
|
|
||||||
title: Used as the document title.
|
|
||||||
url: The URL the gemtext was received from. Used to resolve
|
|
||||||
relative URLs in the gemtext content.
|
|
||||||
original_url: The URL the original request was made at.
|
|
||||||
status: The Gemini status code returned by the server.
|
|
||||||
meta: The meta returned by the server.
|
|
||||||
Returns the HTML representation as a string.
|
|
||||||
"""
|
|
||||||
# Accumulate converted gemtext lines
|
|
||||||
lines = ['<?xml version="1.0" encoding="UTF-8"?>',
|
|
||||||
'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">',
|
|
||||||
"\t<head>",
|
|
||||||
"\t\t<title>" + html.escape(url) + "</title>",
|
|
||||||
"\t\t<style>",
|
|
||||||
get_css(),
|
|
||||||
"\t\t</style>",
|
|
||||||
"\t</head>",
|
|
||||||
"\t<body>",
|
|
||||||
"\t<article>"]
|
|
||||||
in_pre = False
|
|
||||||
in_list = False
|
|
||||||
# Add an extra newline to ensure list tags are closed properly
|
|
||||||
for line in (gemtext + "\n").splitlines():
|
|
||||||
# Add the list closing tag
|
|
||||||
if not line.startswith("*") and in_list:
|
|
||||||
lines.append("\t\t</ul>")
|
|
||||||
in_list = False
|
|
||||||
# Blank line, ignore
|
|
||||||
if not line:
|
|
||||||
pass
|
|
||||||
# Link
|
|
||||||
elif line.startswith("=>"):
|
|
||||||
l = line[2:].split(None, 1)
|
|
||||||
# Use the URL itself as the description if there is none
|
|
||||||
if len(l) == 1:
|
|
||||||
l.append(l[0])
|
|
||||||
# Encode the link description
|
|
||||||
l[1] = html.escape(l[1])
|
|
||||||
# Resolve relative URLs
|
|
||||||
l[0] = gemini_absolutise_url(url, l[0])
|
|
||||||
lines.append("\t\t<p>" + html_href(l[0], l[1]) + "</p>")
|
|
||||||
# Preformated toggle
|
|
||||||
elif line.startswith("```"):
|
|
||||||
if in_pre:
|
|
||||||
lines.append("\t\t</pre>")
|
|
||||||
else:
|
|
||||||
lines.append("\t\t<pre>")
|
|
||||||
in_pre = not in_pre
|
|
||||||
# Preformated
|
|
||||||
elif in_pre:
|
|
||||||
lines.append(line)
|
|
||||||
# Header
|
|
||||||
elif line.startswith("###"):
|
|
||||||
lines.append("\t\t<h3>" + html.escape(line[3:].strip()) + "</h3>")
|
|
||||||
elif line.startswith("##"):
|
|
||||||
lines.append("\t\t<h2>" + html.escape(line[2:].strip()) + "</h2>")
|
|
||||||
elif line.startswith("#"):
|
|
||||||
lines.append("\t\t<h1>" + html.escape(line[1:].strip()) + "</h1>")
|
|
||||||
# List
|
|
||||||
elif line.startswith("*"):
|
|
||||||
if not in_list:
|
|
||||||
lines.append("\t\t<ul>")
|
|
||||||
in_list = True
|
|
||||||
lines.append("\t\t\t<li>" + html.escape(line[1:].strip()) + "</li>")
|
|
||||||
# Quote
|
|
||||||
elif line.startswith(">"):
|
|
||||||
lines.extend(["\t\t<blockquote>",
|
|
||||||
"\t\t\t<p>" + line[1:].strip() + "</p>",
|
|
||||||
"\t\t</blockquote>"])
|
|
||||||
# Normal text
|
|
||||||
else:
|
|
||||||
lines.append("\t\t<p>" + html.escape(line.strip()) + "</p>")
|
|
||||||
url_html = html_href(url, html.escape(url))
|
|
||||||
original_url_html = html_href(original_url, html.escape(original_url))
|
|
||||||
lines.extend(["",
|
|
||||||
"\t</article>",
|
|
||||||
"\t<details>",
|
|
||||||
"\t\t<summary>",
|
|
||||||
"\t\t\tContent from " + url_html,
|
|
||||||
"\t\t</summary>",
|
|
||||||
"\t\t<dl>",
|
|
||||||
"\t\t\t<dt>Original URL</dt>",
|
|
||||||
"\t\t\t<dd>" + original_url_html + "</dd>",
|
|
||||||
"\t\t\t<dt>Status</dt>",
|
|
||||||
"\t\t\t<dd>" + status + "</dd>",
|
|
||||||
"\t\t\t<dt>Meta</dt>",
|
|
||||||
"\t\t\t<dd>" + meta + "</dd>",
|
|
||||||
"\t\t\t<dt>Fetched by</dt>",
|
|
||||||
'\t\t\t<dd><a href="https://git.sr.ht/~sotirisp/qute-gemini">qute-gemini ' + str(_version) + "</a></dd>",
|
|
||||||
"\t\t</dl>",
|
|
||||||
"\t</details>",
|
|
||||||
"\t</body>",
|
|
||||||
"</html>"])
|
|
||||||
return "\n".join(lines)
|
|
||||||
|
|
||||||
|
|
||||||
def get_css() -> str:
|
|
||||||
# Search for qute-gemini.css in the directory this script is located in
|
|
||||||
css_file = qute_gemini_css_path()
|
|
||||||
if os.path.isfile(css_file):
|
|
||||||
# Return the file contents
|
|
||||||
with open(css_file, "r") as f:
|
|
||||||
return f.read().strip()
|
|
||||||
else:
|
|
||||||
# Use no CSS
|
|
||||||
return ""
|
|
||||||
|
|
||||||
|
|
||||||
def qute_error_page(url: str, description: str) -> str:
|
|
||||||
"""Return a data URI error page like qutebrowser does.
|
|
||||||
|
|
||||||
url: The URL of the page that failed to load.
|
|
||||||
description: A description of the error.
|
|
||||||
Returns a data URI containing the error page.
|
|
||||||
"""
|
|
||||||
# Generate the HTML error page
|
|
||||||
html_page = _error_page_template.replace("URL", url)
|
|
||||||
html_page = html_page.replace("URL_TEXT", html.escape(url))
|
|
||||||
html_page = html_page.replace("DESCRIPTION", html.escape(description))
|
|
||||||
html_page = html_page.replace("CSS", get_css())
|
|
||||||
# URL encode and return as a data URI
|
|
||||||
return "data:text/html;charset=UTF-8," + urllib.parse.quote(html_page)
|
|
||||||
|
|
||||||
|
|
||||||
def open_gemini(url: str, open_args: str) -> None:
|
|
||||||
"""Open Gemini URL in qutebrowser."""
|
|
||||||
# Get the Gemini content
|
|
||||||
content, content_url, status, meta, error_msg = gemini_fetch_url(url)
|
|
||||||
if error_msg:
|
|
||||||
# Generate an error page in a data URI
|
|
||||||
open_url = qute_error_page(url, error_msg)
|
|
||||||
else:
|
|
||||||
# Success, convert to HTML in a temporary file
|
|
||||||
tmpf = tempfile.NamedTemporaryFile("w", suffix=".html", delete=False)
|
|
||||||
tmp_filename = tmpf.name
|
|
||||||
tmpf.close()
|
|
||||||
with open(tmp_filename, "w") as f:
|
|
||||||
f.write(gemtext_to_html(content, content_url, url, status, meta))
|
|
||||||
open_url = " file://" + tmp_filename
|
|
||||||
# Open the HTML file in qutebrowser
|
|
||||||
with open(qute_fifo(), "w") as qfifo:
|
|
||||||
qfifo.write("open " + open_args + open_url)
|
|
||||||
|
|
||||||
|
|
||||||
def open_other(url: str, open_args: str) -> None:
|
|
||||||
"""Open non-Gemini URL in qutebrowser."""
|
|
||||||
with open(qute_fifo(), "w") as qfifo:
|
|
||||||
qfifo.write("open " + open_args + " " + url)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Open in the current or a new tab depending on the script name
|
|
||||||
if sys.argv[0].endswith("-tab"):
|
|
||||||
open_args = "-t"
|
|
||||||
else:
|
|
||||||
open_args = ""
|
|
||||||
# Select how to open the URL depending on its scheme
|
|
||||||
url = qute_url()
|
|
||||||
parsed_url = urllib.parse.urlparse(url)
|
|
||||||
if parsed_url.scheme == "gemini":
|
|
||||||
open_gemini(url, open_args)
|
|
||||||
else:
|
|
||||||
open_other(url, open_args)
|
|
@ -1 +0,0 @@
|
|||||||
qute-gemini
|
|
@ -1,361 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
# Copyright (c) 2018-2021 Markus Blöchl <ususdei@gmail.com>
|
|
||||||
#
|
|
||||||
# This file is part of qutebrowser.
|
|
||||||
#
|
|
||||||
# qutebrowser is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# qutebrowser is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with qutebrowser. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
"""
|
|
||||||
# Introduction
|
|
||||||
|
|
||||||
This is a [qutebrowser][2] [userscript][5] to fill website credentials from a [KeepassXC][1] password database.
|
|
||||||
|
|
||||||
|
|
||||||
# Installation
|
|
||||||
|
|
||||||
First, you need to enable [KeepassXC-Browser][6] extensions in your KeepassXC config.
|
|
||||||
|
|
||||||
|
|
||||||
Second, you must make sure to have a working private-public-key-pair in your [GPG keyring][3].
|
|
||||||
|
|
||||||
|
|
||||||
Third, install the python module `pynacl`.
|
|
||||||
|
|
||||||
|
|
||||||
Finally, adapt your qutebrowser config.
|
|
||||||
You can e.g. add the following lines to your `~/.config/qutebrowser/config.py`
|
|
||||||
Remember to replace `ABC1234` with your actual GPG key.
|
|
||||||
|
|
||||||
```python
|
|
||||||
config.bind('<Alt-Shift-u>', 'spawn --userscript qute-keepassxc --key ABC1234', mode='insert')
|
|
||||||
config.bind('pw', 'spawn --userscript qute-keepassxc --key ABC1234', mode='normal')
|
|
||||||
```
|
|
||||||
|
|
||||||
|
|
||||||
# Usage
|
|
||||||
|
|
||||||
If you are on a webpage with a login form, simply activate one of the configured key-bindings.
|
|
||||||
|
|
||||||
The first time you run this script, KeepassXC will ask you for authentication like with any other browser extension.
|
|
||||||
Just provide a name of your choice and accept the request if nothing looks fishy.
|
|
||||||
|
|
||||||
|
|
||||||
# How it works
|
|
||||||
|
|
||||||
This script will talk to KeepassXC using the native [KeepassXC-Browser protocol][4].
|
|
||||||
|
|
||||||
|
|
||||||
This script needs to store the key used to associate with your KeepassXC instance somewhere.
|
|
||||||
Unlike most browser extensions which only use plain local storage, this one attempts to do so in a safe way
|
|
||||||
by storing the key in encrypted form using GPG.
|
|
||||||
Therefore you need to have a public-key-pair readily set up.
|
|
||||||
|
|
||||||
GPG might then ask for your private-key passwort whenever you query the database for login credentials.
|
|
||||||
|
|
||||||
|
|
||||||
[1]: https://keepassxc.org/
|
|
||||||
[2]: https://qutebrowser.org/
|
|
||||||
[3]: https://gnupg.org/
|
|
||||||
[4]: https://github.com/keepassxreboot/keepassxc-browser/blob/develop/keepassxc-protocol.md
|
|
||||||
[5]: https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc
|
|
||||||
[6]: https://keepassxc.org/docs/KeePassXC_GettingStarted.html#_setup_browser_integration
|
|
||||||
"""
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
import socket
|
|
||||||
import json
|
|
||||||
import base64
|
|
||||||
import subprocess
|
|
||||||
import argparse
|
|
||||||
|
|
||||||
import nacl.utils
|
|
||||||
import nacl.public
|
|
||||||
|
|
||||||
|
|
||||||
def parse_args():
|
|
||||||
parser = argparse.ArgumentParser(description="Full passwords from KeepassXC")
|
|
||||||
parser.add_argument('url', nargs='?', default=os.environ.get('QUTE_URL'))
|
|
||||||
parser.add_argument('--socket', '-s', default='/run/user/{}/org.keepassxc.KeePassXC.BrowserServer'.format(os.getuid()),
|
|
||||||
help='Path to KeepassXC browser socket')
|
|
||||||
parser.add_argument('--key', '-k', default='alice@example.com',
|
|
||||||
help='GPG key to encrypt KeepassXC auth key with')
|
|
||||||
parser.add_argument('--insecure', action='store_true',
|
|
||||||
help="Do not encrypt auth key")
|
|
||||||
return parser.parse_args()
|
|
||||||
|
|
||||||
|
|
||||||
class KeepassError(Exception):
|
|
||||||
def __init__(self, code, desc):
|
|
||||||
self.code = code
|
|
||||||
self.description = desc
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return f"KeepassXC Error [{self.code}]: {self.description}"
|
|
||||||
|
|
||||||
|
|
||||||
class KeepassXC:
|
|
||||||
""" Wrapper around the KeepassXC socket API """
|
|
||||||
def __init__(self, id=None, *, key, socket_path):
|
|
||||||
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
||||||
self.id = id
|
|
||||||
self.socket_path = socket_path
|
|
||||||
self.client_key = nacl.public.PrivateKey.generate()
|
|
||||||
self.id_key = nacl.public.PrivateKey.from_seed(key)
|
|
||||||
self.cryptobox = None
|
|
||||||
|
|
||||||
def connect(self):
|
|
||||||
if not os.path.exists(self.socket_path):
|
|
||||||
raise KeepassError(-1, "KeepassXC Browser socket does not exists")
|
|
||||||
self.client_id = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8')
|
|
||||||
self.sock.connect(self.socket_path)
|
|
||||||
|
|
||||||
self.send_raw_msg(dict(
|
|
||||||
action = 'change-public-keys',
|
|
||||||
publicKey = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
|
|
||||||
nonce = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8'),
|
|
||||||
clientID = self.client_id
|
|
||||||
))
|
|
||||||
|
|
||||||
resp = self.recv_raw_msg()
|
|
||||||
assert resp['action'] == 'change-public-keys'
|
|
||||||
assert resp['success'] == 'true'
|
|
||||||
assert resp['nonce']
|
|
||||||
self.cryptobox = nacl.public.Box(
|
|
||||||
self.client_key,
|
|
||||||
nacl.public.PublicKey(base64.b64decode(resp['publicKey']))
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_databasehash(self):
|
|
||||||
self.send_msg(dict(action='get-databasehash'))
|
|
||||||
return self.recv_msg()['hash']
|
|
||||||
|
|
||||||
def lock_database(self):
|
|
||||||
self.send_msg(dict(action='lock-database'))
|
|
||||||
try:
|
|
||||||
self.recv_msg()
|
|
||||||
except KeepassError as e:
|
|
||||||
if e.code == 1:
|
|
||||||
return True
|
|
||||||
raise
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def test_associate(self):
|
|
||||||
if not self.id:
|
|
||||||
return False
|
|
||||||
self.send_msg(dict(
|
|
||||||
action = 'test-associate',
|
|
||||||
id = self.id,
|
|
||||||
key = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
|
|
||||||
))
|
|
||||||
return self.recv_msg()['success'] == 'true'
|
|
||||||
|
|
||||||
def associate(self):
|
|
||||||
self.send_msg(dict(
|
|
||||||
action = 'associate',
|
|
||||||
key = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
|
|
||||||
idKey = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
|
|
||||||
))
|
|
||||||
resp = self.recv_msg()
|
|
||||||
self.id = resp['id']
|
|
||||||
|
|
||||||
def get_logins(self, url):
|
|
||||||
self.send_msg(dict(
|
|
||||||
action = 'get-logins',
|
|
||||||
url = url,
|
|
||||||
keys = [{ 'id': self.id, 'key': base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') }]
|
|
||||||
))
|
|
||||||
return self.recv_msg()['entries']
|
|
||||||
|
|
||||||
def send_raw_msg(self, msg):
|
|
||||||
self.sock.send( json.dumps(msg).encode('utf-8') )
|
|
||||||
|
|
||||||
def recv_raw_msg(self):
|
|
||||||
return json.loads( self.sock.recv(4096).decode('utf-8') )
|
|
||||||
|
|
||||||
def send_msg(self, msg, **extra):
|
|
||||||
nonce = nacl.utils.random(nacl.public.Box.NONCE_SIZE)
|
|
||||||
self.send_raw_msg(dict(
|
|
||||||
action = msg['action'],
|
|
||||||
message = base64.b64encode(self.cryptobox.encrypt(json.dumps(msg).encode('utf-8'), nonce).ciphertext).decode('utf-8'),
|
|
||||||
nonce = base64.b64encode(nonce).decode('utf-8'),
|
|
||||||
clientID = self.client_id,
|
|
||||||
**extra
|
|
||||||
))
|
|
||||||
|
|
||||||
def recv_msg(self):
|
|
||||||
resp = self.recv_raw_msg()
|
|
||||||
if 'error' in resp:
|
|
||||||
raise KeepassError(resp['errorCode'], resp['error'])
|
|
||||||
assert resp['action']
|
|
||||||
return json.loads(self.cryptobox.decrypt(base64.b64decode(resp['message']), base64.b64decode(resp['nonce'])).decode('utf-8'))
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class SecretKeyStore:
|
|
||||||
def __init__(self, gpgkey):
|
|
||||||
self.gpgkey = gpgkey
|
|
||||||
if gpgkey is None:
|
|
||||||
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key')
|
|
||||||
else:
|
|
||||||
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key.gpg')
|
|
||||||
|
|
||||||
def load(self):
|
|
||||||
"Load existing association key from file"
|
|
||||||
if self.gpgkey is None:
|
|
||||||
jsondata = open(self.path, 'r').read()
|
|
||||||
else:
|
|
||||||
jsondata = subprocess.check_output(['gpg', '--decrypt', self.path]).decode('utf-8')
|
|
||||||
data = json.loads(jsondata)
|
|
||||||
self.id = data['id']
|
|
||||||
self.key = base64.b64decode(data['key'])
|
|
||||||
|
|
||||||
def create(self):
|
|
||||||
"Create new association key"
|
|
||||||
self.key = nacl.utils.random(32)
|
|
||||||
self.id = None
|
|
||||||
|
|
||||||
def store(self, id):
|
|
||||||
"Store newly created association key in file"
|
|
||||||
self.id = id
|
|
||||||
jsondata = json.dumps({'id':self.id, 'key':base64.b64encode(self.key).decode('utf-8')})
|
|
||||||
if self.gpgkey is None:
|
|
||||||
open(self.path, "w").write(jsondata)
|
|
||||||
else:
|
|
||||||
subprocess.run(['gpg', '--encrypt', '-o', self.path, '-r', self.gpgkey], input=jsondata.encode('utf-8'), check=True)
|
|
||||||
|
|
||||||
|
|
||||||
def qute(cmd):
|
|
||||||
with open(os.environ['QUTE_FIFO'], 'w') as fifo:
|
|
||||||
fifo.write(cmd)
|
|
||||||
fifo.write('\n')
|
|
||||||
fifo.flush()
|
|
||||||
|
|
||||||
def error(msg):
|
|
||||||
print(msg, file=sys.stderr)
|
|
||||||
qute('message-error "{}"'.format(msg))
|
|
||||||
|
|
||||||
|
|
||||||
def connect_to_keepassxc(args):
|
|
||||||
assert args.key or args.insecure, "Missing GPG key to use for auth key encryption"
|
|
||||||
keystore = SecretKeyStore(args.key)
|
|
||||||
if os.path.isfile(keystore.path):
|
|
||||||
keystore.load()
|
|
||||||
kp = KeepassXC(keystore.id, key=keystore.key, socket_path=args.socket)
|
|
||||||
kp.connect()
|
|
||||||
if not kp.test_associate():
|
|
||||||
error('No KeepassXC association')
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
keystore.create()
|
|
||||||
kp = KeepassXC(key=keystore.key, socket_path=args.socket)
|
|
||||||
kp.connect()
|
|
||||||
kp.associate()
|
|
||||||
if not kp.test_associate():
|
|
||||||
error('No KeepassXC association')
|
|
||||||
return None
|
|
||||||
keystore.store(kp.id)
|
|
||||||
return kp
|
|
||||||
|
|
||||||
|
|
||||||
def make_js_code(username, password):
|
|
||||||
return ' '.join("""
|
|
||||||
function isVisible(elem) {
|
|
||||||
var style = elem.ownerDocument.defaultView.getComputedStyle(elem, null);
|
|
||||||
|
|
||||||
if (style.getPropertyValue("visibility") !== "visible" ||
|
|
||||||
style.getPropertyValue("display") === "none" ||
|
|
||||||
style.getPropertyValue("opacity") === "0") {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return elem.offsetWidth > 0 && elem.offsetHeight > 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
function hasPasswordField(form) {
|
|
||||||
var inputs = form.getElementsByTagName("input");
|
|
||||||
for (var j = 0; j < inputs.length; j++) {
|
|
||||||
var input = inputs[j];
|
|
||||||
if (input.type === "password") {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
function loadData2Form (form) {
|
|
||||||
var inputs = form.getElementsByTagName("input");
|
|
||||||
for (var j = 0; j < inputs.length; j++) {
|
|
||||||
var input = inputs[j];
|
|
||||||
if (isVisible(input) && (input.type === "text" || input.type === "email")) {
|
|
||||||
input.focus();
|
|
||||||
input.value = %s;
|
|
||||||
input.dispatchEvent(new Event('input', { 'bubbles': true }));
|
|
||||||
input.dispatchEvent(new Event('change', { 'bubbles': true }));
|
|
||||||
input.blur();
|
|
||||||
}
|
|
||||||
if (input.type === "password") {
|
|
||||||
input.focus();
|
|
||||||
input.value = %s;
|
|
||||||
input.dispatchEvent(new Event('input', { 'bubbles': true }));
|
|
||||||
input.dispatchEvent(new Event('change', { 'bubbles': true }));
|
|
||||||
input.blur();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
function fillFirstForm() {
|
|
||||||
var forms = document.getElementsByTagName("form");
|
|
||||||
for (i = 0; i < forms.length; i++) {
|
|
||||||
if (hasPasswordField(forms[i])) {
|
|
||||||
loadData2Form(forms[i]);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
alert("No Credentials Form found");
|
|
||||||
};
|
|
||||||
|
|
||||||
fillFirstForm()
|
|
||||||
""".splitlines()) % (json.dumps(username), json.dumps(password))
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
if 'QUTE_FIFO' not in os.environ:
|
|
||||||
print(f"No QUTE_FIFO found - {sys.argv[0]} must be run as a qutebrowser userscript")
|
|
||||||
sys.exit(-1)
|
|
||||||
|
|
||||||
try:
|
|
||||||
args = parse_args()
|
|
||||||
assert args.url, "Missing URL"
|
|
||||||
kp = connect_to_keepassxc(args)
|
|
||||||
if not kp:
|
|
||||||
error('Could not connect to KeepassXC')
|
|
||||||
return
|
|
||||||
creds = kp.get_logins(args.url)
|
|
||||||
if not creds:
|
|
||||||
error('No credentials found')
|
|
||||||
return
|
|
||||||
# TODO: handle multiple matches
|
|
||||||
name, pw = creds[0]['login'], creds[0]['password']
|
|
||||||
if name and pw:
|
|
||||||
qute('jseval -q ' + make_js_code(name, pw))
|
|
||||||
except Exception as e:
|
|
||||||
error(str(e))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
|
|
859
home-surtur.nix
859
home-surtur.nix
@ -738,6 +738,8 @@ in {
|
|||||||
'';
|
'';
|
||||||
executable = true;
|
executable = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
# qutebrowser userscripts start.
|
||||||
".config/qutebrowser/userscripts/localhost" = {
|
".config/qutebrowser/userscripts/localhost" = {
|
||||||
executable = true;
|
executable = true;
|
||||||
text = ''
|
text = ''
|
||||||
@ -790,6 +792,846 @@ in {
|
|||||||
fi
|
fi
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
|
".config/qutebrowser/userscripts/code_select.py" = {
|
||||||
|
executable = true;
|
||||||
|
# source = .local/share/qutebrowser/userscripts/code_select.py;
|
||||||
|
text = ''
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import os
|
||||||
|
import html
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
try:
|
||||||
|
import pyperclip
|
||||||
|
except ImportError:
|
||||||
|
PYPERCLIP = False
|
||||||
|
else:
|
||||||
|
PYPERCLIP = True
|
||||||
|
|
||||||
|
|
||||||
|
def parse_text_content(element):
|
||||||
|
root = ET.fromstring(element)
|
||||||
|
text = ET.tostring(root, encoding="unicode", method="text")
|
||||||
|
text = html.unescape(text)
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
|
def send_command_to_qute(command):
|
||||||
|
with open(os.environ.get("QUTE_FIFO"), "w") as f:
|
||||||
|
f.write(command)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
delimiter = sys.argv[1] if len(sys.argv) > 1 else ";"
|
||||||
|
# For info on qute environment vairables, see
|
||||||
|
# https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc
|
||||||
|
element = os.environ.get("QUTE_SELECTED_HTML")
|
||||||
|
code_text = parse_text_content(element)
|
||||||
|
if PYPERCLIP:
|
||||||
|
pyperclip.copy(code_text)
|
||||||
|
send_command_to_qute(
|
||||||
|
"message-info 'copied to clipboard: {info}{suffix}'".format(
|
||||||
|
info=code_text.splitlines()[0],
|
||||||
|
suffix="..." if len(code_text.splitlines()) > 1 else ""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Qute's yank command won't copy accross multiple lines so we
|
||||||
|
# compromise by placing lines on a single line seperated by the
|
||||||
|
# specified delimiter
|
||||||
|
code_text = re.sub("(\n)+", delimiter, code_text)
|
||||||
|
code_text = code_text.replace("'", "\"")
|
||||||
|
send_command_to_qute("yank inline '{code}'\n".format(code=code_text))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
".config/qutebrowser/userscripts/getbib" = {
|
||||||
|
executable = true;
|
||||||
|
text = ''
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Qutebrowser userscript scraping the current web page for DOIs and downloading
|
||||||
|
corresponding bibtex information.
|
||||||
|
Set the environment variable 'QUTE_BIB_FILEPATH' to indicate the path to
|
||||||
|
download to. Otherwise, bibtex information is downloaded to '/tmp' and hence
|
||||||
|
deleted at reboot.
|
||||||
|
Installation: see qute://help/userscripts.html
|
||||||
|
Inspired by
|
||||||
|
https://ocefpaf.github.io/python4oceanographers/blog/2014/05/19/doi2bibtex/
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
from collections import Counter
|
||||||
|
from urllib import parse as url_parse
|
||||||
|
from urllib import request as url_request
|
||||||
|
|
||||||
|
|
||||||
|
FIFO_PATH = os.getenv("QUTE_FIFO")
|
||||||
|
|
||||||
|
def message_fifo(message, level="warning"):
|
||||||
|
"""Send message to qutebrowser FIFO. The level must be one of 'info',
|
||||||
|
'warning' (default) or 'error'."""
|
||||||
|
with open(FIFO_PATH, "w") as fifo:
|
||||||
|
fifo.write("message-{} '{}'".format(level, message))
|
||||||
|
|
||||||
|
|
||||||
|
source = os.getenv("QUTE_TEXT")
|
||||||
|
with open(source) as f:
|
||||||
|
text = f.read()
|
||||||
|
|
||||||
|
# find DOIs on page using regex
|
||||||
|
dval = re.compile(r'(10\.(\d)+/([^(\s\>\"\<)])+)')
|
||||||
|
# https://stackoverflow.com/a/10324802/3865876, too strict
|
||||||
|
# dval = re.compile(r'\b(10[.][0-9]{4,}(?:[.][0-9]+)*/(?:(?!["&\'<>])\S)+)\b')
|
||||||
|
dois = dval.findall(text)
|
||||||
|
dois = Counter(e[0] for e in dois)
|
||||||
|
try:
|
||||||
|
doi = dois.most_common(1)[0][0]
|
||||||
|
except IndexError:
|
||||||
|
message_fifo("No DOIs found on page")
|
||||||
|
sys.exit()
|
||||||
|
message_fifo("Found {} DOIs on page, selecting {}".format(len(dois), doi),
|
||||||
|
level="info")
|
||||||
|
|
||||||
|
# get bibtex data corresponding to DOI
|
||||||
|
url = "https://dx.doi.org/" + url_parse.quote(doi)
|
||||||
|
headers = dict(Accept='text/bibliography; style=bibtex')
|
||||||
|
request = url_request.Request(url, headers=headers)
|
||||||
|
response = url_request.urlopen(request)
|
||||||
|
status_code = response.getcode()
|
||||||
|
if status_code >= 400:
|
||||||
|
message_fifo("Request returned {}".format(status_code))
|
||||||
|
sys.exit()
|
||||||
|
|
||||||
|
# obtain content and format it
|
||||||
|
bibtex = response.read().decode("utf-8").strip()
|
||||||
|
bibtex = bibtex.replace(" ", "\n ", 1).\
|
||||||
|
replace("}, ", "},\n ").replace("}}", "}\n}")
|
||||||
|
|
||||||
|
# append to file
|
||||||
|
bib_filepath = os.getenv("QUTE_BIB_FILEPATH", "/tmp/qute.bib")
|
||||||
|
with open(bib_filepath, "a") as f:
|
||||||
|
f.write(bibtex + "\n\n")
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
".config/qutebrowser/userscripts/qute-gemini" = {
|
||||||
|
executable = true;
|
||||||
|
text = ''
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# qute-gemini - Open Gemini links in qutebrowser and render them as HTML
|
||||||
|
#
|
||||||
|
# SPDX-FileCopyrightText: 2019-2020 solderpunk
|
||||||
|
# SPDX-FileCopyrightText: 2020 Aaron Janse
|
||||||
|
# SPDX-FileCopyrightText: 2020 petedussin
|
||||||
|
# SPDX-FileCopyrightText: 2020-2021 Sotiris Papatheodorou
|
||||||
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
|
||||||
|
import cgi
|
||||||
|
import html
|
||||||
|
import os
|
||||||
|
import socket
|
||||||
|
import ssl
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
|
||||||
|
_version = "1.0.0"
|
||||||
|
|
||||||
|
_max_redirects = 5
|
||||||
|
|
||||||
|
_error_page_template = ''''<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
|
||||||
|
<head>
|
||||||
|
<title>Error opening page: URL</title>
|
||||||
|
<style>
|
||||||
|
CSS
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1>qute-gemini error</h1>
|
||||||
|
<p>Error while opening:<br/><a href="URL">URL_TEXT</a></p>
|
||||||
|
<p>DESCRIPTION</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
''''
|
||||||
|
|
||||||
|
_status_code_desc = {
|
||||||
|
"1": "Gemini status code 1 Input. This is not implemented in qute-gemini.",
|
||||||
|
"10": "Gemini status code 10 Input. This is not implemented in qute-gemini.",
|
||||||
|
"11": "Gemini status code 11 Sensitive Input. This is not implemented in qute-gemini.",
|
||||||
|
"3": "Gemini status code 3 Redirect. Stopped after " + str(_max_redirects) + " redirects.",
|
||||||
|
"30": "Gemini status code 30 Temporary Redirect. Stopped after " + str(_max_redirects) + " redirects.",
|
||||||
|
"31": "Gemini status code 31 Permanent Redirect. Stopped after " + str(_max_redirects) + " redirects.",
|
||||||
|
"4": "Gemini status code 4 Temporary Failure. Server message: META",
|
||||||
|
"40": "Gemini status code 40 Temporary Failure. Server message: META",
|
||||||
|
"41": "Gemini status code 41 Server Unavailable. The server is unavailable due to overload or maintenance. Server message: META",
|
||||||
|
"42": "Gemini status code 42 CGI Error. A CGI process, or similar system for generating dynamic content, died unexpectedly or timed out. Server message: META",
|
||||||
|
"43": "Gemini status code 43 Proxy Error. A proxy request failed because the server was unable to successfully complete a transaction with the remote host. Server message: META",
|
||||||
|
"44": "Gemini status code 44 Slow Down. Rate limiting is in effect. Please wait META seconds before making another request to this server.",
|
||||||
|
"5": "Gemini status code 5 Permanent Failure. Server message: META",
|
||||||
|
"50": "Gemini status code 50 Permanent Failure. Server message: META",
|
||||||
|
"51": "Gemini status code 51 Not Found. he requested resource could not be found but may be available in the future. Server message: META",
|
||||||
|
"52": "Gemini status code 52 Gone. The resource requested is no longer available and will not be available again. Server message: META",
|
||||||
|
"53": "Gemini status code 53 Proxy Request Refused. The request was for a resource at a domain not served by the server and the server does not accept proxy requests. Server message: META",
|
||||||
|
"59": "Gemini status code 59 Bad Request. The server was unable to parse the client's request, presumably due to a malformed request. Server message: META",
|
||||||
|
"6": "Gemini status code 6 Client Certificate Required. This is not implemented in qute-gemini.",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def qute_url() -> str:
|
||||||
|
"""Get the URL passed to the script by qutebrowser."""
|
||||||
|
return os.environ["QUTE_URL"]
|
||||||
|
|
||||||
|
|
||||||
|
def qute_fifo() -> str:
|
||||||
|
"""Get the FIFO or file to write qutebrowser commands to."""
|
||||||
|
return os.environ["QUTE_FIFO"]
|
||||||
|
|
||||||
|
|
||||||
|
def html_href(url: str, description: str) -> str:
|
||||||
|
return "".join(['<a href="', url, '">', description, "</a>"])
|
||||||
|
|
||||||
|
|
||||||
|
def qute_gemini_css_path() -> str:
|
||||||
|
"""Return the path where the custom CSS file is expected to be."""
|
||||||
|
try:
|
||||||
|
base_dir = os.environ["XDG_DATA_HOME"]
|
||||||
|
except KeyError:
|
||||||
|
base_dir = os.path.join(os.environ["HOME"], ".local/share")
|
||||||
|
return os.path.join(base_dir, "qutebrowser/userscripts/qute-gemini.css")
|
||||||
|
|
||||||
|
|
||||||
|
def gemini_absolutise_url(base_url: str, relative_url: str) -> str:
|
||||||
|
"""Absolutise relative gemini URLs.
|
||||||
|
|
||||||
|
Adapted from gcat: https://github.com/aaronjanse/gcat
|
||||||
|
"""
|
||||||
|
if "://" not in relative_url:
|
||||||
|
# Python's URL tools somehow only work with known schemes?
|
||||||
|
base_url = base_url.replace("gemini://", "http://")
|
||||||
|
relative_url = urllib.parse.urljoin(base_url, relative_url)
|
||||||
|
relative_url = relative_url.replace("http://", "gemini://")
|
||||||
|
return relative_url
|
||||||
|
|
||||||
|
|
||||||
|
def gemini_fetch_url(url: str) -> Tuple[str, str, str, str, str]:
|
||||||
|
"""Fetch a Gemini URL and return the content as a string.
|
||||||
|
|
||||||
|
url: URL with gemini:// or no scheme.
|
||||||
|
Returns 4 strings: the content, the URL the content was fetched from, the
|
||||||
|
Gemini status code, the value of the meta field and an error message.
|
||||||
|
|
||||||
|
Adapted from gcat: https://github.com/aaronjanse/gcat
|
||||||
|
"""
|
||||||
|
# Parse the URL to get the hostname and port
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
if not parsed_url.scheme:
|
||||||
|
url = "gemini://" + url
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
if parsed_url.scheme != "gemini":
|
||||||
|
return "", "Received non-gemini:// URL: " + url
|
||||||
|
if parsed_url.port is not None:
|
||||||
|
useport = parsed_url.port
|
||||||
|
else:
|
||||||
|
useport = 1965
|
||||||
|
# Do the Gemini transaction, looping for redirects
|
||||||
|
redirects = 0
|
||||||
|
while True:
|
||||||
|
# Send the request
|
||||||
|
s = socket.create_connection((parsed_url.hostname, useport))
|
||||||
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||||
|
context.check_hostname = False
|
||||||
|
context.verify_mode = ssl.CERT_NONE
|
||||||
|
s = context.wrap_socket(s, server_hostname = parsed_url.netloc)
|
||||||
|
s.sendall((url + "\r\n").encode("UTF-8"))
|
||||||
|
# Get the status code and meta
|
||||||
|
fp = s.makefile("rb")
|
||||||
|
header = fp.readline().decode("UTF-8").strip()
|
||||||
|
status, meta = header.split()[:2]
|
||||||
|
# Follow up to 5 redirects
|
||||||
|
if status.startswith("3"):
|
||||||
|
url = gemini_absolutise_url(url, meta)
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
redirects += 1
|
||||||
|
if redirects > _max_redirects:
|
||||||
|
# Too many redirects
|
||||||
|
break
|
||||||
|
# Otherwise we're done
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
# Process the response
|
||||||
|
content = ""
|
||||||
|
error_msg = ""
|
||||||
|
# 2x Success
|
||||||
|
if status.startswith("2"):
|
||||||
|
media_type, media_type_opts = cgi.parse_header(meta)
|
||||||
|
# Decode according to declared charset defaulting to UTF-8
|
||||||
|
if meta.startswith("text/gemini"):
|
||||||
|
charset = media_type_opts.get("charset", "UTF-8")
|
||||||
|
content = fp.read().decode(charset)
|
||||||
|
else:
|
||||||
|
error_msg = "Expected media type text/gemini but received " \
|
||||||
|
+ media_type
|
||||||
|
# Handle errors
|
||||||
|
else:
|
||||||
|
# Try matching a 2-digit and then a 1-digit status code
|
||||||
|
try:
|
||||||
|
error_msg = _status_code_desc[status[0:2]]
|
||||||
|
except KeyError:
|
||||||
|
try:
|
||||||
|
error_msg = _status_code_desc[status[0]]
|
||||||
|
except KeyError:
|
||||||
|
error_msg = "The server sent back something weird."
|
||||||
|
# Substitute the contents of meta into the error message if needed
|
||||||
|
error_msg = error_msg.replace("META", meta)
|
||||||
|
return content, url, status, meta, error_msg
|
||||||
|
|
||||||
|
|
||||||
|
def gemtext_to_html(gemtext: str, url: str, original_url: str,
|
||||||
|
status: str, meta: str) -> str:
|
||||||
|
"""Convert gemtext to HTML.
|
||||||
|
|
||||||
|
title: Used as the document title.
|
||||||
|
url: The URL the gemtext was received from. Used to resolve
|
||||||
|
relative URLs in the gemtext content.
|
||||||
|
original_url: The URL the original request was made at.
|
||||||
|
status: The Gemini status code returned by the server.
|
||||||
|
meta: The meta returned by the server.
|
||||||
|
Returns the HTML representation as a string.
|
||||||
|
"""
|
||||||
|
# Accumulate converted gemtext lines
|
||||||
|
lines = ['<?xml version="1.0" encoding="UTF-8"?>',
|
||||||
|
'<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">',
|
||||||
|
"\t<head>",
|
||||||
|
"\t\t<title>" + html.escape(url) + "</title>",
|
||||||
|
"\t\t<style>",
|
||||||
|
get_css(),
|
||||||
|
"\t\t</style>",
|
||||||
|
"\t</head>",
|
||||||
|
"\t<body>",
|
||||||
|
"\t<article>"]
|
||||||
|
in_pre = False
|
||||||
|
in_list = False
|
||||||
|
# Add an extra newline to ensure list tags are closed properly
|
||||||
|
for line in (gemtext + "\n").splitlines():
|
||||||
|
# Add the list closing tag
|
||||||
|
if not line.startswith("*") and in_list:
|
||||||
|
lines.append("\t\t</ul>")
|
||||||
|
in_list = False
|
||||||
|
# Blank line, ignore
|
||||||
|
if not line:
|
||||||
|
pass
|
||||||
|
# Link
|
||||||
|
elif line.startswith("=>"):
|
||||||
|
l = line[2:].split(None, 1)
|
||||||
|
# Use the URL itself as the description if there is none
|
||||||
|
if len(l) == 1:
|
||||||
|
l.append(l[0])
|
||||||
|
# Encode the link description
|
||||||
|
l[1] = html.escape(l[1])
|
||||||
|
# Resolve relative URLs
|
||||||
|
l[0] = gemini_absolutise_url(url, l[0])
|
||||||
|
lines.append("\t\t<p>" + html_href(l[0], l[1]) + "</p>")
|
||||||
|
# Preformated toggle
|
||||||
|
elif line.startswith("```"):
|
||||||
|
if in_pre:
|
||||||
|
lines.append("\t\t</pre>")
|
||||||
|
else:
|
||||||
|
lines.append("\t\t<pre>")
|
||||||
|
in_pre = not in_pre
|
||||||
|
# Preformated
|
||||||
|
elif in_pre:
|
||||||
|
lines.append(line)
|
||||||
|
# Header
|
||||||
|
elif line.startswith("###"):
|
||||||
|
lines.append("\t\t<h3>" + html.escape(line[3:].strip()) + "</h3>")
|
||||||
|
elif line.startswith("##"):
|
||||||
|
lines.append("\t\t<h2>" + html.escape(line[2:].strip()) + "</h2>")
|
||||||
|
elif line.startswith("#"):
|
||||||
|
lines.append("\t\t<h1>" + html.escape(line[1:].strip()) + "</h1>")
|
||||||
|
# List
|
||||||
|
elif line.startswith("*"):
|
||||||
|
if not in_list:
|
||||||
|
lines.append("\t\t<ul>")
|
||||||
|
in_list = True
|
||||||
|
lines.append("\t\t\t<li>" + html.escape(line[1:].strip()) + "</li>")
|
||||||
|
# Quote
|
||||||
|
elif line.startswith(">"):
|
||||||
|
lines.extend(["\t\t<blockquote>",
|
||||||
|
"\t\t\t<p>" + line[1:].strip() + "</p>",
|
||||||
|
"\t\t</blockquote>"])
|
||||||
|
# Normal text
|
||||||
|
else:
|
||||||
|
lines.append("\t\t<p>" + html.escape(line.strip()) + "</p>")
|
||||||
|
url_html = html_href(url, html.escape(url))
|
||||||
|
original_url_html = html_href(original_url, html.escape(original_url))
|
||||||
|
lines.extend(["",
|
||||||
|
"\t</article>",
|
||||||
|
"\t<details>",
|
||||||
|
"\t\t<summary>",
|
||||||
|
"\t\t\tContent from " + url_html,
|
||||||
|
"\t\t</summary>",
|
||||||
|
"\t\t<dl>",
|
||||||
|
"\t\t\t<dt>Original URL</dt>",
|
||||||
|
"\t\t\t<dd>" + original_url_html + "</dd>",
|
||||||
|
"\t\t\t<dt>Status</dt>",
|
||||||
|
"\t\t\t<dd>" + status + "</dd>",
|
||||||
|
"\t\t\t<dt>Meta</dt>",
|
||||||
|
"\t\t\t<dd>" + meta + "</dd>",
|
||||||
|
"\t\t\t<dt>Fetched by</dt>",
|
||||||
|
'\t\t\t<dd><a href="https://git.sr.ht/~sotirisp/qute-gemini">qute-gemini ' + str(_version) + "</a></dd>",
|
||||||
|
"\t\t</dl>",
|
||||||
|
"\t</details>",
|
||||||
|
"\t</body>",
|
||||||
|
"</html>"])
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def get_css() -> str:
|
||||||
|
# Search for qute-gemini.css in the directory this script is located in
|
||||||
|
css_file = qute_gemini_css_path()
|
||||||
|
if os.path.isfile(css_file):
|
||||||
|
# Return the file contents
|
||||||
|
with open(css_file, "r") as f:
|
||||||
|
return f.read().strip()
|
||||||
|
else:
|
||||||
|
# Use no CSS
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def qute_error_page(url: str, description: str) -> str:
|
||||||
|
"""Return a data URI error page like qutebrowser does.
|
||||||
|
|
||||||
|
url: The URL of the page that failed to load.
|
||||||
|
description: A description of the error.
|
||||||
|
Returns a data URI containing the error page.
|
||||||
|
"""
|
||||||
|
# Generate the HTML error page
|
||||||
|
html_page = _error_page_template.replace("URL", url)
|
||||||
|
html_page = html_page.replace("URL_TEXT", html.escape(url))
|
||||||
|
html_page = html_page.replace("DESCRIPTION", html.escape(description))
|
||||||
|
html_page = html_page.replace("CSS", get_css())
|
||||||
|
# URL encode and return as a data URI
|
||||||
|
return "data:text/html;charset=UTF-8," + urllib.parse.quote(html_page)
|
||||||
|
|
||||||
|
|
||||||
|
def open_gemini(url: str, open_args: str) -> None:
|
||||||
|
"""Open Gemini URL in qutebrowser."""
|
||||||
|
# Get the Gemini content
|
||||||
|
content, content_url, status, meta, error_msg = gemini_fetch_url(url)
|
||||||
|
if error_msg:
|
||||||
|
# Generate an error page in a data URI
|
||||||
|
open_url = qute_error_page(url, error_msg)
|
||||||
|
else:
|
||||||
|
# Success, convert to HTML in a temporary file
|
||||||
|
tmpf = tempfile.NamedTemporaryFile("w", suffix=".html", delete=False)
|
||||||
|
tmp_filename = tmpf.name
|
||||||
|
tmpf.close()
|
||||||
|
with open(tmp_filename, "w") as f:
|
||||||
|
f.write(gemtext_to_html(content, content_url, url, status, meta))
|
||||||
|
open_url = " file://" + tmp_filename
|
||||||
|
# Open the HTML file in qutebrowser
|
||||||
|
with open(qute_fifo(), "w") as qfifo:
|
||||||
|
qfifo.write("open " + open_args + open_url)
|
||||||
|
|
||||||
|
|
||||||
|
def open_other(url: str, open_args: str) -> None:
|
||||||
|
"""Open non-Gemini URL in qutebrowser."""
|
||||||
|
with open(qute_fifo(), "w") as qfifo:
|
||||||
|
qfifo.write("open " + open_args + " " + url)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Open in the current or a new tab depending on the script name
|
||||||
|
if sys.argv[0].endswith("-tab"):
|
||||||
|
open_args = "-t"
|
||||||
|
else:
|
||||||
|
open_args = ""
|
||||||
|
# Select how to open the URL depending on its scheme
|
||||||
|
url = qute_url()
|
||||||
|
parsed_url = urllib.parse.urlparse(url)
|
||||||
|
if parsed_url.scheme == "gemini":
|
||||||
|
open_gemini(url, open_args)
|
||||||
|
else:
|
||||||
|
open_other(url, open_args)
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
".config/qutebrowser/userscripts/qute-keepassxc" = {
|
||||||
|
executable = true;
|
||||||
|
text = ''
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# Copyright (c) 2018-2021 Markus Blöchl <ususdei@gmail.com>
|
||||||
|
#
|
||||||
|
# This file is part of qutebrowser.
|
||||||
|
#
|
||||||
|
# qutebrowser is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# qutebrowser is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with qutebrowser. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
"""
|
||||||
|
# Introduction
|
||||||
|
|
||||||
|
This is a [qutebrowser][2] [userscript][5] to fill website credentials from a [KeepassXC][1] password database.
|
||||||
|
|
||||||
|
|
||||||
|
# Installation
|
||||||
|
|
||||||
|
First, you need to enable [KeepassXC-Browser][6] extensions in your KeepassXC config.
|
||||||
|
|
||||||
|
|
||||||
|
Second, you must make sure to have a working private-public-key-pair in your [GPG keyring][3].
|
||||||
|
|
||||||
|
|
||||||
|
Third, install the python module `pynacl`.
|
||||||
|
|
||||||
|
|
||||||
|
Finally, adapt your qutebrowser config.
|
||||||
|
You can e.g. add the following lines to your `~/.config/qutebrowser/config.py`
|
||||||
|
Remember to replace `ABC1234` with your actual GPG key.
|
||||||
|
|
||||||
|
```python
|
||||||
|
config.bind('<Alt-Shift-u>', 'spawn --userscript qute-keepassxc --key ABC1234', mode='insert')
|
||||||
|
config.bind('pw', 'spawn --userscript qute-keepassxc --key ABC1234', mode='normal')
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
# Usage
|
||||||
|
|
||||||
|
If you are on a webpage with a login form, simply activate one of the configured key-bindings.
|
||||||
|
|
||||||
|
The first time you run this script, KeepassXC will ask you for authentication like with any other browser extension.
|
||||||
|
Just provide a name of your choice and accept the request if nothing looks fishy.
|
||||||
|
|
||||||
|
|
||||||
|
# How it works
|
||||||
|
|
||||||
|
This script will talk to KeepassXC using the native [KeepassXC-Browser protocol][4].
|
||||||
|
|
||||||
|
|
||||||
|
This script needs to store the key used to associate with your KeepassXC instance somewhere.
|
||||||
|
Unlike most browser extensions which only use plain local storage, this one attempts to do so in a safe way
|
||||||
|
by storing the key in encrypted form using GPG.
|
||||||
|
Therefore you need to have a public-key-pair readily set up.
|
||||||
|
|
||||||
|
GPG might then ask for your private-key passwort whenever you query the database for login credentials.
|
||||||
|
|
||||||
|
|
||||||
|
[1]: https://keepassxc.org/
|
||||||
|
[2]: https://qutebrowser.org/
|
||||||
|
[3]: https://gnupg.org/
|
||||||
|
[4]: https://github.com/keepassxreboot/keepassxc-browser/blob/develop/keepassxc-protocol.md
|
||||||
|
[5]: https://github.com/qutebrowser/qutebrowser/blob/master/doc/userscripts.asciidoc
|
||||||
|
[6]: https://keepassxc.org/docs/KeePassXC_GettingStarted.html#_setup_browser_integration
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import socket
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
import subprocess
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
import nacl.utils
|
||||||
|
import nacl.public
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
parser = argparse.ArgumentParser(description="Full passwords from KeepassXC")
|
||||||
|
parser.add_argument('url', nargs='?', default=os.environ.get('QUTE_URL'))
|
||||||
|
parser.add_argument('--socket', '-s', default='/run/user/{}/org.keepassxc.KeePassXC.BrowserServer'.format(os.getuid()),
|
||||||
|
help='Path to KeepassXC browser socket')
|
||||||
|
parser.add_argument('--key', '-k', default='alice@example.com',
|
||||||
|
help='GPG key to encrypt KeepassXC auth key with')
|
||||||
|
parser.add_argument('--insecure', action='store_true',
|
||||||
|
help="Do not encrypt auth key")
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
class KeepassError(Exception):
|
||||||
|
def __init__(self, code, desc):
|
||||||
|
self.code = code
|
||||||
|
self.description = desc
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"KeepassXC Error [{self.code}]: {self.description}"
|
||||||
|
|
||||||
|
|
||||||
|
class KeepassXC:
|
||||||
|
""" Wrapper around the KeepassXC socket API """
|
||||||
|
def __init__(self, id=None, *, key, socket_path):
|
||||||
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
self.id = id
|
||||||
|
self.socket_path = socket_path
|
||||||
|
self.client_key = nacl.public.PrivateKey.generate()
|
||||||
|
self.id_key = nacl.public.PrivateKey.from_seed(key)
|
||||||
|
self.cryptobox = None
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
if not os.path.exists(self.socket_path):
|
||||||
|
raise KeepassError(-1, "KeepassXC Browser socket does not exists")
|
||||||
|
self.client_id = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8')
|
||||||
|
self.sock.connect(self.socket_path)
|
||||||
|
|
||||||
|
self.send_raw_msg(dict(
|
||||||
|
action = 'change-public-keys',
|
||||||
|
publicKey = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
|
||||||
|
nonce = base64.b64encode(nacl.utils.random(nacl.public.Box.NONCE_SIZE)).decode('utf-8'),
|
||||||
|
clientID = self.client_id
|
||||||
|
))
|
||||||
|
|
||||||
|
resp = self.recv_raw_msg()
|
||||||
|
assert resp['action'] == 'change-public-keys'
|
||||||
|
assert resp['success'] == 'true'
|
||||||
|
assert resp['nonce']
|
||||||
|
self.cryptobox = nacl.public.Box(
|
||||||
|
self.client_key,
|
||||||
|
nacl.public.PublicKey(base64.b64decode(resp['publicKey']))
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_databasehash(self):
|
||||||
|
self.send_msg(dict(action='get-databasehash'))
|
||||||
|
return self.recv_msg()['hash']
|
||||||
|
|
||||||
|
def lock_database(self):
|
||||||
|
self.send_msg(dict(action='lock-database'))
|
||||||
|
try:
|
||||||
|
self.recv_msg()
|
||||||
|
except KeepassError as e:
|
||||||
|
if e.code == 1:
|
||||||
|
return True
|
||||||
|
raise
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def test_associate(self):
|
||||||
|
if not self.id:
|
||||||
|
return False
|
||||||
|
self.send_msg(dict(
|
||||||
|
action = 'test-associate',
|
||||||
|
id = self.id,
|
||||||
|
key = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
|
||||||
|
))
|
||||||
|
return self.recv_msg()['success'] == 'true'
|
||||||
|
|
||||||
|
def associate(self):
|
||||||
|
self.send_msg(dict(
|
||||||
|
action = 'associate',
|
||||||
|
key = base64.b64encode(self.client_key.public_key.encode()).decode('utf-8'),
|
||||||
|
idKey = base64.b64encode(self.id_key.public_key.encode()).decode('utf-8')
|
||||||
|
))
|
||||||
|
resp = self.recv_msg()
|
||||||
|
self.id = resp['id']
|
||||||
|
|
||||||
|
def get_logins(self, url):
|
||||||
|
self.send_msg(dict(
|
||||||
|
action = 'get-logins',
|
||||||
|
url = url,
|
||||||
|
keys = [{ 'id': self.id, 'key': base64.b64encode(self.id_key.public_key.encode()).decode('utf-8') }]
|
||||||
|
))
|
||||||
|
return self.recv_msg()['entries']
|
||||||
|
|
||||||
|
def send_raw_msg(self, msg):
|
||||||
|
self.sock.send( json.dumps(msg).encode('utf-8') )
|
||||||
|
|
||||||
|
def recv_raw_msg(self):
|
||||||
|
return json.loads( self.sock.recv(4096).decode('utf-8') )
|
||||||
|
|
||||||
|
def send_msg(self, msg, **extra):
|
||||||
|
nonce = nacl.utils.random(nacl.public.Box.NONCE_SIZE)
|
||||||
|
self.send_raw_msg(dict(
|
||||||
|
action = msg['action'],
|
||||||
|
message = base64.b64encode(self.cryptobox.encrypt(json.dumps(msg).encode('utf-8'), nonce).ciphertext).decode('utf-8'),
|
||||||
|
nonce = base64.b64encode(nonce).decode('utf-8'),
|
||||||
|
clientID = self.client_id,
|
||||||
|
**extra
|
||||||
|
))
|
||||||
|
|
||||||
|
def recv_msg(self):
|
||||||
|
resp = self.recv_raw_msg()
|
||||||
|
if 'error' in resp:
|
||||||
|
raise KeepassError(resp['errorCode'], resp['error'])
|
||||||
|
assert resp['action']
|
||||||
|
return json.loads(self.cryptobox.decrypt(base64.b64decode(resp['message']), base64.b64decode(resp['nonce'])).decode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class SecretKeyStore:
|
||||||
|
def __init__(self, gpgkey):
|
||||||
|
self.gpgkey = gpgkey
|
||||||
|
if gpgkey is None:
|
||||||
|
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key')
|
||||||
|
else:
|
||||||
|
self.path = os.path.join(os.environ['QUTE_DATA_DIR'], 'keepassxc.key.gpg')
|
||||||
|
|
||||||
|
def load(self):
|
||||||
|
"Load existing association key from file"
|
||||||
|
if self.gpgkey is None:
|
||||||
|
jsondata = open(self.path, 'r').read()
|
||||||
|
else:
|
||||||
|
jsondata = subprocess.check_output(['gpg', '--decrypt', self.path]).decode('utf-8')
|
||||||
|
data = json.loads(jsondata)
|
||||||
|
self.id = data['id']
|
||||||
|
self.key = base64.b64decode(data['key'])
|
||||||
|
|
||||||
|
def create(self):
|
||||||
|
"Create new association key"
|
||||||
|
self.key = nacl.utils.random(32)
|
||||||
|
self.id = None
|
||||||
|
|
||||||
|
def store(self, id):
|
||||||
|
"Store newly created association key in file"
|
||||||
|
self.id = id
|
||||||
|
jsondata = json.dumps({'id':self.id, 'key':base64.b64encode(self.key).decode('utf-8')})
|
||||||
|
if self.gpgkey is None:
|
||||||
|
open(self.path, "w").write(jsondata)
|
||||||
|
else:
|
||||||
|
subprocess.run(['gpg', '--encrypt', '-o', self.path, '-r', self.gpgkey], input=jsondata.encode('utf-8'), check=True)
|
||||||
|
|
||||||
|
|
||||||
|
def qute(cmd):
|
||||||
|
with open(os.environ['QUTE_FIFO'], 'w') as fifo:
|
||||||
|
fifo.write(cmd)
|
||||||
|
fifo.write('\n')
|
||||||
|
fifo.flush()
|
||||||
|
|
||||||
|
def error(msg):
|
||||||
|
print(msg, file=sys.stderr)
|
||||||
|
qute('message-error "{}"'.format(msg))
|
||||||
|
|
||||||
|
|
||||||
|
def connect_to_keepassxc(args):
|
||||||
|
assert args.key or args.insecure, "Missing GPG key to use for auth key encryption"
|
||||||
|
keystore = SecretKeyStore(args.key)
|
||||||
|
if os.path.isfile(keystore.path):
|
||||||
|
keystore.load()
|
||||||
|
kp = KeepassXC(keystore.id, key=keystore.key, socket_path=args.socket)
|
||||||
|
kp.connect()
|
||||||
|
if not kp.test_associate():
|
||||||
|
error('No KeepassXC association')
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
keystore.create()
|
||||||
|
kp = KeepassXC(key=keystore.key, socket_path=args.socket)
|
||||||
|
kp.connect()
|
||||||
|
kp.associate()
|
||||||
|
if not kp.test_associate():
|
||||||
|
error('No KeepassXC association')
|
||||||
|
return None
|
||||||
|
keystore.store(kp.id)
|
||||||
|
return kp
|
||||||
|
|
||||||
|
|
||||||
|
def make_js_code(username, password):
|
||||||
|
return ' '.join("""
|
||||||
|
function isVisible(elem) {
|
||||||
|
var style = elem.ownerDocument.defaultView.getComputedStyle(elem, null);
|
||||||
|
|
||||||
|
if (style.getPropertyValue("visibility") !== "visible" ||
|
||||||
|
style.getPropertyValue("display") === "none" ||
|
||||||
|
style.getPropertyValue("opacity") === "0") {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return elem.offsetWidth > 0 && elem.offsetHeight > 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
function hasPasswordField(form) {
|
||||||
|
var inputs = form.getElementsByTagName("input");
|
||||||
|
for (var j = 0; j < inputs.length; j++) {
|
||||||
|
var input = inputs[j];
|
||||||
|
if (input.type === "password") {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
function loadData2Form (form) {
|
||||||
|
var inputs = form.getElementsByTagName("input");
|
||||||
|
for (var j = 0; j < inputs.length; j++) {
|
||||||
|
var input = inputs[j];
|
||||||
|
if (isVisible(input) && (input.type === "text" || input.type === "email")) {
|
||||||
|
input.focus();
|
||||||
|
input.value = %s;
|
||||||
|
input.dispatchEvent(new Event('input', { 'bubbles': true }));
|
||||||
|
input.dispatchEvent(new Event('change', { 'bubbles': true }));
|
||||||
|
input.blur();
|
||||||
|
}
|
||||||
|
if (input.type === "password") {
|
||||||
|
input.focus();
|
||||||
|
input.value = %s;
|
||||||
|
input.dispatchEvent(new Event('input', { 'bubbles': true }));
|
||||||
|
input.dispatchEvent(new Event('change', { 'bubbles': true }));
|
||||||
|
input.blur();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
function fillFirstForm() {
|
||||||
|
var forms = document.getElementsByTagName("form");
|
||||||
|
for (i = 0; i < forms.length; i++) {
|
||||||
|
if (hasPasswordField(forms[i])) {
|
||||||
|
loadData2Form(forms[i]);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
alert("No Credentials Form found");
|
||||||
|
};
|
||||||
|
|
||||||
|
fillFirstForm()
|
||||||
|
""".splitlines()) % (json.dumps(username), json.dumps(password))
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if 'QUTE_FIFO' not in os.environ:
|
||||||
|
print(f"No QUTE_FIFO found - {sys.argv[0]} must be run as a qutebrowser userscript")
|
||||||
|
sys.exit(-1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
args = parse_args()
|
||||||
|
assert args.url, "Missing URL"
|
||||||
|
kp = connect_to_keepassxc(args)
|
||||||
|
if not kp:
|
||||||
|
error('Could not connect to KeepassXC')
|
||||||
|
return
|
||||||
|
creds = kp.get_logins(args.url)
|
||||||
|
if not creds:
|
||||||
|
error('No credentials found')
|
||||||
|
return
|
||||||
|
# TODO: handle multiple matches
|
||||||
|
name, pw = creds[0]['login'], creds[0]['password']
|
||||||
|
if name and pw:
|
||||||
|
qute('jseval -q ' + make_js_code(name, pw))
|
||||||
|
except Exception as e:
|
||||||
|
error(str(e))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
# qutebrowser userscripts end.
|
||||||
|
|
||||||
".config/qutebrowser/greasemonkey/DR.js" = {
|
".config/qutebrowser/greasemonkey/DR.js" = {
|
||||||
text = ''
|
text = ''
|
||||||
// ==UserScript==
|
// ==UserScript==
|
||||||
@ -827,22 +1669,7 @@ in {
|
|||||||
DarkReader.disable();
|
DarkReader.disable();
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
".config/qutebrowser/userscripts/code_select.py" = {
|
|
||||||
executable = true;
|
|
||||||
source = .local/share/qutebrowser/userscripts/code_select.py;
|
|
||||||
};
|
|
||||||
".config/qutebrowser/userscripts/getbib" = {
|
|
||||||
executable = true;
|
|
||||||
source = .local/share/qutebrowser/userscripts/getbib;
|
|
||||||
};
|
|
||||||
".config/qutebrowser/userscripts/qute-gemini" = {
|
|
||||||
executable = true;
|
|
||||||
source = .local/share/qutebrowser/userscripts/qute-gemini;
|
|
||||||
};
|
|
||||||
".config/qutebrowser/userscripts/qute-gemini-tab" = {
|
|
||||||
executable = true;
|
|
||||||
source = .local/share/qutebrowser/userscripts/qute-gemini;
|
|
||||||
};
|
|
||||||
".config/qutebrowser/config.py" = {
|
".config/qutebrowser/config.py" = {
|
||||||
source = .config/qutebrowser/config.py;
|
source = .config/qutebrowser/config.py;
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user