[FIX] letsencrypt: satisfy pre-commit checks

- No silent passing of exceptions;
- Open files with encoding specified;
- Use with whit subprocess.Popen().
pull/2236/head
Ronald Portier 2022-02-17 16:03:30 +01:00
parent 7875475f94
commit b25af29c28
4 changed files with 52 additions and 33 deletions

View File

@ -1,21 +1,30 @@
# Copyright 2016 Therp BV <https://therp.nl>. # Copyright 2016,2022 Therp BV <https://therp.nl>.
# Copyright 2016 Antonio Espinosa <antonio.espinosa@tecnativa.com>. # Copyright 2016 Antonio Espinosa <antonio.espinosa@tecnativa.com>.
# Copyright 2018 Ignacio Ibeas <ignacio@acysos.com>. # Copyright 2018 Ignacio Ibeas <ignacio@acysos.com>.
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
# pylint: disable=too-few-public-methods,no-self-use
"""This controller handles the acme challenge call from Letsencrypt."""
import logging
import os import os
from odoo import http from odoo import _, http
from odoo.http import request from odoo.http import request
from ..models.letsencrypt import _get_challenge_dir from ..models.letsencrypt import _get_challenge_dir
_logger = logging.getLogger(__name__)
class Letsencrypt(http.Controller): class Letsencrypt(http.Controller):
"""This controller handles the acme challenge call from Letsencrypt."""
@http.route("/.well-known/acme-challenge/<filename>", auth="none") @http.route("/.well-known/acme-challenge/<filename>", auth="none")
def acme_challenge(self, filename): def acme_challenge(self, filename):
"""Handle the acme challenge."""
path = os.path.join(_get_challenge_dir(), filename)
try: try:
with open(os.path.join(_get_challenge_dir(), filename)) as key: with open(path, encoding="utf-8") as key:
return key.read() return key.read()
except IOError: except IOError:
pass _logger.exception(_("Error opening file %s"), path)
return request.not_found() return request.not_found()

View File

@ -1,7 +1,9 @@
# Copyright 2016-2021 Therp BV <https://therp.nl>. # Copyright 2016-2022 Therp BV <https://therp.nl>.
# Copyright 2016 Antonio Espinosa <antonio.espinosa@tecnativa.com>. # Copyright 2016 Antonio Espinosa <antonio.espinosa@tecnativa.com>.
# Copyright 2018 Ignacio Ibeas <ignacio@acysos.com>. # Copyright 2018 Ignacio Ibeas <ignacio@acysos.com>.
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
"""Fully automatic retrieval of Letsencrypt certificates."""
# pylint: disable=no-self-use,consider-using-f-string
import base64 import base64
import collections import collections
@ -68,6 +70,8 @@ def _get_challenge_dir():
class Letsencrypt(models.AbstractModel): class Letsencrypt(models.AbstractModel):
"""Fully automatic retrieval of Letsencrypt certificates."""
_name = "letsencrypt" _name = "letsencrypt"
_description = "Abstract model providing functions for letsencrypt" _description = "Abstract model providing functions for letsencrypt"
@ -178,7 +182,7 @@ class Letsencrypt(models.AbstractModel):
).value.get_values_for_type(x509.DNSName) ).value.get_values_for_type(x509.DNSName)
) )
except x509.extensions.ExtensionNotFound: except x509.extensions.ExtensionNotFound:
pass _logger.exception(_("Error updating name"))
domains = set(domains) domains = set(domains)
missing = domains - names missing = domains - names
@ -294,7 +298,7 @@ class Letsencrypt(models.AbstractModel):
self._respond_challenge_http(challenge, account_key) self._respond_challenge_http(challenge, account_key)
client.answer_challenge(challenge, acme.challenges.HTTP01Response()) client.answer_challenge(challenge, acme.challenges.HTTP01Response())
break break
elif challenge.chall.typ == TYPE_CHALLENGE_DNS: if challenge.chall.typ == TYPE_CHALLENGE_DNS:
domain = authorizations.body.identifier.value domain = authorizations.body.identifier.value
token = challenge.validation(account_key) token = challenge.validation(account_key)
self._respond_challenge_dns(domain, token) self._respond_challenge_dns(domain, token)
@ -323,7 +327,7 @@ class Letsencrypt(models.AbstractModel):
""" """
token = self._base64_encode(challenge.token) token = self._base64_encode(challenge.token)
challenge_file = os.path.join(_get_challenge_dir(), token) challenge_file = os.path.join(_get_challenge_dir(), token)
with open(challenge_file, "w") as file_: with open(challenge_file, "w", encoding="utf-8") as file_:
file_.write(challenge.validation(account_key)) file_.write(challenge.validation(account_key))
def _respond_challenge_dns(self, domain, token): def _respond_challenge_dns(self, domain, token):
@ -376,8 +380,7 @@ class Letsencrypt(models.AbstractModel):
value = record.to_text()[1:-1] value = record.to_text()[1:-1]
if value == token: if value == token:
return return
else: _logger.debug("Found %r instead of %r", value, token)
_logger.debug("Found %r instead of %r", value, token)
except dns.resolver.NXDOMAIN: except dns.resolver.NXDOMAIN:
_logger.debug("Record for %r does not exist yet", domain) _logger.debug("Record for %r does not exist yet", domain)
if attempt < 30: if attempt < 30:
@ -409,7 +412,7 @@ class Letsencrypt(models.AbstractModel):
def _save_and_reload(self, cert_file, order_resource): def _save_and_reload(self, cert_file, order_resource):
"""Save certfile and reload nginx or other webserver.""" """Save certfile and reload nginx or other webserver."""
ir_config_parameter = self.env["ir.config_parameter"] ir_config_parameter = self.env["ir.config_parameter"]
with open(cert_file, "w") as crt: with open(cert_file, "w", encoding="utf-8") as crt:
crt.write(order_resource.fullchain_pem) crt.write(order_resource.fullchain_pem)
_logger.info("SUCCESS: Certificate saved: %s", cert_file) _logger.info("SUCCESS: Certificate saved: %s", cert_file)
reload_cmd = ir_config_parameter.get_param("letsencrypt.reload_command", "") reload_cmd = ir_config_parameter.get_param("letsencrypt.reload_command", "")
@ -420,23 +423,29 @@ class Letsencrypt(models.AbstractModel):
def _call_cmdline(self, cmdline, env=None): def _call_cmdline(self, cmdline, env=None):
"""Call a shell command.""" """Call a shell command."""
process = subprocess.Popen( with subprocess.Popen(
cmdline, cmdline,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
env=env, env=env,
shell=True, shell=True,
) ) as process:
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
stdout = stdout.strip() stdout = stdout.strip()
stderr = stderr.strip() stderr = stderr.strip()
if process.returncode: if process.returncode:
if stdout:
_logger.warning(stdout)
if stderr:
_logger.warning(stderr)
raise UserError(
_("Error calling %(cmdline)s: %(returncode)d")
% {
"cmdline": cmdline,
"returncode": process.returncode,
}
)
if stdout: if stdout:
_logger.warning(stdout) _logger.info(stdout)
if stderr: if stderr:
_logger.warning(stderr) _logger.info(stderr)
raise UserError(_("Error calling %s: %d") % (cmdline, process.returncode))
if stdout:
_logger.info(stdout)
if stderr:
_logger.info(stderr)

View File

@ -1,5 +1,6 @@
# Copyright 2018-2020 Therp BV <https://therp.nl>. # Copyright 2018-2022 Therp BV <https://therp.nl>.
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
"""Configuration of Letsencrypt."""
from odoo import _, api, exceptions, fields, models from odoo import _, api, exceptions, fields, models
@ -10,6 +11,8 @@ DNS_SCRIPT_DEFAULT = """# Write your script here
class ResConfigSettings(models.TransientModel): class ResConfigSettings(models.TransientModel):
"""Configuration of Letsencrypt."""
_inherit = "res.config.settings" _inherit = "res.config.settings"
letsencrypt_altnames = fields.Char( letsencrypt_altnames = fields.Char(
@ -66,6 +69,7 @@ class ResConfigSettings(models.TransientModel):
@api.onchange("letsencrypt_altnames", "letsencrypt_prefer_dns") @api.onchange("letsencrypt_altnames", "letsencrypt_prefer_dns")
def letsencrypt_check_dns_required(self): def letsencrypt_check_dns_required(self):
"""Check wether DNS required for Letsencrypt."""
altnames = self.letsencrypt_altnames or "" altnames = self.letsencrypt_altnames or ""
self.letsencrypt_needs_dns_provider = ( self.letsencrypt_needs_dns_provider = (
"*." in altnames or self.letsencrypt_prefer_dns "*." in altnames or self.letsencrypt_prefer_dns
@ -80,7 +84,8 @@ class ResConfigSettings(models.TransientModel):
return res return res
def set_values(self): def set_values(self):
super().set_values() """Set Letsencrypt values on settings object."""
result = super().set_values()
self.letsencrypt_check_dns_required() self.letsencrypt_check_dns_required()
if self.letsencrypt_dns_provider == "shell": if self.letsencrypt_dns_provider == "shell":
lines = [ lines = [
@ -90,3 +95,4 @@ class ResConfigSettings(models.TransientModel):
raise exceptions.ValidationError( raise exceptions.ValidationError(
_("You didn't write a DNS update script!") _("You didn't write a DNS update script!")
) )
return result

View File

@ -1,4 +1,4 @@
# Copyright 2018-2020 Therp BV <https://therp.nl>. # Copyright 2018-2022 Therp BV <https://therp.nl>.
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html).
import os import os
@ -6,6 +6,7 @@ import shutil
from datetime import datetime, timedelta from datetime import datetime, timedelta
from os import path from os import path
import dns.resolver
import mock import mock
from odoo.exceptions import UserError, ValidationError from odoo.exceptions import UserError, ValidationError
@ -13,12 +14,6 @@ from odoo.tests import SingleTransactionCase
from ..models.letsencrypt import _get_challenge_dir, _get_data_dir from ..models.letsencrypt import _get_challenge_dir, _get_data_dir
try:
import dns.resolver
except ImportError:
pass
CERT_DIR = path.join(path.dirname(__file__), "certs") CERT_DIR = path.join(path.dirname(__file__), "certs")