# coding=utf-8 from xml.etree import ElementTree from xml.dom.minidom import parse import os.path from simso.core.Task import TaskInfo, task_types from simso.core.Processor import ProcInfo from simso.core.Caches import Cache_LRU from simso.core.Scheduler import SchedulerInfo convert_function = { 'int': int, 'float': float, 'bool': bool, 'str': str } class Parser(object): """ Simulation file parser. """ def __init__(self, filename): self.filename = filename self.cur_dir = os.path.split(filename)[0] if not self.cur_dir: self.cur_dir = '.' self._dom = parse(filename) self._parse_etm() self._parse_duration() self._parse_cycles_per_ms() self._parse_caches() self._parse_tasks() self._parse_processors() self._parse_scheduler() self._parse_penalty() self._parse_faults() def _parse_faults(self): # TODO: implement parser for faults self.fault_info_list = [] self.fault_data_fields = {} def _parse_caches(self): self.caches_list = [] caches_element = self._dom.getElementsByTagName('caches')[0] caches = caches_element.getElementsByTagName('cache') attr = caches_element.attributes self.memory_access_time = 100 if 'memory_access_time' in attr: self.memory_access_time = int(attr['memory_access_time'].value) for cache in caches: attr = cache.attributes if attr['policy'].value == 'LRU' and attr['type'].value == 'data': access_time = 1 associativity = int(attr['size'].value) if 'access_time' in attr: access_time = int(attr['access_time'].value) if 'associativity' in attr: associativity = int(attr['associativity'].value) cache = Cache_LRU(attr['name'].value, int(attr['id'].value), int(attr['size'].value), associativity, access_time) self.caches_list.append(cache) # TODO Généraliser aux autres types de cache. def _parse_tasks(self): tasks_el = self._dom.getElementsByTagName('tasks')[0] self.task_data_fields = {} for field in tasks_el.getElementsByTagName('field'): attr = field.attributes self.task_data_fields[attr['name'].value] = attr['type'].value tasks = tasks_el.getElementsByTagName('task') self.task_info_list = [] for task in tasks: attr = task.attributes data = dict( (k, convert_function[self.task_data_fields[k]](attr[k].value)) for k in attr.keys() if k in self.task_data_fields) task_type = 'Periodic' if 'task_type' in attr and attr['task_type'].value in task_types: task_type = attr['task_type'].value elif 'periodic' in attr and attr['periodic'].value == 'no': task_type = 'APeriodic' list_activation_dates = [] if ('list_activation_dates' in attr and attr['list_activation_dates'].value != ''): list_activation_dates = sorted( map(float, attr['list_activation_dates'].value.split(','))) t = TaskInfo( name=attr['name'].value, identifier=int(attr['id'].value), task_type=task_type, abort_on_miss='abort_on_miss' not in attr or attr['abort_on_miss'].value == 'yes', period=float(attr['period'].value), activation_date=float(attr['activationDate'].value) if 'activationDate' in attr else 0, n_instr=int(attr['instructions'].value), mix=float(attr['mix'].value), stack_file=(self.cur_dir + '/' + attr['stack'].value, self.cur_dir) if 'stack' in attr else ("", self.cur_dir), wcet=float(attr['WCET'].value), acet=float(attr['ACET'].value) if 'ACET' in attr else 0, pwcet=float(attr['PWCET'].value) if 'PWCET' in attr else 0, et_stddev=float( attr['et_stddev'].value) if 'et_stddev' in attr else 0, deadline=float(attr['deadline'].value), base_cpi=float(attr['base_cpi'].value), followed_by=int(attr['followed_by'].value) if 'followed_by' in attr else None, list_activation_dates=list_activation_dates, preemption_cost=int(float(attr['preemption_cost'].value)) if 'preemption_cost' in attr else 0, data=data) self.task_info_list.append(t) def _parse_processors(self): processors_el = self._dom.getElementsByTagName('processors')[0] processors = self._dom.getElementsByTagName('processors')[0] attr = processors.attributes migration_overhead = 0 if 'migration_overhead' in attr: migration_overhead = int(attr['migration_overhead'].value) self.proc_data_fields = {} for field in processors_el.getElementsByTagName('field'): attr = field.attributes self.proc_data_fields[attr['name'].value] = attr['type'].value cpus = processors.getElementsByTagName('processor') self.proc_info_list = [] for cpu in cpus: attr = cpu.attributes data = dict( (k, convert_function[self.proc_data_fields[k]](attr[k].value)) for k in attr.keys() if k in self.proc_data_fields) cl_overhead = 0 cs_overhead = 0 if 'cl_overhead' in attr: cl_overhead = int(float(attr['cl_overhead'].value)) if 'cs_overhead' in attr: cs_overhead = int(float(attr['cs_overhead'].value)) speed = 1.0 if 'speed' in attr: speed = float(attr['speed'].value) proc = ProcInfo(name=attr['name'].value, identifier=int(attr['id'].value), cs_overhead=cs_overhead, cl_overhead=cl_overhead, migration_overhead=migration_overhead, speed=speed, data=data) caches = cpu.getElementsByTagName('cache') for cache_element in caches: attr = cache_element.attributes for cache in self.caches_list: if cache.identifier == int(attr['ref'].value): proc.add_cache(cache) self.proc_info_list.append(proc) def _parse_etm(self): simulation = self._dom.getElementsByTagName('simulation')[0] if 'etm' in simulation.attributes: self.etm = simulation.attributes['etm'].value else: use_wcet = True if 'use_wcet' in simulation.attributes: use_wcet = (simulation.attributes['use_wcet'].value in ('true', 'yes')) if use_wcet: self.etm = "wcet" else: self.etm = "cache" def _parse_duration(self): simulation = self._dom.getElementsByTagName('simulation')[0] if 'duration' in simulation.attributes: self.duration = int(simulation.attributes['duration'].value) else: self.duration = 50000 def _parse_penalty(self): simulation = self._dom.getElementsByTagName('simulation')[0] if 'penalty_preemption' in simulation.attributes: self.penalty_preemption = int( simulation.attributes['penalty_preemption'].value) else: self.penalty_preemption = 100000 if 'penalty_migration' in simulation.attributes: self.penalty_migration = int( simulation.attributes['penalty_migration'].value) else: self.penalty_migration = 100000 def _parse_cycles_per_ms(self): simulation = self._dom.getElementsByTagName('simulation')[0] if 'cycles_per_ms' in simulation.attributes: self.cycles_per_ms = int( simulation.attributes['cycles_per_ms'].value) else: self.cycles_per_ms = 1000000 def _parse_scheduler(self): overhead = 0 overhead_activate = 0 overhead_terminate = 0 sched = self._dom.getElementsByTagName('sched')[0] attr = sched.attributes if 'class' in attr: clas = attr['class'].value else: clas = '' if 'className' in attr: filename = attr['className'].value else: filename = '' if 'overhead' in attr: overhead = int(float(attr['overhead'].value)) if 'overhead_activate' in attr: overhead_activate = int(float(attr['overhead_activate'].value)) if 'overhead_terminate' in attr: overhead_terminate = int(float(attr['overhead_terminate'].value)) data = {} fields = sched.getElementsByTagName('field') for field in fields: name = field.attributes['name'].value type_ = field.attributes['type'].value value = field.attributes['value'].value data[name] = (convert_function[type_](value), type_) self.scheduler_info = SchedulerInfo( clas=clas, overhead=overhead, overhead_activate=overhead_activate, overhead_terminate=overhead_terminate, fields=data) if filename and filename[0] != '/': filename = self.cur_dir + '/' + filename self.scheduler_info.filename = filename class AmaltheaModelParser: def __init__(self, filename): self.filename = filename self.cur_dir = os.path.split(filename)[0] if not self.cur_dir: self.cur_dir = '.' tree = ElementTree.parse(filename) self.root = tree.getroot() self._parse_processors() self._parse_scheduler() # Tasks have to be parsed after processors due to allocation strategies self._parse_tasks() def _parse_tasks(self): self.task_info_list = [] self.task_mappings = {} identifier = 0 for tag in self.root.findall('swModel/tasks'): name = tag.attrib['name'] process = f"{tag.attrib['name']}?type=Task" activations = [] stimuli = self.root.find(f"stimuliModel/stimuli/[@name='{tag.attrib['stimuli'].split('?')[0]}']") if stimuli.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'] == 'am:PeriodicStimulus': task_type = "Periodic" period = stimuli.find(f"[@name='{tag.attrib['stimuli'].split('?')[0]}']/recurrence") if period is not None: unit = period.attrib['unit'] period = int(period.attrib['value']) if unit == 's': period *= 1000 elif unit == 'us': period /=1000 elif unit == 'ns': period /= 1000000 elif unit == 'ps': period /= 1000000000 elif stimuli.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'] == 'am:ArrivalCurveStimulus': task_type = 'Sporadic' activations= stimuli.find(f"[@name='{tag.attrib['stimuli'].split('?')[0]}']/entries") # TODO: find example else: continue deadline = self.root.find(f"constraintsModel/requirements/[@process='{process}']/limit/limitValue") if deadline is not None: deadline = int(deadline.attrib['value']) else: deadline = period priority = self.root.find(f"mappingModel/taskAllocation/[@task='{process}']/schedulingParameters") if priority is not None: priority = int(priority.attrib['priority']) affinities = next(iter(self.root.findall(f"mappingModel/taskAllocation/[@task='{process}']"))).attrib['affinity'] affinities = affinities.split(' ') affinity_list = [] for i in affinities: proc = next((p for p in self.proc_info_list if i.split('?')[0] in p.name), None) affinity_list.append(proc) wcet = 0 acet = 0 for r in tag.findall(f"./activityGraph/items/items/[@{{http://www.w3.org/2001/XMLSchema-instance}}type='am:RunnableCall']"): frequency_domain_list = set([p.name.split('_')[1] for p in affinity_list]) processor = affinity_list[0] if len(frequency_domain_list) > 1: processor = min(affinity_list, key=lambda x: x.speed) pname = processor.name.split('_')[1]+ '?type=ProcessingUnitDefinition' runnable= r.attrib['runnable'].split('?')[0] runnable = self.root.find(f"swModel/runnables/[@name='{runnable}']/activityGraph/items/[@{{http://www.w3.org/2001/XMLSchema-instance}}type='am:Ticks']/extended/[@key='{pname}']/value") if runnable is not None: if runnable.attrib['{http://www.w3.org/2001/XMLSchema-instance}type']== 'am:DiscreteValueConstant': wcet += float(runnable.attrib['value']) acet += float(runnable.attrib['value']) elif runnable.attrib['{http://www.w3.org/2001/XMLSchema-instance}type']== 'am:DiscreteValueStatistics': wcet += float(runnable.attrib['upperBound']) acet += float(runnable.attrib['average']) # wcet[ms] = Ticks (=wcet before) / frequency[Hz] * 1000 [s -> ms] wcet = wcet/processor.data['speed'] * 1000 acet = acet/processor.data['speed'] * 1000 task = TaskInfo(name=name, identifier=identifier, task_type=task_type, abort_on_miss=True, period=period, activation_date=0, n_instr = 0, mix =0, stack_file=("", self.cur_dir), wcet = wcet, acet= acet, pwcet=None, et_stddev = 0, deadline=deadline, base_cpi=0, followed_by=None, list_activation_dates=activations, preemption_cost=0, data = {'priority': priority}) identifier += 1 self.task_info_list.append(task) self.task_mappings[task] = affinity_list def _parse_scheduler(self): scheduler = self.root.find("osModel/operatingSystems/taskSchedulers/schedulingAlgorithm") clas = "simso.schedulers.RM" if scheduler is not None: scheduler = scheduler.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'] match scheduler: case "am:FixedPriorityPreemptive": clas = "simso.schedulers.C_FP" case "am:EarliestDeadlineFirst": clas = "simso.schedulers.EDF" case "am:PfairPD2": clas = "simso.schedulers.PD2" case "am:EarlyReleaseFairPD2": clas = "simso.schedulers.ER_PD2" case _: pass self.scheduler_info = SchedulerInfo( clas=clas, overhead=0, overhead_activate=0, overhead_terminate=0, fields=None) def _parse_processors(self): self.proc_info_list = [] num_proc = 0 for tag in self.root.findall("hwModel/definitions/[@puType='CPU']"): for core in self.root.findall(f"hwModel/structures/structures/modules/[@definition='{tag.attrib['name']}?type=ProcessingUnitDefinition']"): f_domain = self.root.find(f"hwModel/domains/[@name='{core.attrib['frequencyDomain'].split('?')[0]}']/defaultValue") unit = f_domain.attrib['unit'] speed = float(f_domain.attrib['value']) if unit == 'GHz': speed *= 1000000000 elif unit == 'MHz': speed *= 1000000 elif unit == 'KHz': speed *= 1000 proc = ProcInfo(name= core.attrib['name']+ "_" + tag.attrib['name'], identifier=num_proc, speed=speed, data={'speed':speed}) self.proc_info_list.append(proc) num_proc+=1 # adjust speed to range in 0 - 1 max_speed = max(p.speed for p in self.proc_info_list) for p in self.proc_info_list: p.speed = p.speed / max_speed