548 lines
20 KiB
Python
548 lines
20 KiB
Python
# Copyright 2011-2015 Therp BV <https://therp.nl>
|
|
# Copyright 2015-2016 Opener B.V. <https://opener.am>
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
# flake8: noqa: C901
|
|
|
|
#####################################################################
|
|
# library providing a function to analyse two progressive database
|
|
# layouts from the OpenUpgrade server.
|
|
#####################################################################
|
|
|
|
import collections
|
|
import copy
|
|
|
|
try:
|
|
from odoo.addons.openupgrade_scripts import apriori
|
|
except ImportError:
|
|
from dataclasses import dataclass, field as dc_field
|
|
|
|
@dataclass
|
|
class NullApriori:
|
|
renamed_modules: dict = dc_field(default_factory=dict)
|
|
merged_modules: dict = dc_field(default_factory=dict)
|
|
renamed_models: dict = dc_field(default_factory=dict)
|
|
merged_models: dict = dc_field(default_factory=dict)
|
|
|
|
apriori = NullApriori()
|
|
|
|
|
|
def module_map(module):
|
|
return apriori.renamed_modules.get(
|
|
module, apriori.merged_modules.get(module, module)
|
|
)
|
|
|
|
|
|
def model_rename_map(model):
|
|
return apriori.renamed_models.get(model, model)
|
|
|
|
|
|
def model_map(model):
|
|
return apriori.renamed_models.get(model, apriori.merged_models.get(model, model))
|
|
|
|
|
|
def inv_model_map(model):
|
|
inv_model_map_dict = {v: k for k, v in apriori.renamed_models.items()}
|
|
return inv_model_map_dict.get(model, model)
|
|
|
|
|
|
IGNORE_FIELDS = [
|
|
"create_date",
|
|
"create_uid",
|
|
"id",
|
|
"write_date",
|
|
"write_uid",
|
|
]
|
|
|
|
|
|
def compare_records(dict_old, dict_new, fields):
|
|
"""
|
|
Check equivalence of two OpenUpgrade field representations
|
|
with respect to the keys in the 'fields' arguments.
|
|
Take apriori knowledge into account for mapped modules or
|
|
model names.
|
|
Return True of False.
|
|
"""
|
|
for field in fields:
|
|
if field == "module":
|
|
if module_map(dict_old["module"]) != dict_new["module"]:
|
|
return False
|
|
elif field == "model":
|
|
if model_rename_map(dict_old["model"]) != dict_new["model"]:
|
|
return False
|
|
elif field == "other_prefix":
|
|
if (
|
|
dict_old["module"] != dict_old["prefix"]
|
|
or dict_new["module"] != dict_new["prefix"]
|
|
):
|
|
return False
|
|
if dict_old["model"] == "ir.ui.view":
|
|
# basically, to avoid the assets_backend case
|
|
return False
|
|
elif dict_old[field] != dict_new[field]:
|
|
return False
|
|
return True
|
|
|
|
|
|
def search(item, item_list, fields, get_all=None):
|
|
"""
|
|
Find a match of a dictionary in a list of similar dictionaries
|
|
with respect to the keys in the 'fields' arguments.
|
|
Return the item if found or None.
|
|
"""
|
|
all_found = []
|
|
for other in item_list:
|
|
if not compare_records(item, other, fields):
|
|
continue
|
|
if not get_all:
|
|
return other
|
|
if other["module"] != other["prefix"]:
|
|
all_found.append(other)
|
|
if get_all:
|
|
return all_found
|
|
# search for renamed fields
|
|
if "field" in fields:
|
|
for other in item_list:
|
|
if not item["field"] or item["field"] is not None or item["isproperty"]:
|
|
continue
|
|
if compare_records(dict(item, field=other["field"]), other, fields):
|
|
return other
|
|
return None
|
|
|
|
|
|
def fieldprint(old, new, field, text, reprs):
|
|
fieldrepr = "{}".format(old["field"])
|
|
if old["field"] not in ("_inherits", "_order"):
|
|
fieldrepr += " ({})".format(old["type"])
|
|
fullrepr = "{:<12} / {:<24} / {:<30}".format(old["module"], old["model"], fieldrepr)
|
|
if not text:
|
|
text = "{} is now '{}' ('{}')".format(field, new[field], old[field])
|
|
if field in ("column1", "column2"):
|
|
text += " [%s]" % old["table"]
|
|
if field == "relation":
|
|
text += " [nothing to do]"
|
|
reprs[module_map(old["module"])].append("{}: {}".format(fullrepr, text))
|
|
if field == "module":
|
|
text = "previously in module %s" % old[field]
|
|
fullrepr = "{:<12} / {:<24} / {:<30}".format(
|
|
new["module"], old["model"], fieldrepr
|
|
)
|
|
reprs[module_map(new["module"])].append("{}: {}".format(fullrepr, text))
|
|
|
|
|
|
def report_generic(new, old, attrs, reprs):
|
|
for attr in attrs:
|
|
if attr == "required":
|
|
if old[attr] != new["required"] and new["required"]:
|
|
text = "now required"
|
|
fieldprint(old, new, "", text, reprs)
|
|
elif attr == "stored":
|
|
if old[attr] != new[attr]:
|
|
if new["stored"]:
|
|
text = "is now stored"
|
|
else:
|
|
text = "not stored anymore"
|
|
fieldprint(old, new, "", text, reprs)
|
|
elif attr == "isfunction":
|
|
if old[attr] != new[attr]:
|
|
if new["isfunction"]:
|
|
text = "now a function"
|
|
else:
|
|
text = "not a function anymore"
|
|
fieldprint(old, new, "", text, reprs)
|
|
elif attr == "isproperty":
|
|
if old[attr] != new[attr]:
|
|
if new[attr]:
|
|
text = "now a property"
|
|
else:
|
|
text = "not a property anymore"
|
|
fieldprint(old, new, "", text, reprs)
|
|
elif attr == "isrelated":
|
|
if old[attr] != new[attr]:
|
|
if new[attr]:
|
|
text = "now related"
|
|
else:
|
|
text = "not related anymore"
|
|
fieldprint(old, new, "", text, reprs)
|
|
elif attr == "table":
|
|
if old[attr] != new[attr]:
|
|
fieldprint(old, new, attr, "", reprs)
|
|
if old[attr] and new[attr]:
|
|
if old["column1"] != new["column1"]:
|
|
fieldprint(old, new, "column1", "", reprs)
|
|
if old["column2"] != new["column2"]:
|
|
fieldprint(old, new, "column2", "", reprs)
|
|
elif old[attr] != new[attr]:
|
|
fieldprint(old, new, attr, "", reprs)
|
|
|
|
|
|
def compare_sets(old_records, new_records):
|
|
"""
|
|
Compare a set of OpenUpgrade field representations.
|
|
Try to match the equivalent fields in both sets.
|
|
Return a textual representation of changes in a dictionary with
|
|
module names as keys. Special case is the 'general' key
|
|
which contains overall remarks and matching statistics.
|
|
"""
|
|
reprs = collections.defaultdict(list)
|
|
|
|
def clean_records(records):
|
|
result = []
|
|
for record in records:
|
|
if record["field"] not in IGNORE_FIELDS:
|
|
result.append(record)
|
|
return result
|
|
|
|
old_records = clean_records(old_records)
|
|
new_records = clean_records(new_records)
|
|
|
|
origlen = len(old_records)
|
|
new_models = {column["model"] for column in new_records}
|
|
old_models = {column["model"] for column in old_records}
|
|
|
|
matched_direct = 0
|
|
matched_other_module = 0
|
|
matched_other_type = 0
|
|
in_obsolete_models = 0
|
|
|
|
obsolete_models = []
|
|
for model in old_models:
|
|
if model not in new_models:
|
|
if model_map(model) not in new_models:
|
|
obsolete_models.append(model)
|
|
|
|
non_obsolete_old_records = []
|
|
for column in copy.copy(old_records):
|
|
if column["model"] in obsolete_models:
|
|
in_obsolete_models += 1
|
|
else:
|
|
non_obsolete_old_records.append(column)
|
|
|
|
def match(match_fields, report_fields, warn=False):
|
|
count = 0
|
|
for column in copy.copy(non_obsolete_old_records):
|
|
found = search(column, new_records, match_fields)
|
|
if found:
|
|
if warn:
|
|
pass
|
|
# print "Tentatively"
|
|
report_generic(found, column, report_fields, reprs)
|
|
old_records.remove(column)
|
|
non_obsolete_old_records.remove(column)
|
|
new_records.remove(found)
|
|
count += 1
|
|
return count
|
|
|
|
matched_direct = match(
|
|
["module", "mode", "model", "field"],
|
|
[
|
|
"relation",
|
|
"type",
|
|
"selection_keys",
|
|
"_inherits",
|
|
"stored",
|
|
"isfunction",
|
|
"isrelated",
|
|
"required",
|
|
"table",
|
|
"_order",
|
|
],
|
|
)
|
|
|
|
# other module, same type and operation
|
|
matched_other_module = match(
|
|
["mode", "model", "field", "type"],
|
|
[
|
|
"module",
|
|
"relation",
|
|
"selection_keys",
|
|
"_inherits",
|
|
"stored",
|
|
"isfunction",
|
|
"isrelated",
|
|
"required",
|
|
"table",
|
|
"_order",
|
|
],
|
|
)
|
|
|
|
# other module, same operation, other type
|
|
matched_other_type = match(
|
|
["module", "mode", "model", "field"],
|
|
[
|
|
"relation",
|
|
"type",
|
|
"selection_keys",
|
|
"_inherits",
|
|
"stored",
|
|
"isfunction",
|
|
"isrelated",
|
|
"required",
|
|
"table",
|
|
"_order",
|
|
],
|
|
)
|
|
|
|
# Info that is displayed for deleted fields
|
|
printkeys_old = [
|
|
"relation",
|
|
"required",
|
|
"selection_keys",
|
|
"_inherits",
|
|
"mode",
|
|
"attachment",
|
|
]
|
|
# Info that is displayed for new fields
|
|
printkeys_new = printkeys_old + [
|
|
"hasdefault",
|
|
]
|
|
for column in old_records:
|
|
if column["field"] == "_order":
|
|
continue
|
|
# we do not care about removed non stored function fields
|
|
if not column["stored"] and (column["isfunction"] or column["isrelated"]):
|
|
continue
|
|
if column["mode"] == "create":
|
|
column["mode"] = ""
|
|
extra_message = ", ".join(
|
|
[
|
|
k + ": " + str(column[k]) if k != str(column[k]) else k
|
|
for k in printkeys_old
|
|
if column[k]
|
|
]
|
|
)
|
|
if extra_message:
|
|
extra_message = " " + extra_message
|
|
fieldprint(column, "", "", "DEL" + extra_message, reprs)
|
|
|
|
for column in new_records:
|
|
if column["field"] == "_order":
|
|
continue
|
|
# we do not care about newly added non stored function fields
|
|
if not column["stored"] and (column["isfunction"] or column["isrelated"]):
|
|
continue
|
|
if column["mode"] == "create":
|
|
column["mode"] = ""
|
|
printkeys = printkeys_new.copy()
|
|
if column["isfunction"] or column["isrelated"]:
|
|
printkeys.extend(["isfunction", "isrelated", "stored"])
|
|
extra_message = ", ".join(
|
|
[
|
|
k + ": " + str(column[k]) if k != str(column[k]) else k
|
|
for k in printkeys
|
|
if column[k]
|
|
]
|
|
)
|
|
if extra_message:
|
|
extra_message = " " + extra_message
|
|
fieldprint(column, "", "", "NEW" + extra_message, reprs)
|
|
|
|
for line in [
|
|
"# %d fields matched," % (origlen - len(old_records)),
|
|
"# Direct match: %d" % matched_direct,
|
|
"# Found in other module: %d" % matched_other_module,
|
|
"# Found with different type: %d" % matched_other_type,
|
|
"# In obsolete models: %d" % in_obsolete_models,
|
|
"# Not matched: %d" % len(old_records),
|
|
"# New columns: %d" % len(new_records),
|
|
]:
|
|
reprs["general"].append(line)
|
|
return reprs
|
|
|
|
|
|
def compare_xml_sets(old_records, new_records):
|
|
reprs = collections.defaultdict(list)
|
|
|
|
def match_updates(match_fields):
|
|
old_updated, new_updated = {}, {}
|
|
for column in copy.copy(old_records):
|
|
found_all = search(column, old_records, match_fields, True)
|
|
for found in found_all:
|
|
old_records.remove(found)
|
|
for column in copy.copy(new_records):
|
|
found_all = search(column, new_records, match_fields, True)
|
|
for found in found_all:
|
|
new_records.remove(found)
|
|
matched_records = list(old_updated.values()) + list(new_updated.values())
|
|
matched_records = [y for x in matched_records for y in x]
|
|
return matched_records
|
|
|
|
def match(match_fields, match_type="direct"):
|
|
matched_records = []
|
|
for column in copy.copy(old_records):
|
|
found = search(column, new_records, match_fields)
|
|
if found:
|
|
old_records.remove(column)
|
|
new_records.remove(found)
|
|
if match_type != "direct":
|
|
column["old"] = True
|
|
found["new"] = True
|
|
column[match_type] = found["module"]
|
|
found[match_type] = column["module"]
|
|
found["domain"] = (
|
|
column["domain"] != found["domain"]
|
|
and column["domain"] != "[]"
|
|
and found["domain"] is False
|
|
)
|
|
column["domain"] = False
|
|
found["definition"] = (
|
|
column["definition"]
|
|
and column["definition"] != found["definition"]
|
|
and "is now '{}' ('{}')".format(
|
|
found["definition"], column["definition"]
|
|
)
|
|
)
|
|
column["definition"] = False
|
|
column["noupdate_switched"] = False
|
|
found["noupdate_switched"] = column["noupdate"] != found["noupdate"]
|
|
if match_type != "direct":
|
|
matched_records.append(column)
|
|
matched_records.append(found)
|
|
elif (
|
|
match_type == "direct" and (found["domain"] or found["definition"])
|
|
) or found["noupdate_switched"]:
|
|
matched_records.append(found)
|
|
return matched_records
|
|
|
|
# direct match
|
|
modified_records = match(["module", "model", "name"])
|
|
|
|
# updated records (will be excluded)
|
|
match_updates(["model", "name"])
|
|
|
|
# other module, same full xmlid
|
|
moved_records = match(["model", "name"], "moved")
|
|
|
|
# other module, same suffix, other prefix
|
|
renamed_records = match(["model", "suffix", "other_prefix"], "renamed")
|
|
|
|
for record in old_records:
|
|
record["old"] = True
|
|
record["domain"] = False
|
|
record["definition"] = False
|
|
record["noupdate_switched"] = False
|
|
for record in new_records:
|
|
record["new"] = True
|
|
record["domain"] = False
|
|
record["definition"] = False
|
|
record["noupdate_switched"] = False
|
|
|
|
sorted_records = sorted(
|
|
old_records + new_records + moved_records + renamed_records + modified_records,
|
|
key=lambda k: (k["model"], "old" in k, k["name"]),
|
|
)
|
|
for entry in sorted_records:
|
|
content = ""
|
|
if "old" in entry:
|
|
content = "DEL %(model)s: %(name)s" % entry
|
|
if "moved" in entry:
|
|
content += " [moved to %(moved)s module]" % entry
|
|
elif "renamed" in entry:
|
|
content += " [renamed to %(renamed)s module]" % entry
|
|
elif "new" in entry:
|
|
content = "NEW %(model)s: %(name)s" % entry
|
|
if "moved" in entry:
|
|
content += " [moved from %(moved)s module]" % entry
|
|
elif "renamed" in entry:
|
|
content += " [renamed from %(renamed)s module]" % entry
|
|
if "old" not in entry and "new" not in entry:
|
|
content = "%(model)s: %(name)s" % entry
|
|
if entry["domain"]:
|
|
content += " (deleted domain)"
|
|
if entry["definition"]:
|
|
content += " (changed definition: %(definition)s)" % entry
|
|
if entry["noupdate"]:
|
|
content += " (noupdate)"
|
|
if entry["noupdate_switched"]:
|
|
content += " (noupdate switched)"
|
|
reprs[module_map(entry["module"])].append(content)
|
|
return reprs
|
|
|
|
|
|
def compare_model_sets(old_records, new_records):
|
|
"""
|
|
Compare a set of OpenUpgrade model representations.
|
|
"""
|
|
reprs = collections.defaultdict(list)
|
|
|
|
new_models = {column["model"]: column["module"] for column in new_records}
|
|
old_models = {column["model"]: column["module"] for column in old_records}
|
|
|
|
obsolete_models = []
|
|
for column in copy.copy(old_records):
|
|
model = column["model"]
|
|
if model in old_models:
|
|
if model not in new_models:
|
|
if model_map(model) not in new_models:
|
|
obsolete_models.append(model)
|
|
text = "obsolete model %s" % model
|
|
if column["model_type"]:
|
|
text += " [%s]" % column["model_type"]
|
|
reprs[module_map(column["module"])].append(text)
|
|
reprs["general"].append(
|
|
"obsolete model %s [module %s]"
|
|
% (model, module_map(column["module"]))
|
|
)
|
|
else:
|
|
moved_module = ""
|
|
if module_map(column["module"]) != new_models[model_map(model)]:
|
|
moved_module = " in module %s" % new_models[model_map(model)]
|
|
text = "obsolete model {} (renamed to {}{})".format(
|
|
model,
|
|
model_map(model),
|
|
moved_module,
|
|
)
|
|
if column["model_type"]:
|
|
text += " [%s]" % column["model_type"]
|
|
reprs[module_map(column["module"])].append(text)
|
|
reprs["general"].append(
|
|
"obsolete model %s (renamed to %s) [module %s]"
|
|
% (model, model_map(model), module_map(column["module"]))
|
|
)
|
|
else:
|
|
if module_map(column["module"]) != new_models[model]:
|
|
text = "model {} (moved to {})".format(model, new_models[model])
|
|
if column["model_type"]:
|
|
text += " [%s]" % column["model_type"]
|
|
reprs[module_map(column["module"])].append(text)
|
|
text = "model {} (moved from {})".format(model, old_models[model])
|
|
if column["model_type"]:
|
|
text += " [%s]" % column["model_type"]
|
|
|
|
for column in copy.copy(new_records):
|
|
model = column["model"]
|
|
if model in new_models:
|
|
if model not in old_models:
|
|
if inv_model_map(model) not in old_models:
|
|
text = "new model %s" % model
|
|
if column["model_type"]:
|
|
text += " [%s]" % column["model_type"]
|
|
reprs[column["module"]].append(text)
|
|
reprs["general"].append(
|
|
"new model {} [module {}]".format(model, column["module"])
|
|
)
|
|
else:
|
|
moved_module = ""
|
|
if column["module"] != module_map(old_models[inv_model_map(model)]):
|
|
moved_module = (
|
|
" in module %s" % old_models[inv_model_map(model)]
|
|
)
|
|
text = "new model {} (renamed from {}{})".format(
|
|
model,
|
|
inv_model_map(model),
|
|
moved_module,
|
|
)
|
|
if column["model_type"]:
|
|
text += " [%s]" % column["model_type"]
|
|
reprs[column["module"]].append(text)
|
|
reprs["general"].append(
|
|
"new model %s (renamed from %s) [module %s]"
|
|
% (model, inv_model_map(model), column["module"])
|
|
)
|
|
else:
|
|
if column["module"] != module_map(old_models[model]):
|
|
text = "model {} (moved from {})".format(model, old_models[model])
|
|
if column["model_type"]:
|
|
text += " [%s]" % column["model_type"]
|
|
reprs[column["module"]].append(text)
|
|
return reprs
|