From 7db8ebadb4bde623f97a4af58cad45a65427a470 Mon Sep 17 00:00:00 2001 From: Ronald Portier Date: Thu, 19 Nov 2020 15:32:40 +0100 Subject: [PATCH] [IMP] letsencrypt: black, isort, prettier --- letsencrypt/__manifest__.py | 28 +- letsencrypt/controllers/main.py | 4 +- letsencrypt/data/ir_config_parameter.xml | 15 +- letsencrypt/data/ir_cron.xml | 4 +- letsencrypt/demo/ir_cron.xml | 2 +- letsencrypt/hooks.py | 2 +- .../migrations/12.0.2.0.0/post-migrate.py | 52 ---- letsencrypt/models/letsencrypt.py | 157 +++++------ letsencrypt/models/res_config_settings.py | 50 ++-- letsencrypt/tests/test_letsencrypt.py | 250 ++++++++---------- letsencrypt/views/res_config_settings.xml | 71 +++-- 11 files changed, 260 insertions(+), 375 deletions(-) delete mode 100644 letsencrypt/migrations/12.0.2.0.0/post-migrate.py diff --git a/letsencrypt/__manifest__.py b/letsencrypt/__manifest__.py index 5172113ee..8053da731 100644 --- a/letsencrypt/__manifest__.py +++ b/letsencrypt/__manifest__.py @@ -1,34 +1,20 @@ -# © 2016 Therp BV -# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +# 2016-2020 Therp BV . +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). { "name": "Let's Encrypt", - "version": "12.0.2.0.0", - "author": "Therp BV," - "Tecnativa," - "Acysos S.L," - "Odoo Community Association (OCA)", + "version": "13.0.1.0.0", + "author": "Therp BV," "Tecnativa," "Acysos S.L," "Odoo Community Association (OCA)", "license": "AGPL-3", "category": "Hidden/Dependency", "summary": "Request SSL certificates from letsencrypt.org", - "depends": [ - "base_setup", - ], + "depends": ["base_setup"], "data": [ "data/ir_config_parameter.xml", "data/ir_cron.xml", "views/res_config_settings.xml", ], - "demo": [ - "demo/ir_cron.xml", - ], + "demo": ["demo/ir_cron.xml"], "post_init_hook": "post_init_hook", "installable": True, - "external_dependencies": { - "python": [ - "acme", - "cryptography", - "dns", - "josepy", - ], - }, + "external_dependencies": {"python": ["acme", "cryptography", "dns", "josepy"]}, } diff --git a/letsencrypt/controllers/main.py b/letsencrypt/controllers/main.py index bba661710..2dfbc01a1 100644 --- a/letsencrypt/controllers/main.py +++ b/letsencrypt/controllers/main.py @@ -3,13 +3,15 @@ # © 2018 Ignacio Ibeas # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). import os + from odoo import http from odoo.http import request + from ..models.letsencrypt import _get_challenge_dir class Letsencrypt(http.Controller): - @http.route('/.well-known/acme-challenge/', auth='none') + @http.route("/.well-known/acme-challenge/", auth="none") def acme_challenge(self, filename): try: with open(os.path.join(_get_challenge_dir(), filename)) as key: diff --git a/letsencrypt/data/ir_config_parameter.xml b/letsencrypt/data/ir_config_parameter.xml index 737ca4bec..9e7c28881 100644 --- a/letsencrypt/data/ir_config_parameter.xml +++ b/letsencrypt/data/ir_config_parameter.xml @@ -1,20 +1,11 @@ - + - - + letsencrypt.reload_command sudo /usr/sbin/service nginx reload - - + letsencrypt.backoff 3 - diff --git a/letsencrypt/data/ir_cron.xml b/letsencrypt/data/ir_cron.xml index cd8232c5d..64c2fc7ac 100644 --- a/letsencrypt/data/ir_cron.xml +++ b/letsencrypt/data/ir_cron.xml @@ -1,8 +1,8 @@ - + Check Let's Encrypt certificates - + code model._cron() days diff --git a/letsencrypt/demo/ir_cron.xml b/letsencrypt/demo/ir_cron.xml index 926e80d36..0228a5b72 100644 --- a/letsencrypt/demo/ir_cron.xml +++ b/letsencrypt/demo/ir_cron.xml @@ -1,4 +1,4 @@ - + diff --git a/letsencrypt/hooks.py b/letsencrypt/hooks.py index 087deaf77..a236a41e8 100644 --- a/letsencrypt/hooks.py +++ b/letsencrypt/hooks.py @@ -5,4 +5,4 @@ from odoo import SUPERUSER_ID, api def post_init_hook(cr, pool): env = api.Environment(cr, SUPERUSER_ID, {}) - env['letsencrypt']._get_key('account.key') + env["letsencrypt"]._get_key("account.key") diff --git a/letsencrypt/migrations/12.0.2.0.0/post-migrate.py b/letsencrypt/migrations/12.0.2.0.0/post-migrate.py deleted file mode 100644 index 14ae8e2eb..000000000 --- a/letsencrypt/migrations/12.0.2.0.0/post-migrate.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2018 Therp BV -# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). -import urllib.parse - -from odoo import api, SUPERUSER_ID - - -def migrate_altnames(env): - config = env["ir.config_parameter"] - existing = config.search([("key", "=like", "letsencrypt.altname.%")]) - if not existing: - # If letsencrypt.altnames already exists it shouldn't be clobbered - return - domains = existing.mapped("value") - base_url = config.get_param("web.base.url", "http://localhost:8069") - base_domain = urllib.parse.urlparse(base_url).hostname - if ( - domains - and base_domain - and base_domain != "localhost" - and base_domain not in domains - ): - domains.insert(0, base_domain) - config.set_param("letsencrypt.altnames", "\n".join(domains)) - existing.unlink() - - -def migrate_cron(env): - # Any interval that was appropriate for the old version is inappropriate - # for the new one, so it's ok to clobber it. - # But tweaking it afterwards is fine, so noupdate="1" still makes sense. - jobs = ( - env["ir.cron"] - .with_context(active_test=False) - .search( - [ - ("ir_actions_server_id.model_id.model", "=", "letsencrypt"), - ("ir_actions_server_id.code", "=", "model.cron()"), - ] - ) - ) - if not jobs: - # ir.cron._try_lock doesn't handle empty recordsets well - return - jobs.write({"interval_type": "days", "interval_number": "1"}) - jobs.mapped("ir_actions_server_id").write({"code": "model._cron()"}) - - -def migrate(cr, version): - env = api.Environment(cr, SUPERUSER_ID, {}) - migrate_altnames(env) - migrate_cron(env) diff --git a/letsencrypt/models/letsencrypt.py b/letsencrypt/models/letsencrypt.py index 305204310..2df4c9bef 100644 --- a/letsencrypt/models/letsencrypt.py +++ b/letsencrypt/models/letsencrypt.py @@ -1,7 +1,7 @@ -# © 2016 Therp BV -# © 2016 Antonio Espinosa -# © 2018 Ignacio Ibeas -# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +# Copyright 2016-2020 Therp BV . +# Copyright 2016 Antonio Espinosa . +# Copyright 2018 Ignacio Ibeas . +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). import base64 import collections @@ -11,7 +11,6 @@ import re import subprocess import time import urllib.parse - from datetime import datetime, timedelta import requests @@ -39,45 +38,41 @@ try: except ImportError as e: _logger.debug(e) -WILDCARD = '*.' # as defined in the spec +WILDCARD = "*." # as defined in the spec DEFAULT_KEY_LENGTH = 4096 -TYPE_CHALLENGE_HTTP = 'http-01' -TYPE_CHALLENGE_DNS = 'dns-01' -V2_STAGING_DIRECTORY_URL = ( - 'https://acme-staging-v02.api.letsencrypt.org/directory' -) -V2_DIRECTORY_URL = 'https://acme-v02.api.letsencrypt.org/directory' +TYPE_CHALLENGE_HTTP = "http-01" +TYPE_CHALLENGE_DNS = "dns-01" +V2_STAGING_DIRECTORY_URL = "https://acme-staging-v02.api.letsencrypt.org/directory" +V2_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory" LOCAL_DOMAINS = { - 'localhost', - 'localhost.localdomain', - 'localhost6', - 'localhost6.localdomain6', - 'ip6-localhost', - 'ip6-loopback', + "localhost", + "localhost.localdomain", + "localhost6", + "localhost6.localdomain6", + "ip6-localhost", + "ip6-loopback", } -DNSUpdate = collections.namedtuple( - "DNSUpdate", ("challenge", "domain", "token") -) +DNSUpdate = collections.namedtuple("DNSUpdate", ("challenge", "domain", "token")) def _get_data_dir(): - dir_ = os.path.join(config.options.get('data_dir'), 'letsencrypt') + dir_ = os.path.join(config.options.get("data_dir"), "letsencrypt") if not os.path.isdir(dir_): os.makedirs(dir_) return dir_ def _get_challenge_dir(): - dir_ = os.path.join(_get_data_dir(), 'acme-challenge') + dir_ = os.path.join(_get_data_dir(), "acme-challenge") if not os.path.isdir(dir_): os.makedirs(dir_) return dir_ class Letsencrypt(models.AbstractModel): - _name = 'letsencrypt' - _description = 'Abstract model providing functions for letsencrypt' + _name = "letsencrypt" + _description = "Abstract model providing functions for letsencrypt" @api.model def _generate_key(self): @@ -100,7 +95,7 @@ class Letsencrypt(models.AbstractModel): _logger.info("Generating new key %s", key_name) key_bytes = self._generate_key() try: - with open(key_file, 'wb') as file_: + with open(key_file, "wb") as file_: os.fchmod(file_.fileno(), 0o600) file_.write(key_bytes) except BaseException: @@ -110,25 +105,21 @@ class Letsencrypt(models.AbstractModel): raise else: _logger.info("Getting existing key %s", key_name) - with open(key_file, 'rb') as file_: + with open(key_file, "rb") as file_: key_bytes = file_.read() return key_bytes @api.model def _validate_domain(self, domain): """Validate that a domain is publicly accessible.""" - if ':' in domain or all( - char.isdigit() or char == '.' for char in domain - ): + if ":" in domain or all(char.isdigit() or char == "." for char in domain): raise UserError( - _("Domain %s: Let's Encrypt doesn't support IP addresses!") - % domain + _("Domain %s: Let's Encrypt doesn't support IP addresses!") % domain ) - if domain in LOCAL_DOMAINS or '.' not in domain: + if domain in LOCAL_DOMAINS or "." not in domain: raise UserError( - _("Domain %s: Let's encrypt doesn't work with local domains!") - % domain + _("Domain %s: Let's encrypt doesn't work with local domains!") % domain ) @api.model @@ -140,10 +131,8 @@ class Letsencrypt(models.AbstractModel): _logger.info("No existing certificate found, creating a new one") return True - with open(cert_file, 'rb') as file_: - cert = x509.load_pem_x509_certificate( - file_.read(), default_backend() - ) + with open(cert_file, "rb") as file_: + cert = x509.load_pem_x509_certificate(file_.read(), default_backend()) expiry = cert.not_valid_after remaining = expiry - datetime.now() if remaining < timedelta(): @@ -181,8 +170,7 @@ class Letsencrypt(models.AbstractModel): missing = domains - names if missing: _logger.info( - "Found new domains %s, requesting new certificate", - ', '.join(missing), + "Found new domains %s, requesting new certificate", ", ".join(missing), ) return True @@ -195,10 +183,10 @@ class Letsencrypt(models.AbstractModel): @api.model def _cron(self): - ir_config_parameter = self.env['ir.config_parameter'] + ir_config_parameter = self.env["ir.config_parameter"] domains = self._get_altnames() domain = domains[0] - cert_file = os.path.join(_get_data_dir(), '%s.crt' % domain) + cert_file = os.path.join(_get_data_dir(), "%s.crt" % domain) domains = self._cascade_domains(domains) for dom in domains: @@ -207,8 +195,8 @@ class Letsencrypt(models.AbstractModel): if not self._should_run(cert_file, domains): return - account_key = josepy.JWKRSA.load(self._get_key('account.key')) - domain_key = self._get_key('%s.key' % domain) + account_key = josepy.JWKRSA.load(self._get_key("account.key")) + domain_key = self._get_key("%s.key" % domain) client = self._create_client(account_key) new_reg = acme.messages.NewRegistration( @@ -219,16 +207,12 @@ class Letsencrypt(models.AbstractModel): _logger.info("Successfully registered.") except acme.errors.ConflictError as err: reg = acme.messages.Registration(key=account_key.public_key()) - reg_res = acme.messages.RegistrationResource( - body=reg, uri=err.location - ) + reg_res = acme.messages.RegistrationResource(body=reg, uri=err.location) client.query_registration(reg_res) _logger.info("Reusing existing account.") - _logger.info('Making CSR for the following domains: %s', domains) - csr = acme.crypto_util.make_csr( - private_key_pem=domain_key, domains=domains - ) + _logger.info("Making CSR for the following domains: %s", domains) + csr = acme.crypto_util.make_csr(private_key_pem=domain_key, domains=domains) authzr = client.new_order(csr) # For each requested domain name we receive a list of challenges. @@ -260,9 +244,7 @@ class Letsencrypt(models.AbstractModel): for challenge in ordered_challenges: if challenge.chall.typ == TYPE_CHALLENGE_HTTP: self._respond_challenge_http(challenge, account_key) - client.answer_challenge( - challenge, acme.challenges.HTTP01Response() - ) + client.answer_challenge(challenge, acme.challenges.HTTP01Response()) break elif challenge.chall.typ == TYPE_CHALLENGE_DNS: domain = authorizations.body.identifier.value @@ -271,32 +253,24 @@ class Letsencrypt(models.AbstractModel): # We delay this because we wait for each domain. # That takes less time if they've all already been changed. pending_responses.append( - DNSUpdate( - challenge=challenge, domain=domain, token=token - ) + DNSUpdate(challenge=challenge, domain=domain, token=token) ) break else: - raise UserError( - _('Could not respond to letsencrypt challenges.') - ) + raise UserError(_("Could not respond to letsencrypt challenges.")) if pending_responses: for update in pending_responses: self._wait_for_record(update.domain, update.token) # 1 minute was not always enough during testing, even once records # were visible locally - _logger.info( - "All TXT records found, waiting 5 minutes more to make sure." - ) + _logger.info("All TXT records found, waiting 5 minutes more to make sure.") time.sleep(300) for update in pending_responses: - client.answer_challenge( - update.challenge, acme.challenges.DNSResponse() - ) + client.answer_challenge(update.challenge, acme.challenges.DNSResponse()) # let them know we are done and they should check - backoff = int(ir_config_parameter.get_param('letsencrypt.backoff', 3)) + backoff = int(ir_config_parameter.get_param("letsencrypt.backoff", 3)) deadline = datetime.now() + timedelta(minutes=backoff) try: order_resource = client.poll_and_finalize(authzr, deadline) @@ -307,12 +281,10 @@ class Letsencrypt(models.AbstractModel): _logger.error(str(challenge.error)) raise - with open(cert_file, 'w') as crt: + with open(cert_file, "w") as crt: crt.write(order_resource.fullchain_pem) - _logger.info('SUCCESS: Certificate saved: %s', cert_file) - reload_cmd = ir_config_parameter.get_param( - 'letsencrypt.reload_command', '' - ) + _logger.info("SUCCESS: Certificate saved: %s", cert_file) + reload_cmd = ir_config_parameter.get_param("letsencrypt.reload_command", "") if reload_cmd.strip(): self._call_cmdline(reload_cmd) else: @@ -328,9 +300,7 @@ class Letsencrypt(models.AbstractModel): while True: attempt += 1 try: - for record in dns.resolver.query( - "_acme-challenge." + domain, "TXT" - ): + for record in dns.resolver.query("_acme-challenge." + domain, "TXT"): value = record.to_text()[1:-1] if value == token: return @@ -350,9 +320,9 @@ class Letsencrypt(models.AbstractModel): @api.model def _create_client(self, account_key): - param = self.env['ir.config_parameter'] - testing_mode = param.get_param('letsencrypt.testing_mode') == 'True' - if config['test_enable'] or testing_mode: + param = self.env["ir.config_parameter"] + testing_mode = param.get_param("letsencrypt.testing_mode") == "True" + if config["test_enable"] or testing_mode: directory_url = V2_STAGING_DIRECTORY_URL else: directory_url = V2_DIRECTORY_URL @@ -382,7 +352,7 @@ class Letsencrypt(models.AbstractModel): continue if other.endswith(postfix): prefix = other[: -len(postfix)] # e.g. "www" - if '.' not in prefix: + if "." not in prefix: to_remove.add(other) return sorted(set(domains) - to_remove) @@ -390,12 +360,12 @@ class Letsencrypt(models.AbstractModel): @api.model def _get_altnames(self): """Get the configured altnames as a list of strings.""" - parameter = self.env['ir.config_parameter'] + parameter = self.env["ir.config_parameter"] altnames = parameter.get_param("letsencrypt.altnames") if not altnames: base_url = parameter.get_param("web.base.url", "http://localhost") return [urllib.parse.urlparse(base_url).hostname] - return re.split('(?:,|\n| |;)+', altnames) + return re.split("(?:,|\n| |;)+", altnames) @api.model def _respond_challenge_http(self, challenge, account_key): @@ -404,7 +374,7 @@ class Letsencrypt(models.AbstractModel): """ token = self._base64_encode(challenge.token) challenge_file = os.path.join(_get_challenge_dir(), token) - with open(challenge_file, 'w') as file_: + with open(challenge_file, "w") as file_: file_.write(challenge.validation(account_key)) @api.model @@ -413,9 +383,7 @@ class Letsencrypt(models.AbstractModel): Respond to the DNS challenge by creating the DNS record on the provider. """ - provider = self.env['ir.config_parameter'].get_param( - 'letsencrypt.dns_provider' - ) + provider = self.env["ir.config_parameter"].get_param("letsencrypt.dns_provider") if not provider: raise UserError( _("No DNS provider set, can't request wildcard certificate") @@ -441,9 +409,7 @@ class Letsencrypt(models.AbstractModel): _logger.warning(stdout) if stderr: _logger.warning(stderr) - raise UserError( - _('Error calling %s: %d') % (cmdline, process.returncode) - ) + raise UserError(_("Error calling %s: %d") % (cmdline, process.returncode)) if stdout: _logger.info(stdout) if stderr: @@ -452,20 +418,17 @@ class Letsencrypt(models.AbstractModel): @api.model def _respond_challenge_dns_shell(self, domain, token): """Respond to a DNS challenge using an arbitrary shell command.""" - script_str = self.env['ir.config_parameter'].get_param( - 'letsencrypt.dns_shell_script' + script_str = self.env["ir.config_parameter"].get_param( + "letsencrypt.dns_shell_script" ) if script_str: env = os.environ.copy() env.update( - LETSENCRYPT_DNS_DOMAIN=domain, - LETSENCRYPT_DNS_CHALLENGE=token, + LETSENCRYPT_DNS_DOMAIN=domain, LETSENCRYPT_DNS_CHALLENGE=token, ) self._call_cmdline(script_str, env=env) else: - raise UserError( - _("No shell command configured for updating DNS records") - ) + raise UserError(_("No shell command configured for updating DNS records")) @api.model def _base64_encode(self, data): @@ -475,4 +438,4 @@ class Letsencrypt(models.AbstractModel): https://github.com/ietf-wg-acme/acme/issues/64#issuecomment-168852757 and https://golang.org/pkg/encoding/base64/#RawURLEncoding """ - return base64.urlsafe_b64encode(data).rstrip(b'=').decode('ascii') + return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") diff --git a/letsencrypt/models/res_config_settings.py b/letsencrypt/models/res_config_settings.py index 530ea81aa..327ff5591 100644 --- a/letsencrypt/models/res_config_settings.py +++ b/letsencrypt/models/res_config_settings.py @@ -3,7 +3,6 @@ from odoo import _, api, exceptions, fields, models - DNS_SCRIPT_DEFAULT = """# Write your script here # It should create a TXT record of $LETSENCRYPT_DNS_CHALLENGE # on _acme-challenge.$LETSENCRYPT_DNS_DOMAIN @@ -11,46 +10,45 @@ DNS_SCRIPT_DEFAULT = """# Write your script here class ResConfigSettings(models.TransientModel): - _inherit = 'res.config.settings' + _inherit = "res.config.settings" letsencrypt_altnames = fields.Text( string="Domain names", - default='', + default="", help=( - 'Domains to use for the certificate. ' - 'Separate with commas or newlines.' + "Domains to use for the certificate. " "Separate with commas or newlines." ), force_config_parameter="letsencrypt.altnames", ) letsencrypt_dns_provider = fields.Selection( - selection=[('shell', 'Shell script')], - string='DNS provider', + selection=[("shell", "Shell script")], + string="DNS provider", help=( - 'For wildcard certificates we need to add a TXT record on your ' + "For wildcard certificates we need to add a TXT record on your " 'DNS. If you set this to "Shell script" you can enter a shell ' - 'script. Other options can be added by installing additional ' - 'modules.' + "script. Other options can be added by installing additional " + "modules." ), config_parameter="letsencrypt.dns_provider", ) letsencrypt_dns_shell_script = fields.Text( - string='DNS update script', + string="DNS update script", help=( - 'Write a shell script that will update your DNS TXT records. ' - 'You can use the $LETSENCRYPT_DNS_CHALLENGE and ' - '$LETSENCRYPT_DNS_DOMAIN variables.' + "Write a shell script that will update your DNS TXT records. " + "You can use the $LETSENCRYPT_DNS_CHALLENGE and " + "$LETSENCRYPT_DNS_DOMAIN variables." ), default=DNS_SCRIPT_DEFAULT, force_config_parameter="letsencrypt.dns_shell_script", ) letsencrypt_needs_dns_provider = fields.Boolean() letsencrypt_reload_command = fields.Text( - string='Server reload command', - help='Fill this with the command to restart your web server.', + string="Server reload command", + help="Fill this with the command to restart your web server.", force_config_parameter="letsencrypt.reload_command", ) letsencrypt_testing_mode = fields.Boolean( - string='Use testing server', + string="Use testing server", help=( "Use the Let's Encrypt staging server, which has higher rate " "limits but doesn't create valid certificates." @@ -66,9 +64,9 @@ class ResConfigSettings(models.TransientModel): config_parameter="letsencrypt.prefer_dns", ) - @api.onchange('letsencrypt_altnames', 'letsencrypt_prefer_dns') + @api.onchange("letsencrypt_altnames", "letsencrypt_prefer_dns") def letsencrypt_check_dns_required(self): - altnames = self.letsencrypt_altnames or '' + altnames = self.letsencrypt_altnames or "" self.letsencrypt_needs_dns_provider = ( "*." in altnames or self.letsencrypt_prefer_dns ) @@ -77,8 +75,7 @@ class ResConfigSettings(models.TransientModel): def default_get(self, fields_list): res = super().default_get(fields_list) res["letsencrypt_needs_dns_provider"] = ( - "*." in res["letsencrypt_altnames"] - or res["letsencrypt_prefer_dns"] + "*." in res["letsencrypt_altnames"] or res["letsencrypt_prefer_dns"] ) return res @@ -88,12 +85,11 @@ class ResConfigSettings(models.TransientModel): self.letsencrypt_check_dns_required() - if self.letsencrypt_dns_provider == 'shell': + if self.letsencrypt_dns_provider == "shell": lines = [ - line.strip() - for line in self.letsencrypt_dns_shell_script.split('\n') + line.strip() for line in self.letsencrypt_dns_shell_script.split("\n") ] - if all(line == '' or line.startswith('#') for line in lines): + if all(line == "" or line.startswith("#") for line in lines): raise exceptions.ValidationError( _("You didn't write a DNS update script!") ) @@ -114,9 +110,7 @@ class ResConfigSettings(models.TransientModel): for name in classified["other"]: field = self._fields[name] if hasattr(field, "force_config_parameter"): - classified["config"].append( - (name, field.force_config_parameter) - ) + classified["config"].append((name, field.force_config_parameter)) else: new_other.append(name) classified["other"] = new_other diff --git a/letsencrypt/tests/test_letsencrypt.py b/letsencrypt/tests/test_letsencrypt.py index 4e43c1f71..ebf420b51 100644 --- a/letsencrypt/tests/test_letsencrypt.py +++ b/letsencrypt/tests/test_letsencrypt.py @@ -3,7 +3,6 @@ import os import shutil - from datetime import datetime, timedelta from os import path @@ -12,79 +11,76 @@ import mock from odoo.exceptions import UserError, ValidationError from odoo.tests import SingleTransactionCase +from ..models.letsencrypt import _get_challenge_dir, _get_data_dir + try: import dns.resolver except ImportError: pass -from ..models.letsencrypt import _get_data_dir, _get_challenge_dir - -CERT_DIR = path.join(path.dirname(__file__), 'certs') +CERT_DIR = path.join(path.dirname(__file__), "certs") def _poll(order, deadline): - order_resource = mock.Mock(['fullchain_pem']) - order_resource.fullchain_pem = 'chain' + order_resource = mock.Mock(["fullchain_pem"]) + order_resource.fullchain_pem = "chain" return order_resource class TestLetsencrypt(SingleTransactionCase): def setUp(self): super().setUp() - self.env['ir.config_parameter'].set_param( - 'web.base.url', 'http://www.example.com' + self.env["ir.config_parameter"].set_param( + "web.base.url", "http://www.example.com" ) - self.env['res.config.settings'].create( + self.env["res.config.settings"].create( { - 'letsencrypt_dns_provider': 'shell', - 'letsencrypt_dns_shell_script': 'touch /tmp/.letsencrypt_test', - 'letsencrypt_altnames': 'www.example.com,*.example.com', - 'letsencrypt_reload_command': 'echo reloaded', + "letsencrypt_dns_provider": "shell", + "letsencrypt_dns_shell_script": "touch /tmp/.letsencrypt_test", + "letsencrypt_altnames": "www.example.com,*.example.com", + "letsencrypt_reload_command": "echo reloaded", } ).set_values() def test_config_settings(self): - setting_vals = self.env['res.config.settings'].default_get([]) - self.assertEqual(setting_vals['letsencrypt_dns_provider'], 'shell') + setting_vals = self.env["res.config.settings"].default_get([]) + self.assertEqual(setting_vals["letsencrypt_dns_provider"], "shell") self.assertEqual( - setting_vals['letsencrypt_dns_shell_script'], - 'touch /tmp/.letsencrypt_test', + setting_vals["letsencrypt_dns_shell_script"], + "touch /tmp/.letsencrypt_test", ) self.assertEqual( - setting_vals['letsencrypt_altnames'], - 'www.example.com,*.example.com' + setting_vals["letsencrypt_altnames"], "www.example.com,*.example.com" ) - self.assertEqual(setting_vals['letsencrypt_reload_command'], 'echo reloaded') - self.assertTrue(setting_vals['letsencrypt_needs_dns_provider']) - self.assertFalse(setting_vals['letsencrypt_prefer_dns']) + self.assertEqual(setting_vals["letsencrypt_reload_command"], "echo reloaded") + self.assertTrue(setting_vals["letsencrypt_needs_dns_provider"]) + self.assertFalse(setting_vals["letsencrypt_prefer_dns"]) with self.assertRaises(ValidationError): self.env["res.config.settings"].create( {"letsencrypt_dns_shell_script": "# Empty script"} ).set_values() - @mock.patch('acme.client.ClientV2.answer_challenge') - @mock.patch('acme.client.ClientV2.poll_and_finalize', side_effect=_poll) + @mock.patch("acme.client.ClientV2.answer_challenge") + @mock.patch("acme.client.ClientV2.poll_and_finalize", side_effect=_poll) def test_http_challenge(self, poll, _answer_challenge): - letsencrypt = self.env['letsencrypt'] - self.env['res.config.settings'].create( - {'letsencrypt_altnames': ''} + letsencrypt = self.env["letsencrypt"] + self.env["res.config.settings"].create( + {"letsencrypt_altnames": ""} ).set_values() letsencrypt._cron() poll.assert_called() self.assertTrue(os.listdir(_get_challenge_dir())) - self.assertFalse(path.isfile('/tmp/.letsencrypt_test')) - self.assertTrue( - path.isfile(path.join(_get_data_dir(), 'www.example.com.crt')) - ) + self.assertFalse(path.isfile("/tmp/.letsencrypt_test")) + self.assertTrue(path.isfile(path.join(_get_data_dir(), "www.example.com.crt"))) # pylint: disable=unused-argument - @mock.patch('odoo.addons.letsencrypt.models.letsencrypt.DNSUpdate') - @mock.patch('dns.resolver.query') - @mock.patch('time.sleep') - @mock.patch('acme.client.ClientV2.answer_challenge') - @mock.patch('acme.client.ClientV2.poll_and_finalize', side_effect=_poll) + @mock.patch("odoo.addons.letsencrypt.models.letsencrypt.DNSUpdate") + @mock.patch("dns.resolver.query") + @mock.patch("time.sleep") + @mock.patch("acme.client.ClientV2.answer_challenge") + @mock.patch("acme.client.ClientV2.poll_and_finalize", side_effect=_poll) def test_dns_challenge(self, poll, answer_challenge, sleep, query, dnsupd): record = None @@ -120,204 +116,192 @@ class TestLetsencrypt(SingleTransactionCase): query.side_effect = query_effect self.install_certificate(days_left=10) - self.env['letsencrypt']._cron() + self.env["letsencrypt"]._cron() poll.assert_called() self.assertEqual(ncalls, 3) - self.assertTrue(path.isfile('/tmp/.letsencrypt_test')) - self.assertTrue( - path.isfile(path.join(_get_data_dir(), 'www.example.com.crt')) - ) + self.assertTrue(path.isfile("/tmp/.letsencrypt_test")) + self.assertTrue(path.isfile(path.join(_get_data_dir(), "www.example.com.crt"))) def test_dns_challenge_error_on_missing_provider(self): - self.env['res.config.settings'].create( + self.env["res.config.settings"].create( { - 'letsencrypt_altnames': '*.example.com', - 'letsencrypt_dns_provider': False, + "letsencrypt_altnames": "*.example.com", + "letsencrypt_dns_provider": False, } ).set_values() with self.assertRaises(UserError): - self.env['letsencrypt']._cron() + self.env["letsencrypt"]._cron() def test_prefer_dns_setting(self): - self.env['res.config.settings'].create( - { - 'letsencrypt_altnames': 'example.com', - 'letsencrypt_prefer_dns': True, - } + self.env["res.config.settings"].create( + {"letsencrypt_altnames": "example.com", "letsencrypt_prefer_dns": True} ).set_values() # pylint: disable=no-value-for-parameter self.test_dns_challenge() def test_cascading(self): - cascade = self.env['letsencrypt']._cascade_domains + cascade = self.env["letsencrypt"]._cascade_domains self.assertEqual( cascade( [ - 'www.example.com', - '*.example.com', - 'example.com', - 'example.com', - 'notexample.com', - 'multi.sub.example.com', - 'www2.example.com', - 'unrelated.com', + "www.example.com", + "*.example.com", + "example.com", + "example.com", + "notexample.com", + "multi.sub.example.com", + "www2.example.com", + "unrelated.com", ] ), [ - '*.example.com', - 'example.com', - 'multi.sub.example.com', - 'notexample.com', - 'unrelated.com', + "*.example.com", + "example.com", + "multi.sub.example.com", + "notexample.com", + "unrelated.com", ], ) self.assertEqual(cascade([]), []) - self.assertEqual(cascade(['*.example.com']), ['*.example.com']) - self.assertEqual(cascade(['www.example.com']), ['www.example.com']) + self.assertEqual(cascade(["*.example.com"]), ["*.example.com"]) + self.assertEqual(cascade(["www.example.com"]), ["www.example.com"]) self.assertEqual( - cascade(['www.example.com', 'example.com']), - ['example.com', 'www.example.com'], + cascade(["www.example.com", "example.com"]), + ["example.com", "www.example.com"], ) with self.assertRaises(UserError): - cascade(['www.*.example.com']) + cascade(["www.*.example.com"]) with self.assertRaises(UserError): - cascade(['*.*.example.com']) + cascade(["*.*.example.com"]) def test_altnames_parsing(self): - config = self.env['ir.config_parameter'] - letsencrypt = self.env['letsencrypt'] + config = self.env["ir.config_parameter"] + letsencrypt = self.env["letsencrypt"] self.assertEqual( - letsencrypt._get_altnames(), - ['www.example.com', '*.example.com'] + letsencrypt._get_altnames(), ["www.example.com", "*.example.com"] ) - config.set_param('letsencrypt.altnames', '') - self.assertEqual(letsencrypt._get_altnames(), ['www.example.com']) + config.set_param("letsencrypt.altnames", "") + self.assertEqual(letsencrypt._get_altnames(), ["www.example.com"]) - config.set_param('letsencrypt.altnames', 'foobar.example.com') - self.assertEqual(letsencrypt._get_altnames(), ['foobar.example.com']) + config.set_param("letsencrypt.altnames", "foobar.example.com") + self.assertEqual(letsencrypt._get_altnames(), ["foobar.example.com"]) - config.set_param( - 'letsencrypt.altnames', 'example.com,example.org,example.net' - ) + config.set_param("letsencrypt.altnames", "example.com,example.org,example.net") self.assertEqual( - letsencrypt._get_altnames(), - ['example.com', 'example.org', 'example.net'], + letsencrypt._get_altnames(), ["example.com", "example.org", "example.net"], ) config.set_param( - 'letsencrypt.altnames', 'example.com, example.org\nexample.net' + "letsencrypt.altnames", "example.com, example.org\nexample.net" ) self.assertEqual( - letsencrypt._get_altnames(), - ['example.com', 'example.org', 'example.net'], + letsencrypt._get_altnames(), ["example.com", "example.org", "example.net"], ) def test_key_generation_and_retrieval(self): - key_a1 = self.env['letsencrypt']._get_key('a.key') - key_a2 = self.env['letsencrypt']._get_key('a.key') - key_b = self.env['letsencrypt']._get_key('b.key') + key_a1 = self.env["letsencrypt"]._get_key("a.key") + key_a2 = self.env["letsencrypt"]._get_key("a.key") + key_b = self.env["letsencrypt"]._get_key("b.key") self.assertIsInstance(key_a1, bytes) self.assertIsInstance(key_a2, bytes) self.assertIsInstance(key_b, bytes) - self.assertTrue(path.isfile(path.join(_get_data_dir(), 'a.key'))) + self.assertTrue(path.isfile(path.join(_get_data_dir(), "a.key"))) self.assertEqual(key_a1, key_a2) self.assertNotEqual(key_a1, key_b) - @mock.patch('os.remove', side_effect=os.remove) + @mock.patch("os.remove", side_effect=os.remove) @mock.patch( - 'odoo.addons.letsencrypt.models.letsencrypt.Letsencrypt._generate_key', + "odoo.addons.letsencrypt.models.letsencrypt.Letsencrypt._generate_key", side_effect=lambda: None, ) def test_interrupted_key_writing(self, generate_key, remove): with self.assertRaises(TypeError): - self.env['letsencrypt']._get_key('a.key') - self.assertFalse(path.isfile(path.join(_get_data_dir(), 'a.key'))) + self.env["letsencrypt"]._get_key("a.key") + self.assertFalse(path.isfile(path.join(_get_data_dir(), "a.key"))) remove.assert_called() generate_key.assert_called() def test_domain_validation(self): - self.env['letsencrypt']._validate_domain('example.com') - self.env['letsencrypt']._validate_domain('www.example.com') + self.env["letsencrypt"]._validate_domain("example.com") + self.env["letsencrypt"]._validate_domain("www.example.com") with self.assertRaises(UserError): - self.env['letsencrypt']._validate_domain('1.1.1.1') + self.env["letsencrypt"]._validate_domain("1.1.1.1") with self.assertRaises(UserError): - self.env['letsencrypt']._validate_domain('192.168.1.1') + self.env["letsencrypt"]._validate_domain("192.168.1.1") with self.assertRaises(UserError): - self.env['letsencrypt']._validate_domain('localhost.localdomain') + self.env["letsencrypt"]._validate_domain("localhost.localdomain") with self.assertRaises(UserError): - self.env['letsencrypt']._validate_domain('testdomain') + self.env["letsencrypt"]._validate_domain("testdomain") with self.assertRaises(UserError): - self.env['letsencrypt']._validate_domain('::1') + self.env["letsencrypt"]._validate_domain("::1") def test_young_certificate(self): self.install_certificate(60) self.assertFalse( - self.env['letsencrypt']._should_run( - path.join(_get_data_dir(), 'www.example.com.crt'), - ['www.example.com', '*.example.com'], + self.env["letsencrypt"]._should_run( + path.join(_get_data_dir(), "www.example.com.crt"), + ["www.example.com", "*.example.com"], ) ) def test_old_certificate(self): self.install_certificate(20) self.assertTrue( - self.env['letsencrypt']._should_run( - path.join(_get_data_dir(), 'www.example.com.crt'), - ['www.example.com', '*.example.com'], + self.env["letsencrypt"]._should_run( + path.join(_get_data_dir(), "www.example.com.crt"), + ["www.example.com", "*.example.com"], ) ) def test_expired_certificate(self): self.install_certificate(-10) self.assertTrue( - self.env['letsencrypt']._should_run( - path.join(_get_data_dir(), 'www.example.com.crt'), - ['www.example.com', '*.example.com'], + self.env["letsencrypt"]._should_run( + path.join(_get_data_dir(), "www.example.com.crt"), + ["www.example.com", "*.example.com"], ) ) def test_missing_certificate(self): self.assertTrue( - self.env['letsencrypt']._should_run( - path.join(_get_data_dir(), 'www.example.com.crt'), - ['www.example.com', '*.example.com'], + self.env["letsencrypt"]._should_run( + path.join(_get_data_dir(), "www.example.com.crt"), + ["www.example.com", "*.example.com"], ) ) def test_new_altnames(self): - self.install_certificate(60, 'www.example.com', ()) + self.install_certificate(60, "www.example.com", ()) self.assertTrue( - self.env['letsencrypt']._should_run( - path.join(_get_data_dir(), 'www.example.com.crt'), - ['www.example.com', '*.example.com'], + self.env["letsencrypt"]._should_run( + path.join(_get_data_dir(), "www.example.com.crt"), + ["www.example.com", "*.example.com"], ) ) self.assertFalse( - self.env['letsencrypt']._should_run( - path.join(_get_data_dir(), 'www.example.com.crt'), - ['www.example.com'], + self.env["letsencrypt"]._should_run( + path.join(_get_data_dir(), "www.example.com.crt"), ["www.example.com"], ) ) def test_legacy_certificate_without_altnames(self): self.install_certificate(60, use_altnames=False) self.assertFalse( - self.env['letsencrypt']._should_run( - path.join(_get_data_dir(), 'www.example.com.crt'), - ['www.example.com'], + self.env["letsencrypt"]._should_run( + path.join(_get_data_dir(), "www.example.com.crt"), ["www.example.com"], ) ) def install_certificate( self, days_left, - common_name='www.example.com', - altnames=('*.example.com',), + common_name="www.example.com", + altnames=("*.example.com",), use_altnames=True, ): from cryptography import x509 @@ -334,14 +318,10 @@ class TestLetsencrypt(SingleTransactionCase): cert_builder = ( x509.CertificateBuilder() .subject_name( - x509.Name( - [x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)] - ) + x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)]) ) .issuer_name( - x509.Name( - [x509.NameAttribute(x509.NameOID.COMMON_NAME, 'myca.biz')] - ) + x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, "myca.biz")]) ) .not_valid_before(not_before) .not_valid_after(not_after) @@ -359,12 +339,12 @@ class TestLetsencrypt(SingleTransactionCase): ) cert = cert_builder.sign(key, hashes.SHA256(), default_backend()) - cert_file = path.join(_get_data_dir(), '%s.crt' % common_name) - with open(cert_file, 'wb') as file_: + cert_file = path.join(_get_data_dir(), "%s.crt" % common_name) + with open(cert_file, "wb") as file_: file_.write(cert.public_bytes(serialization.Encoding.PEM)) def tearDown(self): super().tearDown() shutil.rmtree(_get_data_dir(), ignore_errors=True) - if path.isfile('/tmp/.letsencrypt_test'): - os.remove('/tmp/.letsencrypt_test') + if path.isfile("/tmp/.letsencrypt_test"): + os.remove("/tmp/.letsencrypt_test") diff --git a/letsencrypt/views/res_config_settings.xml b/letsencrypt/views/res_config_settings.xml index eacc48d8d..9bf509e62 100644 --- a/letsencrypt/views/res_config_settings.xml +++ b/letsencrypt/views/res_config_settings.xml @@ -2,68 +2,89 @@ Letsencrypt settings view res.config.settings - + -
+

Let's Encrypt

+ invisible="1" + />
-
-
-
-
+
-
-