From 65a6d96f273c9890f3a91cfdc8622529f9efa545 Mon Sep 17 00:00:00 2001 From: emanuel buzey <emanuel.buzey@somit.coop> Date: Fri, 8 Mar 2024 22:15:51 +0100 Subject: [PATCH] [IMP] energy_selfconsumption: Optimizing the CUPS import process and handling queued jobs This commit improves the CUPS data import process, I divide the work into small groups to improve memory management and prevent timeouts in Odoo workers. Changes made: - Splits CUPS data into groups of 20 for asynchronous processing. - Fix CUPS ID handling to ensure coefficients are assigned correctly. - Add detailed logs to facilitate problem diagnosis. --- .../distribution_table_import_wizard.py | 97 ++++++++++++------- 1 file changed, 61 insertions(+), 36 deletions(-) diff --git a/energy_selfconsumption/wizards/distribution_table_import_wizard.py b/energy_selfconsumption/wizards/distribution_table_import_wizard.py index eeca1d7c3..f3b841cec 100644 --- a/energy_selfconsumption/wizards/distribution_table_import_wizard.py +++ b/energy_selfconsumption/wizards/distribution_table_import_wizard.py @@ -56,23 +56,32 @@ class DistributionTableImportWizard(models.TransientModel): if type == 'variable_schedule': # For 'variable_schedule', queue job - self.with_delay().import_all_lines(parsing_data, active_id) - message = 'La importación de la programación variable se está procesando en segundo plano.' + header, data_rows = parsing_data[0], parsing_data[1:] + cups_groups = self._divide_in_groups(data_rows, 20) # Divide parsing_data in groups to optimized the memory use + for group in cups_groups: + # Add header before to process + group_with_header = [header] + group + self.with_delay().process_variable_schedule(group_with_header, active_id) + message = 'The variable schedule import is being processed in the background.' else: # For other type, syncron type - self.import_all_lines(parsing_data, active_id) - message = 'La importación ha finalizado.' + self.process_variable_schedule(parsing_data, active_id) + message = 'The import is complete.' return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { - 'title': _('Importación Iniciada'), + 'title': _('Import Started'), 'message': message, 'sticky': False, } } + def _divide_in_groups(self, data, group_size): + """Divide the data into groups to process them separately.""" + return [data[i:i + group_size] for i in range(0, len(data), group_size)] + def download_template_button(self): type = self.env.context.get("type") if type == "fixed": @@ -115,9 +124,8 @@ class DistributionTableImportWizard(models.TransientModel): raise UserError(_("Error parsing the file")) def import_all_lines(self, data, distribution_table): - logger.debug('Inicio de la importación para la Distribution Table con ID: %s', distribution_table) + logger.debug('Starting import for Distribution Table with ID: %s', distribution_table) type = self.env['energy_selfconsumption.distribution_table'].browse(distribution_table).type - logger.debug('El valor de type es: %s', type) if type == 'fixed': supply_point_assignation_values_list = [] @@ -130,41 +138,58 @@ class DistributionTableImportWizard(models.TransientModel): {"supply_point_assignation_ids": supply_point_assignation_values_list} ) elif type == 'variable_schedule': - logger.debug('Procesando importación tipo "variable_schedule"') - DistributionTableVariable = self.env['energy_selfconsumption.distribution_table_variable'] - DistributionTableVariableCoefficient = self.env['energy_selfconsumption.distribution_table_var_coeff'] - cups_ids = data[0][1:] # Get CUPS - logger.debug('CUPS IDs encontrados: %s', cups_ids) - hours_data = data[1:] # Get hours and coefficients - - for cups_index, cups_id in enumerate(cups_ids, start=1): - # Create or find the record in DistributionTableVariable for the current CUPS - logger.debug('Procesando CUPS ID: %s', cups_id) - # Check if record exist - variable_record = DistributionTableVariable.search([ - ('distribution_table_id', '=', distribution_table), - ('cups_id', '=', cups_id), - ], limit=1) - - # If not exist, create + logger.debug('Processing import type "variable_schedule"') + self.process_variable_schedule(data, distribution_table) + + + def process_variable_schedule(self, data, distribution_table): + DistributionTableVariable = self.env['energy_selfconsumption.distribution_table_variable'] + DistributionTableVariableCoefficient = self.env['energy_selfconsumption.distribution_table_var_coeff'] + cups_ids = data[0][1:] # Get CUPS from header row + logger.debug('CUPS IDs found: %s', cups_ids) + hours_data = data[1:] # The rest of the data contains hours and coefficients + coefficients_batch = [] + batch_size = 50 + + # Pre-search existing records to avoid repeated searches + existing_variables = DistributionTableVariable.search([ + ('cups_id', 'in', cups_ids), + ('distribution_table_id', '=', distribution_table) + ]) + existing_cups_map = {var.cups_id: var for var in existing_variables} + logger.debug('Preloaded %d existing records from DistributionTableVariable', len(existing_variables)) + + for row_index, row in enumerate(hours_data, start=1): + hour = int(row[0]) + logger.debug('Processing data for time: %d', hour) + for cups_index, coefficient in enumerate(row[1:], start=1): + cups_id = cups_ids[cups_index - 1] + variable_record = existing_cups_map.get(cups_id) if not variable_record: variable_record = DistributionTableVariable.create({ 'distribution_table_id': distribution_table, 'cups_id': cups_id, }) + existing_cups_map[cups_id] = variable_record + logger.debug('Created new DistributionTableVariable record for CUPS ID: %s', cups_id) + + coefficients_batch.append({ + 'distribution_table_variable_id': variable_record.id, + 'hour': hour, + 'coefficient': float(coefficient), + }) + + if len(coefficients_batch) >= batch_size: + DistributionTableVariableCoefficient.create(coefficients_batch) + logger.debug('Batch processing of %d coefficients for time %d', len(coefficients_batch), hour) + coefficients_batch.clear() + + if coefficients_batch: + DistributionTableVariableCoefficient.create(coefficients_batch) + logger.debug('Processed last batch of %d coefficients', len(coefficients_batch)) + + logger.debug('Completing the import process for the Distribution Table with ID: %s', distribution_table) - # Proccess every hour and coefficient for this CUPS - for row in hours_data: - hour = int(row[0]) - coefficient = float(row[cups_index]) # Get the coefficient for the current hour and CUPS - logger.debug('Añadiendo coeficiente %s para la hora %s del CUPS ID: %s', coefficient, hour, cups_id) - # Create the coefficient record - DistributionTableVariableCoefficient.create({ - 'distribution_table_variable_id': variable_record.id, - 'hour': hour, - 'coefficient': coefficient, - }) - logger.debug('Finalización de la importación para la Distribution Table con ID: %s', distribution_table) def get_supply_point_assignation_values(self, line): return { -- GitLab