Commit 7f99c261 by Maxime Chéramy

Add a bunch of schedulers.

parent 67af12be
"""
Implementation of the BF algorithm.
"""
from simso.core import Scheduler, Timer
class BF(Scheduler):
def init(self):
self.t_f = 0
self.waiting_schedule = False
self.mirroring = False
self.allocations = []
self.timers = {}
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def on_activate(self, job):
self.reschedule()
def alpha_plus_one(self, job):
deadlines = sorted([x.job.absolute_deadline for x in self.task_list])
bk2 = deadlines[1]
bk1 = deadlines[0]
ui = job.wcet / job.deadline
val = bk2 * ui - int(bk1 * ui) - bk2 + bk1
if val == 0:
return 0
if val > 0:
return 1
return -1
def uf_plus_one(self, job):
bk1 = min([x.job.absolute_deadline for x in self.task_list])
ui = job.wcet / job.deadline
return (1. - (bk1 * ui - int(bk1 * ui))) / ui
def nuf_plus_one(self, job):
bk1 = min([x.job.absolute_deadline for x in self.task_list])
ui = 1 - (job.wcet / job.deadline)
return (1. - (bk1 * ui - int(bk1 * ui))) / ui
def compare(self, job_i, job_j):
ai = self.alpha_plus_one(job_i)
aj = self.alpha_plus_one(job_j)
if ai > aj:
return -1
elif ai < aj:
return 1
elif ai == 0:
return -1
elif ai == -1:
if self.uf_plus_one(job_i) > self.uf_plus_one(job_j):
return 1
else:
return -1
else:
if self.nuf_plus_one(job_i) >= self.nuf_plus_one(job_j):
return -1
else:
return 1
def init_interval(self):
"""
Determine the end of the interval and compute allocation of the jobs
to the processors using the McNaughton algorithm.
"""
self.allocations = [[0, []] for _ in self.processors]
self.t_f = int(min([x.job.absolute_deadline for x in self.task_list])
* self.sim.cycles_per_ms)
# Duration that can be allocated for each processor.
w = int(self.t_f - self.sim.now())
p = 0 # Processor id.
mand = {}
available = w * len(self.processors)
eligible = []
for task in self.task_list:
job = task.job
if not job.is_active():
continue
fluid = (self.sim.now() - job.activation_date *
self.sim.cycles_per_ms) * job.wcet / job.period
lag = fluid - job.computation_time_cycles
mand[task.identifier] = max(0,
int(lag + w * job.wcet / job.period))
available -= mand[task.identifier]
if mand[task.identifier] < w:
eligible.append(task)
while available > 0 and eligible:
job_m = None
for task in eligible:
if job_m is None or self.compare(task.job, job_m) == -1:
job_m = task.job
mand[job_m.task.identifier] += 1
eligible.remove(job_m.task)
available -= 1
for task in self.task_list:
# The "fair" duration for this job on that interval. Rounded to the
# upper integer to avoid durations that are not multiples of
# cycles.
job = task.job
if not job.is_active():
continue
duration = mand[task.identifier]
if self.allocations[p][0] + duration <= w:
self.allocations[p][1].append((job, duration))
self.allocations[p][0] += duration
else:
# Add first part to the end of the current processor p:
duration1 = w - self.allocations[p][0]
if duration1 > 0:
self.allocations[p][1].append((job, duration1))
self.allocations[p][0] = w
if p + 1 < len(self.processors):
# Add the second part:
duration2 = duration - duration1
self.allocations[p + 1][1].append((job, duration2))
self.allocations[p + 1][0] += duration2
else:
# Because every durations are rounded to the upper value,
# the last job may have not enough space left.
# This could probably be improved.
print("Warning: didn't allowed enough time to last task.",
duration - duration1)
break
p += 1
for allocation in self.allocations:
if allocation[0] < w:
allocation[1].append((None, w - allocation[0]))
if self.mirroring:
for allocation in self.allocations:
# Rerverse the order of the jobs.
# Note: swapping the first and last items should be enough.
allocation[1].reverse()
self.mirroring = not self.mirroring
def end_event(self, z, job):
"""
Called when a job's budget has expired.
"""
del self.timers[job]
l = self.allocations[z][1]
if l and l[0][0] is job:
l.pop(0)
self.reschedule(self.processors[z])
def schedule(self, cpu):
"""
Schedule main method.
"""
self.waiting_schedule = False
# At the end of the interval:
if self.sim.now() >= self.t_f:
self.init_interval()
# Stop current timers.
for job, timer in self.timers.items():
timer.stop()
self.timers = {}
# Set timers to stop the jobs that will run.
for z, proc in enumerate(self.processors):
l = self.allocations[z][1]
if l and l[0][0] not in self.timers:
timer = Timer(self.sim, BF.end_event,
(self, z, l[0][0]), l[0][1],
cpu=proc, in_ms=False)
timer.start()
self.timers[l[0][0]] = timer
# Schedule the activated tasks on each processor.
decisions = []
for z, proc in enumerate(self.processors):
l = self.allocations[z][1]
if not l[0][0] or l[0][0].is_active():
decisions.append((l[0][0] if l else None, proc))
return decisions
"""
Cycle-Conserving EDF. A DVFS variant of EDF (uniprocessor).
"""
from simso.core import Scheduler
class CC_EDF(Scheduler):
def init(self):
self.ready_list = []
self.ui = {}
for task in self.task_list:
self.ui[task] = float(task.wcet) / task.period
self.adjust_speed()
def adjust_speed(self):
# Compute processor speed
utilization = min(1.0, sum(self.ui.values()))
self.processors[0].set_speed(utilization)
def on_activate(self, job):
self.ui[job.task] = job.wcet / job.period
self.ready_list.append(job)
self.adjust_speed()
job.cpu.resched()
def on_terminated(self, job):
self.ui[job.task] = job.actual_computation_time / job.period
self.ready_list.remove(job)
self.adjust_speed()
job.cpu.resched()
def schedule(self, cpu):
if self.ready_list:
# job with the highest priority
job = min(self.ready_list, key=lambda x: x.absolute_deadline)
else:
job = None
return (job, cpu)
"""
Implementation of the DP-WRAP algorithm as presented by Levin et al. in
"DP-FAIR: A Simple Model for Understanding Optimal Multiprocessor Scheduling".
"""
from simso.core import Scheduler, Timer
from math import ceil
class DP_WRAP(Scheduler):
def init(self):
self.t_f = 0
self.waiting_schedule = False
self.mirroring = False
self.allocations = []
self.timers = {}
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def on_activate(self, job):
self.reschedule()
def init_interval(self):
"""
Determine the end of the interval and compute allocation of the jobs
to the processors using the McNaughton algorithm.
"""
self.allocations = [[0, []] for _ in self.processors]
self.t_f = ceil(min([x.job.absolute_deadline for x in self.task_list]
) * self.sim.cycles_per_ms)
# Duration that can be allocated for each processor.
w = int(self.t_f - self.sim.now())
p = 0 # Processor id.
for task in self.task_list:
job = task.job
if not job.is_active():
continue
# The "fair" duration for this job on that interval. Rounded to the
# upper integer to avoid durations that are not multiples of
# cycles.
duration = ceil(w * job.wcet / job.period)
if self.allocations[p][0] + duration <= w:
self.allocations[p][1].append((job, duration))
self.allocations[p][0] += duration
else:
# Add first part to the end of the current processor p:
duration1 = w - self.allocations[p][0]
if duration1 > 0:
self.allocations[p][1].append((job, duration1))
self.allocations[p][0] = w
if p + 1 < len(self.processors):
# Add the second part:
duration2 = duration - duration1
self.allocations[p + 1][1].append((job, duration2))
self.allocations[p + 1][0] += duration2
else:
# Because every durations are rounded to the upper value,
# the last job may have not enough space left.
# This could probably be improved.
print("Warning: didn't allowed enough time to last task.",
duration - duration1)
break
p += 1
for allocation in self.allocations:
if allocation[0] < w:
allocation[1].append((None, w - allocation[0]))
if self.mirroring:
for allocation in self.allocations:
# Rerverse the order of the jobs.
# Note: swapping the first and last items should be enough.
allocation[1].reverse()
self.mirroring = not self.mirroring
def end_event(self, z, job):
"""
Called when a job's budget has expired.
"""
del self.timers[job]
l = self.allocations[z][1]
if l and l[0][0] is job:
l.pop(0)
self.reschedule(self.processors[z])
def schedule(self, cpu):
"""
Schedule main method.
"""
self.waiting_schedule = False
# At the end of the interval:
if self.sim.now() >= self.t_f:
self.init_interval()
# Stop current timers.
for job, timer in self.timers.items():
timer.stop()
self.timers = {}
# Set timers to stop the jobs that will run.
for z, proc in enumerate(self.processors):
l = self.allocations[z][1]
if l and l[0][0] not in self.timers:
timer = Timer(self.sim, DP_WRAP.end_event,
(self, z, l[0][0]), l[0][1],
cpu=proc, in_ms=False)
timer.start()
self.timers[l[0][0]] = timer
# Schedule the activated tasks on each processor.
decisions = []
for z, proc in enumerate(self.processors):
l = self.allocations[z][1]
if not l[0][0] or l[0][0].is_active():
decisions.append((l[0][0] if l else None, proc))
return decisions
"""
Implementation of the Global-EDF (Earliest Deadline First) for multiprocessor
architectures.
"""
from simso.core import Scheduler
class EDF(Scheduler):
"""Earliest Deadline First"""
def init(self):
self.ready_list = []
def on_activate(self, job):
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
if job in self.ready_list:
self.ready_list.remove(job)
else:
job.cpu.resched()
def schedule(self, cpu):
ready_jobs = [j for j in self.ready_list if j.is_active()]
if ready_jobs:
# Key explanations:
# First the free processors
# Among the others, get the one with the greatest deadline
# If equal, take the one used to schedule
key = lambda x: (
1 if not x.running else 0,
x.running.absolute_deadline if x.running else 0,
1 if x is cpu else 0
)
cpu_min = max(self.processors, key=key)
job = min(ready_jobs, key=lambda x: x.absolute_deadline)
if (cpu_min.running is None or
cpu_min.running.absolute_deadline > job.absolute_deadline):
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
return (job, cpu_min)
"""
Implementation of the Global-EDF (Earliest Deadline First) for multiprocessor
architectures.
"""
from simso.core import Scheduler
class EDF2(Scheduler):
"""Earliest Deadline First"""
def init(self):
self.running_jobs = []
self.sched_to_do = False
def on_activate(self, job):
self.resched(job.cpu)
def on_terminated(self, job):
self.resched(job.cpu)
def resched(self, cpu):
if not self.sched_to_do:
cpu.resched()
self.sched_to_do = True
def schedule(self, cpu):
self.sched_to_do = False
decisions = []
ready_jobs = sorted(
[t.job for t in self.task_list if t.job.is_active()],
key=lambda x: x.absolute_deadline)
jobs = ready_jobs[:len(self.processors)]
# Bind jobs to processors:
available_procs = list(self.processors)
was_not_running = []
for job in jobs:
if job in self.running_jobs:
available_procs.remove(job.cpu)
else:
was_not_running.append(job)
remaining_jobs = []
for job in was_not_running:
if job.cpu in available_procs:
decisions.append((job, job.cpu))
available_procs.remove(job.cpu)
else:
remaining_jobs.append(job)
for p, job in enumerate(remaining_jobs):
decisions.append((job, available_procs[p]))
self.running_jobs = jobs
return decisions
"""
Implementation of EDF-US[1/2].
"""
from simso.core import Scheduler
class EDF_US(Scheduler):
def init(self):
self.ready_list = []
def on_activate(self, job):
if job.wcet > job.period / 2:
job.priority = 0
else:
job.priority = job.absolute_deadline
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
if job in self.ready_list:
self.ready_list.remove(job)
else:
job.cpu.resched()
def schedule(self, cpu):
if self.ready_list:
# Key explanations:
# First the free processors
# Among the others, get the one with the greatest deadline
# If equal, take the one used to schedule
key = lambda x: (
1 if not x.running else 0,
x.running.priority if x.running else 0,
1 if x is cpu else 0
)
cpu_min = max(self.processors, key=key)
job = min([j for j in self.ready_list if j.is_active()],
key=lambda x: x.priority)
if (cpu_min.running is None or
cpu_min.running.priority > job.priority):
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
return (job, cpu_min)
"""
Earliest Deadline First algorithm for uniprocessor architectures.
"""
from simso.core import Scheduler
class EDF_mono(Scheduler):
def init(self):
self.ready_list = []
def on_activate(self, job):
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
self.ready_list.remove(job)
job.cpu.resched()
def schedule(self, cpu):
if self.ready_list:
# job with the highest priority
job = min(self.ready_list, key=lambda x: x.absolute_deadline)
else:
job = None
return (job, cpu)
"""
Implementation of the EDHS Scheduler.
EDHS is a semi-partitionned scheduler proposed by Kato et al in
"Semi-Partitioning Technique for Multiprocessor Real-Time Scheduling".
"""
from simso.core import Scheduler, Timer
from simso.core.Scheduler import SchedulerInfo
from fractions import Fraction
from math import ceil
migrating_tasks = {}
# Mapping processor to scheduler.
map_cpu_sched = {}
class EDF_modified(Scheduler):
"""
An EDF mono-processor scheduler modified to accept migrating jobs.
A migrating job has an infinite priority.
"""
def init(self):
self.ready_list = []
self.migrating_job = None
def _resched(self):
self.processors[0].resched()
def on_activate(self, job):
self.ready_list.append(job)
self._resched()
def on_terminated(self, job):
if job is self.migrating_job:
self.migrating_job = None
elif job in self.ready_list:
self.ready_list.remove(job)
self._resched()
def accept_migrating_job(self, i, job, budget):
self.migrating_job = job
job.task.cpu = self.processors[0]
# Set timer for end.
self.timer = Timer(self.sim, EDF_modified.end_migrating_job,
(self, i), budget, cpu=self.processors[0],
in_ms=False)
self.timer.start()
self._resched()
def end_migrating_job(self, i):
self.processors[0].resched()
if self.migrating_job and i < len(migrating_tasks[self.migrating_job.task]) - 1:
ncpu, nbudget = migrating_tasks[self.migrating_job.task][i + 1]
sched = map_cpu_sched[ncpu]
sched.accept_migrating_job(i + 1, self.migrating_job, nbudget)
self.migrating_job = None
def schedule(self, cpu):
if self.migrating_job:
job = self.migrating_job
elif self.ready_list:
job = min(self.ready_list, key=lambda x: x.absolute_deadline)
else:
job = None
return (job, cpu)
class EDHS(Scheduler):
def init(self):
# Mapping task to scheduler.
self.map_task_sched = {}
cpus = []
for cpu in self.processors:
# Append the processor to a list with an initial utilization of 0.
cpus.append([cpu, Fraction(0)])
# Instantiate a scheduler.
sched = EDF_modified(self.sim, SchedulerInfo("EDF_modified",
EDF_modified))
sched.add_processor(cpu)
sched.init()
# Affect the scheduler to the processor.
map_cpu_sched[cpu] = sched
# First Fit
for task in self.task_list:
j = 0
# Find a processor with free space.
while cpus[j][1] + Fraction(task.wcet) / Fraction(task.period) > 1.0:
j += 1
if j >= len(self.processors):
migrating_tasks[task] = []
break
if j == len(self.processors):
continue
# Get the scheduler for this processor.
sched = map_cpu_sched[cpus[j][0]]
# Affect it to the task.
self.map_task_sched[task.identifier] = sched
sched.add_task(task)
# Put the task on that processor.
task.cpu = cpus[j][0]
self.sim.logger.log("task " + task.name + " on " + task.cpu.name)
# Update utilization.
cpus[j][1] += Fraction(task.wcet) / Fraction(task.period)
for task, l in migrating_tasks.items():
rem = Fraction(task.wcet) / Fraction(task.period)
for cpu, cpu_u in cpus:
if cpu_u < 1 and rem > 0:
u = min(rem, 1 - cpu_u)
l.append((cpu, ceil(u * task.period * self.sim.cycles_per_ms)))
rem -= u
def get_lock(self):
# No lock mechanism is needed.
return True
def schedule(self, cpu):
return map_cpu_sched[cpu].schedule(cpu)
def on_activate(self, job):
try:
self.map_task_sched[job.task.identifier].on_activate(job)
except KeyError:
cpu, budget = migrating_tasks[job.task][0]
sched = map_cpu_sched[cpu]
sched.accept_migrating_job(0, job, budget)
def on_terminated(self, job):
try:
self.map_task_sched[job.task.identifier].on_terminated(job)
except KeyError:
sched = map_cpu_sched[job.task.cpu]
sched.on_terminated(job)
#!/usr/bin/python
# coding=utf-8
from simso.core import Scheduler, Timer
class EDZL(Scheduler):
"""
EDZL Scheduler. EDF Scheduler with zero laxity events.
"""
def init(self):
self.ready_list = []
self.zl_timer = None
def on_activate(self, job):
job.priority = job.absolute_deadline
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
# ce test est peut-être utile en cas d'avortement de tâche.
if job in self.ready_list:
self.ready_list.remove(job)
if self.zl_timer and job == self.zl_timer[0]:
self.zl_timer[1].stop()
job.cpu.resched()
def zero_laxity(self, job):
if job in self.ready_list:
job.priority = 0
job.cpu.resched()
else:
print(self.sim.now(), job.name)
def schedule(self, cpu):
"""
Basically a EDF scheduling but using a priority attribute.
"""
ready_jobs = [j for j in self.ready_list if j.is_active()]
if ready_jobs:
selected_job = None
key = lambda x: (
1 if x.running else -1,
-x.running.priority if x.running else 0,
-1 if x is cpu else 1)
cpu_min = min(self.processors, key=key)
job = min(ready_jobs, key=lambda x: x.priority)
if cpu_min.running is None or \
cpu_min.running.priority > job.priority:
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
selected_job = (job, cpu_min)
# Recherche du prochain event ZeroLaxity pour configurer le timer.
minimum = None
for job in self.ready_list:
zl_date = int((job.absolute_deadline - job.ret
) * self.sim.cycles_per_ms - self.sim.now())
if (minimum is None or minimum[0] > zl_date) and zl_date > 0:
minimum = (zl_date, job)
if self.zl_timer:
self.zl_timer[1].stop()
if minimum:
self.zl_timer = (minimum[0], Timer(
self.sim, EDZL.zero_laxity, (self, minimum[1]),
minimum[0], cpu=cpu, in_ms=False))
self.zl_timer[1].start()
return selected_job
from simso.core import Scheduler, Timer
from simso.core.Scheduler import SchedulerInfo
from fractions import Fraction
from math import ceil
class Modified_EDF(Scheduler):
def init(self):
self.ready_list = []
self.migrating_task1 = None # sous la forme (task, rate)
self.migrating_task2 = None
self.migrating_job1 = None
self.migrating_job2 = None
self.next_deadline = 0
def on_activate(self, job):
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
self.ready_list.remove(job)
job.cpu.resched()
def conf(self, next_deadline):
if next_deadline > self.next_deadline:
self.next_deadline = next_deadline
self.migrating_task1, self.migrating_task2 = \
self.migrating_task2, self.migrating_task1
if self.migrating_task1:
time_a = ceil((next_deadline - self.sim.now())
* self.migrating_task1[1])
self.timer_a = Timer(self.sim, Modified_EDF.on_end_migrating1,
(self,), time_a, cpu=self.processors[0],
in_ms=False)
self.timer_a.start()
self.migrating_job2 = None
if self.migrating_task2:
time_b = int((next_deadline - self.sim.now())
* (1 - self.migrating_task2[1]))
self.timer_b = Timer(
self.sim, Modified_EDF.on_start_migrating2, (self,),
time_b, cpu=self.processors[0], in_ms=False)
self.timer_b.start()
self.processors[0].resched()
if self.migrating_task1:
self.migrating_job1 = self.migrating_task1[0].job
self.processors[0].resched()
else:
self.migrating_job1 = None
def on_end_migrating1(self):
self.migrating_job1 = None
self.processors[0].resched()
def on_start_migrating2(self):
self.migrating_job2 = self.migrating_task2[0].job
self.processors[0].resched()
def schedule(self, cpu):
if self.migrating_job1 and self.migrating_job1.is_active():
return (self.migrating_job1, cpu)
if self.migrating_job2 and self.migrating_job2.is_active():
return (self.migrating_job2, cpu)
if self.ready_list:
job = min(self.ready_list, key=lambda x: x.absolute_deadline)
else:
job = None
return (job, cpu)
class Group(object):
def __init__(self, sim):
self.tasks = []
self.schedulers = []
self.sim = sim
def compute_next_deadline(self):
return min([task.jobs[-1].absolute_deadline
for task in self.tasks if task.jobs]) \
* self.sim.cycles_per_ms
class EKG(Scheduler):
def init(self):
self.groups = []
self.task_to_group = {}
try:
k = self.data['K']
except KeyError:
k = len(self.processors)
m = len(self.processors)
sep = Fraction(k) / Fraction(1 + k) if k < m else 1
light_tasks = [t for t in self.task_list if t.wcet < sep * t.period]
heavy_tasks = [t for t in self.task_list if t.wcet >= sep * t.period]
# Mapping task to scheduler.
self.map_task_sched = {}
self.map_cpu_sched = {}
cpus = []
for i, cpu in enumerate(self.processors):
# Instantiate a scheduler.
sched = Modified_EDF(self.sim, SchedulerInfo("Modified_EDF",
Modified_EDF))
sched.add_processor(cpu)
sched.init()
# Append the processor to a list with an initial utilization of 0.
cpus.append([cpu, sched, Fraction(0)])
# Affect the scheduler to the processor.
self.map_cpu_sched[cpu.identifier] = sched
# Affect to the correct group.
if i >= len(heavy_tasks):
if (i - len(heavy_tasks)) % k == 0:
group = Group(self.sim)
group.schedulers.append(sched)
self.groups.append(group)
else:
self.groups[-1].schedulers.append(sched)
# Affect heavy tasks to individual processors.
p = 0
for task in heavy_tasks:
cpu, sched, _ = cpus[p]
# Affect the task to the processor.
self.map_task_sched[task.identifier] = sched
sched.add_task(task)
# Put the task on that processor.
task.cpu = cpu
p += 1
self.task_to_group[task] = None
# Custom Next Fit
for task in light_tasks:
g = (p - len(heavy_tasks)) // k
if cpus[p][2] + Fraction(task.wcet) / Fraction(task.period) <= 1.0:
cpu, sched, _ = cpus[p]
# Affect the task to the processors.
self.map_task_sched[task.identifier] = sched
sched.add_task(task)
# Put the task on that processor.
task.cpu = cpu
cpus[p][2] += Fraction(task.wcet) / Fraction(task.period)
self.groups[g].tasks.append(task)
self.task_to_group[task] = self.groups[g]
if cpus[p][2] == 1:
p += 1
else:
if (p + 1 - len(heavy_tasks)) % k == 0:
cpu, sched, _ = cpus[p + 1]
# Affect the task to the processor.
self.map_task_sched[task.identifier] = sched
sched.add_task(task)
# Put the task on that processor.
task.cpu = cpu
cpus[p + 1][2] += \
Fraction(task.wcet) / Fraction(task.period)
self.groups[g + 1].tasks.append(task)
self.task_to_group[task] = self.groups[g + 1]
else:
# Split in 2.
u1 = 1 - cpus[p][2]
u2 = Fraction(task.wcet) / Fraction(task.period) - u1
cpus[p][1].migrating_task2 = (task, u1)
cpus[p + 1][1].migrating_task1 = (task, u2)
cpus[p + 1][2] = u2
self.groups[g].tasks.append(task)
self.task_to_group[task] = self.groups[g]
p += 1
def schedule(self, cpu):
return self.map_cpu_sched[cpu.identifier].schedule(cpu)
def on_activate(self, job):
group = self.task_to_group[job.task]
if group:
nd = group.compute_next_deadline()
for sched in group.schedulers:
sched.conf(nd)
try:
self.map_task_sched[job.task.identifier].on_activate(job)
except KeyError:
job.cpu.resched()
def on_terminated(self, job):
try:
self.map_task_sched[job.task.identifier].on_terminated(job)
except KeyError:
job.cpu.resched()
# coding=utf-8
from simso.core import Scheduler, Timer
from math import ceil
class EPDF(Scheduler):
"""Earliest Pseudo-Deadline First"""
quantum = 1 # ms
class PseudoJob(object):
def __init__(self, job, seq):
self.job = job
self.release_date = int((seq - 1) / (job.wcet / job.deadline))
self.deadline = int(ceil(seq / (job.wcet / job.deadline)))
self.seq = seq
def cmp_key(self):
return self.deadline * EPDF.quantum + self.job.activation_date
def init(self):
self.ready_list = []
self.pseudo_job = {}
self.timers = []
def pseudo_terminate(self, pseudo_job):
if self.pseudo_job[pseudo_job.job.cpu] == pseudo_job:
self.pseudo_job[pseudo_job.job.cpu] = None
pseudo_job.job.cpu.resched()
def pseudo_activate(self, pseudo_job):
pseudo_job.job.cpu.resched()
self.ready_list.append(pseudo_job)
def on_activate(self, job):
# First pseudo-activation
pseudo_job = EPDF.PseudoJob(job, 1)
self.pseudo_activate(pseudo_job)
# Set next pseudo activations :
while pseudo_job.seq * self.quantum < job.wcet:
pseudo_job = EPDF.PseudoJob(job, pseudo_job.seq + 1)
timer = Timer(self.sim, EPDF.pseudo_activate, (self, pseudo_job),
pseudo_job.release_date * self.quantum -
self.sim.now() / self.sim.cycles_per_ms +
job.activation_date, cpu=job.cpu,
in_ms=True)
timer.start()
self.timers.append(timer)
def on_terminated(self, job):
self.ready_list = [x for x in self.ready_list if x.job is not job]
def schedule(self, cpu):
if len(self.ready_list) > 0:
# Explication sur la key:
# En priorité, on met tous les processeurs libres au début.
# Ensuite, on trie tout par ordre décroissant de la deadline.
# Et on départage en préférant le processeur "cpu".
key = lambda x: (
1 if (not x.running) or (not self.pseudo_job[x]) else 0,
self.pseudo_job[x].cmp_key() if x.running and
self.pseudo_job[x] else None,
1 if x is cpu else 0
)
cpu_min = max(self.processors, key=key)
pjob = min(self.ready_list, key=lambda x: x.cmp_key())
if (cpu_min.running is None or
self.pseudo_job[cpu_min] is None or
self.pseudo_job[cpu_min].cmp_key() > pjob.cmp_key()):
self.ready_list.remove(pjob)
if cpu_min.running and self.pseudo_job[cpu_min]:
self.ready_list.append(self.pseudo_job[cpu_min])
self.pseudo_job[cpu_min] = pjob
timer = Timer(
self.sim, EPDF.pseudo_terminate, (self, pjob),
pjob.seq * self.quantum - pjob.job.computation_time,
cpu=cpu_min, in_ms=True)
timer.start()
self.timers.append(timer)
return (pjob.job, cpu_min)
elif self.pseudo_job[cpu] is None:
return (None, cpu)
# coding=utf-8
from simso.core import Scheduler, Timer
from math import ceil
def rounded_wcet(job, q=None):
return rounded_wcet_cycles(job, q) / job.sim.cycles_per_ms
def rounded_wcet_cycles(job, q=None):
if q is None:
q = ER_PD2.quantum
wcet_cycles = job.wcet * job.sim.cycles_per_ms
if wcet_cycles % q:
return wcet_cycles + q - (wcet_cycles % q)
else:
return wcet_cycles
class VirtualJob(object):
"""
A Virtual Job contains the list of the pseudo jobs for an actual job.
"""
def __init__(self, job):
self.job = job
self.pseudo_jobs = []
self.cur = 0
seq = 0
# Create pseudo jobs:
while seq * ER_PD2.quantum <= job.wcet * job.sim.cycles_per_ms:
self.pseudo_jobs.append(PseudoJob(job, seq + 1))
seq += 1
def get_next_job(self):
if self.cur < len(self.pseudo_jobs) - 1:
self.cur += 1
job = self.pseudo_jobs[self.cur]
return job
def get_current_job(self):
if self.cur < len(self.pseudo_jobs):
return self.pseudo_jobs[self.cur]
class PseudoJob(object):
def __init__(self, job, seq):
self.job = job
self.release_date = int(job.deadline * (seq - 1) / rounded_wcet(job)
) * ER_PD2.quantum
self.deadline = int(ceil(job.deadline * seq / rounded_wcet(job))
) * ER_PD2.quantum
self.seq = seq
self.succ_bit = PseudoJob.succ_bit(job, seq)
if rounded_wcet(job) / job.deadline < 0.5:
self.group_deadline = 0
else:
j = seq + 1
while (j * ER_PD2.quantum <= rounded_wcet_cycles(job) and
PseudoJob.succ_bit(job, j - 1) == 1 and
PseudoJob.win_size(job, j) == 2):
# while the window size is 2 and succ_bit of the prev is 1
j += 1
self.group_deadline = int(ceil(
job.deadline * (j - 1) / rounded_wcet(job)
* job.sim.cycles_per_ms))
@property
def absolute_releasedate(self):
return self.release_date + \
self.job.activation_date * self.job.sim.cycles_per_ms
def cmp_key(self):
# Si le premier parametre est identique, il regarde le second, etc.
cycles_per_ms = self.job.sim.cycles_per_ms
return (self.deadline + self.job.activation_date * cycles_per_ms,
-self.succ_bit,
-(self.job.activation_date + self.group_deadline))
@staticmethod
def succ_bit(job, seq):
return int(ceil(seq * job.deadline / rounded_wcet(job))) \
- int(seq * job.deadline / rounded_wcet(job))
@staticmethod
def win_size(job, seq):
return int(ceil(seq * job.deadline / rounded_wcet(job))) \
- int((seq - 1) * job.deadline / rounded_wcet(job))
class ER_PD2(Scheduler):
def init(self):
self.ready_list = []
self.timers = []
self.terminate_timers = []
self.waiting_schedule = False
self.running_vjobs = []
ER_PD2.quantum = self.sim.cycles_per_ms // 10 # 0.1ms.
#ER_PD2.quantum = 1000000
#while not self.is_schedulable() and ER_PD2.quantum > 1000:
# ER_PD2.quantum /= 2
self.timer = Timer(
self.sim, ER_PD2.reschedule, (self, ), ER_PD2.quantum,
cpu=self.processors[0], in_ms=False, one_shot=False)
self.timer.start()
def is_schedulable(self, q=None):
load = 0.0
cycles_per_ms = self.sim.cycles_per_ms
for task in self.task_list:
wcet = rounded_wcet_cycles(task, q)
load += wcet / task.period
if wcet > task.period * cycles_per_ms \
or load > len(self.processors) * cycles_per_ms:
return False
return True
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def virtual_terminate(self, virtual_job):
pjob = virtual_job.get_next_job()
if not pjob or not virtual_job.job.is_active():
self.ready_list.remove(virtual_job)
def on_activate(self, job):
virtual_job = VirtualJob(job)
self.ready_list.append(virtual_job)
if self.sim.now() == 0:
self.reschedule()
def schedule(self, cpu):
self.waiting_schedule = False
decisions = []
for vjob in self.running_vjobs:
self.virtual_terminate(vjob)
vjobs = [vjob for vjob in self.ready_list if vjob.job.is_active()]
self.running_vjobs = sorted(
vjobs,
key=lambda x: x.get_current_job().cmp_key()
)[:len(self.processors)]
selected_jobs = [vjob.job for vjob in self.running_vjobs]
remaining_jobs = selected_jobs[:]
available_procs = []
# Remove from the list of remaining jobs the jobs that already runs.
for proc in self.processors:
if proc.running in selected_jobs:
# This processor keeps running the same job.
remaining_jobs.remove(proc.running)
else:
# This processor is not running a selected job.
available_procs.append(proc)
# Jobs not currently running
for vjob in self.running_vjobs:
if vjob.job in remaining_jobs:
decisions.append((vjob.job, available_procs.pop()))
# Unused processors
for proc in available_procs:
decisions.append((None, proc))
return decisions
#!/usr/bin/python
# coding=utf-8
from simso.core import Scheduler
class FP(Scheduler):
""" Fixed Priority (use 'priority' field) """
def init(self):
self.ready_list = []
def on_activate(self, job):
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
job.cpu.resched()
def schedule(self, cpu):
if self.ready_list:
# Get a free processor or a processor running a low priority job.
key = lambda x: (
1 if x.running else 0,
x.running.data['priority'] if x.running else 0,
0 if x is cpu else 1
)
cpu_min = min(self.processors, key=key)
# Get the job with the highest priority.
job = max(self.ready_list, key=lambda x: x.data['priority'])
if (cpu_min.running is None or
cpu_min.running.data['priority'] < job.data['priority']):
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
return (job, cpu_min)
return None
#!/usr/bin/python
# coding=utf-8
from simso.core.Scheduler import SchedulerInfo
from EDF_mono import EDF_mono
from simso.utils import PartitionedScheduler
class Fixed_PEDF(PartitionedScheduler):
def init(self):
PartitionedScheduler.init(self, SchedulerInfo("EDF_mono", EDF_mono))
def packer(self):
for task in self.task_list:
# Affect it to the task.
cpu = next(proc for proc in self.processors
if proc.identifier == task.data["cpu"])
self.affect_task_to_processor(task, cpu)
return True
"""
Implementation of the Global-Fair Lateness as presented by Erickson and
Anderson in Fair lateness scheduling: Reducing maximum lateness in G-EDF-like
scheduling.
"""
from simso.core import Scheduler
class G_FL(Scheduler):
"""Earliest Deadline First"""
def init(self):
self.ready_list = []
def on_activate(self, job):
job.priority = job.activation_date + job.deadline - \
((len(self.processors) - 1.0) / len(self.processors)) * job.wcet
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
if job in self.ready_list:
self.ready_list.remove(job)
else:
job.cpu.resched()
def schedule(self, cpu):
ready_jobs = [j for j in self.ready_list if j.is_active()]
if ready_jobs:
# Key explanations:
# First the free processors
# Among the others, get the one with the greatest deadline
# If equal, take the one used to schedule
key = lambda x: (
1 if not x.running else 0,
x.running.priority if x.running else 0,
1 if x is cpu else 0
)
cpu_min = max(self.processors, key=key)
job = min(ready_jobs, key=lambda x: x.priority)
if (cpu_min.running is None or
cpu_min.running.priority > job.priority):
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
return (job, cpu_min)
from simso.core import Scheduler, Timer
class G_FL_ZL(Scheduler):
"""
G_FL with Zero Laxity Scheduler.
"""
def init(self):
self.ready_list = []
self.zl_timer = None
def on_activate(self, job):
job.priority = job.activation_date + job.deadline - \
((len(self.processors) - 1.0) / len(self.processors)) * job.wcet
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
if job in self.ready_list:
self.ready_list.remove(job)
if self.zl_timer and job == self.zl_timer[0]:
self.zl_timer[1].stop()
job.cpu.resched()
def zero_laxity(self, job):
if job in self.ready_list:
job.priority = 0
job.cpu.resched()
else:
print(self.sim.now(), job.name)
def schedule(self, cpu):
"""
Basically a EDF scheduling but using a priority attribute.
"""
if self.ready_list:
selected_job = None
key = lambda x: (
1 if x.running else -1,
-x.running.priority if x.running else 0,
-1 if x is cpu else 1)
cpu_min = min(self.processors, key=key)
job = min(self.ready_list, key=lambda x: x.priority)
if cpu_min.running is None or \
cpu_min.running.priority > job.priority:
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
selected_job = (job, cpu_min)
minimum = None
for job in self.ready_list:
zl_date = int((job.absolute_deadline - job.ret
) * self.sim.cycles_per_ms - self.sim.now())
if (minimum is None or minimum[0] > zl_date) and zl_date > 0:
minimum = (zl_date, job)
if self.zl_timer:
self.zl_timer[1].stop()
if minimum:
self.zl_timer = (minimum[0], Timer(
self.sim, G_FL_ZL.zero_laxity, (self, minimum[1]),
minimum[0], cpu=cpu, in_ms=False))
self.zl_timer[1].start()
return selected_job
"""
Partitionned EDF using PartitionedScheduler.
Try to load balance the tasks among the processors.
"""
from simso.core.Scheduler import SchedulerInfo
from EDF_mono import EDF_mono
from simso.utils import PartitionedScheduler
class LB_P_EDF(PartitionedScheduler):
def init(self):
PartitionedScheduler.init(self, SchedulerInfo("EDF_mono", EDF_mono))
def packer(self):
# First Fit
cpus = [[cpu, 0] for cpu in self.processors]
for task in self.task_list:
m = cpus[0][1]
j = 0
# Find the processor with the lowest load.
for i, c in enumerate(cpus):
if c[1] < m:
m = c[1]
j = i
# Affect it to the task.
self.affect_task_to_processor(task, cpus[j][0])
# Update utilization.
cpus[j][1] += float(task.wcet) / task.period
return True
from simso.core import Scheduler, Timer
class LLF(Scheduler):
"""Least Laxity First"""
def init(self):
self.ready_list = []
self.timer = Timer(self.sim, LLF.compute_laxity,
(self, self.processors[0]), 1, one_shot=False,
cpu=self.processors[0], overhead=.01)
self.timer.start()
def compute_laxity(self, cpu):
if self.ready_list:
for job in self.ready_list:
job.laxity = job.absolute_deadline - \
(job.ret + self.sim.now_ms())
cpu.resched()
def on_activate(self, job):
self.ready_list.append(job)
self.compute_laxity(job.cpu)
def on_terminated(self, job):
self.ready_list.remove(job)
self.compute_laxity(job.cpu)
def schedule(self, cpu):
decisions = []
if self.ready_list:
# Sort according to the laxity.
self.ready_list.sort(key=lambda x: (x.laxity, x.task.identifier))
# m : Nombre de processeurs.
m = len(self.processors)
# Available processors:
l = (proc for proc in self.processors
if proc.running not in self.ready_list[:m])
# The first m jobs should be running:
for job in self.ready_list[:m]:
if not job.is_running():
proc = next(l)
decisions.append((job, proc))
return decisions
"""
Implementation of the LLREF scheduler as presented by Cho et al. in
"An Optimal Real-Time Scheduling Algorithm for Multiprocessors".
"""
from simso.core import Scheduler, Timer
from math import ceil
class LLREF(Scheduler):
def init(self):
self.selected_jobs = [] # Jobs currently running.
self.budget = {} # Budgets for the active jobs.
self.next_deadline = 0
self.waiting_schedule = False
self.last_update = 0 # Used to update the budget.
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def on_activate(self, job):
"""
Deal with a job activation. The budget for this job is computed.
"""
# Compute budget for this newly activated job
window = self.next_deadline - self.sim.now()
self.budget[job] = window * job.wcet / job.period
# Find the next absolute deadline among the ready jobs.
m_dl = min([rjob.absolute_deadline for rjob in self.budget.keys()]) \
* self.sim.cycles_per_ms
# Refill the budgets if we change the interval
if m_dl != self.next_deadline:
window = m_dl - self.next_deadline
self.next_deadline = m_dl
for j in self.budget.keys():
self.budget[j] += window * j.wcet / j.period
# There's a new job, the system should be rescheduled.
self.reschedule()
def on_terminated(self, job):
if job in self.budget:
del self.budget[job]
def update_budget(self):
"""
Remove budget from the currently executing jobs.
"""
time_since_last_update = self.sim.now() - self.last_update
for job in self.selected_jobs:
if job in self.budget:
if job.is_active():
self.budget[job] -= time_since_last_update
else:
del self.budget[job]
self.last_update = self.sim.now()
def date_next_event(self, selected, not_selected):
next_event = 0
if selected:
next_bottom_hitting = selected[-1][1]
next_event = next_bottom_hitting
if not_selected:
next_ceiling_hitting = self.next_deadline - self.sim.now() \
- not_selected[0][1]
if next_ceiling_hitting < next_bottom_hitting:
next_event = next_ceiling_hitting
return next_event
def schedule(self, cpu):
self.waiting_schedule = False
self.update_budget()
# Sort the jobs by budgets.
sorted_budgets = sorted(
[(x, ceil(y)) for x, y in self.budget.items()],
key=lambda x: (-x[1], x[0].name))
selected = sorted_budgets[:len(self.processors)]
not_selected = sorted_budgets[len(self.processors):]
# Compute the (relative) date of the next event.
next_event = self.date_next_event(selected, not_selected)
if next_event > 0:
# Set a timer to reschedule the system at that date.
self.timer_a = Timer(self.sim, LLREF.reschedule, (self,),
next_event, cpu=cpu, in_ms=False)
self.timer_a.start()
# Allocate the selected jobs to the processors.
# The processors already running selected jobs are not changed.
available_procs = []
self.selected_jobs = [s[0] for s in selected if s[1] > 0]
remaining_jobs = self.selected_jobs[:]
for proc in self.processors:
if proc.running in self.selected_jobs:
# This processor keeps running the same job.
remaining_jobs.remove(proc.running)
else:
# This processor is not running a selected job.
available_procs.append(proc)
# The remaining processors are running the remaining jobs or None.
padded_remaining_jobs = remaining_jobs + \
[None] * (len(available_procs) - len(remaining_jobs))
# zip create a list of couples (job, proc) using the list of remaining
# jobs and the list of available processors.
decisions = list(zip(padded_remaining_jobs, available_procs))
return decisions
"""
Implementation of the LLREF scheduler as presented by Cho et al. in
"An Optimal Real-Time Scheduling Algorithm for Multiprocessors".
"""
from simso.core import Scheduler, Timer
from math import ceil
class LLREF2(Scheduler):
def init(self):
self.selected_jobs = [] # Jobs currently running.
self.budget = {} # Budgets for the active jobs.
self.next_deadline = 0
self.waiting_schedule = False
self.last_update = 0 # Used to update the budget.
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def on_activate(self, job):
"""
Deal with a job activation. The budget for this job is computed.
"""
if job.wcet == 0:
return
# Compute budget for this newly activated job
window = self.next_deadline - self.sim.now()
self.budget[job] = window * job.wcet / job.period
# Find the next absolute deadline among the ready jobs.
m_dl = min([rjob.absolute_deadline for rjob in self.budget.keys()]) \
* self.sim.cycles_per_ms
# Refill the budgets if we change the interval
if m_dl != self.next_deadline:
window = m_dl - self.next_deadline
self.next_deadline = m_dl
for j in self.budget.keys():
self.budget[j] += window * j.wcet / j.period
# There's a new job, the system should be rescheduled.
self.reschedule()
def on_terminated(self, job):
if job in self.budget:
del self.budget[job]
def update_budget(self):
"""
Remove budget from the currently executing jobs.
"""
time_since_last_update = self.sim.now() - self.last_update
for job in self.selected_jobs:
if job in self.budget:
if job.is_active():
self.budget[job] -= time_since_last_update
else:
del self.budget[job]
self.last_update = self.sim.now()
def date_next_event(self, selected, not_selected):
next_event = None
if selected:
next_bottom_hitting = min(ceil(y) for _, y in selected)
next_event = next_bottom_hitting
if not_selected:
next_ceiling_hitting = self.next_deadline - self.sim.now() \
- ceil(max(y for _, y in not_selected))
if next_event is None or next_ceiling_hitting < next_event:
next_event = next_ceiling_hitting
return next_event if next_event else 0
def select_jobs(self):
window = self.next_deadline - self.sim.now()
res = [(job, b) for job, b in self.budget.items()
if window <= ceil(b) and job.is_active()]
for job, b in sorted(self.budget.items(), key=lambda x: -x[1]):
if b > 0 and (job, b) not in res and job.is_active():
res.append((job, b))
return (res[:len(self.processors)], res[len(self.processors):])
def schedule(self, cpu):
self.waiting_schedule = False
self.update_budget()
# Sort the jobs by budgets.
selected, not_selected = self.select_jobs()
# Compute the (relative) date of the next event.
next_event = self.date_next_event(selected, not_selected)
if next_event > 0:
# Set a timer to reschedule the system at that date.
self.timer_a = Timer(self.sim, LLREF2.reschedule, (self,),
next_event, cpu=cpu, in_ms=False)
self.timer_a.start()
# Allocate the selected jobs to the processors.
# The processors already running selected jobs are not changed.
available_procs = []
self.selected_jobs = [s[0] for s in selected]
remaining_jobs = self.selected_jobs[:]
for proc in self.processors:
if proc.running in self.selected_jobs:
# This processor keeps running the same job.
remaining_jobs.remove(proc.running)
else:
# This processor is not running a selected job.
available_procs.append(proc)
# The remaining processors are running the remaining jobs or None.
padded_remaining_jobs = remaining_jobs + \
[None] * (len(available_procs) - len(remaining_jobs))
# zip create a list of couples (job, proc) using the list of remaining
# jobs and the list of available processors.
decisions = list(zip(padded_remaining_jobs, available_procs))
return decisions
"""
Implementation of the LRE-TL scheduler as presented by S. Funk in "LRE-TL: An
Optimal Multiprocessor Scheduling Algorithm for Sporadic Task Sets".
"""
from simso.core import Scheduler, Timer
from heapq import heappush, heapreplace, heappop, heapify
from math import ceil
class LRE_TL(Scheduler):
def init(self):
"""
Initialization of the scheduler. This function is called when the
system is ready to run.
"""
self.t_f = 0
self.h_b = [] # Heap of running tasks.
self.h_c = [] # Heap of waiting tasks.
self.h_d = [] # Heap of deadlines.
self.pmin = min([task.period for task in self.task_list]) \
* self.sim.cycles_per_ms
self.evt_bc = False
self.activations = []
self.waiting_schedule = False
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def on_activate(self, job):
"""
A-event.
"""
self.activations.append(job.task)
self.reschedule()
def init_tl_plane(self):
decisions = []
for task in self.activations:
dl = int(task.job.absolute_deadline * self.sim.cycles_per_ms)
if dl not in self.h_d:
heappush(self.h_d, dl)
self.t_f = self.sim.now() + self.pmin
if self.h_d[0] <= self.t_f:
self.t_f = heappop(self.h_d)
z = 0
self.h_b = []
self.h_c = []
for task in self.task_list:
l = ceil(task.wcet * (self.t_f - self.sim.now()) / task.period)
if z < len(self.processors) and task.job.is_active():
heappush(self.h_b, (self.sim.now() + l, task))
decisions.append((task.job, self.processors[z]))
z += 1
else:
heappush(self.h_c, (self.t_f - l, task))
while z < len(self.processors):
decisions.append((None, self.processors[z]))
z += 1
return decisions
def handle_evt_a(self, task):
"""
Handle an "A-Event".
"""
decisions = []
tasks_h_c = [t for _, t in self.h_c]
tasks_h_b = [t for _, t in self.h_b]
if task not in tasks_h_b and task not in tasks_h_c:
l = ceil(task.wcet * (self.t_f - self.sim.now()) / task.period)
if len(self.h_b) < len(self.processors):
idle_proc = [z for z in self.processors
if not z.is_running()][0]
decisions.append((task.job, idle_proc))
heappush(self.h_b, (self.sim.now() + l, task))
else:
if task.wcet < task.period:
heappush(self.h_c, ((self.t_f - l), task))
else:
key_b, task_b = heapreplace(self.h_b, (self.t_f + l, task))
heappush(self.h_c, (self.t_f - key_b + self.sim.now()))
dl = int(task.job.absolute_deadline * self.sim.cycles_per_ms)
if dl not in self.h_d:
heappush(self.h_d, dl)
return decisions
def handle_evt_bc(self):
"""
Handle a "BC-Event".
"""
decisions = []
while self.h_b and self.h_b[0][0] == self.sim.now():
task_b = heappop(self.h_b)[1]
if self.h_c:
key_c, task_c = heappop(self.h_c)
heappush(self.h_b, (self.t_f - key_c + self.sim.now(), task_c))
decisions.append((task_c.job, task_b.cpu))
else:
decisions.append((None, task_b.cpu))
if self.h_c:
while self.h_c[0][0] == self.sim.now():
key_b, task_b = heappop(self.h_b)
key_c, task_c = heappop(self.h_c)
key_b = self.t_f - key_b + self.sim.now()
assert key_c != key_b, "Handle Evt BC failed."
key_c = self.t_f - key_c + self.sim.now()
heappush(self.h_b, (key_c, task_c))
heappush(self.h_c, (key_b, task_b))
decisions.append((task_c.job, task_b.cpu))
return decisions
def event_bc(self):
"""
B or C event.
"""
self.evt_bc = True
self.reschedule()
def schedule(self, cpu):
"""
Take the scheduling decisions.
"""
self.waiting_schedule = False
decisions = []
self.h_c = [(d, t) for d, t in self.h_c if t.job.is_active()]
heapify(self.h_c)
if self.sim.now() == self.t_f:
decisions = self.init_tl_plane()
else:
for task in self.activations:
decisions += self.handle_evt_a(task)
if self.evt_bc:
decisions += self.handle_evt_bc()
self.activations = []
self.evt_bc = False
if self.h_b:
t_next = self.h_b[0][0]
if self.h_c:
t_next = min(t_next, self.h_c[0][0])
self.timer = Timer(self.sim, LRE_TL.event_bc, (self,),
t_next - self.sim.now(),
cpu=self.processors[0], in_ms=False)
else:
self.timer = Timer(self.sim, LRE_TL.reschedule, (self,),
self.t_f - self.sim.now(),
cpu=self.processors[0], in_ms=False)
self.timer.start()
return decisions
from simso.core import Scheduler, Timer
class MLLF(Scheduler):
"""Modified Least Laxity First"""
def init(self):
self.ready_list = []
self.timer = None
def compute_laxity(self, cpu):
if self.ready_list:
for job in self.ready_list:
job.laxity = (job.absolute_deadline - job.ret) * \
self.sim.cycles_per_ms - self.sim.now()
cpu.resched()
def on_activate(self, job):
self.ready_list.append(job)
self.compute_laxity(job.cpu)
def on_terminated(self, job):
self.ready_list.remove(job)
self.compute_laxity(job.cpu)
def schedule(self, cpu):
decisions = []
if self.ready_list:
# Sort according to the laxity.
self.ready_list.sort(
key=lambda x: (x.laxity, x.absolute_deadline))
# m : Nombre de processeurs.
m = len(self.processors)
# Available processors:
l = (proc for proc in self.processors
if proc.running not in self.ready_list[:m])
if len(self.ready_list) > m:
ta = self.ready_list[m - 1]
dmin = self.ready_list[m].absolute_deadline * \
self.sim.cycles_per_ms - self.sim.now()
if self.timer:
self.timer.stop()
self.timer = Timer(
self.sim, MLLF.compute_laxity,
(self, self.processors[0]), dmin - ta.laxity,
one_shot=True,
cpu=self.processors[0])
self.timer.start()
# The first m jobs should be running:
for job in self.ready_list[:m]:
if not job.is_running():
proc = next(l)
decisions.append((job, proc))
return decisions
"""
The NVNLF scheduler is a work-conserving variant of LLREF, introduced by
Funaoka et al. in "Work-Conversing Optimal Real-Time Scheduling on
Multiprocessors."
"""
from simso.core import Scheduler, Timer
from math import ceil
class NVNLF(Scheduler):
def init(self):
self.selected_jobs = [] # Jobs currently running.
self.budget = {} # Budgets for the active jobs.
self.next_deadline = 0
self.waiting_schedule = False
self.last_update = 0 # Used to update the budget.
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def on_activate(self, job):
"""
Deal with a job activation. The budget for this job is computed.
"""
if job.wcet == 0:
return
self.budget[job] = 0
# Find the next absolute deadline among the ready jobs.
self.next_deadline = min([task.job.absolute_deadline
for task in self.task_list]) \
* self.sim.cycles_per_ms
window = self.next_deadline - self.sim.now()
for j in self.budget.keys():
self.budget[j] = ceil(window * j.wcet / j.period)
L = window * len(self.processors) - sum(self.budget.values())
rem = []
for j, l in self.budget.items():
e = ceil(j.ret * self.sim.cycles_per_ms)
if e <= l:
a = e - l
L -= a
self.budget[j] = e
else:
rem.append(j)
for j in rem:
e = ceil(j.ret * self.sim.cycles_per_ms)
l = self.budget[j]
if e <= window:
a = min(e - l, L)
else:
a = min(window - l, L)
self.budget[j] += a
L -= a
self.last_update = self.sim.now()
# There's a new job, the system should be rescheduled.
self.reschedule()
def on_terminated(self, job):
if job in self.budget:
del self.budget[job]
def update_budget(self):
"""
Remove budget from the currently executing jobs.
"""
time_since_last_update = self.sim.now() - self.last_update
for job in self.selected_jobs:
if job in self.budget:
if job.is_active():
self.budget[job] -= time_since_last_update
else:
del self.budget[job]
self.last_update = self.sim.now()
def date_next_event(self, selected, not_selected):
next_event = None
if selected:
next_bottom_hitting = min(ceil(y) for _, y in selected)
next_event = next_bottom_hitting
if not_selected:
next_ceiling_hitting = self.next_deadline - self.sim.now() \
- ceil(max(y for _, y in not_selected))
if next_event is None or next_ceiling_hitting < next_event:
next_event = next_ceiling_hitting
return next_event if next_event else 0
def select_jobs(self):
window = self.next_deadline - self.sim.now()
res = [(job, b) for job, b in self.budget.items()
if window <= ceil(b) and job.is_active()]
for job, b in self.budget.items():
if b > 0 and (job, b) not in res and job.is_active():
res.append((job, b))
return (res[:len(self.processors)], res[len(self.processors):])
def schedule(self, cpu):
self.waiting_schedule = False
self.update_budget()
# Sort the jobs by budgets.
selected, not_selected = self.select_jobs()
# Compute the (relative) date of the next event.
next_event = self.date_next_event(selected, not_selected)
if next_event > 0:
# Set a timer to reschedule the system at that date.
self.timer_a = Timer(self.sim, NVNLF.reschedule, (self,),
next_event, cpu=cpu, in_ms=False)
self.timer_a.start()
# Allocate the selected jobs to the processors.
# The processors already running selected jobs are not changed.
available_procs = []
self.selected_jobs = [s[0] for s in selected]
remaining_jobs = self.selected_jobs[:]
for proc in self.processors:
if proc.running in self.selected_jobs:
# This processor keeps running the same job.
remaining_jobs.remove(proc.running)
else:
# This processor is not running a selected job.
available_procs.append(proc)
# The remaining processors are running the remaining jobs or None.
padded_remaining_jobs = remaining_jobs + \
[None] * (len(available_procs) - len(remaining_jobs))
# zip create a list of couples (job, proc) using the list of remaining
# jobs and the list of available processors.
decisions = list(zip(padded_remaining_jobs, available_procs))
return decisions
# coding=utf-8
from simso.core import Scheduler, Timer
from math import ceil
def rounded_wcet(job, q=None):
return rounded_wcet_cycles(job, q) / job.sim.cycles_per_ms
def rounded_wcet_cycles(job, q=None):
if q is None:
q = PD2.quantum
wcet_cycles = job.wcet * job.sim.cycles_per_ms
if wcet_cycles % q:
return wcet_cycles + q - (wcet_cycles % q)
else:
return wcet_cycles
class VirtualJob(object):
"""
A Virtual Job contains the list of the pseudo jobs for an actual job.
"""
def __init__(self, job):
self.job = job
self.pseudo_jobs = []
self.cur = 0
seq = 0
# Create pseudo jobs:
while seq * PD2.quantum <= job.wcet * job.sim.cycles_per_ms:
self.pseudo_jobs.append(PseudoJob(job, seq + 1))
seq += 1
def get_next_job(self):
if self.cur < len(self.pseudo_jobs) - 1:
self.cur += 1
job = self.pseudo_jobs[self.cur]
return job
def get_current_job(self):
if self.cur < len(self.pseudo_jobs):
return self.pseudo_jobs[self.cur]
class PseudoJob(object):
def __init__(self, job, seq):
self.job = job
self.release_date = int(job.deadline * (seq - 1) / rounded_wcet(job)
) * PD2.quantum
self.deadline = int(ceil(job.deadline * seq / rounded_wcet(job))
) * PD2.quantum
self.seq = seq
self.succ_bit = PseudoJob.succ_bit(job, seq)
if rounded_wcet(job) / job.deadline < 0.5:
self.group_deadline = 0
else:
j = seq + 1
while (j * PD2.quantum <= rounded_wcet_cycles(job) and
PseudoJob.succ_bit(job, j - 1) == 1 and
PseudoJob.win_size(job, j) == 2):
# while the window size is 2 and succ_bit of the prev is 1
j += 1
self.group_deadline = int(ceil(
job.deadline * (j - 1) / rounded_wcet(job)
* job.sim.cycles_per_ms))
@property
def absolute_releasedate(self):
return self.release_date + \
self.job.activation_date * self.job.sim.cycles_per_ms
def cmp_key(self):
# Si le premier parametre est identique, il regarde le second, etc.
cycles_per_ms = self.job.sim.cycles_per_ms
return (self.deadline + self.job.activation_date * cycles_per_ms,
-self.succ_bit,
-(self.job.activation_date + self.group_deadline))
@staticmethod
def succ_bit(job, seq):
return int(ceil(seq * job.deadline / rounded_wcet(job))) \
- int(seq * job.deadline / rounded_wcet(job))
@staticmethod
def win_size(job, seq):
return int(ceil(seq * job.deadline / rounded_wcet(job))) \
- int((seq - 1) * job.deadline / rounded_wcet(job))
class PD2(Scheduler):
quantum = 100000 # cycles
def init(self):
self.ready_list = []
self.timers = []
self.terminate_timers = []
self.waiting_schedule = False
self.running_vjobs = []
# PD2.quantum = 1000000
# while not self.is_schedulable() and PD2.quantum > 1000:
# PD2.quantum /= 2
PD2.quantum = self.sim.cycles_per_ms // 10
self.timer = Timer(
self.sim, PD2.reschedule, (self, ), PD2.quantum,
cpu=self.processors[0], in_ms=False, one_shot=False)
self.timer.start()
def is_schedulable(self, q=None):
load = 0.0
cycles_per_ms = self.sim.cycles_per_ms
for task in self.task_list:
wcet = rounded_wcet_cycles(task, q)
load += wcet / task.period
if wcet > task.period * cycles_per_ms \
or load > len(self.processors) * cycles_per_ms:
return False
return True
def reschedule(self, cpu=None):
"""
Ask for a scheduling decision. Don't call if not necessary.
"""
if not self.waiting_schedule:
if cpu is None:
cpu = self.processors[0]
cpu.resched()
self.waiting_schedule = True
def virtual_terminate(self, virtual_job):
pjob = virtual_job.get_next_job()
if not pjob or not virtual_job.job.is_active():
self.ready_list.remove(virtual_job)
def on_activate(self, job):
virtual_job = VirtualJob(job)
self.ready_list.append(virtual_job)
if self.sim.now() == 0:
self.reschedule()
def schedule(self, cpu):
self.waiting_schedule = False
decisions = []
for vjob in self.running_vjobs:
self.virtual_terminate(vjob)
vjobs = [vjob for vjob in self.ready_list if vjob.job.is_active() and
self.sim.now() >= vjob.get_current_job().absolute_releasedate]
self.running_vjobs = sorted(
vjobs,
key=lambda x: x.get_current_job().cmp_key()
)[:len(self.processors)]
selected_jobs = [vjob.job for vjob in self.running_vjobs]
remaining_jobs = selected_jobs[:]
available_procs = []
# Remove from the list of remaining jobs the jobs that already runs.
for proc in self.processors:
if proc.running in selected_jobs:
# This processor keeps running the same job.
remaining_jobs.remove(proc.running)
else:
# This processor is not running a selected job.
available_procs.append(proc)
# Jobs not currently running
for vjob in self.running_vjobs:
if vjob.job in remaining_jobs:
decisions.append((vjob.job, available_procs.pop()))
# Unused processors
for proc in available_procs:
decisions.append((None, proc))
return decisions
"""
Partitionned EDF using PartitionedScheduler.
"""
from simso.core.Scheduler import SchedulerInfo
from EDF_mono import EDF_mono
from simso.utils import PartitionedScheduler
from simso.utils.PartitionedScheduler import decreasing_first_fit
class P_EDF(PartitionedScheduler):
def init(self):
PartitionedScheduler.init(
self, SchedulerInfo("EDF_mono", EDF_mono), decreasing_first_fit)
"""
Partitionned EDF without the helping class.
Use EDF_mono.
"""
from simso.core import Scheduler
from simso.core.Scheduler import SchedulerInfo
from EDF_mono import EDF_mono
class P_EDF2(Scheduler):
def init(self):
# Mapping processor to scheduler.
self.map_cpu_sched = {}
# Mapping task to scheduler.
self.map_task_sched = {}
cpus = []
for cpu in self.processors:
# Append the processor to a list with an initial utilization of 0.
cpus.append([cpu, 0])
# Instantiate a scheduler.
sched = EDF_mono(self.sim, SchedulerInfo("EDF_mono", EDF_mono))
sched.add_processor(cpu)
sched.init()
# Affect the scheduler to the processor.
self.map_cpu_sched[cpu.identifier] = sched
# First Fit
for task in self.task_list:
j = 0
# Find a processor with free space.
while cpus[j][1] + float(task.wcet) / task.period > 1.0:
j += 1
if j >= len(self.processors):
print("oops bin packing failed.")
return
# Get the scheduler for this processor.
sched = self.map_cpu_sched[cpus[j][0].identifier]
# Affect it to the task.
self.map_task_sched[task.identifier] = sched
sched.add_task(task)
# Put the task on that processor.
task.cpu = cpus[j][0]
self.sim.logger.log("task " + task.name + " on " + task.cpu.name)
# Update utilization.
cpus[j][1] += float(task.wcet) / task.period
def get_lock(self):
# No lock mechanism is needed.
return True
def schedule(self, cpu):
return self.map_cpu_sched[cpu.identifier].schedule(cpu)
def on_activate(self, job):
self.map_task_sched[job.task.identifier].on_activate(job)
def on_terminated(self, job):
self.map_task_sched[job.task.identifier].on_terminated(job)
"""
Partitionned EDF using PartitionedScheduler.
"""
from simso.core.Scheduler import SchedulerInfo
from EDF_mono import EDF_mono
from simso.utils import PartitionedScheduler
from simso.utils.PartitionedScheduler import decreasing_worst_fit
class P_EDF_WF(PartitionedScheduler):
def init(self):
PartitionedScheduler.init(
self, SchedulerInfo("EDF_mono", EDF_mono), decreasing_worst_fit)
"""
Partitionned EDF using PartitionedScheduler.
"""
from simso.core.Scheduler import SchedulerInfo
from RM_mono import RM_mono
from simso.utils import PartitionedScheduler
class P_RM(PartitionedScheduler):
def init(self):
PartitionedScheduler.init(
self, SchedulerInfo("RM_mono", RM_mono))
def packer(self):
# First Fit
cpus = [[cpu, 0] for cpu in self.processors]
for task in self.task_list:
m = cpus[0][1]
j = 0
# Find the processor with the lowest load.
for i, c in enumerate(cpus):
if c[1] < m:
m = c[1]
j = i
# Affect it to the task.
self.affect_task_to_processor(task, cpus[j][0])
# Update utilization.
cpus[j][1] += float(task.wcet) / task.period
return True
"""
Implementation of the PriD scheduler as introduced by Goossens et al. in
Priority-Driven Scheduling of Periodic Task Systems on Multiprocessors.
"""
from simso.core import Scheduler
from math import ceil
class PriD(Scheduler):
"""EDF(k) scheduler"""
def init(self):
self.ready_list = []
self.kfirst = []
tasks = sorted(self.task_list, key=lambda x: -x.wcet / x.period)
kmin = 1
mmin = None
u = sum(x.wcet / x.period for x in tasks)
for k, task in enumerate(tasks):
u -= task.wcet / task.period
m = k + ceil(u / (1 - task.wcet / task.period))
if mmin is None or mmin > m:
kmin = k + 1
mmin = m
self.kfirst = tasks[:kmin]
def on_activate(self, job):
if job.task in self.kfirst:
job.priority = 0
else:
job.priority = job.absolute_deadline
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
if job in self.ready_list:
self.ready_list.remove(job)
else:
job.cpu.resched()
def schedule(self, cpu):
if self.ready_list:
# Key explanations:
# First the free processors
# Among the others, get the one with the greatest deadline
# If equal, take the one used to schedule
key = lambda x: (
1 if not x.running else 0,
x.running.priority if x.running else 0,
1 if x is cpu else 0
)
cpu_min = max(self.processors, key=key)
job = min([j for j in self.ready_list if j.is_active()],
key=lambda x: x.priority)
if (cpu_min.running is None or
cpu_min.running.priority > job.priority):
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
return (job, cpu_min)
from simso.core import Scheduler
class RM(Scheduler):
""" Rate monotonic """
def init(self):
self.ready_list = []
def on_activate(self, job):
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
if job in self.ready_list:
self.ready_list.remove(job)
else:
job.cpu.resched()
def schedule(self, cpu):
decision = None
if self.ready_list:
# Get a free processor or a processor running a low priority job.
key = lambda x: (
0 if x.running is None else 1,
-x.running.period if x.running else 0,
0 if x is cpu else 1
)
cpu_min = min(self.processors, key=key)
# Job with highest priority.
job = min(self.ready_list, key=lambda x: x.period)
if (cpu_min.running is None or
cpu_min.running.period > job.period):
self.ready_list.remove(job)
if cpu_min.running:
self.ready_list.append(cpu_min.running)
decision = (job, cpu_min)
return decision
"""
Rate Monotic algorithm for uniprocessor architectures.
"""
from simso.core import Scheduler
class RM_mono(Scheduler):
def init(self):
self.ready_list = []
def on_activate(self, job):
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
self.ready_list.remove(job)
job.cpu.resched()
def schedule(self, cpu):
if self.ready_list:
# job with the highest priority
job = min(self.ready_list, key=lambda x: x.period)
else:
job = None
return (job, cpu)
"""
Implementation of the RUN scheduler as introduced in RUN: Optimal
Multiprocessor Real-Time Scheduling via Reduction to Uniprocessor
by Regnier et al.
RUN is a global multiprocessors scheduler for periodic-preemptive-independent
tasks with implicit deadlines.
"""
from simso.core import Scheduler, Timer
from RUNServer import EDFServer, TaskServer, DualServer, select_jobs, \
add_job, get_child_tasks
class RUN(Scheduler):
"""
RUN scheduler. The offline part is done here but the online part is mainly
done in the SubSystem objects. The RUN object is a proxy for the
sub-systems.
"""
def init(self):
"""
Initialization of the scheduler. This function is called when the
system is ready to run.
"""
self.subsystems = [] # List of all the sub-systems.
self.available_cpus = self.processors[:] # Not yet affected cpus.
self.task_to_subsystem = {} # map: Task -> SubSystem
# Create the Task Servers. Those are the leaves of the reduction tree.
list_servers = [TaskServer(task) for task in self.task_list]
# map: Task -> TaskServer. Used to quickly find the servers to update.
self.servers = dict(zip(self.task_list, list_servers))
assert (sum([s.utilization for s in list_servers])
<= len(self.processors)), "Load exceeds 100%!"
# Instanciate the reduction tree and the various sub-systems.
self.reduce_iterations(list_servers)
def add_idle_tasks(self, servers):
"""
Create IdleTasks in order to reach 100% system utilization.
"""
from collections import namedtuple
# pylint: disable-msg=C0103
IdleTask = namedtuple('IdleTask', ['utilization'])
idle = len(self.processors) - sum([s.utilization for s in servers])
for server in servers:
if server.utilization < 1 and idle > 0:
task = IdleTask(min(1 - server.utilization, idle))
server.add_child(TaskServer(task))
idle -= task.utilization
while idle > 0:
task = IdleTask(min(1, idle))
server = EDFServer()
server.add_child(TaskServer(task))
idle -= task.utilization
servers.append(server)
def add_proper_subsystem(self, server):
"""
Create a proper sub-system from a unit server.
"""
tasks_servers = get_child_tasks(server)
subsystem_utilization = sum([t.utilization for t in tasks_servers])
cpus = []
while subsystem_utilization > 0:
cpus.append(self.available_cpus.pop())
subsystem_utilization -= 1
subsystem = ProperSubsystem(self.sim, server, cpus)
for server in tasks_servers:
self.task_to_subsystem[server.task] = subsystem
self.subsystems.append(subsystem)
def remove_unit_servers(self, servers):
"""
Remove all the unit servers for a list and create a proper sub-system
instead.
"""
for server in servers:
if server.utilization == 1:
self.add_proper_subsystem(server)
servers[:] = [s for s in servers if s.utilization < 1]
def reduce_iterations(self, servers):
"""
Offline part of the RUN Scheduler. Create the reduction tree.
"""
servers = pack(servers)
self.add_idle_tasks(servers)
self.remove_unit_servers(servers)
while servers:
servers = pack(dual(servers))
self.remove_unit_servers(servers)
def on_activate(self, job):
"""
Deal with a (real) task activation.
"""
subsystem = self.task_to_subsystem[job.task]
subsystem.update_budget()
add_job(self.sim, job, self.servers[job.task])
subsystem.resched(job.cpu)
def on_terminated(self, job):
"""
Deal with a (real) job termination.
"""
subsystem = self.task_to_subsystem[job.task]
self.task_to_subsystem[job.task].update_budget()
subsystem.resched(job.cpu)
def schedule(self, _):
"""
This method is called by the simulator. The sub-systems that should be
rescheduled are also scheduled.
"""
decisions = []
for subsystem in self.subsystems:
if subsystem.to_reschedule:
decisions += subsystem.schedule()
return decisions
def pack(servers):
"""
Create a list of EDF Servers by packing servers. Currently use a
First-Fit but the original article states they used a Worst-Fit packing
algorithm. According to the article, a First-Fit should also work.
"""
packed_servers = []
for server in servers:
for p_server in packed_servers:
if p_server.utilization + server.utilization <= 1:
p_server.add_child(server)
break
else:
p_server = EDFServer()
p_server.add_child(server)
packed_servers.append(p_server)
return packed_servers
def dual(servers):
"""
From a list of servers, returns a list of corresponding DualServers.
"""
return [DualServer(s) for s in servers]
class ProperSubsystem(object):
"""
Proper sub-system. A proper sub-system is the set of the tasks belonging to
a unit server (server with utilization of 1) and a set of processors.
"""
def __init__(self, sim, root, processors):
self.sim = sim
self.root = root
self.processors = processors
self.virtual = []
self.last_update = 0
self.to_reschedule = False
self.timer = None
def update_budget(self):
"""
Update the budget of the servers.
"""
time_since_last_update = self.sim.now() - self.last_update
for server in self.virtual:
server.budget -= time_since_last_update
self.last_update = self.sim.now()
def resched(self, cpu):
"""
Plannify a scheduling decision on processor cpu. Ignore it if already
planned.
"""
if not self.to_reschedule:
self.to_reschedule = True
cpu.resched()
def virtual_event(self, cpu):
"""
Virtual scheduling event. Happens when a virtual job terminates.
"""
self.update_budget()
self.resched(cpu)
def schedule(self):
"""
Schedule this proper sub-system.
"""
self.to_reschedule = False
decision = []
self.virtual = []
jobs = select_jobs(self.root, self.virtual)
wakeup_delay = min(self.virtual, key=lambda s: s.budget).budget
if wakeup_delay > 0:
self.timer = Timer(self.sim, ProperSubsystem.virtual_event,
(self, self.processors[0]), wakeup_delay,
cpu=self.processors[0], in_ms=False)
self.timer.start()
cpus = []
for cpu in self.processors:
if cpu.running in jobs:
jobs.remove(cpu.running)
else:
cpus.append(cpu)
for cpu in cpus:
if jobs:
decision.append((jobs.pop(), cpu))
else:
decision.append((None, cpu))
return decision
"""
This module is part of the RUN implementation (see RUN.py).
"""
from fractions import Fraction
class _Server(object):
"""
Abstract class that represents a Server.
"""
next_id = 1
def __init__(self, is_dual, task=None):
self.parent = None
self.is_dual = is_dual
self.utilization = Fraction(0, 1)
self.task = task
self.job = None
self.deadlines = [0]
self.budget = 0
self.next_deadline = 0
self.identifier = _Server.next_id
_Server.next_id += 1
if task:
if hasattr(task, 'utilization'):
self.utilization += task.utilization
else:
self.utilization += Fraction(task.wcet) / Fraction(task.period)
def add_deadline(self, current_instant, deadline):
"""
Add a deadline to this server.
"""
self.deadlines.append(deadline)
self.deadlines = [d for d in self.deadlines if d > current_instant]
self.next_deadline = min(self.deadlines)
def create_job(self, current_instant):
"""
Replenish the budget.
"""
self.budget = int(self.utilization * (self.next_deadline -
current_instant))
class TaskServer(_Server):
"""
A Task Server is a Server that contains a real Task.
"""
def __init__(self, task):
super(TaskServer, self).__init__(False, task)
class EDFServer(_Server):
"""
An EDF Server is a Server with multiple children scheduled with EDF.
"""
def __init__(self):
super(EDFServer, self).__init__(False)
self.children = []
def add_child(self, server):
"""
Add a child to this EDFServer (used by the packing function).
"""
self.children.append(server)
self.utilization += server.utilization
server.parent = self
class DualServer(_Server):
"""
A Dual server is the opposite of its child.
"""
def __init__(self, child):
super(DualServer, self).__init__(True)
self.child = child
child.parent = self
self.utilization = 1 - child.utilization
def add_job(sim, job, server):
"""
Recursively update the deadlines of the parents of server.
"""
server.job = job
while server:
server.add_deadline(sim.now(), job.absolute_deadline *
sim.cycles_per_ms)
server.create_job(sim.now())
server = server.parent
def select_jobs(server, virtual, execute=True):
"""
Select the jobs that should run according to RUN. The virtual jobs are
appended to the virtual list passed as argument.
"""
jobs = []
if execute:
virtual.append(server)
if server.task:
if execute and server.budget > 0 and server.job.is_active():
jobs.append(server.job)
else:
if server.is_dual:
jobs += select_jobs(server.child, virtual, not execute)
else:
active_servers = [s for s in server.children if s.budget > 0]
if active_servers:
min_server = min(active_servers, key=lambda s: s.next_deadline)
else:
min_server = None
for child in server.children:
jobs += select_jobs(child, virtual,
execute and child is min_server)
return jobs
def get_child_tasks(server):
"""
Get the tasks scheduled by this server.
"""
if server.task:
return [server]
else:
if server.is_dual:
return get_child_tasks(server.child)
else:
tasks = []
for child in server.children:
tasks += get_child_tasks(child)
return tasks
"""
Static EDF. A DVFS variant of EDF (uniprocessor).
"""
from simso.core import Scheduler
class Static_EDF(Scheduler):
def init(self):
self.ready_list = []
# Compute processor speed
utilization = sum([t.wcet / t.period for t in self.task_list], 0.0)
self.processors[0].set_speed(utilization)
def on_activate(self, job):
self.ready_list.append(job)
job.cpu.resched()
def on_terminated(self, job):
self.ready_list.remove(job)
job.cpu.resched()
def schedule(self, cpu):
if self.ready_list:
# job with the highest priority
job = min(self.ready_list, key=lambda x: x.absolute_deadline)
else:
job = None
return (job, cpu)
"""
Implementation of the U-EDF scheduler as presented by Nelissen et al. in
"U-EDF an unfair but optimal multiprocessor scheduling algorithm for sporadic
tasks"
"""
from simso.core import Scheduler, Timer
from math import ceil
class U_EDF(Scheduler):
def init(self):
self.al = {}
self.timer = None
self.toresched = False
self.running_jobs = {}
self.last_event = 0
self.newly_activated = False
self.ui = {task: task.wcet / task.period for task in self.task_list}
def reschedule(self):
if not self.toresched:
self.processors[0].resched()
self.toresched = True
def hp(self, job):
for task in self.sorted_task_list:
if task.job is job:
return
else:
yield task.job
def res(self, job, j, t1, t2):
s = self.s[job]
u = max(0, min(1, (s + self.ui[job.task] - j)))\
- max(0, min(1, (s - j)))
return (t2 - t1) * u
def bdg(self, job, j, t2):
return self.al[job][j] + self.res(
job, j, job.absolute_deadline * self.sim.cycles_per_ms, t2)
def compute_al(self):
t = self.sim.now()
cycles_per_ms = self.sim.cycles_per_ms
self.sorted_task_list = sorted(
self.task_list,
key=lambda t: (t.job.absolute_deadline, t.identifier))
self.s = {}
for task in self.task_list:
self.al[task.job] = [0] * len(self.processors)
self.s[task.job] = sum(self.ui[x.task] for x in self.hp(task.job))
for task in self.sorted_task_list:
job = task.job
if not job.is_active():
continue
for j in range(len(self.processors)):
almax = (job.absolute_deadline * cycles_per_ms - t) \
- sum(self.bdg(x, j, job.absolute_deadline * cycles_per_ms)
for x in self.hp(job)) - sum(self.al[job])
self.al[job][j] = int(ceil(min(
almax, job.ret * self.sim.cycles_per_ms
- sum(self.al[job]))))
def on_activate(self, job):
self.newly_activated = True
self.reschedule()
def update_al(self):
delta = self.sim.now() - self.last_event
for job, j in self.running_jobs.items():
self.al[job][j] -= delta
def schedule(self, cpu):
self.toresched = False
if self.newly_activated:
self.compute_al()
self.newly_activated = False
else:
self.update_al()
self.last_event = self.sim.now()
next_event = None
decisions = []
selected_jobs = {}
# Select the jobs:
for j, proc in enumerate(self.processors):
eligible = [task.job for task in self.task_list
if task.job.is_active()
and task.job not in selected_jobs
and task.job in self.al
and self.al[task.job][j] > 0]
if not eligible:
continue
job = min(eligible,
key=lambda x: (x.absolute_deadline, x.task.identifier))
if next_event is None or next_event > self.al[job][j]:
next_event = self.al[job][j]
selected_jobs[job] = j
# Set the timer for the next event:
if self.timer:
self.timer.stop()
self.timer = None
if next_event is not None:
self.timer = Timer(self.sim, U_EDF.reschedule, (self,),
next_event, self.processors[0], in_ms=False)
self.timer.start()
# Bind jobs to processors:
jobs = list(selected_jobs.keys())
available_procs = list(self.processors)
was_not_running = []
for job in jobs:
if job in self.running_jobs:
available_procs.remove(job.cpu)
else:
was_not_running.append(job)
remaining_jobs = []
for job in was_not_running:
if job.cpu in available_procs:
decisions.append((job, job.cpu))
available_procs.remove(job.cpu)
else:
remaining_jobs.append(job)
for p, job in enumerate(remaining_jobs):
decisions.append((job, available_procs[p]))
self.running_jobs = selected_jobs
return decisions
"""
Work-Conserving version of U-EDF.
"""
from RUN import RUN
class WC_RUN(RUN):
def init(self):
RUN.init(self)
self.state = {}
def on_terminated(self, job):
RUN.on_terminated(self, job)
if job in self.state:
del self.state[job]
def on_abort(self, job):
RUN.on_abort(self, job)
if job in self.state:
del self.state[job]
def schedule(self, cpu):
decisions = RUN.schedule(self, cpu)
# print(".")
# print([(id(job), job.name if job else None, proc.name)
# for job, proc in self.state.items()])
#
# print("decisions :")
# print([(job.name if job else None, proc.name)
# for job, proc in decisions])
rstate = {proc: job for job, proc in self.state.items()}
for djob, dproc in decisions:
if dproc in rstate:
del self.state[rstate[dproc]]
if djob is not None:
self.state[djob] = dproc
rstate[dproc] = djob
# print([(id(job), job.name if job else None, proc.name)
# for job, proc in self.state.items()])
running_jobs = list(self.state.keys())
# Get active jobs.
jobs = sorted(
(task.job for task in self.task_list if task.job.is_active()),
key=lambda j: j.absolute_deadline)
# Bind jobs to processors:
available_procs = list(self.processors)
was_not_running = []
for job in jobs:
if job in running_jobs:
available_procs.remove(self.state[job])
else:
was_not_running.append(job)
remaining_jobs = []
for job in was_not_running:
if job.cpu in available_procs:
decisions.append((job, job.cpu))
available_procs.remove(job.cpu)
self.state[job] = job.cpu
else:
remaining_jobs.append(job)
for p, proc in enumerate(available_procs):
if p < len(remaining_jobs):
job = remaining_jobs[p]
self.state[job] = proc
else:
job = None
decisions.append((job, proc))
return decisions
"""
Work-Conserving version of U-EDF.
"""
from U_EDF import U_EDF
class WC_U_EDF(U_EDF):
# def on_terminated(self, job):
# self.reschedule()
#
# def on_aborted(self, job):
# self.reschedule()
def schedule(self, cpu):
decisions = U_EDF.schedule(self, cpu)
# Get active jobs.
jobs = sorted(
(task.job for task in self.task_list if task.job.is_active()),
key=lambda j: j.absolute_deadline)
# Bind jobs to processors:
available_procs = list(self.processors)
was_not_running = []
for job in jobs:
if job in self.running_jobs:
for djob, dproc in decisions:
if djob == job:
available_procs.remove(dproc)
break
else:
available_procs.remove(job.cpu)
else:
was_not_running.append(job)
remaining_jobs = []
for job in was_not_running:
if job.cpu in available_procs:
decisions.append((job, job.cpu))
available_procs.remove(job.cpu)
else:
remaining_jobs.append(job)
try:
for p, job in enumerate(remaining_jobs):
decisions.append((job, available_procs[p]))
except IndexError:
pass
return decisions
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment