-
emanuel buzey authoredemanuel buzey authored
selfconsumption_import_wizard.py 8.76 KiB
import base64
import logging
from csv import reader
from datetime import datetime
from io import StringIO
import chardet
from odoo import _, api, fields, models
from odoo.exceptions import UserError, ValidationError
logger = logging.getLogger(__name__)
class SelfconsumptionImportWizard(models.TransientModel):
_name = "energy_selfconsumption.selfconsumption_import.wizard"
import_file = fields.Binary(string="Import File (*.csv)")
fname = fields.Char(string="File Name")
delimiter = fields.Char(
default=",",
required=True,
string="File Delimiter",
help="Delimiter in import CSV file.",
)
quotechar = fields.Char(
default='"',
required=True,
string="File Quotechar",
help="Quotechar in import CSV file.",
)
encoding = fields.Char(
default="utf-8",
required=True,
string="File Encoding",
help="Enconding format in import CSV file.",
)
date_format = fields.Char(
default="%d/%m/%Y",
required=True,
string="Date Format",
help="Date format for effective date.",
)
@api.constrains("import_file")
def _constrains_import_file(self):
if self.fname:
format = str(self.fname.split(".")[1])
if format != "csv":
raise ValidationError(_("Only csv format files are accepted."))
def import_file_button(self):
self.flush()
error_string_list = ""
file_data = base64.b64decode(self.import_file)
parsing_data = self.with_context(active_id=self.ids[0])._parse_file(file_data)
active_id = self.env.context.get("active_id")
project = self.env["energy_selfconsumption.selfconsumption"].browse(active_id)
for index, line in enumerate(parsing_data[1:], start=2):
import_dict = self.get_line_dict(line)
result = self.import_line(import_dict, project)
if not result[0]:
error_string_list = "".join(
[
error_string_list,
_("<li>Line {line}: {error}</li>\n").format(
line=index, error=result[1]
),
]
)
if error_string_list:
project.message_post(
subject=_("Import Errors"),
body=_("Import errors found: <ul>{list}</ul>").format(
list=error_string_list
),
)
return True
def download_template_button(self):
distribution_table_example_attachment = self.env.ref(
"energy_selfconsumption.selfconsumption_table_example_attachment"
)
download_url = "/web/content/{}/?download=true".format(
str(distribution_table_example_attachment.id)
)
return {
"type": "ir.actions.act_url",
"url": download_url,
"target": "new",
}
def download_list_button(self):
list_state_attachment = self.env.ref(
"energy_selfconsumption.list_state_attachment"
)
download_url = "/web/content/{}/?download=true".format(
str(list_state_attachment.id)
)
return {
"type": "ir.actions.act_url",
"url": download_url,
"target": "new",
}
def get_line_dict(self, line):
return {
"partner_vat": line[0] or False,
"effective_date": line[1] or False,
"code": line[2] or False,
"street": line[3] or False,
"street2": line[4] or False,
"city": line[5] or False,
"state": line[6] or False,
"postal_code": line[7] or False,
"country": line[8] or False,
"owner_vat": line[9] or False,
"owner_firstname": line[10] or False,
"owner_lastname": line[11] or False,
}
def _parse_file(self, data_file):
self.ensure_one()
try:
csv_options = {}
csv_options["delimiter"] = self.delimiter
csv_options["quotechar"] = self.quotechar
try:
decoded_file = data_file.decode(self.encoding)
except UnicodeDecodeError:
detected_encoding = chardet.detect(data_file).get("encoding", False)
if not detected_encoding:
raise UserError(
_("No valid encoding was found for the attached file")
)
decoded_file = data_file.decode(detected_encoding)
csv = reader(StringIO(decoded_file), **csv_options)
return list(csv)
except BaseException:
logger.warning("Parser error", exc_info=True)
raise UserError(_("Error parsing the file"))
def import_line(self, line_dict, project):
partner = self.env["res.partner"].search(
[
"|",
("vat", "=", line_dict["partner_vat"]),
("vat", "=ilike", line_dict["partner_vat"]),
],
limit=1,
)
if not partner:
return False, _("Partner with VAT:<b>{vat}</b> was not found.").format(
vat=line_dict["partner_vat"]
)
if not project.inscription_ids.filtered_domain(
[("partner_id", "=", partner.id)]
):
try:
if line_dict["effective_date"]:
effective_date = datetime.strptime(
line_dict["effective_date"], self.date_format
).date()
else:
effective_date = fields.date.today()
self.env["energy_project.inscription"].create(
{
"project_id": project.id,
"partner_id": partner.id,
"effective_date": effective_date,
}
)
except Exception as e:
return False, _(
"Could not create inscription for {vat}. {error}"
).format(vat=line_dict["partner_vat"], error=e)
supply_point = self.env["energy_selfconsumption.supply_point"].search(
[("code", "=", line_dict["code"])]
)
if supply_point and supply_point.partner_id != partner:
return False, _(
"The supply point partner {supply_partner} and the partner {vat} in the inscription are different."
).format(supply_partner=supply_point.partner_id.vat, vat=partner.vat)
if not supply_point:
result = self.create_supply_point(line_dict, partner)
if not result[0]:
return result
return True, False
def create_supply_point(self, line_dict, partner):
if line_dict["owner_vat"]:
owner = self.env["res.partner"].search(
[
"|",
("vat", "=", line_dict["owner_vat"]),
("vat", "=ilike", line_dict["owner_vat"]),
],
limit=1,
)
if not owner:
try:
owner = self.env["res.partner"].create(
{
"vat": line_dict["owner_vat"],
"firstname": line_dict["owner_firstname"],
"lastname": line_dict["owner_lastname"],
"company_type": "person",
}
)
except Exception as e:
return False, _("Owner could not be created: {error}").format(
error=e
)
else:
owner = partner
country = self.env["res.country"].search([("code", "=", line_dict["country"])])
if not country:
return False, _("Country code was not found: {country}").format(
country=line_dict["country"]
)
state = self.env["res.country.state"].search(
[("code", "=", line_dict["state"]), ("country_id", "=", country.id)]
)
if not state:
return False, _("State code was not found: {state}").format(
state=line_dict["state"]
)
return self.env["energy_selfconsumption.supply_point"].create(
{
"code": line_dict["code"],
"name": line_dict["street"],
"street": line_dict["street"],
"street2": line_dict["street2"],
"city": line_dict["city"],
"zip": line_dict["postal_code"],
"state_id": state.id,
"country_id": country.id,
"owner_id": owner.id,
"partner_id": partner.id,
}
)