# Copyright 2011-2015 Therp BV # Copyright 2015-2016 Opener B.V. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). # flake8: noqa: C901 ##################################################################### # library providing a function to analyse two progressive database # layouts from the OpenUpgrade server. ##################################################################### import collections import copy try: from odoo.addons.openupgrade_scripts import apriori except ImportError: from dataclasses import dataclass, field as dc_field @dataclass class NullApriori: renamed_modules: dict = dc_field(default_factory=dict) merged_modules: dict = dc_field(default_factory=dict) renamed_models: dict = dc_field(default_factory=dict) merged_models: dict = dc_field(default_factory=dict) apriori = NullApriori() def module_map(module): return apriori.renamed_modules.get( module, apriori.merged_modules.get(module, module) ) def model_rename_map(model): return apriori.renamed_models.get(model, model) def model_map(model): return apriori.renamed_models.get(model, apriori.merged_models.get(model, model)) def inv_model_map(model): inv_model_map_dict = {v: k for k, v in apriori.renamed_models.items()} return inv_model_map_dict.get(model, model) IGNORE_FIELDS = [ "create_date", "create_uid", "id", "write_date", "write_uid", ] def compare_records(dict_old, dict_new, fields): """ Check equivalence of two OpenUpgrade field representations with respect to the keys in the 'fields' arguments. Take apriori knowledge into account for mapped modules or model names. Return True of False. """ for field in fields: if field == "module": if module_map(dict_old["module"]) != dict_new["module"]: return False elif field == "model": if model_rename_map(dict_old["model"]) != dict_new["model"]: return False elif field == "other_prefix": if ( dict_old["module"] != dict_old["prefix"] or dict_new["module"] != dict_new["prefix"] ): return False if dict_old["model"] == "ir.ui.view": # basically, to avoid the assets_backend case return False elif dict_old[field] != dict_new[field]: return False return True def search(item, item_list, fields, get_all=None): """ Find a match of a dictionary in a list of similar dictionaries with respect to the keys in the 'fields' arguments. Return the item if found or None. """ all_found = [] for other in item_list: if not compare_records(item, other, fields): continue if not get_all: return other if other["module"] != other["prefix"]: all_found.append(other) if get_all: return all_found # search for renamed fields if "field" in fields: for other in item_list: if not item["field"] or item["field"] is not None or item["isproperty"]: continue if compare_records(dict(item, field=other["field"]), other, fields): return other return None def fieldprint(old, new, field, text, reprs): fieldrepr = "{}".format(old["field"]) if old["field"] not in ("_inherits", "_order"): fieldrepr += " ({})".format(old["type"]) fullrepr = "{:<12} / {:<24} / {:<30}".format(old["module"], old["model"], fieldrepr) if not text: text = "{} is now '{}' ('{}')".format(field, new[field], old[field]) if field in ("column1", "column2"): text += " [%s]" % old["table"] if field == "relation": text += " [nothing to do]" reprs[module_map(old["module"])].append("{}: {}".format(fullrepr, text)) if field == "module": text = "previously in module %s" % old[field] fullrepr = "{:<12} / {:<24} / {:<30}".format( new["module"], old["model"], fieldrepr ) reprs[module_map(new["module"])].append("{}: {}".format(fullrepr, text)) def report_generic(new, old, attrs, reprs): for attr in attrs: if attr == "required": if old[attr] != new["required"] and new["required"]: text = "now required" if new["req_default"]: text += ", req_default: %s" % new["req_default"] fieldprint(old, new, "", text, reprs) elif attr == "stored": if old[attr] != new[attr]: if new["stored"]: text = "is now stored" else: text = "not stored anymore" fieldprint(old, new, "", text, reprs) elif attr == "isfunction": if old[attr] != new[attr]: if new["isfunction"]: text = "now a function" else: text = "not a function anymore" fieldprint(old, new, "", text, reprs) elif attr == "isproperty": if old[attr] != new[attr]: if new[attr]: text = "now a property" else: text = "not a property anymore" fieldprint(old, new, "", text, reprs) elif attr == "isrelated": if old[attr] != new[attr]: if new[attr]: text = "now related" else: text = "not related anymore" fieldprint(old, new, "", text, reprs) elif attr == "table": if old[attr] != new[attr]: fieldprint(old, new, attr, "", reprs) if old[attr] and new[attr]: if old["column1"] != new["column1"]: fieldprint(old, new, "column1", "", reprs) if old["column2"] != new["column2"]: fieldprint(old, new, "column2", "", reprs) elif old[attr] != new[attr]: fieldprint(old, new, attr, "", reprs) def compare_sets(old_records, new_records): """ Compare a set of OpenUpgrade field representations. Try to match the equivalent fields in both sets. Return a textual representation of changes in a dictionary with module names as keys. Special case is the 'general' key which contains overall remarks and matching statistics. """ reprs = collections.defaultdict(list) def clean_records(records): result = [] for record in records: if record["field"] not in IGNORE_FIELDS: result.append(record) return result old_records = clean_records(old_records) new_records = clean_records(new_records) origlen = len(old_records) new_models = {column["model"] for column in new_records} old_models = {column["model"] for column in old_records} matched_direct = 0 matched_other_module = 0 matched_other_type = 0 in_obsolete_models = 0 obsolete_models = [] for model in old_models: if model not in new_models: if model_map(model) not in new_models: obsolete_models.append(model) non_obsolete_old_records = [] for column in copy.copy(old_records): if column["model"] in obsolete_models: in_obsolete_models += 1 else: non_obsolete_old_records.append(column) def match(match_fields, report_fields, warn=False): count = 0 for column in copy.copy(non_obsolete_old_records): found = search(column, new_records, match_fields) if found: if warn: pass # print "Tentatively" report_generic(found, column, report_fields, reprs) old_records.remove(column) non_obsolete_old_records.remove(column) new_records.remove(found) count += 1 return count matched_direct = match( ["module", "mode", "model", "field"], [ "relation", "type", "selection_keys", "_inherits", "stored", "isfunction", "isrelated", "required", "table", "_order", ], ) # other module, same type and operation matched_other_module = match( ["mode", "model", "field", "type"], [ "module", "relation", "selection_keys", "_inherits", "stored", "isfunction", "isrelated", "required", "table", "_order", ], ) # other module, same operation, other type matched_other_type = match( ["module", "mode", "model", "field"], [ "relation", "type", "selection_keys", "_inherits", "stored", "isfunction", "isrelated", "required", "table", "_order", ], ) printkeys = [ "relation", "required", "selection_keys", "req_default", "_inherits", "mode", "attachment", ] for column in old_records: if column["field"] == "_order": continue # we do not care about removed non stored function fields if not column["stored"] and (column["isfunction"] or column["isrelated"]): continue if column["mode"] == "create": column["mode"] = "" extra_message = ", ".join( [ k + ": " + str(column[k]) if k != str(column[k]) else k for k in printkeys if column[k] ] ) if extra_message: extra_message = " " + extra_message fieldprint(column, "", "", "DEL" + extra_message, reprs) printkeys.extend( [ "hasdefault", ] ) for column in new_records: if column["field"] == "_order": continue # we do not care about newly added non stored function fields if not column["stored"] and (column["isfunction"] or column["isrelated"]): continue if column["mode"] == "create": column["mode"] = "" printkeys_plus = printkeys.copy() if column["isfunction"] or column["isrelated"]: printkeys_plus.extend(["isfunction", "isrelated", "stored"]) extra_message = ", ".join( [ k + ": " + str(column[k]) if k != str(column[k]) else k for k in printkeys_plus if column[k] ] ) if extra_message: extra_message = " " + extra_message fieldprint(column, "", "", "NEW" + extra_message, reprs) for line in [ "# %d fields matched," % (origlen - len(old_records)), "# Direct match: %d" % matched_direct, "# Found in other module: %d" % matched_other_module, "# Found with different type: %d" % matched_other_type, "# In obsolete models: %d" % in_obsolete_models, "# Not matched: %d" % len(old_records), "# New columns: %d" % len(new_records), ]: reprs["general"].append(line) return reprs def compare_xml_sets(old_records, new_records): reprs = collections.defaultdict(list) def match_updates(match_fields): old_updated, new_updated = {}, {} for column in copy.copy(old_records): found_all = search(column, old_records, match_fields, True) for found in found_all: old_records.remove(found) for column in copy.copy(new_records): found_all = search(column, new_records, match_fields, True) for found in found_all: new_records.remove(found) matched_records = list(old_updated.values()) + list(new_updated.values()) matched_records = [y for x in matched_records for y in x] return matched_records def match(match_fields, match_type="direct"): matched_records = [] for column in copy.copy(old_records): found = search(column, new_records, match_fields) if found: old_records.remove(column) new_records.remove(found) if match_type != "direct": column["old"] = True found["new"] = True column[match_type] = found["module"] found[match_type] = column["module"] found["domain"] = ( column["domain"] != found["domain"] and column["domain"] != "[]" and found["domain"] is False ) column["domain"] = False column["noupdate_switched"] = False found["noupdate_switched"] = column["noupdate"] != found["noupdate"] if match_type != "direct": matched_records.append(column) matched_records.append(found) elif (match_type == "direct" and found["domain"]) or found[ "noupdate_switched" ]: matched_records.append(found) return matched_records # direct match modified_records = match(["module", "model", "name"]) # updated records (will be excluded) match_updates(["model", "name"]) # other module, same full xmlid moved_records = match(["model", "name"], "moved") # other module, same suffix, other prefix renamed_records = match(["model", "suffix", "other_prefix"], "renamed") for record in old_records: record["old"] = True record["domain"] = False record["noupdate_switched"] = False for record in new_records: record["new"] = True record["domain"] = False record["noupdate_switched"] = False sorted_records = sorted( old_records + new_records + moved_records + renamed_records + modified_records, key=lambda k: (k["model"], "old" in k, k["name"]), ) for entry in sorted_records: content = "" if "old" in entry: content = "DEL %(model)s: %(name)s" % entry if "moved" in entry: content += " [moved to %(moved)s module]" % entry elif "renamed" in entry: content += " [renamed to %(renamed)s module]" % entry elif "new" in entry: content = "NEW %(model)s: %(name)s" % entry if "moved" in entry: content += " [moved from %(moved)s module]" % entry elif "renamed" in entry: content += " [renamed from %(renamed)s module]" % entry if "old" not in entry and "new" not in entry: content = "%(model)s: %(name)s" % entry if entry["domain"]: content += " (deleted domain)" if entry["noupdate"]: content += " (noupdate)" if entry["noupdate_switched"]: content += " (noupdate switched)" reprs[module_map(entry["module"])].append(content) return reprs def compare_model_sets(old_records, new_records): """ Compare a set of OpenUpgrade model representations. """ reprs = collections.defaultdict(list) new_models = {column["model"]: column["module"] for column in new_records} old_models = {column["model"]: column["module"] for column in old_records} obsolete_models = [] for column in copy.copy(old_records): model = column["model"] if model in old_models: if model not in new_models: if model_map(model) not in new_models: obsolete_models.append(model) text = "obsolete model %s" % model if column["model_type"]: text += " [%s]" % column["model_type"] reprs[module_map(column["module"])].append(text) reprs["general"].append( "obsolete model %s [module %s]" % (model, module_map(column["module"])) ) else: moved_module = "" if module_map(column["module"]) != new_models[model_map(model)]: moved_module = " in module %s" % new_models[model_map(model)] text = "obsolete model {} (renamed to {}{})".format( model, model_map(model), moved_module, ) if column["model_type"]: text += " [%s]" % column["model_type"] reprs[module_map(column["module"])].append(text) reprs["general"].append( "obsolete model %s (renamed to %s) [module %s]" % (model, model_map(model), module_map(column["module"])) ) else: if module_map(column["module"]) != new_models[model]: text = "model {} (moved to {})".format(model, new_models[model]) if column["model_type"]: text += " [%s]" % column["model_type"] reprs[module_map(column["module"])].append(text) text = "model {} (moved from {})".format(model, old_models[model]) if column["model_type"]: text += " [%s]" % column["model_type"] for column in copy.copy(new_records): model = column["model"] if model in new_models: if model not in old_models: if inv_model_map(model) not in old_models: text = "new model %s" % model if column["model_type"]: text += " [%s]" % column["model_type"] reprs[column["module"]].append(text) reprs["general"].append( "new model {} [module {}]".format(model, column["module"]) ) else: moved_module = "" if column["module"] != module_map(old_models[inv_model_map(model)]): moved_module = ( " in module %s" % old_models[inv_model_map(model)] ) text = "new model {} (renamed from {}{})".format( model, inv_model_map(model), moved_module, ) if column["model_type"]: text += " [%s]" % column["model_type"] reprs[column["module"]].append(text) reprs["general"].append( "new model %s (renamed from %s) [module %s]" % (model, inv_model_map(model), column["module"]) ) else: if column["module"] != module_map(old_models[model]): text = "model {} (moved from {})".format(model, old_models[model]) if column["model_type"]: text += " [%s]" % column["model_type"] reprs[column["module"]].append(text) return reprs