362 lines
13 KiB
Python
362 lines
13 KiB
Python
# Copyright 2018 Therp BV <http://therp.nl>
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
|
|
|
import os
|
|
import shutil
|
|
|
|
from datetime import datetime, timedelta
|
|
from os import path
|
|
|
|
import mock
|
|
|
|
from odoo.exceptions import UserError, ValidationError
|
|
from odoo.tests import SingleTransactionCase
|
|
|
|
try:
|
|
import dns.resolver
|
|
except ImportError:
|
|
pass
|
|
|
|
from ..models.letsencrypt import _get_data_dir, _get_challenge_dir
|
|
|
|
|
|
CERT_DIR = path.join(path.dirname(__file__), 'certs')
|
|
|
|
|
|
def _poll(order, deadline):
|
|
order_resource = mock.Mock(['fullchain_pem'])
|
|
order_resource.fullchain_pem = 'chain'
|
|
return order_resource
|
|
|
|
|
|
class TestLetsencrypt(SingleTransactionCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.env['ir.config_parameter'].set_param(
|
|
'web.base.url', 'http://www.example.com'
|
|
)
|
|
self.env['res.config.settings'].create(
|
|
{
|
|
'letsencrypt_dns_provider': 'shell',
|
|
'letsencrypt_dns_shell_script': 'touch /tmp/.letsencrypt_test',
|
|
'letsencrypt_altnames': '*.example.com',
|
|
'letsencrypt_reload_command': 'echo reloaded',
|
|
}
|
|
).set_values()
|
|
|
|
def test_config_settings(self):
|
|
setting_vals = self.env['res.config.settings'].default_get([])
|
|
self.assertEqual(setting_vals['letsencrypt_dns_provider'], 'shell')
|
|
self.assertEqual(
|
|
setting_vals['letsencrypt_dns_shell_script'],
|
|
'touch /tmp/.letsencrypt_test',
|
|
)
|
|
self.assertEqual(setting_vals['letsencrypt_altnames'], '*.example.com')
|
|
self.assertEqual(setting_vals['letsencrypt_reload_command'], 'echo reloaded')
|
|
self.assertTrue(setting_vals['letsencrypt_needs_dns_provider'])
|
|
self.assertFalse(setting_vals['letsencrypt_prefer_dns'])
|
|
|
|
with self.assertRaises(ValidationError):
|
|
self.env["res.config.settings"].create(
|
|
{"letsencrypt_dns_shell_script": "# Empty script"}
|
|
).set_values()
|
|
|
|
@mock.patch('acme.client.ClientV2.answer_challenge')
|
|
@mock.patch('acme.client.ClientV2.poll_and_finalize', side_effect=_poll)
|
|
def test_http_challenge(self, poll, _answer_challenge):
|
|
letsencrypt = self.env['letsencrypt']
|
|
self.env['res.config.settings'].create(
|
|
{'letsencrypt_altnames': 'test.example.com'}
|
|
).set_values()
|
|
letsencrypt._cron()
|
|
poll.assert_called()
|
|
self.assertTrue(os.listdir(_get_challenge_dir()))
|
|
self.assertFalse(path.isfile('/tmp/.letsencrypt_test'))
|
|
self.assertTrue(
|
|
path.isfile(path.join(_get_data_dir(), 'www.example.com.crt'))
|
|
)
|
|
|
|
# pylint: disable=unused-argument
|
|
@mock.patch('odoo.addons.letsencrypt.models.letsencrypt.DNSUpdate')
|
|
@mock.patch('dns.resolver.query')
|
|
@mock.patch('time.sleep')
|
|
@mock.patch('acme.client.ClientV2.answer_challenge')
|
|
@mock.patch('acme.client.ClientV2.poll_and_finalize', side_effect=_poll)
|
|
def test_dns_challenge(self, poll, answer_challenge, sleep, query, dnsupd):
|
|
|
|
record = None
|
|
|
|
def register_update(challenge, domain, token):
|
|
nonlocal record
|
|
record = mock.Mock()
|
|
record.to_text.return_value = '"%s"' % token
|
|
ret = mock.Mock()
|
|
ret.challenge = challenge
|
|
ret.domain = domain
|
|
ret.token = token
|
|
return ret
|
|
|
|
dnsupd.side_effect = register_update
|
|
|
|
ncalls = 0
|
|
|
|
def query_effect(domain, rectype):
|
|
nonlocal ncalls
|
|
self.assertEqual(domain, "_acme-challenge.example.com.")
|
|
self.assertEqual(rectype, "TXT")
|
|
ncalls += 1
|
|
if ncalls == 1:
|
|
raise dns.resolver.NXDOMAIN
|
|
elif ncalls == 2:
|
|
wrong_record = mock.Mock()
|
|
wrong_record.to_text.return_value = '"not right"'
|
|
return [wrong_record]
|
|
else:
|
|
return [record]
|
|
|
|
query.side_effect = query_effect
|
|
|
|
self.install_certificate(days_left=10)
|
|
self.env['letsencrypt']._cron()
|
|
poll.assert_called()
|
|
self.assertEqual(ncalls, 3)
|
|
self.assertTrue(path.isfile('/tmp/.letsencrypt_test'))
|
|
self.assertTrue(
|
|
path.isfile(path.join(_get_data_dir(), 'www.example.com.crt'))
|
|
)
|
|
|
|
def test_dns_challenge_error_on_missing_provider(self):
|
|
self.env['res.config.settings'].create(
|
|
{
|
|
'letsencrypt_altnames': '*.example.com',
|
|
'letsencrypt_dns_provider': False,
|
|
}
|
|
).set_values()
|
|
with self.assertRaises(UserError):
|
|
self.env['letsencrypt']._cron()
|
|
|
|
def test_prefer_dns_setting(self):
|
|
self.env['res.config.settings'].create(
|
|
{
|
|
'letsencrypt_altnames': 'example.com',
|
|
'letsencrypt_prefer_dns': True,
|
|
}
|
|
).set_values()
|
|
self.env['ir.config_parameter'].set_param(
|
|
'web.base.url', 'http://example.com'
|
|
)
|
|
# pylint: disable=no-value-for-parameter
|
|
self.test_dns_challenge()
|
|
|
|
def test_cascading(self):
|
|
cascade = self.env['letsencrypt']._cascade_domains
|
|
self.assertEqual(
|
|
cascade(
|
|
[
|
|
'www.example.com',
|
|
'*.example.com',
|
|
'example.com',
|
|
'example.com',
|
|
'notexample.com',
|
|
'multi.sub.example.com',
|
|
'www2.example.com',
|
|
'unrelated.com',
|
|
]
|
|
),
|
|
[
|
|
'*.example.com',
|
|
'example.com',
|
|
'multi.sub.example.com',
|
|
'notexample.com',
|
|
'unrelated.com',
|
|
],
|
|
)
|
|
self.assertEqual(cascade([]), [])
|
|
self.assertEqual(cascade(['*.example.com']), ['*.example.com'])
|
|
self.assertEqual(cascade(['www.example.com']), ['www.example.com'])
|
|
self.assertEqual(
|
|
cascade(['www.example.com', 'example.com']),
|
|
['example.com', 'www.example.com'],
|
|
)
|
|
|
|
with self.assertRaises(UserError):
|
|
cascade(['www.*.example.com'])
|
|
|
|
with self.assertRaises(UserError):
|
|
cascade(['*.*.example.com'])
|
|
|
|
def test_altnames_parsing(self):
|
|
config = self.env['ir.config_parameter']
|
|
letsencrypt = self.env['letsencrypt']
|
|
|
|
self.assertEqual(letsencrypt._get_altnames(), ['*.example.com'])
|
|
|
|
config.set_param('letsencrypt.altnames', '')
|
|
self.assertEqual(letsencrypt._get_altnames(), [])
|
|
|
|
config.set_param('letsencrypt.altnames', 'www.example.com')
|
|
self.assertEqual(letsencrypt._get_altnames(), ['www.example.com'])
|
|
|
|
config.set_param(
|
|
'letsencrypt.altnames', 'example.com,example.org,example.net'
|
|
)
|
|
self.assertEqual(
|
|
letsencrypt._get_altnames(),
|
|
['example.com', 'example.org', 'example.net'],
|
|
)
|
|
|
|
config.set_param(
|
|
'letsencrypt.altnames', 'example.com, example.org\nexample.net'
|
|
)
|
|
self.assertEqual(
|
|
letsencrypt._get_altnames(),
|
|
['example.com', 'example.org', 'example.net'],
|
|
)
|
|
|
|
def test_key_generation_and_retrieval(self):
|
|
key_a1 = self.env['letsencrypt']._get_key('a.key')
|
|
key_a2 = self.env['letsencrypt']._get_key('a.key')
|
|
key_b = self.env['letsencrypt']._get_key('b.key')
|
|
self.assertIsInstance(key_a1, bytes)
|
|
self.assertIsInstance(key_a2, bytes)
|
|
self.assertIsInstance(key_b, bytes)
|
|
self.assertTrue(path.isfile(path.join(_get_data_dir(), 'a.key')))
|
|
self.assertEqual(key_a1, key_a2)
|
|
self.assertNotEqual(key_a1, key_b)
|
|
|
|
@mock.patch('os.remove', side_effect=os.remove)
|
|
@mock.patch(
|
|
'odoo.addons.letsencrypt.models.letsencrypt.Letsencrypt._generate_key',
|
|
side_effect=lambda: None,
|
|
)
|
|
def test_interrupted_key_writing(self, generate_key, remove):
|
|
with self.assertRaises(TypeError):
|
|
self.env['letsencrypt']._get_key('a.key')
|
|
self.assertFalse(path.isfile(path.join(_get_data_dir(), 'a.key')))
|
|
remove.assert_called()
|
|
generate_key.assert_called()
|
|
|
|
def test_domain_validation(self):
|
|
self.env['letsencrypt']._validate_domain('example.com')
|
|
self.env['letsencrypt']._validate_domain('www.example.com')
|
|
|
|
with self.assertRaises(UserError):
|
|
self.env['letsencrypt']._validate_domain('1.1.1.1')
|
|
with self.assertRaises(UserError):
|
|
self.env['letsencrypt']._validate_domain('192.168.1.1')
|
|
with self.assertRaises(UserError):
|
|
self.env['letsencrypt']._validate_domain('localhost.localdomain')
|
|
with self.assertRaises(UserError):
|
|
self.env['letsencrypt']._validate_domain('testdomain')
|
|
with self.assertRaises(UserError):
|
|
self.env['letsencrypt']._validate_domain('::1')
|
|
|
|
def test_young_certificate(self):
|
|
self.install_certificate(60)
|
|
self.assertFalse(
|
|
self.env['letsencrypt']._should_run(
|
|
path.join(_get_data_dir(), 'www.example.com.crt'),
|
|
['www.example.com', '*.example.com'],
|
|
)
|
|
)
|
|
|
|
def test_old_certificate(self):
|
|
self.install_certificate(20)
|
|
self.assertTrue(
|
|
self.env['letsencrypt']._should_run(
|
|
path.join(_get_data_dir(), 'www.example.com.crt'),
|
|
['www.example.com', '*.example.com'],
|
|
)
|
|
)
|
|
|
|
def test_expired_certificate(self):
|
|
self.install_certificate(-10)
|
|
self.assertTrue(
|
|
self.env['letsencrypt']._should_run(
|
|
path.join(_get_data_dir(), 'www.example.com.crt'),
|
|
['www.example.com', '*.example.com'],
|
|
)
|
|
)
|
|
|
|
def test_missing_certificate(self):
|
|
self.assertTrue(
|
|
self.env['letsencrypt']._should_run(
|
|
path.join(_get_data_dir(), 'www.example.com.crt'),
|
|
['www.example.com', '*.example.com'],
|
|
)
|
|
)
|
|
|
|
def test_new_altnames(self):
|
|
self.install_certificate(60, 'www.example.com', ())
|
|
self.assertTrue(
|
|
self.env['letsencrypt']._should_run(
|
|
path.join(_get_data_dir(), 'www.example.com.crt'),
|
|
['www.example.com', '*.example.com'],
|
|
)
|
|
)
|
|
|
|
def test_legacy_certificate_without_altnames(self):
|
|
self.install_certificate(60, use_altnames=False)
|
|
self.assertFalse(
|
|
self.env['letsencrypt']._should_run(
|
|
path.join(_get_data_dir(), 'www.example.com.crt'),
|
|
['www.example.com'],
|
|
)
|
|
)
|
|
|
|
def install_certificate(
|
|
self,
|
|
days_left,
|
|
common_name='www.example.com',
|
|
altnames=('*.example.com',),
|
|
use_altnames=True,
|
|
):
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
|
|
not_after = datetime.now() + timedelta(days=days_left)
|
|
not_before = not_after - timedelta(days=90)
|
|
|
|
key = rsa.generate_private_key(
|
|
public_exponent=65537, key_size=2048, backend=default_backend()
|
|
)
|
|
cert_builder = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(
|
|
x509.Name(
|
|
[x509.NameAttribute(x509.NameOID.COMMON_NAME, common_name)]
|
|
)
|
|
)
|
|
.issuer_name(
|
|
x509.Name(
|
|
[x509.NameAttribute(x509.NameOID.COMMON_NAME, 'myca.biz')]
|
|
)
|
|
)
|
|
.not_valid_before(not_before)
|
|
.not_valid_after(not_after)
|
|
.serial_number(x509.random_serial_number())
|
|
.public_key(key.public_key())
|
|
)
|
|
|
|
if use_altnames:
|
|
cert_builder = cert_builder.add_extension(
|
|
x509.SubjectAlternativeName(
|
|
[x509.DNSName(common_name)]
|
|
+ [x509.DNSName(name) for name in altnames]
|
|
),
|
|
critical=False,
|
|
)
|
|
|
|
cert = cert_builder.sign(key, hashes.SHA256(), default_backend())
|
|
cert_file = path.join(_get_data_dir(), '%s.crt' % common_name)
|
|
with open(cert_file, 'wb') as file_:
|
|
file_.write(cert.public_bytes(serialization.Encoding.PEM))
|
|
|
|
def tearDown(self):
|
|
super().tearDown()
|
|
shutil.rmtree(_get_data_dir(), ignore_errors=True)
|
|
if path.isfile('/tmp/.letsencrypt_test'):
|
|
os.remove('/tmp/.letsencrypt_test')
|