# Copyright 2017-2018 Therp BV # Copyright 2020 Hunki Enterprises BV # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). import logging import ssl import traceback import urllib from collections import namedtuple import psycopg2 from odoo import _, api, exceptions, fields, models, tools from odoo.tools.safe_eval import safe_eval try: import odoorpc except ImportError: # pragma: no cover logging.info("Unable to import odoorpc, used in base_import_odoo") odoorpc = False _logger = logging.getLogger("base_import_odoo") import_context_tuple = namedtuple( "import_context", [ "remote", "model_line", "ids", "idmap", "dummies", "dummy_instances", "to_delete", "field_context", ], ) class ImportContext(import_context_tuple): def with_field_context(self, *args): return ImportContext(*(self[:-1] + (field_context(*args),))) field_context = namedtuple( "field_context", ["record_model", "field_name", "record_id"], ) mapping_key = namedtuple("mapping_key", ["model_name", "remote_id"]) dummy_instance = namedtuple( "dummy_instance", ["model_name", "field_name", "remote_id", "dummy_id"], ) class ImportOdooDatabase(models.Model): _name = "import.odoo.database" _description = "An Odoo database to import" url = fields.Char(required=True) ssl = fields.Boolean(compute="_compute_ssl", help="Use HTTPS") ssl_bypass = fields.Boolean(help="Bypass SSL certificate check") database = fields.Char(required=True) user = fields.Char(default="admin", required=True) password = fields.Char(default="admin") import_line_ids = fields.One2many( "import.odoo.database.model", "database_id", string="Import models", ) import_field_mappings = fields.One2many( "import.odoo.database.field", "database_id", string="Field mappings", ) cronjob_id = fields.Many2one( "ir.cron", string="Import job", readonly=True, copy=False, ) cronjob_running = fields.Boolean(compute="_compute_cronjob_running") status_data = fields.Serialized("Status", readonly=True, copy=False) status_html = fields.Html( compute="_compute_status_html", readonly=True, sanitize=False, ) duplicates = fields.Selection( [ ("skip", "Skip existing"), ("overwrite", "Overwrite existing"), ("overwrite_empty", "Overwrite empty fields"), ], "Duplicate handling", default="skip", required=True, ) null_password = fields.Boolean(default=True) @api.depends("url") def _compute_ssl(self): for this in self: this.ssl = this.url and this.url.lower().startswith("https") def action_import(self): """Create a cronjob to run the actual import""" self.ensure_one() if self.cronjob_id: return self.cronjob_id.write( {"numbercall": 1, "doall": True, "active": True} ) return self.write({"cronjob_id": self._create_cronjob().id}) @api.model def _run_import_cron(self, ids): return self.browse(ids)._run_import() def _run_import(self, commit=True, commit_threshold=100): """Run the import as cronjob, commit often""" self.ensure_one() if not self.password: self.write( {"status_data": dict(self.status_data, error="No password provided")} ) return # model name: [ids] remote_ids = {} # model name: count remote_counts = {} # model name: count done = {} # mapping_key: local_id idmap = {} # mapping_key: local_id # this are records created or linked when we need to fill a required # field, but the local record is not yet created dummies = {} # model name: [local_id] # this happens when we create a dummy we can throw away again to_delete = {} # dummy_instance dummy_instances = [] try: remote = self._get_connection() except odoorpc.error.RPCError as e: self.write({"status_data": dict(self.status_data, error=str(e))}) return if self.null_password: self.write({"password": False}) if commit and not tools.config["test_enable"]: # pylint: disable=invalid-commit self.env.cr.commit() for model_line in self.import_line_ids: model = model_line.model_id remote_ids[model.model] = remote.execute( model.model, "search", safe_eval(model_line.domain) if model_line.domain else [], None, None, "id asc", ) remote_counts[model.model] = len(remote_ids[model.model]) self.write( { "status_data": { "counts": remote_counts, "ids": remote_ids, "error": None, "dummies": None, "done": {}, } } ) if commit and not tools.config["test_enable"]: # pylint: disable=invalid-commit self.env.cr.commit() for model_line in self.import_line_ids: model = self.env[model_line.model_id.model] done[model._name] = 0 chunk_len = ( commit and (commit_threshold or 1) or len(remote_ids[model._name]) ) for start_index in range(int(len(remote_ids[model._name]) / chunk_len) + 1): index = start_index * chunk_len ids = remote_ids[model._name][index : index + chunk_len] context = ImportContext( remote, model_line, ids, idmap, dummies, dummy_instances, to_delete, field_context(None, None, None), ) try: self._run_import_model(context) except Exception: # pragma: no cover error = traceback.format_exc() self.env.cr.rollback() self.write({"status_data": dict(self.status_data, error=error)}) # pylint: disable=invalid-commit self.env.cr.commit() raise done[model._name] += len(ids) self.write({"status_data": dict(self.status_data, done=done)}) if commit and not tools.config["test_enable"]: # pylint: disable=invalid-commit self.env.cr.commit() missing = {} for dummy_model, remote_id in dummies.keys(): if remote_id: missing.setdefault(dummy_model, []).append(remote_id) self.write({"status_data": dict(self.status_data, dummies=dict(missing))}) def _run_import_model(self, context): """Import records of a configured model""" model = self.env[context.model_line.model_id.model] fields = self._run_import_model_get_fields(context) for data in context.remote.execute(model._name, "read", context.ids, fields): self._run_import_get_record( context, model, data, create_dummy=False, ) if (model._name, data["id"]) in context.idmap: # one of our mappings hit, create an xmlid to persist # this knowledge self._create_record_xmlid( model, context.idmap[(model._name, data["id"])], data["id"] ) if self.duplicates == "skip": # there's a mapping for this record, nothing to do continue data = self._run_import_map_values(context, data) _id = data["id"] record = self._create_record(context, model, data) self._run_import_model_cleanup_dummies( context, model, _id, record.id, ) def _create_record(self, context, model, record): """Create a record, add an xmlid""" _id = record.pop("id") xmlid = "%d-%s-%d" % (self.id, model._name.replace(".", "_"), _id or 0) record = self._create_record_filter_fields(model, record) model_defaults = {} if context.model_line.defaults: model_defaults.update(safe_eval(context.model_line.defaults)) for key, value in model_defaults.items(): record.setdefault(key, value) if context.model_line.postprocess: safe_eval( context.model_line.postprocess, { "vals": record, "env": self.env, "_id": _id, "remote": context.remote, }, mode="exec", ) new = self.env.ref("__base_import_odoo__.%s" % xmlid, False) if new and new.exists(): if self.duplicates == "overwrite_empty": record = {key: value for key, value in record.items() if not new[key]} new.with_context(**self._create_record_context(model, record)).write(record) _logger.debug("Updated record %s", xmlid) else: new = model.with_context( **self._create_record_context(model, record) ).create(record) self._create_record_xmlid(model, new.id, _id) _logger.debug("Created record %s", xmlid) context.idmap[mapping_key(model._name, _id)] = new.id return new def _create_record_xmlid(self, model, local_id, remote_id): xmlid = "%d-%s-%d" % (self.id, model._name.replace(".", "_"), remote_id or 0) if self.env.ref("__base_import_odoo__.%s" % xmlid, False): return return self.env["ir.model.data"].create( { "name": xmlid, "model": model._name, "module": "__base_import_odoo__", "res_id": local_id, "noupdate": True, "import_database_id": self.id, "import_database_record_id": remote_id, } ) def _create_record_filter_fields(self, model, record): """Return a version of record with unknown fields for model removed and required fields with no value set to the default if it exists""" defaults = model.default_get(list(record.keys())) return { key: value if value or not model._fields[key].required else defaults.get(key) for key, value in record.items() if key in model._fields } def _create_record_context(self, model, record): """Return a context that is used when creating a record""" context = { "tracking_disable": True, } if model._name == "res.users": context["no_reset_password"] = True return context def _run_import_get_record( self, context, model, record, create_dummy=True, ): """Find the local id of some remote record. Create a dummy if not available""" _id = context.idmap.get((model._name, record["id"])) logged = False if not _id: _id = context.dummies.get((model._name, record["id"])) if _id: context.dummy_instances.append( dummy_instance(*(context.field_context + (_id,))) ) else: logged = True _logger.debug( "Got %s(%d[%d]) from idmap", model._name, _id, record["id"] or 0, ) if not _id: _id = self._run_import_get_record_mapping( context, model, record, create_dummy=create_dummy, ) elif not logged: logged = True _logger.debug( "Got %s(%d[%d]) from dummies", model._name, _id, record["id"], ) if not _id: xmlid = self.env["ir.model.data"].search( [ ("import_database_id", "=", self.id), ("import_database_record_id", "=", record["id"]), ("model", "=", model._name), ], limit=1, ) if xmlid: _id = xmlid.res_id context.idmap[(model._name, record["id"])] = _id elif not logged: logged = True _logger.debug( "Got %s(%d[%d]) from mappings", model._name, _id, record["id"], ) if not _id and create_dummy: _id = self._run_import_create_dummy( context, model, record, forcecreate=record["id"] and record["id"] not in self.status_data["ids"].get(model._name, []), ) elif _id and not logged: _logger.debug( "Got %s(%d[%d]) from xmlid", model._name, _id, record["id"], ) return _id def _run_import_get_record_mapping( self, context, model, record, create_dummy=True, ): current_field = self.env["ir.model.fields"].search( [ ("name", "=", context.field_context.field_name), ("model_id.model", "=", context.field_context.record_model), ] ) mappings = self.import_field_mappings.filtered( lambda x: x.mapping_type == "fixed" and x.model_id.model == model._name and (not x.field_ids or current_field in x.field_ids) and x.local_id and (x.remote_id == record["id"] or not x.remote_id) or x.mapping_type == "by_field" and x.model_id.model == model._name ) _id = None for mapping in mappings: if mapping.mapping_type == "fixed": assert mapping.local_id _id = mapping.local_id context.idmap[(model._name, record["id"])] = _id break elif mapping.mapping_type == "by_field": assert mapping.field_ids if len(record) == 1: # just the id of a record we haven't seen yet. # read the whole record from remote to check if # this can be mapped to an existing record record = ( context.remote.execute( model._name, "read", record["id"], mapping.field_ids.mapped("name"), ) or None ) if not record: continue if isinstance(record, list): record = record[0] domain = [ (field.name, "=", record.get(field.name)) for field in mapping.field_ids if record.get(field.name) ] if len(domain) < len(mapping.field_ids): # play it save, only use mapping if we really select # something specific continue records = model.with_context(active_test=False).search( domain, limit=1, ) if records: _id = records.id context.idmap[(model._name, record["id"])] = _id break else: raise exceptions.UserError(_("Unknown mapping")) return _id def _run_import_create_dummy( self, context, model, record, forcecreate=False, ): """Either misuse some existing record or create an empty one to satisfy required links""" dummy = model.search( [ ( "id", "not in", [ v for (model_name, remote_id), v in context.dummies.items() if model_name == model._name ] + [ mapping.local_id for mapping in self.import_field_mappings if mapping.model_id.model == model._name and mapping.local_id ], ), ], limit=1, ) if dummy and not forcecreate: context.dummies[mapping_key(model._name, record["id"])] = dummy.id context.dummy_instances.append( dummy_instance(*(context.field_context + (dummy.id,))) ) _logger.debug( "Using %d as dummy for %s(%d[%d]).%s[%d]", dummy.id, context.field_context.record_model, context.idmap.get(context.field_context.record_id, 0), context.field_context.record_id, context.field_context.field_name, record["id"], ) return dummy.id required = [name for name, field in model._fields.items() if field.required] defaults = model.default_get(required) values = {"id": record["id"]} for name, field in model._fields.items(): if name not in required or defaults.get(name): continue value = None if field.type in ["char", "text", "html"]: value = "/" elif field.type in ["boolean"]: value = False elif field.type in ["integer", "float"]: value = 0 elif model._fields[name].type in ["date", "datetime"]: value = "2000-01-01" elif field.type in ["many2one"]: if name in model._inherits.values(): continue new_context = context.with_field_context( model._name, name, record["id"] ) value = self._run_import_get_record( new_context, self.env[model._fields[name].comodel_name], {"id": record.get(name, [None])[0]}, ) elif field.type in ["selection"] and not callable(field.selection): value = field.selection[0][0] elif field.type in ["selection"] and callable(field.selection): value = field.selection(model)[0][0] values[name] = value dummy = self._create_record(context, model, values) del context.idmap[mapping_key(model._name, record["id"])] context.dummies[mapping_key(model._name, record["id"])] = dummy.id context.to_delete.setdefault(model._name, []) context.to_delete[model._name].append(dummy.id) context.dummy_instances.append( dummy_instance(*(context.field_context + (dummy.id,))) ) _logger.debug( "Created %d as dummy for %s(%d[%d]).%s[%d]", dummy.id, context.field_context.record_model, context.idmap.get(context.field_context.record_id, 0), context.field_context.record_id or 0, context.field_context.field_name, record["id"], ) return dummy.id def _run_import_map_values(self, context, data): model = self.env[context.model_line.model_id.model] for field_name in list(data.keys()): if ( not isinstance(model._fields[field_name], fields._Relational) or not data[field_name] ): continue if model._fields[field_name].type == "one2many": # don't import one2many fields, use an own configuration # for this data.pop(field_name) continue ids = ( data[field_name] if (model._fields[field_name].type != "many2one") else [data[field_name][0]] ) new_context = context.with_field_context( model._name, field_name, data["id"] ) comodel = self.env[model._fields[field_name].comodel_name] data[field_name] = [ self._run_import_get_record( new_context, comodel, {"id": _id}, create_dummy=model._fields[field_name].required or any( m.model_id.model == comodel._name for m in self.import_line_ids ), ) for _id in ids ] data[field_name] = list(filter(None, data[field_name])) if model._fields[field_name].type == "many2one": if data[field_name]: data[field_name] = data[field_name] and data[field_name][0] else: data[field_name] = None else: data[field_name] = [(6, 0, data[field_name])] for mapping in self.import_field_mappings: if mapping.model_id.model != model._name: continue if mapping.mapping_type == "unique": for field in mapping.field_ids: value = data.get(field.name, "") counter = 1 while model.with_context(active_test=False).search( [(field.name, "=", data.get(field.name, value))] ): data[field.name] = "%s (%d)" % (value, counter) counter += 1 elif mapping.mapping_type == "by_reference": res_model = data.get(mapping.model_field_id.name) res_id = data.get(mapping.id_field_id.name) update = { mapping.model_field_id.name: None, mapping.id_field_id.name: None, } if res_model in self.env.registry and res_id: new_context = context.with_field_context( model._name, res_id, data["id"] ) record_id = self._run_import_get_record( new_context, self.env[res_model], {"id": res_id}, create_dummy=False, ) if record_id: update.update( { mapping.model_field_id.name: res_model, mapping.id_field_id.name: record_id, } ) data.update(update) return data def _run_import_model_get_fields(self, context): local_fields = { name: field for name, field in self.env[ context.model_line.model_id.model ]._fields.items() if not field.compute or field.inverse or not field.readonly } model_name = context.model_line.model_id.model remote_fields = context.remote.execute(model_name, "fields_get") extra = context.model_line.extra_fields extra_fields = ( [] if not extra else list(map(lambda a: a.strip(), extra.split(","))) ) return list( set(local_fields.keys()).intersection(set(remote_fields.keys())) | set(extra_fields) ) def _run_import_model_cleanup_dummies(self, context, model, remote_id, local_id): if not (model._name, remote_id) in context.dummies: return for instance in context.dummy_instances: key = mapping_key(instance.model_name, instance.remote_id) if key not in context.idmap: continue dummy_id = context.dummies[(model._name, remote_id)] record_model = self.env[instance.model_name] comodel = record_model._fields[instance.field_name].comodel_name if comodel != model._name or instance.dummy_id != dummy_id: continue record = record_model.browse(context.idmap[key]) field_name = instance.field_name _logger.debug( "Replacing dummy %d on %s(%d).%s with %d", dummy_id, record_model._name, record.id, field_name, local_id, ) if record._fields[field_name].type == "many2one": record.write({field_name: local_id}) elif record._fields[field_name].type == "many2many": record.write({field_name: [(3, dummy_id), (4, local_id)]}) else: raise exceptions.UserError( _("Unhandled field type %s") % record._fields[field_name].type ) context.dummy_instances.remove(instance) if dummy_id in context.to_delete: model.browse(dummy_id).unlink() _logger.debug("Deleting dummy %d", dummy_id) if (model._name, remote_id) in context.dummies: del context.dummies[(model._name, remote_id)] def _get_connection(self): self.ensure_one() url = urllib.parse.urlparse(self.url) hostport = url.netloc.split(":") if len(hostport) == 1: hostport.append("443" if self.ssl else "80") host, port = hostport if not odoorpc: # pragma: no cover raise exceptions.UserError( _('Please install the "odoorpc" libary in your environment') ) opener = None if self.ssl and self.ssl_bypass: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE opener = urllib.request.build_opener( urllib.request.HTTPSHandler(context=ctx) ) remote = odoorpc.ODOO( host, protocol="jsonrpc+ssl" if self.ssl else "jsonrpc", port=int(port), opener=opener, ) remote.login(self.database, self.user, self.password) return remote @api.constrains("url", "database", "user", "password") def _constrain_url(self): for this in self: if this == self.env.ref("base_import_odoo.demodb", False): continue if tools.config["test_enable"]: continue if not this.password: continue this._get_connection() @api.depends("status_data") def _compute_status_html(self): template_ref = "base_import_odoo.view_import_odoo_database_qweb" for this in self: if not this.status_data: status = False else: status = self.env["ir.qweb"]._render(template_ref, {"object": this}) this.status_html = status @api.depends("cronjob_id") def _compute_cronjob_running(self): for this in self: this.cronjob_running = False if not this.cronjob_id: continue try: with self.env.cr.savepoint(): self.env.cr.execute( 'select id from "%s" where id=%%s for update nowait' % self.env["ir.cron"]._table, (this.cronjob_id.id,), log_exceptions=False, ) except psycopg2.OperationalError: this.cronjob_running = True def _create_cronjob(self): self.ensure_one() return self.env["ir.cron"].create( { "name": self.display_name, "model_id": self.env["ir.model"] .search([("model", "=", self._name)]) .id, "code": "model._run_import_cron({})".format(self.ids), "doall": True, } ) def name_get(self): return [ (this.id, "{}@{}, {}".format(this.user, this.url, this.database)) for this in self ]