# Copyright 2011-2015 Therp BV # Copyright 2016-2020 Opener B.V. # Copyright 2019 ForgeFlow # Copyright 2020 GRAP # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). # flake8: noqa: C901 import logging import os from copy import deepcopy from lxml import etree from odoo import fields, models from odoo.exceptions import UserError, ValidationError from odoo.modules import get_module_path from odoo.tools import config from odoo.tools.convert import nodeattr2bool from odoo.tools.translate import _ try: from odoo.addons.openupgrade_scripts.apriori import merged_modules, renamed_modules except ImportError: renamed_modules = {} merged_modules = {} from .. import compare _logger = logging.getLogger(__name__) _IGNORE_MODULES = ["openupgrade_records", "upgrade_analysis"] class UpgradeAnalysis(models.Model): _name = "upgrade.analysis" _description = "Upgrade Analyses" analysis_date = fields.Datetime(readonly=True) state = fields.Selection( [("draft", "draft"), ("done", "Done")], readonly=True, default="draft" ) config_id = fields.Many2one( string="Comparison Config", comodel_name="upgrade.comparison.config", readonly=True, required=True, ) log = fields.Text(readonly=True) upgrade_path = fields.Char( compute="_compute_upgrade_path", readonly=True, help=( "The base file path to save the analyse files of Odoo modules. " "Taken from Odoo's --upgrade-path command line option or the " "'scripts' subdirectory in the openupgrade_scripts addon." ), ) write_files = fields.Boolean( help="Write analysis files to the module directories", default=True ) def _compute_upgrade_path(self): """Return the --upgrade-path configuration option or the `scripts` directory in `openupgrade_scripts` if available """ res = config.get("upgrade_path", False) if not res: module_path = get_module_path("openupgrade_scripts", display_warning=False) if module_path: res = os.path.join(module_path, "scripts") self.upgrade_path = res def _get_remote_model(self, connection, model): self.ensure_one() if model == "record": if float(self.config_id.version) < 14.0: return connection.env["openupgrade.record"] else: return connection.env["upgrade.record"] return False def _write_file( self, module_name, version, content, filename="upgrade_analysis.txt" ): module = self.env["ir.module.module"].search([("name", "=", module_name)])[0] if module.is_odoo_module: if not self.upgrade_path: return ( "ERROR: no upgrade_path set when writing analysis of %s\n" % module_name ) full_path = os.path.join(self.upgrade_path, module_name, version) else: full_path = os.path.join( get_module_path(module_name, "migrations", version) ) if not os.path.exists(full_path): try: os.makedirs(full_path) except os.error: return "ERROR: could not create migrations directory %s:\n" % ( full_path ) logfile = os.path.join(full_path, filename) try: f = open(logfile, "w") except Exception: return "ERROR: could not open file %s for writing:\n" % logfile _logger.debug("Writing analysis to %s", logfile) f.write(content) f.close() return None def analyze(self): """ Retrieve both sets of database representations, perform the comparison and register the resulting change set """ self.ensure_one() self.write( { "analysis_date": fields.Datetime.now(), } ) connection = self.config_id.get_connection() RemoteRecord = self._get_remote_model(connection, "record") LocalRecord = self.env["upgrade.record"] # Retrieve field representations and compare remote_records = RemoteRecord.field_dump() local_records = LocalRecord.field_dump() res = compare.compare_sets(remote_records, local_records) # Retrieve xml id representations and compare flds = [ "module", "model", "name", "noupdate", "prefix", "suffix", "domain", ] local_xml_records = [ {field: record[field] for field in flds} for record in LocalRecord.search([("type", "=", "xmlid")]) ] remote_xml_record_ids = RemoteRecord.search([("type", "=", "xmlid")]) remote_xml_records = [ {field: record[field] for field in flds} for record in RemoteRecord.read(remote_xml_record_ids, flds) ] res_xml = compare.compare_xml_sets(remote_xml_records, local_xml_records) # Retrieve model representations and compare flds = [ "module", "model", "name", "model_original_module", "model_type", ] local_model_records = [ {field: record[field] for field in flds} for record in LocalRecord.search([("type", "=", "model")]) ] remote_model_record_ids = RemoteRecord.search([("type", "=", "model")]) remote_model_records = [ {field: record[field] for field in flds} for record in RemoteRecord.read(remote_model_record_ids, flds) ] res_model = compare.compare_model_sets( remote_model_records, local_model_records ) affected_modules = sorted( { record["module"] for record in remote_records + local_records + remote_xml_records + local_xml_records + remote_model_records + local_model_records } ) if "base" in affected_modules: try: from odoo.addons.openupgrade_scripts import apriori except ImportError: _logger.error( "You are using upgrade_analysis on core modules without " " having openupgrade_scripts module available." " The analysis process will not work properly," " if you are generating analysis for the odoo modules" " in an openupgrade context." ) # reorder and output the result keys = ["general"] + affected_modules modules = { module["name"]: module for module in self.env["ir.module.module"].search( [("state", "=", "installed")] ) } general_log = "" for ignore_module in _IGNORE_MODULES: if ignore_module in keys: keys.remove(ignore_module) for key in keys: contents = "---Models in module '%s'---\n" % key if key in res_model: contents += "\n".join([str(line) for line in res_model[key]]) if res_model[key]: contents += "\n" contents += "---Fields in module '%s'---\n" % key if key in res: contents += "\n".join([str(line) for line in sorted(res[key])]) if res[key]: contents += "\n" contents += "---XML records in module '%s'---\n" % key if key in res_xml: contents += "\n".join([str(line) for line in res_xml[key]]) if res_xml[key]: contents += "\n" if key not in res and key not in res_xml and key not in res_model: contents += "---nothing has changed in this module--\n" if key == "general": general_log += contents continue if compare.module_map(key) not in modules: general_log += ( "ERROR: module not in list of installed modules:\n" + contents ) continue if key not in modules: # no need to log in full log the merged/renamed modules continue if self.write_files: error = self._write_file(key, modules[key].installed_version, contents) if error: general_log += error general_log += contents else: general_log += contents # Store the full log if self.write_files and "base" in modules: self._write_file( "base", modules["base"].installed_version, general_log, "upgrade_general_log.txt", ) try: self.generate_noupdate_changes() except Exception as e: _logger.exception("Error generating noupdate changes: %s" % e) general_log += "ERROR: error when generating noupdate changes: %s\n" % e self.write( { "state": "done", "log": general_log, } ) return True @staticmethod def _get_node_dict(element): res = {} if element is None: return res for child in element: if "name" in child.attrib: key = "./{}[@name='{}']".format(child.tag, child.attrib["name"]) res[key] = child return res @staticmethod def _get_node_value(element): if "eval" in element.attrib.keys(): return element.attrib["eval"] if "ref" in element.attrib.keys(): return element.attrib["ref"] if not len(element): return element.text return etree.tostring(element) def _get_xml_diff( self, remote_update, remote_noupdate, local_update, local_noupdate ): odoo = etree.Element("odoo") for xml_id in sorted(local_noupdate.keys()): local_record = local_noupdate[xml_id] remote_record = None if xml_id in remote_update and xml_id not in remote_noupdate: remote_record = remote_update[xml_id] elif xml_id in remote_noupdate: remote_record = remote_noupdate[xml_id] if "." in xml_id: module_xmlid = xml_id.split(".", 1)[0] else: module_xmlid = "" if remote_record is None and not module_xmlid: continue element = etree.Element( "record", id=xml_id, model=local_record.attrib["model"] ) # Add forcecreate attribute if exists if local_record.attrib.get("forcecreate"): element.attrib["forcecreate"] = local_record.attrib["forcecreate"] record_remote_dict = self._get_node_dict(remote_record) record_local_dict = self._get_node_dict(local_record) for key in sorted(record_remote_dict.keys()): if not local_record.xpath(key): # The element is no longer present. # Does the field still exist? if record_remote_dict[key].tag == "field": field_name = remote_record.xpath(key)[0].attrib.get("name") if ( field_name not in self.env[local_record.attrib["model"]]._fields.keys() ): continue # Overwrite an existing value with an empty one. attribs = deepcopy(record_remote_dict[key]).attrib for attr in ["eval", "ref"]: if attr in attribs: del attribs[attr] element.append(etree.Element(record_remote_dict[key].tag, attribs)) else: oldrepr = self._get_node_value(record_remote_dict[key]) newrepr = self._get_node_value(record_local_dict[key]) if oldrepr != newrepr: element.append(deepcopy(record_local_dict[key])) for key in sorted(record_local_dict.keys()): if remote_record is None or not remote_record.xpath(key): element.append(deepcopy(record_local_dict[key])) if len(element): odoo.append(element) if not len(odoo): return "" return etree.tostring( etree.ElementTree(odoo), pretty_print=True, xml_declaration=True, encoding="utf-8", ).decode("utf-8") @staticmethod def _update_node(target, source): for element in source: if "name" in element.attrib: query = "./{}[@name='{}']".format(element.tag, element.attrib["name"]) else: # query = "./{}".format(element.tag) continue for existing in target.xpath(query): target.remove(existing) target.append(element) @classmethod def _process_data_node( self, data_node, records_update, records_noupdate, module_name ): noupdate = nodeattr2bool(data_node, "noupdate", False) for record in data_node.xpath("./record"): self._process_record_node( record, noupdate, records_update, records_noupdate, module_name ) @classmethod def _process_record_node( self, record, noupdate, records_update, records_noupdate, module_name ): xml_id = record.get("id") if not xml_id: return if "." in xml_id and xml_id.startswith(module_name + "."): xml_id = xml_id[len(module_name) + 1 :] for records in records_noupdate, records_update: # records can occur multiple times in the same module # with different noupdate settings if xml_id in records: # merge records (overwriting an existing element # with the same tag). The order processing the # various directives from the manifest is # important here self._update_node(records[xml_id], record) break else: target_dict = records_noupdate if noupdate else records_update target_dict[xml_id] = record @classmethod def _parse_paths(self, xml_paths, module_name): records_update = {} records_noupdate = {} parser = etree.XMLParser( remove_blank_text=True, strip_cdata=False, ) for xml_path in xml_paths: try: # This is for a final correct pretty print # Ref.: https://stackoverflow.com/a/7904066 # Also don't strip CDATA tags as needed for HTML content tree = etree.parse(xml_path, parser=parser) except etree.XMLSyntaxError: continue # Support xml files with root Element either odoo or openerp # Condition: each xml file should have only one root element # {, or —rarely— }; root_node = tree.getroot() root_node_noupdate = nodeattr2bool(root_node, "noupdate", False) if root_node.tag not in ("openerp", "odoo", "data"): raise ValidationError( _( "Unexpected root Element: %s in file: %s" % (tree.getroot(), xml_path) ) ) for node in root_node: if node.tag == "data": self._process_data_node( node, records_update, records_noupdate, module_name ) elif node.tag == "record": self._process_record_node( node, root_node_noupdate, records_update, records_noupdate, module_name, ) return records_update, records_noupdate def generate_noupdate_changes(self): """Communicate with the remote server to fetch all xml data records per module, and generate a diff in XML format that can be imported from the module's migration script using openupgrade.load_data() """ self.ensure_one() connection = self.config_id.get_connection() remote_record_obj = self._get_remote_model(connection, "record") local_record_obj = self.env["upgrade.record"] local_modules = local_record_obj.list_modules() all_remote_modules = remote_record_obj.list_modules() for local_module in local_modules: remote_paths = [] remote_modules = [] remote_update, remote_noupdate = {}, {} for remote_module in all_remote_modules: if local_module == renamed_modules.get( remote_module, merged_modules.get(remote_module, remote_module) ): remote_paths.extend( remote_record_obj.get_xml_records(remote_module) ) remote_modules.append(remote_module) add_remote_update, add_remote_noupdate = self._parse_paths( remote_paths, remote_module ) remote_update.update(add_remote_update) remote_noupdate.update(add_remote_noupdate) if not remote_modules: continue local_paths = local_record_obj.get_xml_records(local_module) local_update, local_noupdate = self._parse_paths(local_paths, local_module) diff = self._get_xml_diff( remote_update, remote_noupdate, local_update, local_noupdate ) if diff: module = self.env["ir.module.module"].search( [("name", "=", local_module)] ) self._write_file( local_module, module.installed_version, diff, filename="noupdate_changes.xml", ) return True