# Copyright 2011-2015 Therp BV # Copyright 2016-2020 Opener B.V. # Copyright 2019 ForgeFlow # Copyright 2020 GRAP # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). # flake8: noqa: C901 import logging import os from copy import deepcopy from lxml import etree from mako.template import Template from odoo import fields, models, release from odoo.exceptions import ValidationError from odoo.modules import get_module_path from odoo.tools import config from odoo.tools.convert import nodeattr2bool from odoo.tools.translate import _ try: from odoo.addons.openupgrade_scripts.apriori import merged_modules, renamed_modules except ImportError: renamed_modules = {} merged_modules = {} from .. import compare _logger = logging.getLogger(__name__) _IGNORE_MODULES = ["openupgrade_records", "upgrade_analysis"] class UpgradeAnalysis(models.Model): _name = "upgrade.analysis" _description = "Upgrade Analyses" analysis_date = fields.Datetime(readonly=True) state = fields.Selection( [("draft", "draft"), ("done", "Done")], readonly=True, default="draft" ) config_id = fields.Many2one( string="Comparison Config", comodel_name="upgrade.comparison.config", readonly=True, required=True, ) log = fields.Text(readonly=True) upgrade_path = fields.Char( compute="_compute_upgrade_path", readonly=True, help=( "The base file path to save the analyse files of Odoo modules. " "Taken from Odoo's --upgrade-path command line option or the " "'scripts' subdirectory in the openupgrade_scripts addon." ), ) write_files = fields.Boolean( help="Write analysis files to the module directories", default=True ) def _compute_upgrade_path(self): """Return the --upgrade-path configuration option or the `scripts` directory in `openupgrade_scripts` if available """ res = config.get("upgrade_path", False) if not res: module_path = get_module_path("openupgrade_scripts", display_warning=False) if module_path: res = os.path.join(module_path, "scripts") self.upgrade_path = res def _get_remote_model(self, connection, model): self.ensure_one() if model == "record": if float(self.config_id.version) < 14.0: return connection.env["openupgrade.record"] else: return connection.env["upgrade.record"] return False def _write_file( self, module_name, version, content, filename="upgrade_analysis.txt" ): module = self.env["ir.module.module"].search([("name", "=", module_name)])[0] if module.is_odoo_module: if not self.upgrade_path: return ( "ERROR: no upgrade_path set when writing analysis of %s\n" % module_name ) full_path = os.path.join(self.upgrade_path, module_name, version) else: full_path = os.path.join( get_module_path(module_name), "migrations", version ) if not os.path.exists(full_path): try: os.makedirs(full_path) except os.error: return "ERROR: could not create migrations directory %s:\n" % ( full_path ) logfile = os.path.join(full_path, filename) try: f = open(logfile, "w") except Exception: return "ERROR: could not open file %s for writing:\n" % logfile _logger.debug("Writing analysis to %s", logfile) f.write(content) f.close() return None def analyze(self): """ Retrieve both sets of database representations, perform the comparison and register the resulting change set """ self.ensure_one() self.write( { "analysis_date": fields.Datetime.now(), } ) connection = self.config_id.get_connection() RemoteRecord = self._get_remote_model(connection, "record") LocalRecord = self.env["upgrade.record"] # Retrieve field representations and compare remote_records = RemoteRecord.field_dump() local_records = LocalRecord.field_dump() res = compare.compare_sets(remote_records, local_records) # Retrieve xml id representations and compare flds = [ "module", "model", "name", "noupdate", "prefix", "suffix", "domain", ] local_xml_records = [ {field: record[field] for field in flds} for record in LocalRecord.search([("type", "=", "xmlid")]) ] remote_xml_record_ids = RemoteRecord.search([("type", "=", "xmlid")]) remote_xml_records = [ {field: record[field] for field in flds} for record in RemoteRecord.read(remote_xml_record_ids, flds) ] res_xml = compare.compare_xml_sets(remote_xml_records, local_xml_records) # Retrieve model representations and compare flds = [ "module", "model", "name", "model_original_module", "model_type", ] local_model_records = [ {field: record[field] for field in flds} for record in LocalRecord.search([("type", "=", "model")]) ] remote_model_record_ids = RemoteRecord.search([("type", "=", "model")]) remote_model_records = [ {field: record[field] for field in flds} for record in RemoteRecord.read(remote_model_record_ids, flds) ] res_model = compare.compare_model_sets( remote_model_records, local_model_records ) affected_modules = sorted( { record["module"] for record in remote_records + local_records + remote_xml_records + local_xml_records + remote_model_records + local_model_records } ) if "base" in affected_modules: try: pass except ImportError: _logger.error( "You are using upgrade_analysis on core modules without " " having openupgrade_scripts module available." " The analysis process will not work properly," " if you are generating analysis for the odoo modules" " in an openupgrade context." ) # reorder and output the result keys = ["general"] + affected_modules modules = { module["name"]: module for module in self.env["ir.module.module"].search( [("state", "=", "installed")] ) } general_log = "" for ignore_module in _IGNORE_MODULES: if ignore_module in keys: keys.remove(ignore_module) for key in keys: contents = "---Models in module '%s'---\n" % key if key in res_model: contents += "\n".join([str(line) for line in res_model[key]]) if res_model[key]: contents += "\n" contents += "---Fields in module '%s'---\n" % key if key in res: contents += "\n".join([str(line) for line in sorted(res[key])]) if res[key]: contents += "\n" contents += "---XML records in module '%s'---\n" % key if key in res_xml: contents += "\n".join([str(line) for line in res_xml[key]]) if res_xml[key]: contents += "\n" if key not in res and key not in res_xml and key not in res_model: contents += "---nothing has changed in this module--\n" if key == "general": general_log += contents continue if compare.module_map(key) not in modules: general_log += ( "ERROR: module not in list of installed modules:\n" + contents ) continue if key not in modules: # no need to log in full log the merged/renamed modules continue if self.write_files: error = self._write_file(key, modules[key].installed_version, contents) if error: general_log += error general_log += contents else: general_log += contents # Store the full log if self.write_files and "base" in modules: self._write_file( "base", modules["base"].installed_version, general_log, "upgrade_general_log.txt", ) try: self.generate_noupdate_changes() except Exception as e: _logger.exception("Error generating noupdate changes: %s" % e) general_log += "ERROR: error when generating noupdate changes: %s\n" % e try: self.generate_module_coverage_file() except Exception as e: _logger.exception("Error generating module coverage file: %s" % e) general_log += "ERROR: error when generating module coverage file: %s\n" % e self.write( { "state": "done", "log": general_log, } ) return True @staticmethod def _get_node_dict(element): res = {} if element is None: return res for child in element: if "name" in child.attrib: key = "./{}[@name='{}']".format(child.tag, child.attrib["name"]) res[key] = child return res @staticmethod def _get_node_value(element): if "eval" in element.attrib.keys(): return element.attrib["eval"] if "ref" in element.attrib.keys(): return element.attrib["ref"] if not len(element): return element.text return etree.tostring(element) def _get_xml_diff( self, remote_update, remote_noupdate, local_update, local_noupdate ): odoo = etree.Element("odoo") for xml_id in sorted(local_noupdate.keys()): local_record = local_noupdate[xml_id] remote_record = None if xml_id in remote_update and xml_id not in remote_noupdate: remote_record = remote_update[xml_id] elif xml_id in remote_noupdate: remote_record = remote_noupdate[xml_id] if "." in xml_id: module_xmlid = xml_id.split(".", 1)[0] else: module_xmlid = "" if remote_record is None and not module_xmlid: continue element = etree.Element( "record", id=xml_id, model=local_record.attrib["model"] ) # Add forcecreate attribute if exists if local_record.attrib.get("forcecreate"): element.attrib["forcecreate"] = local_record.attrib["forcecreate"] record_remote_dict = self._get_node_dict(remote_record) record_local_dict = self._get_node_dict(local_record) for key in sorted(record_remote_dict.keys()): if not local_record.xpath(key): # The element is no longer present. # Does the field still exist? if record_remote_dict[key].tag == "field": field_name = remote_record.xpath(key)[0].attrib.get("name") if ( field_name not in self.env[local_record.attrib["model"]]._fields.keys() ): continue # Overwrite an existing value with an empty one. attribs = deepcopy(record_remote_dict[key]).attrib for attr in ["eval", "ref"]: if attr in attribs: del attribs[attr] element.append(etree.Element(record_remote_dict[key].tag, attribs)) else: oldrepr = self._get_node_value(record_remote_dict[key]) newrepr = self._get_node_value(record_local_dict[key]) if oldrepr != newrepr: element.append(deepcopy(record_local_dict[key])) for key in sorted(record_local_dict.keys()): if remote_record is None or not remote_record.xpath(key): element.append(deepcopy(record_local_dict[key])) if len(element): odoo.append(element) if not len(odoo): return "" return etree.tostring( etree.ElementTree(odoo), pretty_print=True, xml_declaration=True, encoding="utf-8", ).decode("utf-8") @staticmethod def _update_node(target, source): for element in source: if "name" in element.attrib: query = "./{}[@name='{}']".format(element.tag, element.attrib["name"]) else: # query = "./{}".format(element.tag) continue for existing in target.xpath(query): target.remove(existing) target.append(element) @classmethod def _process_data_node( self, data_node, records_update, records_noupdate, module_name ): noupdate = nodeattr2bool(data_node, "noupdate", False) for record in data_node.xpath("./record"): self._process_record_node( record, noupdate, records_update, records_noupdate, module_name ) @classmethod def _process_record_node( self, record, noupdate, records_update, records_noupdate, module_name ): xml_id = record.get("id") if not xml_id: return if "." in xml_id and xml_id.startswith(module_name + "."): xml_id = xml_id[len(module_name) + 1 :] for records in records_noupdate, records_update: # records can occur multiple times in the same module # with different noupdate settings if xml_id in records: # merge records (overwriting an existing element # with the same tag). The order processing the # various directives from the manifest is # important here self._update_node(records[xml_id], record) break else: target_dict = records_noupdate if noupdate else records_update target_dict[xml_id] = record @classmethod def _parse_files(self, xml_files, module_name): records_update = {} records_noupdate = {} parser = etree.XMLParser( remove_blank_text=True, strip_cdata=False, ) for xml_file in xml_files: try: # This is for a final correct pretty print # Ref.: https://stackoverflow.com/a/7904066 # Also don't strip CDATA tags as needed for HTML content root_node = etree.fromstring(xml_file.encode("utf-8"), parser=parser) except etree.XMLSyntaxError: continue # Support xml files with root Element either odoo or openerp # Condition: each xml file should have only one root element # {, or —rarely— }; root_node_noupdate = nodeattr2bool(root_node, "noupdate", False) if root_node.tag not in ("openerp", "odoo", "data"): raise ValidationError( _("Unexpected root Element: %(root)s in file: %(file)s") % {"root": root_node.getroot(), "file": xml_file} ) for node in root_node: if node.tag == "data": self._process_data_node( node, records_update, records_noupdate, module_name ) elif node.tag == "record": self._process_record_node( node, root_node_noupdate, records_update, records_noupdate, module_name, ) return records_update, records_noupdate def generate_noupdate_changes(self): """Communicate with the remote server to fetch all xml data records per module, and generate a diff in XML format that can be imported from the module's migration script using openupgrade.load_data() """ self.ensure_one() connection = self.config_id.get_connection() remote_record_obj = self._get_remote_model(connection, "record") local_record_obj = self.env["upgrade.record"] local_modules = local_record_obj.list_modules() all_remote_modules = remote_record_obj.list_modules() for local_module in local_modules: remote_files = [] remote_modules = [] remote_update, remote_noupdate = {}, {} for remote_module in all_remote_modules: if local_module == renamed_modules.get( remote_module, merged_modules.get(remote_module, remote_module) ): remote_files.extend( remote_record_obj.get_xml_records(remote_module) ) remote_modules.append(remote_module) add_remote_update, add_remote_noupdate = self._parse_files( remote_files, remote_module ) remote_update.update(add_remote_update) remote_noupdate.update(add_remote_noupdate) if not remote_modules: continue local_files = local_record_obj.get_xml_records(local_module) local_update, local_noupdate = self._parse_files(local_files, local_module) diff = self._get_xml_diff( remote_update, remote_noupdate, local_update, local_noupdate ) if diff: module = self.env["ir.module.module"].search( [("name", "=", local_module)] ) self._write_file( local_module, module.installed_version, diff, filename="noupdate_changes.xml", ) return True def generate_module_coverage_file(self): self.ensure_one() module_coverage_file_folder = config.get("module_coverage_file_folder", False) if not module_coverage_file_folder: return file_template = Template( filename=os.path.join( get_module_path("upgrade_analysis"), "static", "src", "module_coverage_template.rst.mako", ) ) module_domain = [ ("state", "=", "installed"), ("name", "not in", ["upgrade_analysis", "openupgrade_records"]), ] connection = self.config_id.get_connection() all_local_modules = ( self.env["ir.module.module"].search(module_domain).mapped("name") ) all_remote_modules = ( connection.env["ir.module.module"] .browse(connection.env["ir.module.module"].search(module_domain)) .mapped("name") ) start_version = connection.version end_version = release.major_version all_modules = sorted(list(set(all_remote_modules + all_local_modules))) module_descriptions = [] for module in all_modules: if module in all_local_modules and module in all_remote_modules: module_description = " %s" % module elif module in all_local_modules: module_description = " |new| %s" % module else: module_description = " |del| %s" % module module_descriptions.append(module_description.ljust(49, " ")) rendered_text = file_template.render( start_version=start_version, end_version=end_version, module_descriptions=module_descriptions, ) file_name = "modules{}-{}.rst".format( start_version.replace(".", ""), end_version.replace(".", ""), ) file_path = os.path.join(module_coverage_file_folder, file_name) f = open(file_path, "w+") f.write(rendered_text) f.close()