[FIX] mail_tracking: permissions

Refine which mail tracking record is allowed to see. We can rely on the
ACLs of the related records so we minimize inconvenient message subject
leaks.

A regular user can read these mail.tracking.email records:

- Those with a linked mail.message that the user can read
- Those with a linked mail.mail that the user can read
- Those with no message/mail link but a linked partner that the user can
read.
- Those with no linked records.

TT31399
pull/966/head
david 2022-09-07 19:22:48 +02:00
parent c05f81f2c7
commit b4ab1600fe
5 changed files with 114 additions and 25 deletions

View File

@ -81,8 +81,10 @@ class MailTrackingController(MailController):
metadata = self._request_metadata()
with db_env(db) as env:
try:
tracking_email = env["mail.tracking.email"].search(
[("id", "=", tracking_email_id), ("token", "=", token)]
tracking_email = (
env["mail.tracking.email"]
.sudo()
.search([("id", "=", tracking_email_id), ("token", "=", token)])
)
if not tracking_email:
_logger.warning(

View File

@ -8,7 +8,8 @@ import urllib.parse
import uuid
from datetime import datetime
from odoo import api, fields, models, tools
from odoo import _, api, fields, models, tools
from odoo.exceptions import AccessError
_logger = logging.getLogger(__name__)
@ -130,11 +131,88 @@ class MailTrackingEmail(models.Model):
if state and state in self.env["mail.message"].get_failed_states():
self.mapped("mail_message_id").write({"mail_tracking_needs_action": True})
def _find_allowed_tracking_ids(self):
"""Filter trackings based on related records ACLs"""
# Admins passby this filter
if not self or self.env.user.has_group("base.group_system"):
return self.ids
# Override ORM to get the values directly
self._cr.execute(
"""
SELECT id, mail_message_id, mail_id, partner_id
FROM mail_tracking_email WHERE id IN %s
""",
(tuple(self.ids),),
)
msg_linked = self._cr.fetchall()
if not msg_linked:
return []
ids, msg_ids, mail_ids, partner_ids = zip(*msg_linked)
# Filter messages with their ACL rules
msg_ids = self.env["mail.message"]._search([("id", "in", msg_ids)])
mail_ids = self.env["mail.mail"]._search([("id", "in", mail_ids)])
partner_ids = self.env["res.partner"]._search([("id", "in", partner_ids)])
return [
x[0]
for x in msg_linked
if (x[1] in msg_ids) # We can read the linked message
or (x[2] in mail_ids) # We can read the linked mail
or (
not any({x[1], x[2]}) and x[3] in partner_ids
) # No linked msg/mail but we can read the linked partner
or (not any({x[1], x[2], x[3]})) # No linked record
]
@api.model
def _search(
self,
args,
offset=0,
limit=None,
order=None,
count=False,
access_rights_uid=None,
):
"""Filter ids based on related records ACLs"""
ids = super()._search(
args, offset, limit, order, count=count, access_rights_uid=access_rights_uid
)
if not self.env.user.has_group("base.group_system") and not count:
ids = self.browse(ids)._find_allowed_tracking_ids()
return ids
def check_access_rule(self, operation):
"""Rely on related messages ACLs"""
super().check_access_rule(operation)
allowed_ids = self._search([("id", "in", self._find_allowed_tracking_ids())])
disallowed_ids = set(self.exists().ids).difference(set(allowed_ids))
if not disallowed_ids:
return
raise AccessError(
_(
"The requested operation cannot be completed due to security "
"restrictions. Please contact your system administrator.\n\n"
"(Document type: %s, Operation: %s)"
)
% (self._description, operation)
+ " - ({} {}, {} {})".format(
_("Records:"), list(disallowed_ids), _("User:"), self._uid
)
)
def read(self, fields=None, load="_classic_read"):
"""Override to explicitly call check_access_rule, that is not called
by the ORM. It instead directly fetches ir.rules and apply them.
"""
if not self.env.user.has_group("base.group_system"):
self.check_access_rule("read")
return super().read(fields=fields, load=load)
@api.model
def email_is_bounced(self, email):
if not email:
return False
res = self._email_last_tracking_state(email)
res = self.sudo()._email_last_tracking_state(email)
return res and res[0].get("state", "") in {
"rejected",
"error",
@ -155,13 +233,13 @@ class MailTrackingEmail(models.Model):
def email_score_from_email(self, email):
if not email:
return 0.0
data = self.read_group(
data = self.sudo().read_group(
[("recipient_address", "=", email.lower())],
["recipient_address", "state"],
["state"],
)
mapped_data = {state["state"]: state["state_count"] for state in data}
return self.with_context(mt_states=mapped_data).email_score()
return self.with_context(mt_states=mapped_data).sudo().email_score()
@api.model
def _email_score_weights(self):

View File

@ -17,14 +17,16 @@ class ResPartner(models.Model):
@api.depends("email")
def _compute_email_score_and_count(self):
self.email_score = 50.0
self.tracking_emails_count = 0
partners_mail = self.filtered("email")
mail_tracking_obj = self.env["mail.tracking.email"]
mt_obj = self.env["mail.tracking.email"].sudo()
for partner in partners_mail:
partner.email_score = self.env[
"mail.tracking.email"
].email_score_from_email(partner.email)
partner.tracking_emails_count = mail_tracking_obj.search_count(
[("recipient_address", "=", partner.email.lower())]
partner.email_score = mt_obj.email_score_from_email(partner.email)
# We don't want performance issues due to heavy ACLs check for large
# recordsets. Our option is to hide the number for regular users.
if not self.env.user.has_group("base.group_system"):
continue
partner.tracking_emails_count = len(
mt_obj._search([("recipient_address", "=", partner.email.lower())])
)
partners_no_mail = self - partners_mail
partners_no_mail.update({"email_score": 50.0, "tracking_emails_count": 0})

View File

@ -10,6 +10,7 @@ import psycopg2.errorcodes
from lxml import etree
from odoo import http
from odoo.tests import users
from odoo.tests.common import TransactionCase
from odoo.tools import mute_logger
@ -75,9 +76,13 @@ class TestMailTracking(TransactionCase):
tracking.write({"recipient": False})
self.assertEqual(False, tracking.recipient_address)
@users("admin", "demo")
def test_message_post(self):
# This message will generate a notification for recipient
message = self.env["mail.message"].create(
message = (
self.env["mail.message"]
.sudo()
.create(
{
"subject": "Message test",
"author_id": self.sender.id,
@ -89,6 +94,7 @@ class TestMailTracking(TransactionCase):
"body": "<p>This is a test message</p>",
}
)
)
message._moderate_accept()
# Search tracking created
tracking_email = self.env["mail.tracking.email"].search(

View File

@ -23,6 +23,7 @@
name="tracking_emails_count"
widget="statinfo"
string="Tracking emails"
attrs="{'invisible': [('tracking_emails_count', '=', False)]}"
/>
</button>
</div>