server-tools/excel_import_export/models/xlsx_import.py

302 lines
12 KiB
Python

# Copyright 2019 Ecosoft Co., Ltd (http://ecosoft.co.th/)
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html)
import base64
import uuid
from ast import literal_eval
from datetime import date, datetime as dt
from io import BytesIO
import xlrd
import xlwt
from odoo import _, api, models
from odoo.exceptions import ValidationError
from odoo.tools.float_utils import float_compare
from odoo.tools.safe_eval import safe_eval
from . import common as co
class XLSXImport(models.AbstractModel):
_name = "xlsx.import"
_description = "Excel Import AbstractModel"
@api.model
def get_eval_context(self, model=False, value=False):
eval_context = {
"float_compare": float_compare,
"datetime": dt,
"date": date,
"env": self.env,
"context": self._context,
"value": False,
"model": False,
}
if model:
eval_context.update({"model": self.env[model]})
if value:
if isinstance(value, str): # Remove non Ord 128 character
value = "".join([i if ord(i) < 128 else " " for i in value])
eval_context.update({"value": value})
return eval_context
@api.model
def get_external_id(self, record):
"""Get external ID of the record, if not already exists create one"""
ModelData = self.env["ir.model.data"]
xml_id = record.get_external_id()
if not xml_id or (record.id in xml_id and xml_id[record.id] == ""):
ModelData.create(
{
"name": "{}_{}".format(record._table, record.id),
"module": "excel_import_export",
"model": record._name,
"res_id": record.id,
}
)
xml_id = record.get_external_id()
return xml_id[record.id]
@api.model
def _get_field_type(self, model, field):
try:
record = self.env[model].new()
for f in field.split("/"):
field_type = record._fields[f].type
if field_type in ("one2many", "many2many"):
record = record[f]
else:
return field_type
except Exception:
raise ValidationError(
_("Invalid declaration, %s has no valid field type") % field
)
@api.model
def _delete_record_data(self, record, data_dict):
"""If no _NODEL_, delete existing lines before importing"""
if not record or not data_dict:
return
try:
for sheet_name in data_dict:
worksheet = data_dict[sheet_name]
line_fields = filter(lambda x: x != "_HEAD_", worksheet)
for line_field in line_fields:
if "_NODEL_" not in line_field:
if line_field in record and record[line_field]:
record[line_field].unlink()
# Remove _NODEL_ from dict
for s, _sv in data_dict.copy().items():
for f, _fv in data_dict[s].copy().items():
if "_NODEL_" in f:
new_fv = data_dict[s].pop(f)
data_dict[s][f.replace("_NODEL_", "")] = new_fv
except Exception as e:
raise ValidationError(_("Error deleting data\n%s") % e)
@api.model
def _get_end_row(self, st, worksheet, line_field):
"""Get max row or next empty row as the ending row"""
_x, max_row = co.get_line_max(line_field)
test_rows = {}
max_end_row = 0
for rc, _col in worksheet.get(line_field, {}).items():
rc, key_eval_cond = co.get_field_condition(rc)
row, col = co.pos2idx(rc)
# Use max_row, i.e., order_line[5], use it. Otherwise, use st.nrows
max_end_row = st.nrows if max_row is False else (row + max_row)
for idx in range(row, max_row and max_end_row or st.nrows):
cell_type = st.cell_type(idx, col) # empty type = 0
r_types = test_rows.get(idx, [])
r_types.append(cell_type)
test_rows[idx] = r_types
empty_list = filter(lambda y: all(i == 0 for i in y[1]), test_rows.items())
empty_rows = list(map(lambda z: z[0], empty_list))
next_empty_row = empty_rows and min(empty_rows) or max_end_row
return next_empty_row
@api.model
def _get_line_vals(self, st, worksheet, model, line_field):
"""Get values of this field from excel sheet"""
vals = {}
end_row = self._get_end_row(st, worksheet, line_field)
for rc, columns in worksheet.get(line_field, {}).items():
if not isinstance(columns, list):
columns = [columns]
for field in columns:
rc, key_eval_cond = co.get_field_condition(rc)
x_field, val_eval_cond = co.get_field_condition(field)
row, col = co.pos2idx(rc)
new_line_field, _x = co.get_line_max(line_field)
out_field = "{}/{}".format(new_line_field, x_field)
field_type = self._get_field_type(model, out_field)
vals.update({out_field: []})
for idx in range(row, end_row):
value = co._get_cell_value(st.cell(idx, col), field_type=field_type)
eval_context = self.get_eval_context(model=model, value=value)
if key_eval_cond:
value = safe_eval(key_eval_cond, eval_context)
if val_eval_cond:
value = safe_eval(val_eval_cond, eval_context)
vals[out_field].append(value)
if not filter(lambda x: x != "", vals[out_field]):
vals.pop(out_field)
return vals
@api.model
def _process_worksheet(self, wb, out_wb, out_st, model, data_dict, header_fields):
col_idx = 1
for sheet_name in data_dict: # For each Sheet
worksheet = data_dict[sheet_name]
st = False
if isinstance(sheet_name, str):
st = co.xlrd_get_sheet_by_name(wb, sheet_name)
elif isinstance(sheet_name, int):
st = wb.sheet_by_index(sheet_name - 1)
if not st:
raise ValidationError(_("Sheet %s not found") % sheet_name)
# HEAD updates
for rc, field in worksheet.get("_HEAD_", {}).items():
rc, key_eval_cond = co.get_field_condition(rc)
field, val_eval_cond = co.get_field_condition(field)
field_type = self._get_field_type(model, field)
value = False
try:
row, col = co.pos2idx(rc)
value = co._get_cell_value(st.cell(row, col), field_type=field_type)
except Exception:
pass
eval_context = self.get_eval_context(model=model, value=value)
if key_eval_cond:
value = str(safe_eval(key_eval_cond, eval_context))
if val_eval_cond:
value = str(safe_eval(val_eval_cond, eval_context))
out_st.write(0, col_idx, field) # Next Column
out_st.write(1, col_idx, value) # Next Value
header_fields.append(field)
col_idx += 1
# Line Items
line_fields = filter(lambda x: x != "_HEAD_", worksheet)
for line_field in line_fields:
vals = self._get_line_vals(st, worksheet, model, line_field)
for field in vals:
# Columns, i.e., line_ids/field_id
out_st.write(0, col_idx, field)
header_fields.append(field)
# Data
i = 1
for value in vals[field]:
out_st.write(i, col_idx, value)
i += 1
col_idx += 1
@api.model
def _import_record_data(self, import_file, record, data_dict):
"""From complex excel, create temp simple excel and do import"""
if not data_dict:
return
try:
header_fields = []
model = record._name
decoded_data = base64.decodebytes(import_file)
wb = xlrd.open_workbook(file_contents=decoded_data)
out_wb = xlwt.Workbook()
out_st = out_wb.add_sheet("Sheet 1")
xml_id = (
record
and self.get_external_id(record)
or "{}.{}".format("xls", uuid.uuid4())
)
out_st.write(0, 0, "id") # id and xml_id on first column
out_st.write(1, 0, xml_id)
header_fields.append("id")
# Process on all worksheets
self._process_worksheet(wb, out_wb, out_st, model, data_dict, header_fields)
# --
content = BytesIO()
out_wb.save(content)
content.seek(0) # Set index to 0, and start reading
xls_file = content.read()
# Do the import
Import = self.env["base_import.import"]
imp = Import.create(
{
"res_model": model,
"file": xls_file,
"file_type": "application/vnd.ms-excel",
"file_name": "temp.xls",
}
)
errors = imp.do(
header_fields,
header_fields,
{
"headers": True,
"advanced": True,
"keep_matches": False,
"encoding": "",
"separator": "",
"quoting": '"',
"date_format": "%Y-%m-%d",
"datetime_format": "%Y-%m-%d %H:%M:%S",
"float_thousand_separator": ",",
"float_decimal_separator": ".",
"fields": [],
},
)
if errors.get("messages"):
message = _("Error importing data")
messages = errors["messages"]
if isinstance(messages, dict):
message = messages["message"]
if isinstance(messages, list):
message = ", ".join([x["message"] for x in messages])
raise ValidationError(message.encode("utf-8"))
return self.env.ref(xml_id)
except xlrd.XLRDError:
raise ValidationError(
_("Invalid file style, only .xls or .xlsx file allowed")
)
except Exception as e:
raise e
@api.model
def _post_import_operation(self, record, operation):
"""Run python code after import"""
if not record or not operation:
return
try:
if "${" in operation:
code = (operation.split("${"))[1].split("}")[0]
eval_context = {"object": record}
safe_eval(code, eval_context)
except Exception as e:
raise ValidationError(_("Post import operation error\n%s") % e)
@api.model
def import_xlsx(self, import_file, template, res_model=False, res_id=False):
"""
- If res_id = False, we want to create new document first
- Delete fields' data according to data_dict['__IMPORT__']
- Import data from excel according to data_dict['__IMPORT__']
"""
self = self.sudo()
if res_model and template.res_model != res_model:
raise ValidationError(_("Template's model mismatch"))
record = self.env[template.res_model].browse(res_id)
data_dict = literal_eval(template.instruction.strip())
if not data_dict.get("__IMPORT__"):
raise ValidationError(
_("No data_dict['__IMPORT__'] in template %s") % template.name
)
if record:
# Delete existing data first
self._delete_record_data(record, data_dict["__IMPORT__"])
# Fill up record with data from excel sheets
record = self._import_record_data(import_file, record, data_dict["__IMPORT__"])
# Post Import Operation, i.e., cleanup some data
if data_dict.get("__POST_IMPORT__", False):
self._post_import_operation(record, data_dict["__POST_IMPORT__"])
return record