diff --git a/schedulers/BF.py b/schedulers/BF.py new file mode 100644 index 0000000..17ef504 --- /dev/null +++ b/schedulers/BF.py @@ -0,0 +1,193 @@ +""" +Implementation of the BF algorithm. +""" +from simso.core import Scheduler, Timer + + +class BF(Scheduler): + def init(self): + self.t_f = 0 + self.waiting_schedule = False + self.mirroring = False + self.allocations = [] + self.timers = {} + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def on_activate(self, job): + self.reschedule() + + def alpha_plus_one(self, job): + deadlines = sorted([x.job.absolute_deadline for x in self.task_list]) + bk2 = deadlines[1] + bk1 = deadlines[0] + ui = job.wcet / job.deadline + val = bk2 * ui - int(bk1 * ui) - bk2 + bk1 + if val == 0: + return 0 + if val > 0: + return 1 + return -1 + + def uf_plus_one(self, job): + bk1 = min([x.job.absolute_deadline for x in self.task_list]) + ui = job.wcet / job.deadline + return (1. - (bk1 * ui - int(bk1 * ui))) / ui + + def nuf_plus_one(self, job): + bk1 = min([x.job.absolute_deadline for x in self.task_list]) + ui = 1 - (job.wcet / job.deadline) + return (1. - (bk1 * ui - int(bk1 * ui))) / ui + + def compare(self, job_i, job_j): + ai = self.alpha_plus_one(job_i) + aj = self.alpha_plus_one(job_j) + if ai > aj: + return -1 + elif ai < aj: + return 1 + elif ai == 0: + return -1 + elif ai == -1: + if self.uf_plus_one(job_i) > self.uf_plus_one(job_j): + return 1 + else: + return -1 + else: + if self.nuf_plus_one(job_i) >= self.nuf_plus_one(job_j): + return -1 + else: + return 1 + + def init_interval(self): + """ + Determine the end of the interval and compute allocation of the jobs + to the processors using the McNaughton algorithm. + """ + self.allocations = [[0, []] for _ in self.processors] + self.t_f = int(min([x.job.absolute_deadline for x in self.task_list]) + * self.sim.cycles_per_ms) + # Duration that can be allocated for each processor. + w = int(self.t_f - self.sim.now()) + p = 0 # Processor id. + mand = {} + available = w * len(self.processors) + eligible = [] + + for task in self.task_list: + job = task.job + if not job.is_active(): + continue + + fluid = (self.sim.now() - job.activation_date * + self.sim.cycles_per_ms) * job.wcet / job.period + lag = fluid - job.computation_time_cycles + mand[task.identifier] = max(0, + int(lag + w * job.wcet / job.period)) + available -= mand[task.identifier] + if mand[task.identifier] < w: + eligible.append(task) + + while available > 0 and eligible: + job_m = None + for task in eligible: + if job_m is None or self.compare(task.job, job_m) == -1: + job_m = task.job + mand[job_m.task.identifier] += 1 + eligible.remove(job_m.task) + available -= 1 + + for task in self.task_list: + # The "fair" duration for this job on that interval. Rounded to the + # upper integer to avoid durations that are not multiples of + # cycles. + job = task.job + if not job.is_active(): + continue + duration = mand[task.identifier] + if self.allocations[p][0] + duration <= w: + self.allocations[p][1].append((job, duration)) + self.allocations[p][0] += duration + else: + # Add first part to the end of the current processor p: + duration1 = w - self.allocations[p][0] + if duration1 > 0: + self.allocations[p][1].append((job, duration1)) + self.allocations[p][0] = w + + if p + 1 < len(self.processors): + # Add the second part: + duration2 = duration - duration1 + self.allocations[p + 1][1].append((job, duration2)) + self.allocations[p + 1][0] += duration2 + else: + # Because every durations are rounded to the upper value, + # the last job may have not enough space left. + # This could probably be improved. + print("Warning: didn't allowed enough time to last task.", + duration - duration1) + break + + p += 1 + + for allocation in self.allocations: + if allocation[0] < w: + allocation[1].append((None, w - allocation[0])) + + if self.mirroring: + for allocation in self.allocations: + # Rerverse the order of the jobs. + # Note: swapping the first and last items should be enough. + allocation[1].reverse() + self.mirroring = not self.mirroring + + def end_event(self, z, job): + """ + Called when a job's budget has expired. + """ + del self.timers[job] + l = self.allocations[z][1] + if l and l[0][0] is job: + l.pop(0) + self.reschedule(self.processors[z]) + + def schedule(self, cpu): + """ + Schedule main method. + """ + self.waiting_schedule = False + # At the end of the interval: + if self.sim.now() >= self.t_f: + self.init_interval() + + # Stop current timers. + for job, timer in self.timers.items(): + timer.stop() + self.timers = {} + + # Set timers to stop the jobs that will run. + for z, proc in enumerate(self.processors): + l = self.allocations[z][1] + if l and l[0][0] not in self.timers: + timer = Timer(self.sim, BF.end_event, + (self, z, l[0][0]), l[0][1], + cpu=proc, in_ms=False) + timer.start() + self.timers[l[0][0]] = timer + + # Schedule the activated tasks on each processor. + decisions = [] + for z, proc in enumerate(self.processors): + l = self.allocations[z][1] + if not l[0][0] or l[0][0].is_active(): + decisions.append((l[0][0] if l else None, proc)) + + return decisions diff --git a/schedulers/CC_EDF.py b/schedulers/CC_EDF.py new file mode 100644 index 0000000..86ef30b --- /dev/null +++ b/schedulers/CC_EDF.py @@ -0,0 +1,41 @@ +""" +Cycle-Conserving EDF. A DVFS variant of EDF (uniprocessor). +""" +from simso.core import Scheduler + + +class CC_EDF(Scheduler): + def init(self): + self.ready_list = [] + + self.ui = {} + for task in self.task_list: + self.ui[task] = float(task.wcet) / task.period + + self.adjust_speed() + + def adjust_speed(self): + # Compute processor speed + utilization = min(1.0, sum(self.ui.values())) + self.processors[0].set_speed(utilization) + + def on_activate(self, job): + self.ui[job.task] = job.wcet / job.period + self.ready_list.append(job) + self.adjust_speed() + job.cpu.resched() + + def on_terminated(self, job): + self.ui[job.task] = job.actual_computation_time / job.period + self.ready_list.remove(job) + self.adjust_speed() + job.cpu.resched() + + def schedule(self, cpu): + if self.ready_list: + # job with the highest priority + job = min(self.ready_list, key=lambda x: x.absolute_deadline) + else: + job = None + + return (job, cpu) diff --git a/schedulers/DP_WRAP.py b/schedulers/DP_WRAP.py new file mode 100644 index 0000000..0660f19 --- /dev/null +++ b/schedulers/DP_WRAP.py @@ -0,0 +1,126 @@ +""" +Implementation of the DP-WRAP algorithm as presented by Levin et al. in +"DP-FAIR: A Simple Model for Understanding Optimal Multiprocessor Scheduling". +""" +from simso.core import Scheduler, Timer +from math import ceil + + +class DP_WRAP(Scheduler): + def init(self): + self.t_f = 0 + self.waiting_schedule = False + self.mirroring = False + self.allocations = [] + self.timers = {} + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def on_activate(self, job): + self.reschedule() + + def init_interval(self): + """ + Determine the end of the interval and compute allocation of the jobs + to the processors using the McNaughton algorithm. + """ + self.allocations = [[0, []] for _ in self.processors] + self.t_f = ceil(min([x.job.absolute_deadline for x in self.task_list] + ) * self.sim.cycles_per_ms) + # Duration that can be allocated for each processor. + w = int(self.t_f - self.sim.now()) + p = 0 # Processor id. + for task in self.task_list: + job = task.job + if not job.is_active(): + continue + # The "fair" duration for this job on that interval. Rounded to the + # upper integer to avoid durations that are not multiples of + # cycles. + duration = ceil(w * job.wcet / job.period) + if self.allocations[p][0] + duration <= w: + self.allocations[p][1].append((job, duration)) + self.allocations[p][0] += duration + else: + # Add first part to the end of the current processor p: + duration1 = w - self.allocations[p][0] + if duration1 > 0: + self.allocations[p][1].append((job, duration1)) + self.allocations[p][0] = w + + if p + 1 < len(self.processors): + # Add the second part: + duration2 = duration - duration1 + self.allocations[p + 1][1].append((job, duration2)) + self.allocations[p + 1][0] += duration2 + else: + # Because every durations are rounded to the upper value, + # the last job may have not enough space left. + # This could probably be improved. + print("Warning: didn't allowed enough time to last task.", + duration - duration1) + break + + p += 1 + + for allocation in self.allocations: + if allocation[0] < w: + allocation[1].append((None, w - allocation[0])) + + if self.mirroring: + for allocation in self.allocations: + # Rerverse the order of the jobs. + # Note: swapping the first and last items should be enough. + allocation[1].reverse() + self.mirroring = not self.mirroring + + def end_event(self, z, job): + """ + Called when a job's budget has expired. + """ + del self.timers[job] + l = self.allocations[z][1] + if l and l[0][0] is job: + l.pop(0) + self.reschedule(self.processors[z]) + + def schedule(self, cpu): + """ + Schedule main method. + """ + self.waiting_schedule = False + # At the end of the interval: + if self.sim.now() >= self.t_f: + self.init_interval() + + # Stop current timers. + for job, timer in self.timers.items(): + timer.stop() + self.timers = {} + + # Set timers to stop the jobs that will run. + for z, proc in enumerate(self.processors): + l = self.allocations[z][1] + if l and l[0][0] not in self.timers: + timer = Timer(self.sim, DP_WRAP.end_event, + (self, z, l[0][0]), l[0][1], + cpu=proc, in_ms=False) + timer.start() + self.timers[l[0][0]] = timer + + # Schedule the activated tasks on each processor. + decisions = [] + for z, proc in enumerate(self.processors): + l = self.allocations[z][1] + if not l[0][0] or l[0][0].is_active(): + decisions.append((l[0][0] if l else None, proc)) + + return decisions diff --git a/schedulers/EDF.py b/schedulers/EDF.py new file mode 100644 index 0000000..5c7caf5 --- /dev/null +++ b/schedulers/EDF.py @@ -0,0 +1,45 @@ +""" +Implementation of the Global-EDF (Earliest Deadline First) for multiprocessor +architectures. +""" +from simso.core import Scheduler + + +class EDF(Scheduler): + """Earliest Deadline First""" + def init(self): + self.ready_list = [] + + def on_activate(self, job): + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + if job in self.ready_list: + self.ready_list.remove(job) + else: + job.cpu.resched() + + def schedule(self, cpu): + ready_jobs = [j for j in self.ready_list if j.is_active()] + + if ready_jobs: + # Key explanations: + # First the free processors + # Among the others, get the one with the greatest deadline + # If equal, take the one used to schedule + key = lambda x: ( + 1 if not x.running else 0, + x.running.absolute_deadline if x.running else 0, + 1 if x is cpu else 0 + ) + cpu_min = max(self.processors, key=key) + + job = min(ready_jobs, key=lambda x: x.absolute_deadline) + + if (cpu_min.running is None or + cpu_min.running.absolute_deadline > job.absolute_deadline): + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + return (job, cpu_min) diff --git a/schedulers/EDF2.py b/schedulers/EDF2.py new file mode 100644 index 0000000..6c0e267 --- /dev/null +++ b/schedulers/EDF2.py @@ -0,0 +1,56 @@ +""" +Implementation of the Global-EDF (Earliest Deadline First) for multiprocessor +architectures. +""" +from simso.core import Scheduler + + +class EDF2(Scheduler): + """Earliest Deadline First""" + def init(self): + self.running_jobs = [] + self.sched_to_do = False + + def on_activate(self, job): + self.resched(job.cpu) + + def on_terminated(self, job): + self.resched(job.cpu) + + def resched(self, cpu): + if not self.sched_to_do: + cpu.resched() + self.sched_to_do = True + + def schedule(self, cpu): + self.sched_to_do = False + decisions = [] + + ready_jobs = sorted( + [t.job for t in self.task_list if t.job.is_active()], + key=lambda x: x.absolute_deadline) + jobs = ready_jobs[:len(self.processors)] + + # Bind jobs to processors: + available_procs = list(self.processors) + was_not_running = [] + for job in jobs: + if job in self.running_jobs: + available_procs.remove(job.cpu) + else: + was_not_running.append(job) + + remaining_jobs = [] + for job in was_not_running: + if job.cpu in available_procs: + decisions.append((job, job.cpu)) + available_procs.remove(job.cpu) + else: + remaining_jobs.append(job) + + for p, job in enumerate(remaining_jobs): + decisions.append((job, available_procs[p])) + + self.running_jobs = jobs + + return decisions diff --git a/schedulers/EDF_US.py b/schedulers/EDF_US.py new file mode 100644 index 0000000..9cb5424 --- /dev/null +++ b/schedulers/EDF_US.py @@ -0,0 +1,46 @@ +""" +Implementation of EDF-US[1/2]. +""" +from simso.core import Scheduler + + +class EDF_US(Scheduler): + def init(self): + self.ready_list = [] + + def on_activate(self, job): + if job.wcet > job.period / 2: + job.priority = 0 + else: + job.priority = job.absolute_deadline + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + if job in self.ready_list: + self.ready_list.remove(job) + else: + job.cpu.resched() + + def schedule(self, cpu): + if self.ready_list: + # Key explanations: + # First the free processors + # Among the others, get the one with the greatest deadline + # If equal, take the one used to schedule + key = lambda x: ( + 1 if not x.running else 0, + x.running.priority if x.running else 0, + 1 if x is cpu else 0 + ) + cpu_min = max(self.processors, key=key) + + job = min([j for j in self.ready_list if j.is_active()], + key=lambda x: x.priority) + + if (cpu_min.running is None or + cpu_min.running.priority > job.priority): + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + return (job, cpu_min) diff --git a/schedulers/EDF_mono.py b/schedulers/EDF_mono.py new file mode 100644 index 0000000..d69fd1a --- /dev/null +++ b/schedulers/EDF_mono.py @@ -0,0 +1,26 @@ +""" +Earliest Deadline First algorithm for uniprocessor architectures. +""" +from simso.core import Scheduler + + +class EDF_mono(Scheduler): + def init(self): + self.ready_list = [] + + def on_activate(self, job): + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + self.ready_list.remove(job) + job.cpu.resched() + + def schedule(self, cpu): + if self.ready_list: + # job with the highest priority + job = min(self.ready_list, key=lambda x: x.absolute_deadline) + else: + job = None + + return (job, cpu) diff --git a/schedulers/EDHS.py b/schedulers/EDHS.py new file mode 100644 index 0000000..59e7505 --- /dev/null +++ b/schedulers/EDHS.py @@ -0,0 +1,145 @@ +""" +Implementation of the EDHS Scheduler. + +EDHS is a semi-partitionned scheduler proposed by Kato et al in +"Semi-Partitioning Technique for Multiprocessor Real-Time Scheduling". +""" +from simso.core import Scheduler, Timer +from simso.core.Scheduler import SchedulerInfo +from fractions import Fraction +from math import ceil + +migrating_tasks = {} + +# Mapping processor to scheduler. +map_cpu_sched = {} + + +class EDF_modified(Scheduler): + """ + An EDF mono-processor scheduler modified to accept migrating jobs. + A migrating job has an infinite priority. + """ + def init(self): + self.ready_list = [] + self.migrating_job = None + + def _resched(self): + self.processors[0].resched() + + def on_activate(self, job): + self.ready_list.append(job) + self._resched() + + def on_terminated(self, job): + if job is self.migrating_job: + self.migrating_job = None + elif job in self.ready_list: + self.ready_list.remove(job) + self._resched() + + def accept_migrating_job(self, i, job, budget): + self.migrating_job = job + job.task.cpu = self.processors[0] + + # Set timer for end. + self.timer = Timer(self.sim, EDF_modified.end_migrating_job, + (self, i), budget, cpu=self.processors[0], + in_ms=False) + self.timer.start() + + self._resched() + + def end_migrating_job(self, i): + self.processors[0].resched() + if self.migrating_job and i < len(migrating_tasks[self.migrating_job.task]) - 1: + ncpu, nbudget = migrating_tasks[self.migrating_job.task][i + 1] + sched = map_cpu_sched[ncpu] + sched.accept_migrating_job(i + 1, self.migrating_job, nbudget) + self.migrating_job = None + + def schedule(self, cpu): + if self.migrating_job: + job = self.migrating_job + elif self.ready_list: + job = min(self.ready_list, key=lambda x: x.absolute_deadline) + else: + job = None + + return (job, cpu) + + +class EDHS(Scheduler): + def init(self): + # Mapping task to scheduler. + self.map_task_sched = {} + + cpus = [] + for cpu in self.processors: + # Append the processor to a list with an initial utilization of 0. + cpus.append([cpu, Fraction(0)]) + + # Instantiate a scheduler. + sched = EDF_modified(self.sim, SchedulerInfo("EDF_modified", + EDF_modified)) + sched.add_processor(cpu) + sched.init() + + # Affect the scheduler to the processor. + map_cpu_sched[cpu] = sched + + # First Fit + for task in self.task_list: + j = 0 + # Find a processor with free space. + while cpus[j][1] + Fraction(task.wcet) / Fraction(task.period) > 1.0: + j += 1 + if j >= len(self.processors): + migrating_tasks[task] = [] + break + if j == len(self.processors): + continue + + # Get the scheduler for this processor. + sched = map_cpu_sched[cpus[j][0]] + + # Affect it to the task. + self.map_task_sched[task.identifier] = sched + sched.add_task(task) + + # Put the task on that processor. + task.cpu = cpus[j][0] + self.sim.logger.log("task " + task.name + " on " + task.cpu.name) + + # Update utilization. + cpus[j][1] += Fraction(task.wcet) / Fraction(task.period) + + for task, l in migrating_tasks.items(): + rem = Fraction(task.wcet) / Fraction(task.period) + for cpu, cpu_u in cpus: + if cpu_u < 1 and rem > 0: + u = min(rem, 1 - cpu_u) + l.append((cpu, ceil(u * task.period * self.sim.cycles_per_ms))) + rem -= u + + def get_lock(self): + # No lock mechanism is needed. + return True + + def schedule(self, cpu): + return map_cpu_sched[cpu].schedule(cpu) + + def on_activate(self, job): + try: + self.map_task_sched[job.task.identifier].on_activate(job) + except KeyError: + cpu, budget = migrating_tasks[job.task][0] + sched = map_cpu_sched[cpu] + sched.accept_migrating_job(0, job, budget) + + def on_terminated(self, job): + try: + self.map_task_sched[job.task.identifier].on_terminated(job) + except KeyError: + sched = map_cpu_sched[job.task.cpu] + sched.on_terminated(job) diff --git a/schedulers/EDZL.py b/schedulers/EDZL.py new file mode 100644 index 0000000..1423125 --- /dev/null +++ b/schedulers/EDZL.py @@ -0,0 +1,74 @@ +#!/usr/bin/python +# coding=utf-8 + +from simso.core import Scheduler, Timer + + +class EDZL(Scheduler): + """ + EDZL Scheduler. EDF Scheduler with zero laxity events. + """ + + def init(self): + self.ready_list = [] + self.zl_timer = None + + def on_activate(self, job): + job.priority = job.absolute_deadline + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + # ce test est peut-être utile en cas d'avortement de tâche. + if job in self.ready_list: + self.ready_list.remove(job) + if self.zl_timer and job == self.zl_timer[0]: + self.zl_timer[1].stop() + job.cpu.resched() + + def zero_laxity(self, job): + if job in self.ready_list: + job.priority = 0 + job.cpu.resched() + else: + print(self.sim.now(), job.name) + + def schedule(self, cpu): + """ + Basically a EDF scheduling but using a priority attribute. + """ + ready_jobs = [j for j in self.ready_list if j.is_active()] + if ready_jobs: + selected_job = None + + key = lambda x: ( + 1 if x.running else -1, + -x.running.priority if x.running else 0, + -1 if x is cpu else 1) + cpu_min = min(self.processors, key=key) + + job = min(ready_jobs, key=lambda x: x.priority) + if cpu_min.running is None or \ + cpu_min.running.priority > job.priority: + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + selected_job = (job, cpu_min) + + # Recherche du prochain event ZeroLaxity pour configurer le timer. + minimum = None + for job in self.ready_list: + zl_date = int((job.absolute_deadline - job.ret + ) * self.sim.cycles_per_ms - self.sim.now()) + if (minimum is None or minimum[0] > zl_date) and zl_date > 0: + minimum = (zl_date, job) + + if self.zl_timer: + self.zl_timer[1].stop() + if minimum: + self.zl_timer = (minimum[0], Timer( + self.sim, EDZL.zero_laxity, (self, minimum[1]), + minimum[0], cpu=cpu, in_ms=False)) + self.zl_timer[1].start() + + return selected_job diff --git a/schedulers/EKG.py b/schedulers/EKG.py new file mode 100644 index 0000000..0476424 --- /dev/null +++ b/schedulers/EKG.py @@ -0,0 +1,209 @@ +from simso.core import Scheduler, Timer +from simso.core.Scheduler import SchedulerInfo +from fractions import Fraction +from math import ceil + + +class Modified_EDF(Scheduler): + def init(self): + self.ready_list = [] + + self.migrating_task1 = None # sous la forme (task, rate) + self.migrating_task2 = None + self.migrating_job1 = None + self.migrating_job2 = None + + self.next_deadline = 0 + + def on_activate(self, job): + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + self.ready_list.remove(job) + job.cpu.resched() + + def conf(self, next_deadline): + if next_deadline > self.next_deadline: + self.next_deadline = next_deadline + self.migrating_task1, self.migrating_task2 = \ + self.migrating_task2, self.migrating_task1 + if self.migrating_task1: + time_a = ceil((next_deadline - self.sim.now()) + * self.migrating_task1[1]) + self.timer_a = Timer(self.sim, Modified_EDF.on_end_migrating1, + (self,), time_a, cpu=self.processors[0], + in_ms=False) + self.timer_a.start() + + self.migrating_job2 = None + if self.migrating_task2: + time_b = int((next_deadline - self.sim.now()) + * (1 - self.migrating_task2[1])) + self.timer_b = Timer( + self.sim, Modified_EDF.on_start_migrating2, (self,), + time_b, cpu=self.processors[0], in_ms=False) + self.timer_b.start() + self.processors[0].resched() + + if self.migrating_task1: + self.migrating_job1 = self.migrating_task1[0].job + self.processors[0].resched() + else: + self.migrating_job1 = None + + def on_end_migrating1(self): + self.migrating_job1 = None + self.processors[0].resched() + + def on_start_migrating2(self): + self.migrating_job2 = self.migrating_task2[0].job + self.processors[0].resched() + + def schedule(self, cpu): + if self.migrating_job1 and self.migrating_job1.is_active(): + return (self.migrating_job1, cpu) + if self.migrating_job2 and self.migrating_job2.is_active(): + return (self.migrating_job2, cpu) + + if self.ready_list: + job = min(self.ready_list, key=lambda x: x.absolute_deadline) + else: + job = None + + return (job, cpu) + + +class Group(object): + def __init__(self, sim): + self.tasks = [] + self.schedulers = [] + self.sim = sim + + def compute_next_deadline(self): + return min([task.jobs[-1].absolute_deadline + for task in self.tasks if task.jobs]) \ + * self.sim.cycles_per_ms + + +class EKG(Scheduler): + def init(self): + self.groups = [] + self.task_to_group = {} + try: + k = self.data['K'] + except KeyError: + k = len(self.processors) + m = len(self.processors) + + sep = Fraction(k) / Fraction(1 + k) if k < m else 1 + + light_tasks = [t for t in self.task_list if t.wcet < sep * t.period] + heavy_tasks = [t for t in self.task_list if t.wcet >= sep * t.period] + + # Mapping task to scheduler. + self.map_task_sched = {} + self.map_cpu_sched = {} + + cpus = [] + for i, cpu in enumerate(self.processors): + # Instantiate a scheduler. + sched = Modified_EDF(self.sim, SchedulerInfo("Modified_EDF", + Modified_EDF)) + sched.add_processor(cpu) + sched.init() + + # Append the processor to a list with an initial utilization of 0. + cpus.append([cpu, sched, Fraction(0)]) + + # Affect the scheduler to the processor. + self.map_cpu_sched[cpu.identifier] = sched + + # Affect to the correct group. + if i >= len(heavy_tasks): + if (i - len(heavy_tasks)) % k == 0: + group = Group(self.sim) + group.schedulers.append(sched) + self.groups.append(group) + else: + self.groups[-1].schedulers.append(sched) + + # Affect heavy tasks to individual processors. + p = 0 + for task in heavy_tasks: + cpu, sched, _ = cpus[p] + + # Affect the task to the processor. + self.map_task_sched[task.identifier] = sched + sched.add_task(task) + + # Put the task on that processor. + task.cpu = cpu + p += 1 + + self.task_to_group[task] = None + + # Custom Next Fit + for task in light_tasks: + g = (p - len(heavy_tasks)) // k + if cpus[p][2] + Fraction(task.wcet) / Fraction(task.period) <= 1.0: + cpu, sched, _ = cpus[p] + # Affect the task to the processors. + self.map_task_sched[task.identifier] = sched + sched.add_task(task) + + # Put the task on that processor. + task.cpu = cpu + + cpus[p][2] += Fraction(task.wcet) / Fraction(task.period) + + self.groups[g].tasks.append(task) + self.task_to_group[task] = self.groups[g] + + if cpus[p][2] == 1: + p += 1 + else: + if (p + 1 - len(heavy_tasks)) % k == 0: + cpu, sched, _ = cpus[p + 1] + # Affect the task to the processor. + self.map_task_sched[task.identifier] = sched + sched.add_task(task) + + # Put the task on that processor. + task.cpu = cpu + + cpus[p + 1][2] += \ + Fraction(task.wcet) / Fraction(task.period) + self.groups[g + 1].tasks.append(task) + self.task_to_group[task] = self.groups[g + 1] + else: + # Split in 2. + u1 = 1 - cpus[p][2] + u2 = Fraction(task.wcet) / Fraction(task.period) - u1 + cpus[p][1].migrating_task2 = (task, u1) + cpus[p + 1][1].migrating_task1 = (task, u2) + cpus[p + 1][2] = u2 + self.groups[g].tasks.append(task) + self.task_to_group[task] = self.groups[g] + + p += 1 + + def schedule(self, cpu): + return self.map_cpu_sched[cpu.identifier].schedule(cpu) + + def on_activate(self, job): + group = self.task_to_group[job.task] + if group: + nd = group.compute_next_deadline() + for sched in group.schedulers: + sched.conf(nd) + try: + self.map_task_sched[job.task.identifier].on_activate(job) + except KeyError: + job.cpu.resched() + + def on_terminated(self, job): + try: + self.map_task_sched[job.task.identifier].on_terminated(job) + except KeyError: + job.cpu.resched() diff --git a/schedulers/EPDF.py b/schedulers/EPDF.py new file mode 100644 index 0000000..d1531da --- /dev/null +++ b/schedulers/EPDF.py @@ -0,0 +1,89 @@ +# coding=utf-8 + +from simso.core import Scheduler, Timer +from math import ceil + + +class EPDF(Scheduler): + """Earliest Pseudo-Deadline First""" + + quantum = 1 # ms + + class PseudoJob(object): + def __init__(self, job, seq): + self.job = job + self.release_date = int((seq - 1) / (job.wcet / job.deadline)) + self.deadline = int(ceil(seq / (job.wcet / job.deadline))) + self.seq = seq + + def cmp_key(self): + return self.deadline * EPDF.quantum + self.job.activation_date + + def init(self): + self.ready_list = [] + self.pseudo_job = {} + self.timers = [] + + def pseudo_terminate(self, pseudo_job): + if self.pseudo_job[pseudo_job.job.cpu] == pseudo_job: + self.pseudo_job[pseudo_job.job.cpu] = None + pseudo_job.job.cpu.resched() + + def pseudo_activate(self, pseudo_job): + pseudo_job.job.cpu.resched() + self.ready_list.append(pseudo_job) + + def on_activate(self, job): + # First pseudo-activation + pseudo_job = EPDF.PseudoJob(job, 1) + + self.pseudo_activate(pseudo_job) + + # Set next pseudo activations : + while pseudo_job.seq * self.quantum < job.wcet: + pseudo_job = EPDF.PseudoJob(job, pseudo_job.seq + 1) + timer = Timer(self.sim, EPDF.pseudo_activate, (self, pseudo_job), + pseudo_job.release_date * self.quantum - + self.sim.now() / self.sim.cycles_per_ms + + job.activation_date, cpu=job.cpu, + in_ms=True) + timer.start() + self.timers.append(timer) + + def on_terminated(self, job): + self.ready_list = [x for x in self.ready_list if x.job is not job] + + def schedule(self, cpu): + if len(self.ready_list) > 0: + # Explication sur la key: + # En priorité, on met tous les processeurs libres au début. + # Ensuite, on trie tout par ordre décroissant de la deadline. + # Et on départage en préférant le processeur "cpu". + key = lambda x: ( + 1 if (not x.running) or (not self.pseudo_job[x]) else 0, + self.pseudo_job[x].cmp_key() if x.running and + self.pseudo_job[x] else None, + 1 if x is cpu else 0 + ) + cpu_min = max(self.processors, key=key) + + pjob = min(self.ready_list, key=lambda x: x.cmp_key()) + + if (cpu_min.running is None or + self.pseudo_job[cpu_min] is None or + self.pseudo_job[cpu_min].cmp_key() > pjob.cmp_key()): + self.ready_list.remove(pjob) + if cpu_min.running and self.pseudo_job[cpu_min]: + self.ready_list.append(self.pseudo_job[cpu_min]) + self.pseudo_job[cpu_min] = pjob + + timer = Timer( + self.sim, EPDF.pseudo_terminate, (self, pjob), + pjob.seq * self.quantum - pjob.job.computation_time, + cpu=cpu_min, in_ms=True) + timer.start() + self.timers.append(timer) + + return (pjob.job, cpu_min) + elif self.pseudo_job[cpu] is None: + return (None, cpu) diff --git a/schedulers/ER_PD2.py b/schedulers/ER_PD2.py new file mode 100644 index 0000000..15b94ca --- /dev/null +++ b/schedulers/ER_PD2.py @@ -0,0 +1,181 @@ +# coding=utf-8 + +from simso.core import Scheduler, Timer +from math import ceil + + +def rounded_wcet(job, q=None): + return rounded_wcet_cycles(job, q) / job.sim.cycles_per_ms + + +def rounded_wcet_cycles(job, q=None): + if q is None: + q = ER_PD2.quantum + wcet_cycles = job.wcet * job.sim.cycles_per_ms + if wcet_cycles % q: + return wcet_cycles + q - (wcet_cycles % q) + else: + return wcet_cycles + + +class VirtualJob(object): + """ + A Virtual Job contains the list of the pseudo jobs for an actual job. + """ + def __init__(self, job): + self.job = job + self.pseudo_jobs = [] + self.cur = 0 + seq = 0 + + # Create pseudo jobs: + while seq * ER_PD2.quantum <= job.wcet * job.sim.cycles_per_ms: + self.pseudo_jobs.append(PseudoJob(job, seq + 1)) + seq += 1 + + def get_next_job(self): + if self.cur < len(self.pseudo_jobs) - 1: + self.cur += 1 + job = self.pseudo_jobs[self.cur] + return job + + def get_current_job(self): + if self.cur < len(self.pseudo_jobs): + return self.pseudo_jobs[self.cur] + + +class PseudoJob(object): + def __init__(self, job, seq): + self.job = job + self.release_date = int(job.deadline * (seq - 1) / rounded_wcet(job) + ) * ER_PD2.quantum + self.deadline = int(ceil(job.deadline * seq / rounded_wcet(job)) + ) * ER_PD2.quantum + self.seq = seq + self.succ_bit = PseudoJob.succ_bit(job, seq) + if rounded_wcet(job) / job.deadline < 0.5: + self.group_deadline = 0 + else: + j = seq + 1 + while (j * ER_PD2.quantum <= rounded_wcet_cycles(job) and + PseudoJob.succ_bit(job, j - 1) == 1 and + PseudoJob.win_size(job, j) == 2): + # while the window size is 2 and succ_bit of the prev is 1 + j += 1 + self.group_deadline = int(ceil( + job.deadline * (j - 1) / rounded_wcet(job) + * job.sim.cycles_per_ms)) + + @property + def absolute_releasedate(self): + return self.release_date + \ + self.job.activation_date * self.job.sim.cycles_per_ms + + def cmp_key(self): + # Si le premier parametre est identique, il regarde le second, etc. + cycles_per_ms = self.job.sim.cycles_per_ms + return (self.deadline + self.job.activation_date * cycles_per_ms, + -self.succ_bit, + -(self.job.activation_date + self.group_deadline)) + + @staticmethod + def succ_bit(job, seq): + return int(ceil(seq * job.deadline / rounded_wcet(job))) \ + - int(seq * job.deadline / rounded_wcet(job)) + + @staticmethod + def win_size(job, seq): + return int(ceil(seq * job.deadline / rounded_wcet(job))) \ + - int((seq - 1) * job.deadline / rounded_wcet(job)) + + +class ER_PD2(Scheduler): + def init(self): + self.ready_list = [] + self.timers = [] + self.terminate_timers = [] + self.waiting_schedule = False + self.running_vjobs = [] + + ER_PD2.quantum = self.sim.cycles_per_ms // 10 # 0.1ms. + + #ER_PD2.quantum = 1000000 + #while not self.is_schedulable() and ER_PD2.quantum > 1000: + # ER_PD2.quantum /= 2 + + self.timer = Timer( + self.sim, ER_PD2.reschedule, (self, ), ER_PD2.quantum, + cpu=self.processors[0], in_ms=False, one_shot=False) + self.timer.start() + + def is_schedulable(self, q=None): + load = 0.0 + cycles_per_ms = self.sim.cycles_per_ms + for task in self.task_list: + wcet = rounded_wcet_cycles(task, q) + load += wcet / task.period + if wcet > task.period * cycles_per_ms \ + or load > len(self.processors) * cycles_per_ms: + return False + return True + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def virtual_terminate(self, virtual_job): + pjob = virtual_job.get_next_job() + if not pjob or not virtual_job.job.is_active(): + self.ready_list.remove(virtual_job) + + def on_activate(self, job): + virtual_job = VirtualJob(job) + self.ready_list.append(virtual_job) + + if self.sim.now() == 0: + self.reschedule() + + def schedule(self, cpu): + self.waiting_schedule = False + + decisions = [] + + for vjob in self.running_vjobs: + self.virtual_terminate(vjob) + + vjobs = [vjob for vjob in self.ready_list if vjob.job.is_active()] + + self.running_vjobs = sorted( + vjobs, + key=lambda x: x.get_current_job().cmp_key() + )[:len(self.processors)] + + selected_jobs = [vjob.job for vjob in self.running_vjobs] + remaining_jobs = selected_jobs[:] + available_procs = [] + + # Remove from the list of remaining jobs the jobs that already runs. + for proc in self.processors: + if proc.running in selected_jobs: + # This processor keeps running the same job. + remaining_jobs.remove(proc.running) + else: + # This processor is not running a selected job. + available_procs.append(proc) + + # Jobs not currently running + for vjob in self.running_vjobs: + if vjob.job in remaining_jobs: + decisions.append((vjob.job, available_procs.pop())) + + # Unused processors + for proc in available_procs: + decisions.append((None, proc)) + + return decisions diff --git a/schedulers/FP.py b/schedulers/FP.py new file mode 100644 index 0000000..d5670f0 --- /dev/null +++ b/schedulers/FP.py @@ -0,0 +1,39 @@ +#!/usr/bin/python +# coding=utf-8 + +from simso.core import Scheduler + + +class FP(Scheduler): + """ Fixed Priority (use 'priority' field) """ + def init(self): + self.ready_list = [] + + def on_activate(self, job): + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + job.cpu.resched() + + def schedule(self, cpu): + if self.ready_list: + # Get a free processor or a processor running a low priority job. + key = lambda x: ( + 1 if x.running else 0, + x.running.data['priority'] if x.running else 0, + 0 if x is cpu else 1 + ) + cpu_min = min(self.processors, key=key) + + # Get the job with the highest priority. + job = max(self.ready_list, key=lambda x: x.data['priority']) + + if (cpu_min.running is None or + cpu_min.running.data['priority'] < job.data['priority']): + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + return (job, cpu_min) + + return None diff --git a/schedulers/Fixed_PEDF.py b/schedulers/Fixed_PEDF.py new file mode 100644 index 0000000..a896531 --- /dev/null +++ b/schedulers/Fixed_PEDF.py @@ -0,0 +1,19 @@ +#!/usr/bin/python +# coding=utf-8 + +from simso.core.Scheduler import SchedulerInfo +from EDF_mono import EDF_mono +from simso.utils import PartitionedScheduler + + +class Fixed_PEDF(PartitionedScheduler): + def init(self): + PartitionedScheduler.init(self, SchedulerInfo("EDF_mono", EDF_mono)) + + def packer(self): + for task in self.task_list: + # Affect it to the task. + cpu = next(proc for proc in self.processors + if proc.identifier == task.data["cpu"]) + self.affect_task_to_processor(task, cpu) + return True diff --git a/schedulers/G_FL.py b/schedulers/G_FL.py new file mode 100644 index 0000000..e2d36c9 --- /dev/null +++ b/schedulers/G_FL.py @@ -0,0 +1,48 @@ +""" +Implementation of the Global-Fair Lateness as presented by Erickson and +Anderson in Fair lateness scheduling: Reducing maximum lateness in G-EDF-like +scheduling. +""" +from simso.core import Scheduler + + +class G_FL(Scheduler): + """Earliest Deadline First""" + def init(self): + self.ready_list = [] + + def on_activate(self, job): + job.priority = job.activation_date + job.deadline - \ + ((len(self.processors) - 1.0) / len(self.processors)) * job.wcet + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + if job in self.ready_list: + self.ready_list.remove(job) + else: + job.cpu.resched() + + def schedule(self, cpu): + ready_jobs = [j for j in self.ready_list if j.is_active()] + + if ready_jobs: + # Key explanations: + # First the free processors + # Among the others, get the one with the greatest deadline + # If equal, take the one used to schedule + key = lambda x: ( + 1 if not x.running else 0, + x.running.priority if x.running else 0, + 1 if x is cpu else 0 + ) + cpu_min = max(self.processors, key=key) + + job = min(ready_jobs, key=lambda x: x.priority) + + if (cpu_min.running is None or + cpu_min.running.priority > job.priority): + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + return (job, cpu_min) diff --git a/schedulers/G_FL_ZL.py b/schedulers/G_FL_ZL.py new file mode 100644 index 0000000..b6f35f7 --- /dev/null +++ b/schedulers/G_FL_ZL.py @@ -0,0 +1,69 @@ +from simso.core import Scheduler, Timer + + +class G_FL_ZL(Scheduler): + """ + G_FL with Zero Laxity Scheduler. + """ + + def init(self): + self.ready_list = [] + self.zl_timer = None + + def on_activate(self, job): + job.priority = job.activation_date + job.deadline - \ + ((len(self.processors) - 1.0) / len(self.processors)) * job.wcet + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + if job in self.ready_list: + self.ready_list.remove(job) + if self.zl_timer and job == self.zl_timer[0]: + self.zl_timer[1].stop() + job.cpu.resched() + + def zero_laxity(self, job): + if job in self.ready_list: + job.priority = 0 + job.cpu.resched() + else: + print(self.sim.now(), job.name) + + def schedule(self, cpu): + """ + Basically a EDF scheduling but using a priority attribute. + """ + if self.ready_list: + selected_job = None + + key = lambda x: ( + 1 if x.running else -1, + -x.running.priority if x.running else 0, + -1 if x is cpu else 1) + cpu_min = min(self.processors, key=key) + + job = min(self.ready_list, key=lambda x: x.priority) + if cpu_min.running is None or \ + cpu_min.running.priority > job.priority: + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + selected_job = (job, cpu_min) + + minimum = None + for job in self.ready_list: + zl_date = int((job.absolute_deadline - job.ret + ) * self.sim.cycles_per_ms - self.sim.now()) + if (minimum is None or minimum[0] > zl_date) and zl_date > 0: + minimum = (zl_date, job) + + if self.zl_timer: + self.zl_timer[1].stop() + if minimum: + self.zl_timer = (minimum[0], Timer( + self.sim, G_FL_ZL.zero_laxity, (self, minimum[1]), + minimum[0], cpu=cpu, in_ms=False)) + self.zl_timer[1].start() + + return selected_job diff --git a/schedulers/LB_P_EDF.py b/schedulers/LB_P_EDF.py new file mode 100644 index 0000000..171751b --- /dev/null +++ b/schedulers/LB_P_EDF.py @@ -0,0 +1,31 @@ +""" +Partitionned EDF using PartitionedScheduler. +Try to load balance the tasks among the processors. +""" +from simso.core.Scheduler import SchedulerInfo +from EDF_mono import EDF_mono +from simso.utils import PartitionedScheduler + + +class LB_P_EDF(PartitionedScheduler): + def init(self): + PartitionedScheduler.init(self, SchedulerInfo("EDF_mono", EDF_mono)) + + def packer(self): + # First Fit + cpus = [[cpu, 0] for cpu in self.processors] + for task in self.task_list: + m = cpus[0][1] + j = 0 + # Find the processor with the lowest load. + for i, c in enumerate(cpus): + if c[1] < m: + m = c[1] + j = i + + # Affect it to the task. + self.affect_task_to_processor(task, cpus[j][0]) + + # Update utilization. + cpus[j][1] += float(task.wcet) / task.period + return True diff --git a/schedulers/LLF.py b/schedulers/LLF.py new file mode 100644 index 0000000..740de5f --- /dev/null +++ b/schedulers/LLF.py @@ -0,0 +1,47 @@ +from simso.core import Scheduler, Timer + + +class LLF(Scheduler): + """Least Laxity First""" + def init(self): + self.ready_list = [] + self.timer = Timer(self.sim, LLF.compute_laxity, + (self, self.processors[0]), 1, one_shot=False, + cpu=self.processors[0], overhead=.01) + self.timer.start() + + def compute_laxity(self, cpu): + if self.ready_list: + for job in self.ready_list: + job.laxity = job.absolute_deadline - \ + (job.ret + self.sim.now_ms()) + cpu.resched() + + def on_activate(self, job): + self.ready_list.append(job) + self.compute_laxity(job.cpu) + + def on_terminated(self, job): + self.ready_list.remove(job) + self.compute_laxity(job.cpu) + + def schedule(self, cpu): + decisions = [] + if self.ready_list: + # Sort according to the laxity. + self.ready_list.sort(key=lambda x: (x.laxity, x.task.identifier)) + + # m : Nombre de processeurs. + m = len(self.processors) + + # Available processors: + l = (proc for proc in self.processors + if proc.running not in self.ready_list[:m]) + + # The first m jobs should be running: + for job in self.ready_list[:m]: + if not job.is_running(): + proc = next(l) + decisions.append((job, proc)) + + return decisions diff --git a/schedulers/LLREF.py b/schedulers/LLREF.py new file mode 100644 index 0000000..cb473ac --- /dev/null +++ b/schedulers/LLREF.py @@ -0,0 +1,121 @@ +""" +Implementation of the LLREF scheduler as presented by Cho et al. in +"An Optimal Real-Time Scheduling Algorithm for Multiprocessors". +""" + +from simso.core import Scheduler, Timer +from math import ceil + + +class LLREF(Scheduler): + def init(self): + self.selected_jobs = [] # Jobs currently running. + self.budget = {} # Budgets for the active jobs. + self.next_deadline = 0 + self.waiting_schedule = False + self.last_update = 0 # Used to update the budget. + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def on_activate(self, job): + """ + Deal with a job activation. The budget for this job is computed. + """ + # Compute budget for this newly activated job + window = self.next_deadline - self.sim.now() + self.budget[job] = window * job.wcet / job.period + + # Find the next absolute deadline among the ready jobs. + m_dl = min([rjob.absolute_deadline for rjob in self.budget.keys()]) \ + * self.sim.cycles_per_ms + + # Refill the budgets if we change the interval + if m_dl != self.next_deadline: + window = m_dl - self.next_deadline + self.next_deadline = m_dl + for j in self.budget.keys(): + self.budget[j] += window * j.wcet / j.period + + # There's a new job, the system should be rescheduled. + self.reschedule() + + def on_terminated(self, job): + if job in self.budget: + del self.budget[job] + + def update_budget(self): + """ + Remove budget from the currently executing jobs. + """ + time_since_last_update = self.sim.now() - self.last_update + for job in self.selected_jobs: + if job in self.budget: + if job.is_active(): + self.budget[job] -= time_since_last_update + else: + del self.budget[job] + self.last_update = self.sim.now() + + def date_next_event(self, selected, not_selected): + next_event = 0 + + if selected: + next_bottom_hitting = selected[-1][1] + next_event = next_bottom_hitting + + if not_selected: + next_ceiling_hitting = self.next_deadline - self.sim.now() \ + - not_selected[0][1] + if next_ceiling_hitting < next_bottom_hitting: + next_event = next_ceiling_hitting + + return next_event + + def schedule(self, cpu): + self.waiting_schedule = False + self.update_budget() + + # Sort the jobs by budgets. + sorted_budgets = sorted( + [(x, ceil(y)) for x, y in self.budget.items()], + key=lambda x: (-x[1], x[0].name)) + selected = sorted_budgets[:len(self.processors)] + not_selected = sorted_budgets[len(self.processors):] + + # Compute the (relative) date of the next event. + next_event = self.date_next_event(selected, not_selected) + if next_event > 0: + # Set a timer to reschedule the system at that date. + self.timer_a = Timer(self.sim, LLREF.reschedule, (self,), + next_event, cpu=cpu, in_ms=False) + self.timer_a.start() + + # Allocate the selected jobs to the processors. + # The processors already running selected jobs are not changed. + available_procs = [] + self.selected_jobs = [s[0] for s in selected if s[1] > 0] + remaining_jobs = self.selected_jobs[:] + for proc in self.processors: + if proc.running in self.selected_jobs: + # This processor keeps running the same job. + remaining_jobs.remove(proc.running) + else: + # This processor is not running a selected job. + available_procs.append(proc) + + # The remaining processors are running the remaining jobs or None. + padded_remaining_jobs = remaining_jobs + \ + [None] * (len(available_procs) - len(remaining_jobs)) + # zip create a list of couples (job, proc) using the list of remaining + # jobs and the list of available processors. + decisions = list(zip(padded_remaining_jobs, available_procs)) + + return decisions diff --git a/schedulers/LLREF2.py b/schedulers/LLREF2.py new file mode 100644 index 0000000..3f01db6 --- /dev/null +++ b/schedulers/LLREF2.py @@ -0,0 +1,130 @@ +""" +Implementation of the LLREF scheduler as presented by Cho et al. in +"An Optimal Real-Time Scheduling Algorithm for Multiprocessors". +""" + +from simso.core import Scheduler, Timer +from math import ceil + + +class LLREF2(Scheduler): + def init(self): + self.selected_jobs = [] # Jobs currently running. + self.budget = {} # Budgets for the active jobs. + self.next_deadline = 0 + self.waiting_schedule = False + self.last_update = 0 # Used to update the budget. + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def on_activate(self, job): + """ + Deal with a job activation. The budget for this job is computed. + """ + if job.wcet == 0: + return + + # Compute budget for this newly activated job + window = self.next_deadline - self.sim.now() + self.budget[job] = window * job.wcet / job.period + + # Find the next absolute deadline among the ready jobs. + m_dl = min([rjob.absolute_deadline for rjob in self.budget.keys()]) \ + * self.sim.cycles_per_ms + + # Refill the budgets if we change the interval + if m_dl != self.next_deadline: + window = m_dl - self.next_deadline + self.next_deadline = m_dl + for j in self.budget.keys(): + self.budget[j] += window * j.wcet / j.period + + # There's a new job, the system should be rescheduled. + self.reschedule() + + def on_terminated(self, job): + if job in self.budget: + del self.budget[job] + + def update_budget(self): + """ + Remove budget from the currently executing jobs. + """ + time_since_last_update = self.sim.now() - self.last_update + for job in self.selected_jobs: + if job in self.budget: + if job.is_active(): + self.budget[job] -= time_since_last_update + else: + del self.budget[job] + self.last_update = self.sim.now() + + def date_next_event(self, selected, not_selected): + next_event = None + + if selected: + next_bottom_hitting = min(ceil(y) for _, y in selected) + next_event = next_bottom_hitting + + if not_selected: + next_ceiling_hitting = self.next_deadline - self.sim.now() \ + - ceil(max(y for _, y in not_selected)) + if next_event is None or next_ceiling_hitting < next_event: + next_event = next_ceiling_hitting + + return next_event if next_event else 0 + + def select_jobs(self): + window = self.next_deadline - self.sim.now() + res = [(job, b) for job, b in self.budget.items() + if window <= ceil(b) and job.is_active()] + for job, b in sorted(self.budget.items(), key=lambda x: -x[1]): + if b > 0 and (job, b) not in res and job.is_active(): + res.append((job, b)) + + return (res[:len(self.processors)], res[len(self.processors):]) + + def schedule(self, cpu): + self.waiting_schedule = False + self.update_budget() + + # Sort the jobs by budgets. + selected, not_selected = self.select_jobs() + + # Compute the (relative) date of the next event. + next_event = self.date_next_event(selected, not_selected) + if next_event > 0: + # Set a timer to reschedule the system at that date. + self.timer_a = Timer(self.sim, LLREF2.reschedule, (self,), + next_event, cpu=cpu, in_ms=False) + self.timer_a.start() + + # Allocate the selected jobs to the processors. + # The processors already running selected jobs are not changed. + available_procs = [] + self.selected_jobs = [s[0] for s in selected] + remaining_jobs = self.selected_jobs[:] + for proc in self.processors: + if proc.running in self.selected_jobs: + # This processor keeps running the same job. + remaining_jobs.remove(proc.running) + else: + # This processor is not running a selected job. + available_procs.append(proc) + + # The remaining processors are running the remaining jobs or None. + padded_remaining_jobs = remaining_jobs + \ + [None] * (len(available_procs) - len(remaining_jobs)) + # zip create a list of couples (job, proc) using the list of remaining + # jobs and the list of available processors. + decisions = list(zip(padded_remaining_jobs, available_procs)) + + return decisions diff --git a/schedulers/LRE_TL.py b/schedulers/LRE_TL.py new file mode 100644 index 0000000..7f5c7c4 --- /dev/null +++ b/schedulers/LRE_TL.py @@ -0,0 +1,173 @@ +""" +Implementation of the LRE-TL scheduler as presented by S. Funk in "LRE-TL: An +Optimal Multiprocessor Scheduling Algorithm for Sporadic Task Sets". +""" + +from simso.core import Scheduler, Timer +from heapq import heappush, heapreplace, heappop, heapify +from math import ceil + + +class LRE_TL(Scheduler): + def init(self): + """ + Initialization of the scheduler. This function is called when the + system is ready to run. + """ + self.t_f = 0 + self.h_b = [] # Heap of running tasks. + self.h_c = [] # Heap of waiting tasks. + self.h_d = [] # Heap of deadlines. + self.pmin = min([task.period for task in self.task_list]) \ + * self.sim.cycles_per_ms + self.evt_bc = False + self.activations = [] + self.waiting_schedule = False + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def on_activate(self, job): + """ + A-event. + """ + self.activations.append(job.task) + self.reschedule() + + def init_tl_plane(self): + decisions = [] + + for task in self.activations: + dl = int(task.job.absolute_deadline * self.sim.cycles_per_ms) + if dl not in self.h_d: + heappush(self.h_d, dl) + + self.t_f = self.sim.now() + self.pmin + if self.h_d[0] <= self.t_f: + self.t_f = heappop(self.h_d) + + z = 0 + self.h_b = [] + self.h_c = [] + for task in self.task_list: + l = ceil(task.wcet * (self.t_f - self.sim.now()) / task.period) + if z < len(self.processors) and task.job.is_active(): + heappush(self.h_b, (self.sim.now() + l, task)) + decisions.append((task.job, self.processors[z])) + z += 1 + else: + heappush(self.h_c, (self.t_f - l, task)) + + while z < len(self.processors): + decisions.append((None, self.processors[z])) + z += 1 + + return decisions + + def handle_evt_a(self, task): + """ + Handle an "A-Event". + """ + decisions = [] + + tasks_h_c = [t for _, t in self.h_c] + tasks_h_b = [t for _, t in self.h_b] + + if task not in tasks_h_b and task not in tasks_h_c: + l = ceil(task.wcet * (self.t_f - self.sim.now()) / task.period) + if len(self.h_b) < len(self.processors): + idle_proc = [z for z in self.processors + if not z.is_running()][0] + decisions.append((task.job, idle_proc)) + heappush(self.h_b, (self.sim.now() + l, task)) + else: + if task.wcet < task.period: + heappush(self.h_c, ((self.t_f - l), task)) + else: + key_b, task_b = heapreplace(self.h_b, (self.t_f + l, task)) + heappush(self.h_c, (self.t_f - key_b + self.sim.now())) + + dl = int(task.job.absolute_deadline * self.sim.cycles_per_ms) + if dl not in self.h_d: + heappush(self.h_d, dl) + + return decisions + + def handle_evt_bc(self): + """ + Handle a "BC-Event". + """ + + decisions = [] + while self.h_b and self.h_b[0][0] == self.sim.now(): + task_b = heappop(self.h_b)[1] + + if self.h_c: + key_c, task_c = heappop(self.h_c) + heappush(self.h_b, (self.t_f - key_c + self.sim.now(), task_c)) + decisions.append((task_c.job, task_b.cpu)) + else: + decisions.append((None, task_b.cpu)) + + if self.h_c: + while self.h_c[0][0] == self.sim.now(): + key_b, task_b = heappop(self.h_b) + key_c, task_c = heappop(self.h_c) + key_b = self.t_f - key_b + self.sim.now() + assert key_c != key_b, "Handle Evt BC failed." + key_c = self.t_f - key_c + self.sim.now() + heappush(self.h_b, (key_c, task_c)) + heappush(self.h_c, (key_b, task_b)) + decisions.append((task_c.job, task_b.cpu)) + + return decisions + + def event_bc(self): + """ + B or C event. + """ + self.evt_bc = True + self.reschedule() + + def schedule(self, cpu): + """ + Take the scheduling decisions. + """ + self.waiting_schedule = False + decisions = [] + self.h_c = [(d, t) for d, t in self.h_c if t.job.is_active()] + heapify(self.h_c) + + if self.sim.now() == self.t_f: + decisions = self.init_tl_plane() + else: + for task in self.activations: + decisions += self.handle_evt_a(task) + if self.evt_bc: + decisions += self.handle_evt_bc() + + self.activations = [] + self.evt_bc = False + + if self.h_b: + t_next = self.h_b[0][0] + if self.h_c: + t_next = min(t_next, self.h_c[0][0]) + + self.timer = Timer(self.sim, LRE_TL.event_bc, (self,), + t_next - self.sim.now(), + cpu=self.processors[0], in_ms=False) + else: + self.timer = Timer(self.sim, LRE_TL.reschedule, (self,), + self.t_f - self.sim.now(), + cpu=self.processors[0], in_ms=False) + self.timer.start() + + return decisions diff --git a/schedulers/MLLF.py b/schedulers/MLLF.py new file mode 100644 index 0000000..01de27b --- /dev/null +++ b/schedulers/MLLF.py @@ -0,0 +1,59 @@ +from simso.core import Scheduler, Timer + + +class MLLF(Scheduler): + """Modified Least Laxity First""" + def init(self): + self.ready_list = [] + self.timer = None + + def compute_laxity(self, cpu): + if self.ready_list: + for job in self.ready_list: + job.laxity = (job.absolute_deadline - job.ret) * \ + self.sim.cycles_per_ms - self.sim.now() + cpu.resched() + + def on_activate(self, job): + self.ready_list.append(job) + self.compute_laxity(job.cpu) + + def on_terminated(self, job): + self.ready_list.remove(job) + self.compute_laxity(job.cpu) + + def schedule(self, cpu): + decisions = [] + if self.ready_list: + # Sort according to the laxity. + self.ready_list.sort( + key=lambda x: (x.laxity, x.absolute_deadline)) + + # m : Nombre de processeurs. + m = len(self.processors) + + # Available processors: + l = (proc for proc in self.processors + if proc.running not in self.ready_list[:m]) + + if len(self.ready_list) > m: + ta = self.ready_list[m - 1] + dmin = self.ready_list[m].absolute_deadline * \ + self.sim.cycles_per_ms - self.sim.now() + + if self.timer: + self.timer.stop() + self.timer = Timer( + self.sim, MLLF.compute_laxity, + (self, self.processors[0]), dmin - ta.laxity, + one_shot=True, + cpu=self.processors[0]) + self.timer.start() + + # The first m jobs should be running: + for job in self.ready_list[:m]: + if not job.is_running(): + proc = next(l) + decisions.append((job, proc)) + + return decisions diff --git a/schedulers/NVNLF.py b/schedulers/NVNLF.py new file mode 100644 index 0000000..0895e12 --- /dev/null +++ b/schedulers/NVNLF.py @@ -0,0 +1,150 @@ +""" +The NVNLF scheduler is a work-conserving variant of LLREF, introduced by +Funaoka et al. in "Work-Conversing Optimal Real-Time Scheduling on +Multiprocessors." +""" + +from simso.core import Scheduler, Timer +from math import ceil + + +class NVNLF(Scheduler): + def init(self): + self.selected_jobs = [] # Jobs currently running. + self.budget = {} # Budgets for the active jobs. + self.next_deadline = 0 + self.waiting_schedule = False + self.last_update = 0 # Used to update the budget. + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def on_activate(self, job): + """ + Deal with a job activation. The budget for this job is computed. + """ + if job.wcet == 0: + return + + self.budget[job] = 0 + + # Find the next absolute deadline among the ready jobs. + self.next_deadline = min([task.job.absolute_deadline + for task in self.task_list]) \ + * self.sim.cycles_per_ms + + window = self.next_deadline - self.sim.now() + + for j in self.budget.keys(): + self.budget[j] = ceil(window * j.wcet / j.period) + + L = window * len(self.processors) - sum(self.budget.values()) + rem = [] + for j, l in self.budget.items(): + e = ceil(j.ret * self.sim.cycles_per_ms) + if e <= l: + a = e - l + L -= a + self.budget[j] = e + else: + rem.append(j) + for j in rem: + e = ceil(j.ret * self.sim.cycles_per_ms) + l = self.budget[j] + if e <= window: + a = min(e - l, L) + else: + a = min(window - l, L) + self.budget[j] += a + L -= a + + self.last_update = self.sim.now() + + # There's a new job, the system should be rescheduled. + self.reschedule() + + def on_terminated(self, job): + if job in self.budget: + del self.budget[job] + + def update_budget(self): + """ + Remove budget from the currently executing jobs. + """ + time_since_last_update = self.sim.now() - self.last_update + for job in self.selected_jobs: + if job in self.budget: + if job.is_active(): + self.budget[job] -= time_since_last_update + else: + del self.budget[job] + self.last_update = self.sim.now() + + def date_next_event(self, selected, not_selected): + next_event = None + + if selected: + next_bottom_hitting = min(ceil(y) for _, y in selected) + next_event = next_bottom_hitting + + if not_selected: + next_ceiling_hitting = self.next_deadline - self.sim.now() \ + - ceil(max(y for _, y in not_selected)) + if next_event is None or next_ceiling_hitting < next_event: + next_event = next_ceiling_hitting + + return next_event if next_event else 0 + + def select_jobs(self): + window = self.next_deadline - self.sim.now() + res = [(job, b) for job, b in self.budget.items() + if window <= ceil(b) and job.is_active()] + for job, b in self.budget.items(): + if b > 0 and (job, b) not in res and job.is_active(): + res.append((job, b)) + + return (res[:len(self.processors)], res[len(self.processors):]) + + def schedule(self, cpu): + self.waiting_schedule = False + self.update_budget() + + # Sort the jobs by budgets. + selected, not_selected = self.select_jobs() + + # Compute the (relative) date of the next event. + next_event = self.date_next_event(selected, not_selected) + if next_event > 0: + # Set a timer to reschedule the system at that date. + self.timer_a = Timer(self.sim, NVNLF.reschedule, (self,), + next_event, cpu=cpu, in_ms=False) + self.timer_a.start() + + # Allocate the selected jobs to the processors. + # The processors already running selected jobs are not changed. + available_procs = [] + self.selected_jobs = [s[0] for s in selected] + remaining_jobs = self.selected_jobs[:] + for proc in self.processors: + if proc.running in self.selected_jobs: + # This processor keeps running the same job. + remaining_jobs.remove(proc.running) + else: + # This processor is not running a selected job. + available_procs.append(proc) + + # The remaining processors are running the remaining jobs or None. + padded_remaining_jobs = remaining_jobs + \ + [None] * (len(available_procs) - len(remaining_jobs)) + # zip create a list of couples (job, proc) using the list of remaining + # jobs and the list of available processors. + decisions = list(zip(padded_remaining_jobs, available_procs)) + + return decisions diff --git a/schedulers/PD2.py b/schedulers/PD2.py new file mode 100644 index 0000000..a6cb2a1 --- /dev/null +++ b/schedulers/PD2.py @@ -0,0 +1,184 @@ +# coding=utf-8 + +from simso.core import Scheduler, Timer +from math import ceil + + +def rounded_wcet(job, q=None): + return rounded_wcet_cycles(job, q) / job.sim.cycles_per_ms + + +def rounded_wcet_cycles(job, q=None): + if q is None: + q = PD2.quantum + wcet_cycles = job.wcet * job.sim.cycles_per_ms + if wcet_cycles % q: + return wcet_cycles + q - (wcet_cycles % q) + else: + return wcet_cycles + + +class VirtualJob(object): + """ + A Virtual Job contains the list of the pseudo jobs for an actual job. + """ + def __init__(self, job): + self.job = job + self.pseudo_jobs = [] + self.cur = 0 + seq = 0 + + # Create pseudo jobs: + while seq * PD2.quantum <= job.wcet * job.sim.cycles_per_ms: + self.pseudo_jobs.append(PseudoJob(job, seq + 1)) + seq += 1 + + def get_next_job(self): + if self.cur < len(self.pseudo_jobs) - 1: + self.cur += 1 + job = self.pseudo_jobs[self.cur] + return job + + def get_current_job(self): + if self.cur < len(self.pseudo_jobs): + return self.pseudo_jobs[self.cur] + + +class PseudoJob(object): + def __init__(self, job, seq): + self.job = job + self.release_date = int(job.deadline * (seq - 1) / rounded_wcet(job) + ) * PD2.quantum + self.deadline = int(ceil(job.deadline * seq / rounded_wcet(job)) + ) * PD2.quantum + self.seq = seq + self.succ_bit = PseudoJob.succ_bit(job, seq) + if rounded_wcet(job) / job.deadline < 0.5: + self.group_deadline = 0 + else: + j = seq + 1 + while (j * PD2.quantum <= rounded_wcet_cycles(job) and + PseudoJob.succ_bit(job, j - 1) == 1 and + PseudoJob.win_size(job, j) == 2): + # while the window size is 2 and succ_bit of the prev is 1 + j += 1 + self.group_deadline = int(ceil( + job.deadline * (j - 1) / rounded_wcet(job) + * job.sim.cycles_per_ms)) + + @property + def absolute_releasedate(self): + return self.release_date + \ + self.job.activation_date * self.job.sim.cycles_per_ms + + def cmp_key(self): + # Si le premier parametre est identique, il regarde le second, etc. + cycles_per_ms = self.job.sim.cycles_per_ms + return (self.deadline + self.job.activation_date * cycles_per_ms, + -self.succ_bit, + -(self.job.activation_date + self.group_deadline)) + + @staticmethod + def succ_bit(job, seq): + return int(ceil(seq * job.deadline / rounded_wcet(job))) \ + - int(seq * job.deadline / rounded_wcet(job)) + + @staticmethod + def win_size(job, seq): + return int(ceil(seq * job.deadline / rounded_wcet(job))) \ + - int((seq - 1) * job.deadline / rounded_wcet(job)) + + +class PD2(Scheduler): + quantum = 100000 # cycles + + def init(self): + self.ready_list = [] + self.timers = [] + self.terminate_timers = [] + self.waiting_schedule = False + self.running_vjobs = [] + + # PD2.quantum = 1000000 + # while not self.is_schedulable() and PD2.quantum > 1000: + # PD2.quantum /= 2 + + PD2.quantum = self.sim.cycles_per_ms // 10 + + self.timer = Timer( + self.sim, PD2.reschedule, (self, ), PD2.quantum, + cpu=self.processors[0], in_ms=False, one_shot=False) + self.timer.start() + + def is_schedulable(self, q=None): + load = 0.0 + cycles_per_ms = self.sim.cycles_per_ms + for task in self.task_list: + wcet = rounded_wcet_cycles(task, q) + load += wcet / task.period + if wcet > task.period * cycles_per_ms \ + or load > len(self.processors) * cycles_per_ms: + return False + return True + + def reschedule(self, cpu=None): + """ + Ask for a scheduling decision. Don't call if not necessary. + """ + if not self.waiting_schedule: + if cpu is None: + cpu = self.processors[0] + cpu.resched() + self.waiting_schedule = True + + def virtual_terminate(self, virtual_job): + pjob = virtual_job.get_next_job() + if not pjob or not virtual_job.job.is_active(): + self.ready_list.remove(virtual_job) + + def on_activate(self, job): + virtual_job = VirtualJob(job) + self.ready_list.append(virtual_job) + + if self.sim.now() == 0: + self.reschedule() + + def schedule(self, cpu): + self.waiting_schedule = False + + decisions = [] + + for vjob in self.running_vjobs: + self.virtual_terminate(vjob) + + vjobs = [vjob for vjob in self.ready_list if vjob.job.is_active() and + self.sim.now() >= vjob.get_current_job().absolute_releasedate] + + self.running_vjobs = sorted( + vjobs, + key=lambda x: x.get_current_job().cmp_key() + )[:len(self.processors)] + + selected_jobs = [vjob.job for vjob in self.running_vjobs] + remaining_jobs = selected_jobs[:] + available_procs = [] + + # Remove from the list of remaining jobs the jobs that already runs. + for proc in self.processors: + if proc.running in selected_jobs: + # This processor keeps running the same job. + remaining_jobs.remove(proc.running) + else: + # This processor is not running a selected job. + available_procs.append(proc) + + # Jobs not currently running + for vjob in self.running_vjobs: + if vjob.job in remaining_jobs: + decisions.append((vjob.job, available_procs.pop())) + + # Unused processors + for proc in available_procs: + decisions.append((None, proc)) + + return decisions diff --git a/schedulers/P_EDF.py b/schedulers/P_EDF.py new file mode 100644 index 0000000..91447bb --- /dev/null +++ b/schedulers/P_EDF.py @@ -0,0 +1,13 @@ +""" +Partitionned EDF using PartitionedScheduler. +""" +from simso.core.Scheduler import SchedulerInfo +from EDF_mono import EDF_mono +from simso.utils import PartitionedScheduler +from simso.utils.PartitionedScheduler import decreasing_first_fit + + +class P_EDF(PartitionedScheduler): + def init(self): + PartitionedScheduler.init( + self, SchedulerInfo("EDF_mono", EDF_mono), decreasing_first_fit) diff --git a/schedulers/P_EDF2.py b/schedulers/P_EDF2.py new file mode 100644 index 0000000..c7cf5d1 --- /dev/null +++ b/schedulers/P_EDF2.py @@ -0,0 +1,66 @@ +""" +Partitionned EDF without the helping class. + +Use EDF_mono. +""" +from simso.core import Scheduler +from simso.core.Scheduler import SchedulerInfo +from EDF_mono import EDF_mono + + +class P_EDF2(Scheduler): + def init(self): + # Mapping processor to scheduler. + self.map_cpu_sched = {} + # Mapping task to scheduler. + self.map_task_sched = {} + + cpus = [] + for cpu in self.processors: + # Append the processor to a list with an initial utilization of 0. + cpus.append([cpu, 0]) + + # Instantiate a scheduler. + sched = EDF_mono(self.sim, SchedulerInfo("EDF_mono", EDF_mono)) + sched.add_processor(cpu) + sched.init() + + # Affect the scheduler to the processor. + self.map_cpu_sched[cpu.identifier] = sched + + # First Fit + for task in self.task_list: + j = 0 + # Find a processor with free space. + while cpus[j][1] + float(task.wcet) / task.period > 1.0: + j += 1 + if j >= len(self.processors): + print("oops bin packing failed.") + return + + # Get the scheduler for this processor. + sched = self.map_cpu_sched[cpus[j][0].identifier] + + # Affect it to the task. + self.map_task_sched[task.identifier] = sched + sched.add_task(task) + + # Put the task on that processor. + task.cpu = cpus[j][0] + self.sim.logger.log("task " + task.name + " on " + task.cpu.name) + + # Update utilization. + cpus[j][1] += float(task.wcet) / task.period + + def get_lock(self): + # No lock mechanism is needed. + return True + + def schedule(self, cpu): + return self.map_cpu_sched[cpu.identifier].schedule(cpu) + + def on_activate(self, job): + self.map_task_sched[job.task.identifier].on_activate(job) + + def on_terminated(self, job): + self.map_task_sched[job.task.identifier].on_terminated(job) diff --git a/schedulers/P_EDF_WF.py b/schedulers/P_EDF_WF.py new file mode 100644 index 0000000..8c0d3f6 --- /dev/null +++ b/schedulers/P_EDF_WF.py @@ -0,0 +1,13 @@ +""" +Partitionned EDF using PartitionedScheduler. +""" +from simso.core.Scheduler import SchedulerInfo +from EDF_mono import EDF_mono +from simso.utils import PartitionedScheduler +from simso.utils.PartitionedScheduler import decreasing_worst_fit + + +class P_EDF_WF(PartitionedScheduler): + def init(self): + PartitionedScheduler.init( + self, SchedulerInfo("EDF_mono", EDF_mono), decreasing_worst_fit) diff --git a/schedulers/P_RM.py b/schedulers/P_RM.py new file mode 100644 index 0000000..f6027f4 --- /dev/null +++ b/schedulers/P_RM.py @@ -0,0 +1,31 @@ +""" +Partitionned EDF using PartitionedScheduler. +""" +from simso.core.Scheduler import SchedulerInfo +from RM_mono import RM_mono +from simso.utils import PartitionedScheduler + + +class P_RM(PartitionedScheduler): + def init(self): + PartitionedScheduler.init( + self, SchedulerInfo("RM_mono", RM_mono)) + + def packer(self): + # First Fit + cpus = [[cpu, 0] for cpu in self.processors] + for task in self.task_list: + m = cpus[0][1] + j = 0 + # Find the processor with the lowest load. + for i, c in enumerate(cpus): + if c[1] < m: + m = c[1] + j = i + + # Affect it to the task. + self.affect_task_to_processor(task, cpus[j][0]) + + # Update utilization. + cpus[j][1] += float(task.wcet) / task.period + return True diff --git a/schedulers/PriD.py b/schedulers/PriD.py new file mode 100644 index 0000000..fdc1b17 --- /dev/null +++ b/schedulers/PriD.py @@ -0,0 +1,63 @@ +""" +Implementation of the PriD scheduler as introduced by Goossens et al. in +Priority-Driven Scheduling of Periodic Task Systems on Multiprocessors. +""" +from simso.core import Scheduler +from math import ceil + + +class PriD(Scheduler): + """EDF(k) scheduler""" + def init(self): + self.ready_list = [] + self.kfirst = [] + + tasks = sorted(self.task_list, key=lambda x: -x.wcet / x.period) + kmin = 1 + mmin = None + u = sum(x.wcet / x.period for x in tasks) + for k, task in enumerate(tasks): + u -= task.wcet / task.period + m = k + ceil(u / (1 - task.wcet / task.period)) + if mmin is None or mmin > m: + kmin = k + 1 + mmin = m + + self.kfirst = tasks[:kmin] + + def on_activate(self, job): + if job.task in self.kfirst: + job.priority = 0 + else: + job.priority = job.absolute_deadline + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + if job in self.ready_list: + self.ready_list.remove(job) + else: + job.cpu.resched() + + def schedule(self, cpu): + if self.ready_list: + # Key explanations: + # First the free processors + # Among the others, get the one with the greatest deadline + # If equal, take the one used to schedule + key = lambda x: ( + 1 if not x.running else 0, + x.running.priority if x.running else 0, + 1 if x is cpu else 0 + ) + cpu_min = max(self.processors, key=key) + + job = min([j for j in self.ready_list if j.is_active()], + key=lambda x: x.priority) + + if (cpu_min.running is None or + cpu_min.running.priority > job.priority): + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + return (job, cpu_min) diff --git a/schedulers/RM.py b/schedulers/RM.py new file mode 100644 index 0000000..1c8a5ac --- /dev/null +++ b/schedulers/RM.py @@ -0,0 +1,40 @@ +from simso.core import Scheduler + + +class RM(Scheduler): + """ Rate monotonic """ + def init(self): + self.ready_list = [] + + def on_activate(self, job): + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + if job in self.ready_list: + self.ready_list.remove(job) + else: + job.cpu.resched() + + def schedule(self, cpu): + decision = None + if self.ready_list: + # Get a free processor or a processor running a low priority job. + key = lambda x: ( + 0 if x.running is None else 1, + -x.running.period if x.running else 0, + 0 if x is cpu else 1 + ) + cpu_min = min(self.processors, key=key) + + # Job with highest priority. + job = min(self.ready_list, key=lambda x: x.period) + + if (cpu_min.running is None or + cpu_min.running.period > job.period): + self.ready_list.remove(job) + if cpu_min.running: + self.ready_list.append(cpu_min.running) + decision = (job, cpu_min) + + return decision diff --git a/schedulers/RM_mono.py b/schedulers/RM_mono.py new file mode 100644 index 0000000..d0e8e45 --- /dev/null +++ b/schedulers/RM_mono.py @@ -0,0 +1,26 @@ +""" +Rate Monotic algorithm for uniprocessor architectures. +""" +from simso.core import Scheduler + + +class RM_mono(Scheduler): + def init(self): + self.ready_list = [] + + def on_activate(self, job): + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + self.ready_list.remove(job) + job.cpu.resched() + + def schedule(self, cpu): + if self.ready_list: + # job with the highest priority + job = min(self.ready_list, key=lambda x: x.period) + else: + job = None + + return (job, cpu) diff --git a/schedulers/RUN.py b/schedulers/RUN.py new file mode 100644 index 0000000..8e86131 --- /dev/null +++ b/schedulers/RUN.py @@ -0,0 +1,229 @@ +""" +Implementation of the RUN scheduler as introduced in RUN: Optimal +Multiprocessor Real-Time Scheduling via Reduction to Uniprocessor +by Regnier et al. + +RUN is a global multiprocessors scheduler for periodic-preemptive-independent +tasks with implicit deadlines. +""" + +from simso.core import Scheduler, Timer +from RUNServer import EDFServer, TaskServer, DualServer, select_jobs, \ + add_job, get_child_tasks + + +class RUN(Scheduler): + """ + RUN scheduler. The offline part is done here but the online part is mainly + done in the SubSystem objects. The RUN object is a proxy for the + sub-systems. + """ + + def init(self): + """ + Initialization of the scheduler. This function is called when the + system is ready to run. + """ + self.subsystems = [] # List of all the sub-systems. + self.available_cpus = self.processors[:] # Not yet affected cpus. + self.task_to_subsystem = {} # map: Task -> SubSystem + + # Create the Task Servers. Those are the leaves of the reduction tree. + list_servers = [TaskServer(task) for task in self.task_list] + + # map: Task -> TaskServer. Used to quickly find the servers to update. + self.servers = dict(zip(self.task_list, list_servers)) + + assert (sum([s.utilization for s in list_servers]) + <= len(self.processors)), "Load exceeds 100%!" + + # Instanciate the reduction tree and the various sub-systems. + self.reduce_iterations(list_servers) + + def add_idle_tasks(self, servers): + """ + Create IdleTasks in order to reach 100% system utilization. + """ + from collections import namedtuple + # pylint: disable-msg=C0103 + IdleTask = namedtuple('IdleTask', ['utilization']) + + idle = len(self.processors) - sum([s.utilization for s in servers]) + for server in servers: + if server.utilization < 1 and idle > 0: + task = IdleTask(min(1 - server.utilization, idle)) + server.add_child(TaskServer(task)) + idle -= task.utilization + while idle > 0: + task = IdleTask(min(1, idle)) + server = EDFServer() + server.add_child(TaskServer(task)) + idle -= task.utilization + servers.append(server) + + def add_proper_subsystem(self, server): + """ + Create a proper sub-system from a unit server. + """ + tasks_servers = get_child_tasks(server) + subsystem_utilization = sum([t.utilization for t in tasks_servers]) + cpus = [] + while subsystem_utilization > 0: + cpus.append(self.available_cpus.pop()) + subsystem_utilization -= 1 + + subsystem = ProperSubsystem(self.sim, server, cpus) + for server in tasks_servers: + self.task_to_subsystem[server.task] = subsystem + self.subsystems.append(subsystem) + + def remove_unit_servers(self, servers): + """ + Remove all the unit servers for a list and create a proper sub-system + instead. + """ + for server in servers: + if server.utilization == 1: + self.add_proper_subsystem(server) + servers[:] = [s for s in servers if s.utilization < 1] + + def reduce_iterations(self, servers): + """ + Offline part of the RUN Scheduler. Create the reduction tree. + """ + servers = pack(servers) + self.add_idle_tasks(servers) + self.remove_unit_servers(servers) + + while servers: + servers = pack(dual(servers)) + self.remove_unit_servers(servers) + + def on_activate(self, job): + """ + Deal with a (real) task activation. + """ + subsystem = self.task_to_subsystem[job.task] + subsystem.update_budget() + add_job(self.sim, job, self.servers[job.task]) + subsystem.resched(job.cpu) + + def on_terminated(self, job): + """ + Deal with a (real) job termination. + """ + subsystem = self.task_to_subsystem[job.task] + self.task_to_subsystem[job.task].update_budget() + subsystem.resched(job.cpu) + + def schedule(self, _): + """ + This method is called by the simulator. The sub-systems that should be + rescheduled are also scheduled. + """ + decisions = [] + for subsystem in self.subsystems: + if subsystem.to_reschedule: + decisions += subsystem.schedule() + + return decisions + + +def pack(servers): + """ + Create a list of EDF Servers by packing servers. Currently use a + First-Fit but the original article states they used a Worst-Fit packing + algorithm. According to the article, a First-Fit should also work. + """ + packed_servers = [] + for server in servers: + for p_server in packed_servers: + if p_server.utilization + server.utilization <= 1: + p_server.add_child(server) + break + else: + p_server = EDFServer() + p_server.add_child(server) + packed_servers.append(p_server) + + return packed_servers + + +def dual(servers): + """ + From a list of servers, returns a list of corresponding DualServers. + """ + return [DualServer(s) for s in servers] + + +class ProperSubsystem(object): + """ + Proper sub-system. A proper sub-system is the set of the tasks belonging to + a unit server (server with utilization of 1) and a set of processors. + """ + + def __init__(self, sim, root, processors): + self.sim = sim + self.root = root + self.processors = processors + self.virtual = [] + self.last_update = 0 + self.to_reschedule = False + self.timer = None + + def update_budget(self): + """ + Update the budget of the servers. + """ + time_since_last_update = self.sim.now() - self.last_update + for server in self.virtual: + server.budget -= time_since_last_update + self.last_update = self.sim.now() + + def resched(self, cpu): + """ + Plannify a scheduling decision on processor cpu. Ignore it if already + planned. + """ + if not self.to_reschedule: + self.to_reschedule = True + cpu.resched() + + def virtual_event(self, cpu): + """ + Virtual scheduling event. Happens when a virtual job terminates. + """ + self.update_budget() + self.resched(cpu) + + def schedule(self): + """ + Schedule this proper sub-system. + """ + self.to_reschedule = False + decision = [] + + self.virtual = [] + jobs = select_jobs(self.root, self.virtual) + + wakeup_delay = min(self.virtual, key=lambda s: s.budget).budget + if wakeup_delay > 0: + self.timer = Timer(self.sim, ProperSubsystem.virtual_event, + (self, self.processors[0]), wakeup_delay, + cpu=self.processors[0], in_ms=False) + self.timer.start() + + cpus = [] + for cpu in self.processors: + if cpu.running in jobs: + jobs.remove(cpu.running) + else: + cpus.append(cpu) + + for cpu in cpus: + if jobs: + decision.append((jobs.pop(), cpu)) + else: + decision.append((None, cpu)) + + return decision diff --git a/schedulers/RUNServer.py b/schedulers/RUNServer.py new file mode 100644 index 0000000..0856dcf --- /dev/null +++ b/schedulers/RUNServer.py @@ -0,0 +1,138 @@ +""" +This module is part of the RUN implementation (see RUN.py). +""" + +from fractions import Fraction + + +class _Server(object): + """ + Abstract class that represents a Server. + """ + next_id = 1 + + def __init__(self, is_dual, task=None): + self.parent = None + self.is_dual = is_dual + self.utilization = Fraction(0, 1) + self.task = task + self.job = None + self.deadlines = [0] + self.budget = 0 + self.next_deadline = 0 + self.identifier = _Server.next_id + _Server.next_id += 1 + if task: + if hasattr(task, 'utilization'): + self.utilization += task.utilization + else: + self.utilization += Fraction(task.wcet) / Fraction(task.period) + + def add_deadline(self, current_instant, deadline): + """ + Add a deadline to this server. + """ + self.deadlines.append(deadline) + + self.deadlines = [d for d in self.deadlines if d > current_instant] + self.next_deadline = min(self.deadlines) + + def create_job(self, current_instant): + """ + Replenish the budget. + """ + self.budget = int(self.utilization * (self.next_deadline - + current_instant)) + + +class TaskServer(_Server): + """ + A Task Server is a Server that contains a real Task. + """ + def __init__(self, task): + super(TaskServer, self).__init__(False, task) + + +class EDFServer(_Server): + """ + An EDF Server is a Server with multiple children scheduled with EDF. + """ + def __init__(self): + super(EDFServer, self).__init__(False) + self.children = [] + + def add_child(self, server): + """ + Add a child to this EDFServer (used by the packing function). + """ + self.children.append(server) + self.utilization += server.utilization + server.parent = self + + +class DualServer(_Server): + """ + A Dual server is the opposite of its child. + """ + def __init__(self, child): + super(DualServer, self).__init__(True) + self.child = child + child.parent = self + self.utilization = 1 - child.utilization + + +def add_job(sim, job, server): + """ + Recursively update the deadlines of the parents of server. + """ + server.job = job + while server: + server.add_deadline(sim.now(), job.absolute_deadline * + sim.cycles_per_ms) + server.create_job(sim.now()) + server = server.parent + + +def select_jobs(server, virtual, execute=True): + """ + Select the jobs that should run according to RUN. The virtual jobs are + appended to the virtual list passed as argument. + """ + jobs = [] + if execute: + virtual.append(server) + + if server.task: + if execute and server.budget > 0 and server.job.is_active(): + jobs.append(server.job) + else: + if server.is_dual: + jobs += select_jobs(server.child, virtual, not execute) + else: + active_servers = [s for s in server.children if s.budget > 0] + if active_servers: + min_server = min(active_servers, key=lambda s: s.next_deadline) + else: + min_server = None + + for child in server.children: + jobs += select_jobs(child, virtual, + execute and child is min_server) + + return jobs + + +def get_child_tasks(server): + """ + Get the tasks scheduled by this server. + """ + if server.task: + return [server] + else: + if server.is_dual: + return get_child_tasks(server.child) + else: + tasks = [] + for child in server.children: + tasks += get_child_tasks(child) + return tasks diff --git a/schedulers/Static_EDF.py b/schedulers/Static_EDF.py new file mode 100644 index 0000000..6f4f8ba --- /dev/null +++ b/schedulers/Static_EDF.py @@ -0,0 +1,29 @@ +""" +Static EDF. A DVFS variant of EDF (uniprocessor). +""" +from simso.core import Scheduler + + +class Static_EDF(Scheduler): + def init(self): + self.ready_list = [] + # Compute processor speed + utilization = sum([t.wcet / t.period for t in self.task_list], 0.0) + self.processors[0].set_speed(utilization) + + def on_activate(self, job): + self.ready_list.append(job) + job.cpu.resched() + + def on_terminated(self, job): + self.ready_list.remove(job) + job.cpu.resched() + + def schedule(self, cpu): + if self.ready_list: + # job with the highest priority + job = min(self.ready_list, key=lambda x: x.absolute_deadline) + else: + job = None + + return (job, cpu) diff --git a/schedulers/U_EDF.py b/schedulers/U_EDF.py new file mode 100644 index 0000000..6ecd418 --- /dev/null +++ b/schedulers/U_EDF.py @@ -0,0 +1,146 @@ +""" +Implementation of the U-EDF scheduler as presented by Nelissen et al. in +"U-EDF an unfair but optimal multiprocessor scheduling algorithm for sporadic +tasks" +""" + + +from simso.core import Scheduler, Timer +from math import ceil + + +class U_EDF(Scheduler): + def init(self): + self.al = {} + self.timer = None + self.toresched = False + self.running_jobs = {} + self.last_event = 0 + self.newly_activated = False + + self.ui = {task: task.wcet / task.period for task in self.task_list} + + def reschedule(self): + if not self.toresched: + self.processors[0].resched() + self.toresched = True + + def hp(self, job): + for task in self.sorted_task_list: + if task.job is job: + return + else: + yield task.job + + def res(self, job, j, t1, t2): + s = self.s[job] + u = max(0, min(1, (s + self.ui[job.task] - j)))\ + - max(0, min(1, (s - j))) + return (t2 - t1) * u + + def bdg(self, job, j, t2): + return self.al[job][j] + self.res( + job, j, job.absolute_deadline * self.sim.cycles_per_ms, t2) + + def compute_al(self): + t = self.sim.now() + cycles_per_ms = self.sim.cycles_per_ms + + self.sorted_task_list = sorted( + self.task_list, + key=lambda t: (t.job.absolute_deadline, t.identifier)) + + self.s = {} + for task in self.task_list: + self.al[task.job] = [0] * len(self.processors) + self.s[task.job] = sum(self.ui[x.task] for x in self.hp(task.job)) + + for task in self.sorted_task_list: + job = task.job + + if not job.is_active(): + continue + + for j in range(len(self.processors)): + almax = (job.absolute_deadline * cycles_per_ms - t) \ + - sum(self.bdg(x, j, job.absolute_deadline * cycles_per_ms) + for x in self.hp(job)) - sum(self.al[job]) + self.al[job][j] = int(ceil(min( + almax, job.ret * self.sim.cycles_per_ms + - sum(self.al[job])))) + + def on_activate(self, job): + self.newly_activated = True + self.reschedule() + + def update_al(self): + delta = self.sim.now() - self.last_event + + for job, j in self.running_jobs.items(): + self.al[job][j] -= delta + + def schedule(self, cpu): + self.toresched = False + + if self.newly_activated: + self.compute_al() + self.newly_activated = False + else: + self.update_al() + + self.last_event = self.sim.now() + + next_event = None + decisions = [] + selected_jobs = {} + + # Select the jobs: + for j, proc in enumerate(self.processors): + eligible = [task.job for task in self.task_list + if task.job.is_active() + and task.job not in selected_jobs + and task.job in self.al + and self.al[task.job][j] > 0] + + if not eligible: + continue + + job = min(eligible, + key=lambda x: (x.absolute_deadline, x.task.identifier)) + if next_event is None or next_event > self.al[job][j]: + next_event = self.al[job][j] + selected_jobs[job] = j + + # Set the timer for the next event: + if self.timer: + self.timer.stop() + self.timer = None + if next_event is not None: + self.timer = Timer(self.sim, U_EDF.reschedule, (self,), + next_event, self.processors[0], in_ms=False) + self.timer.start() + + # Bind jobs to processors: + jobs = list(selected_jobs.keys()) + available_procs = list(self.processors) + was_not_running = [] + for job in jobs: + if job in self.running_jobs: + available_procs.remove(job.cpu) + else: + was_not_running.append(job) + + remaining_jobs = [] + for job in was_not_running: + if job.cpu in available_procs: + decisions.append((job, job.cpu)) + available_procs.remove(job.cpu) + else: + remaining_jobs.append(job) + + for p, job in enumerate(remaining_jobs): + decisions.append((job, available_procs[p])) + + self.running_jobs = selected_jobs + + return decisions diff --git a/schedulers/WC_RUN.py b/schedulers/WC_RUN.py new file mode 100644 index 0000000..1584d23 --- /dev/null +++ b/schedulers/WC_RUN.py @@ -0,0 +1,78 @@ +""" +Work-Conserving version of U-EDF. +""" + +from RUN import RUN + + +class WC_RUN(RUN): + def init(self): + RUN.init(self) + self.state = {} + + def on_terminated(self, job): + RUN.on_terminated(self, job) + if job in self.state: + del self.state[job] + + def on_abort(self, job): + RUN.on_abort(self, job) + if job in self.state: + del self.state[job] + + def schedule(self, cpu): + decisions = RUN.schedule(self, cpu) + +# print(".") +# print([(id(job), job.name if job else None, proc.name) +# for job, proc in self.state.items()]) +# +# print("decisions :") +# print([(job.name if job else None, proc.name) +# for job, proc in decisions]) + + rstate = {proc: job for job, proc in self.state.items()} + for djob, dproc in decisions: + if dproc in rstate: + del self.state[rstate[dproc]] + if djob is not None: + self.state[djob] = dproc + rstate[dproc] = djob + +# print([(id(job), job.name if job else None, proc.name) +# for job, proc in self.state.items()]) + + running_jobs = list(self.state.keys()) + + # Get active jobs. + jobs = sorted( + (task.job for task in self.task_list if task.job.is_active()), + key=lambda j: j.absolute_deadline) + + # Bind jobs to processors: + available_procs = list(self.processors) + was_not_running = [] + for job in jobs: + if job in running_jobs: + available_procs.remove(self.state[job]) + else: + was_not_running.append(job) + + remaining_jobs = [] + for job in was_not_running: + if job.cpu in available_procs: + decisions.append((job, job.cpu)) + available_procs.remove(job.cpu) + self.state[job] = job.cpu + else: + remaining_jobs.append(job) + + for p, proc in enumerate(available_procs): + if p < len(remaining_jobs): + job = remaining_jobs[p] + self.state[job] = proc + else: + job = None + decisions.append((job, proc)) + + return decisions diff --git a/schedulers/WC_U_EDF.py b/schedulers/WC_U_EDF.py new file mode 100644 index 0000000..879066d --- /dev/null +++ b/schedulers/WC_U_EDF.py @@ -0,0 +1,51 @@ +""" +Work-Conserving version of U-EDF. +""" + +from U_EDF import U_EDF + + +class WC_U_EDF(U_EDF): +# def on_terminated(self, job): +# self.reschedule() +# +# def on_aborted(self, job): +# self.reschedule() + + def schedule(self, cpu): + decisions = U_EDF.schedule(self, cpu) + + # Get active jobs. + jobs = sorted( + (task.job for task in self.task_list if task.job.is_active()), + key=lambda j: j.absolute_deadline) + + # Bind jobs to processors: + available_procs = list(self.processors) + was_not_running = [] + for job in jobs: + if job in self.running_jobs: + for djob, dproc in decisions: + if djob == job: + available_procs.remove(dproc) + break + else: + available_procs.remove(job.cpu) + else: + was_not_running.append(job) + + remaining_jobs = [] + for job in was_not_running: + if job.cpu in available_procs: + decisions.append((job, job.cpu)) + available_procs.remove(job.cpu) + else: + remaining_jobs.append(job) + + try: + for p, job in enumerate(remaining_jobs): + decisions.append((job, available_procs[p])) + except IndexError: + pass + + return decisions