# coding=utf-8 import contextlib import os from collections import namedtuple from ppms import ppms from ppms.constants import * from ppms.dimmer import Dimmer from ppms.global_setting import get_global_setting_value from ppms.module_subclasses.base_class import Base from ppms.text_constant import get_text_constant from ppms.user_interface import ask_for_confirmation_via_dialog, ask_user_for_confirmation, ButtonList from ppms.rights import user_has_access_to_module from .constants import SystemSource from .datazip_modifier import modify_datazip_content, import_datazip from .schema import modify_schema_file_add_suffix from .conflict import ConflictManager from .update import UpdateManager __all__ = ['DatazipModule', 'ModificationsModule', 'ApplicatorMappingModule'] ImportParameters = namedtuple('ImportParameters', ['path', 'source']) MSG_ID_COMPLETED = '1279' DEVELOPER_MODULE = '009D1D' class DatazipModule(Base): def on_load(self): self.menu(MENU_FILTER) # TODO: This is overly complicated and a relict of when the module had multiple inputs @property def parameters(self): records = self.get_da('import_parameters').get_records() record = records[0] path = record.db_update_path.get_raw_value() if not os.path.isfile(path): absolute_path = os.path.abspath(path) text = get_text_constant('002199', default='Invalid file path: "{absolute_path}"') raise ValueError(text.format(absolute_path=absolute_path)) source = SystemSource.REFERENCE return ImportParameters(path=path, source=source) def open_or_refresh_modification_module(self): modification_module_id = '009D1D' for panel in ppms.get_panels(): for module in panel.get_modules(): if module.get_id() == modification_module_id: module.focus() module.menu(MENU_RESET) return module return self.open_module(modification_module_id) def generate_icou(self, applied_dfc, clicked_df): conflict_manager = ConflictManager() if conflict_manager.conflicts_exist() and not ask_for_confirmation_via_dialog('1299'): return manager = UpdateManager() missing_tables = manager.get_missing_tables_necessary_for_icou() if missing_tables: text = get_text_constant("002217", default="""The following tables are needed for finding the individual modifications and don't exist: {tables} Continue anyway?""") text = text.format(tables='"' + '"\n"'.join(missing_tables) + '"') if not ask_user_for_confirmation(message_text=text, button_enum=ButtonList.YES_NO): return user = ppms.uvar_get(SYS_VAR_LOGGED_IN_USER) debug = False if user_has_access_to_module(user=user, module=DEVELOPER_MODULE): debug = ask_for_confirmation_via_dialog('1304') venus_on_earth = ask_for_confirmation_via_dialog('1305') manager.generate_icou_data(dont_wipe=debug, venus_on_earth=venus_on_earth) def import_object(self, applied_dfc, clicked_df): """Import a data.zip/schema file based on the settings in the module""" parameters = self.parameters reset_module = False if parameters.path.endswith('.zip'): path_to_datazip = parameters.path suffix = parameters.source.get_suffix() output_path = parameters.source.get_temporary_name() self._import_datazip(path_to_datazip=path_to_datazip, suffix=suffix, output_path=output_path) reset_module = True for filepath in [path_to_datazip, output_path]: with contextlib.suppress(OSError): os.remove(filepath) elif parameters.path.endswith('.sql'): schema_file = parameters.path suffix = parameters.source.get_suffix() self._import_schema(schema_file=schema_file, suffix=suffix) with contextlib.suppress(OSError): os.remove(schema_file) else: text = get_text_constant('002200', default='Unknown file type. Valid types: *.zip and *.sql') raise ValueError(text) self.clear_statusbar() ppms.ui_message_id(MSG_ID_COMPLETED) if reset_module: self.menu(MENU_RESET) def _apply_statements(self, statements, ignore_errors=False): for statement in statements: try: ppms.db_modify(statement) except: if not ignore_errors: ppms.ui_message_box('Error while executing statement', statement) raise def _import_datazip(self, path_to_datazip, suffix, output_path): text = get_text_constant('002204', default='Customizing deployment is being prepared') with Dimmer(text=text) as dimmer: modify_datazip_content(path_to_datazip=path_to_datazip, suffix=suffix, output_path=output_path) dimmer.text = get_text_constant('002205', default='Customizing deployment is being applied') import_datazip(output_path) self.clear_statusbar() def _import_schema(self, schema_file, suffix): text = get_text_constant('002202', default='Schema file is being prepared') with Dimmer(text=text) as dimmer: drop_statements, create_statements = modify_schema_file_add_suffix(schema_file=schema_file, suffix=suffix) dimmer.text = get_text_constant('002203', default='Schema file is being applied') self._apply_statements(drop_statements, ignore_errors=True) self._apply_statements(create_statements, ignore_errors=False) self.clear_statusbar() class ModificationsModule(Base): def compare_reference_with_customer(self, applied_dfc, clicked_df): module = get_global_setting_value('compare_reference_with_customer', 'alpha120') self._compare(module, clicked_df) def compare_customer_with_future(self, applied_dfc, clicked_df): module = get_global_setting_value('compare_customer_with_future', 'alpha120') self._compare(module, clicked_df) def get_docked_module(self, module_id): """Return the docked module or None if it isn't docked yet""" panel = self.get_panel() for module in panel.get_modules(): if module.get_id() == module_id: return module return None def _compare(self, module, datafield): record = datafield.get_record() record_id = record.uuid.get_raw_value() self.set_new_L_var(4, [record_id]) comparison_module = self.get_docked_module(module_id=module) if comparison_module: comparison_module.menu(MENU_RESET) else: comparison_module = self.open_module(module, forced_status=MODULE_FORCED_STATUS_SUB, dock_to_module=DOCK_TO_MODULE_PANEL, dock_style=DOCK_STYLE_BOTTOM, dock_proportion=0.4) comparison_module.menu(MENU_FILTER) comparison_module.focus() return comparison_module def on_initial_focus(self): self.menu(MENU_FILTER) def on_reset(self): self.on_initial_focus() class ApplicatorMappingModule(Base): def on_initial_focus(self): self.menu(MENU_FILTER) def on_reset(self): self.on_initial_focus()