Processor.py 8.11 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
# coding=utf-8

from collections import deque
from SimPy.Simulation import Process, Monitor, hold, waituntil
from simso.core.ProcEvent import ProcRunEvent, ProcIdleEvent, \
    ProcOverheadEvent, ProcCxtSaveEvent, ProcCxtLoadEvent


RESCHED = 1
ACTIVATE = 2
TERMINATE = 3
TIMER = 4
13 14
PREEMPT = 5
SPEED = 6
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88


class ProcInfo(object):
    def __init__(self, identifier, name, cs_overhead=0, cl_overhead=0,
                 migration_overhead=0, speed=1.0, data=None):
        self.identifier = identifier
        self.name = name
        self.penalty = 0
        self.caches = []
        self.cs_overhead = cs_overhead
        self.cl_overhead = cl_overhead
        self.migration_overhead = migration_overhead
        if data is None:
            data = {}
        self.data = data
        self.speed = speed

    def add_cache(self, cache):
        self.caches.append(cache)


class Processor(Process):
    """
    A processor is responsible of deciding whether the simulated processor
    should execute a job or execute the scheduler. There is one instance of
    Processor per simulated processor. Those are responsible to call the
    scheduler methods.

    When a scheduler needs to take a scheduling decision, it must invoke the
    :meth:`resched` method. This is typically done in the :meth:`on_activate
    <simso.core.Scheduler.Scheduler.on_activate>`, :meth:`on_terminated
    <simso.core.Scheduler.Scheduler.on_terminated>` or in a :class:`timer
    <simso.core.Timer.Timer>` handler.
    """
    _identifier = 0

    @classmethod
    def init(cls):
        cls._identifier = 0

    def __init__(self, model, proc_info):
        Process.__init__(self, name=proc_info.name, sim=model)
        self._model = model
        self._internal_id = Processor._identifier
        Processor._identifier += 1
        self.identifier = proc_info.identifier
        self._running = None
        self.was_running = None
        self._evts = deque([])
        self.sched = model.scheduler
        self.monitor = Monitor(name="Monitor" + proc_info.name, sim=model)
        self._caches = []
        self._penalty = proc_info.penalty
        self._cs_overhead = proc_info.cs_overhead
        self._cl_overhead = proc_info.cl_overhead
        self._migration_overhead = proc_info.migration_overhead
        self.set_caches(proc_info.caches)
        self.timer_monitor = Monitor(name="Monitor Timer" + proc_info.name,
                                     sim=model)
        self._speed = proc_info.speed

    def resched(self):
        """
        Add a resched event to the list of events to handle.
        """
        self._evts.append((RESCHED,))

    def activate(self, job):
        self._evts.append((ACTIVATE, job))

    def terminate(self, job):
        self._evts.append((TERMINATE, job))
        self._running = None

89 90
    def preempt(self, job=None):
        self._evts = deque([e for e in self._evts if e[0] != PREEMPT])
91
        self._evts.append((PREEMPT,))
92
        self._running = job
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204

    def timer(self, timer):
        self._evts.append((TIMER, timer))

    def set_speed(self, speed):
        assert speed >= 0, "Speed must be positive."
        self._evts.append((SPEED, speed))

    @property
    def speed(self):
        return self._speed

    def is_running(self):
        """
        Return True if a job is currently running on that processor.
        """
        return self._running is not None

    def set_caches(self, caches):
        self._caches = caches
        for cache in caches:
            cache.shared_with.append(self)

    def get_caches(self):
        return self._caches

    caches = property(get_caches, set_caches)

    @property
    def penalty_memaccess(self):
        return self._penalty

    @property
    def cs_overhead(self):
        return self._cs_overhead

    @property
    def cl_overhead(self):
        return self._cl_overhead

    @property
    def internal_id(self):
        """A unique, internal, id."""
        return self._internal_id

    @property
    def running(self):
        """
        The job currently running on that processor. None if no job is
        currently running on the processor.
        """
        return self._running

    def run(self):
        while True:
            if not self._evts:
                job = self._running
                if job:
                    yield waituntil, self, lambda: job.context_ok
                    self.monitor.observe(ProcCxtLoadEvent())
                    yield hold, self, self.cl_overhead  # overhead load context
                    self.monitor.observe(ProcCxtLoadEvent(terminated=True))
                    job.interruptReset()
                    self.sim.reactivate(job)
                    self.monitor.observe(ProcRunEvent(job))
                    job.context_ok = False
                else:
                    self.monitor.observe(ProcIdleEvent())

                # Wait event.
                yield waituntil, self, lambda: self._evts
                if job:
                    self.interrupt(job)
                    self.monitor.observe(ProcCxtSaveEvent())
                    yield hold, self, self.cs_overhead  # overhead save context
                    self.monitor.observe(ProcCxtSaveEvent(terminated=True))
                    job.context_ok = True

            evt = self._evts.popleft()
            if evt[0] == RESCHED:
                if any(x[0] != RESCHED for x in self._evts):
                    self._evts.append(evt)
                    continue

            if evt[0] == ACTIVATE:
                self.sched.on_activate(evt[1])
                self.monitor.observe(ProcOverheadEvent("JobActivation"))
                self.sched.monitor_begin_activate(self)
                yield hold, self, self.sched.overhead_activate
                self.sched.monitor_end_activate(self)
            elif evt[0] == TERMINATE:
                self.sched.on_terminated(evt[1])
                self.monitor.observe(ProcOverheadEvent("JobTermination"))
                self.sched.monitor_begin_terminate(self)
                yield hold, self, self.sched.overhead_terminate
                self.sched.monitor_end_terminate(self)
            elif evt[0] == TIMER:
                self.timer_monitor.observe(None)
                if evt[1].overhead > 0:
                    print(self.sim.now(), "hold", evt[1].overhead)
                    yield hold, self, evt[1].overhead
                evt[1].call_handler()
            elif evt[0] == SPEED:
                self._speed = evt[1]
            elif evt[0] == RESCHED:
                self.monitor.observe(ProcOverheadEvent("Scheduling"))
                self.sched.monitor_begin_schedule(self)
                yield waituntil, self, self.sched.get_lock
                decisions = self.sched.schedule(self)
                yield hold, self, self.sched.overhead  # overhead scheduling
                if type(decisions) is not list:
                    decisions = [decisions]
205
                decisions = [d for d in decisions if d is not None]
206

207 208
                for job, cpu in decisions:
                    # If there is nothing to change, simply ignore:
209 210 211
                    if cpu.running == job:
                        continue

212
                    # If trying to execute a terminated job, warn and ignore:
213 214 215 216 217 218 219 220 221 222
                    if job is not None and not job.is_active():
                        print("Can't schedule a terminated job! ({})"
                              .format(job.name))
                        continue

                    # if the job was running somewhere else, stop it.
                    if job and job.cpu.running == job:
                        job.cpu.preempt()

                    # Send that job to processor cpu.
223
                    cpu.preempt(job)
224 225 226 227

                    if job:
                        job.task.cpu = cpu

228
                # Forbid to run a job simultaneously on 2 or more processors.
229 230 231 232 233 234 235 236
                running_tasks = [
                    cpu.running.name
                    for cpu in self._model.processors if cpu.running]
                assert len(set(running_tasks)) == len(running_tasks), \
                    "Try to run a job on 2 processors simultaneously!"

                self.sched.release_lock()
                self.sched.monitor_end_schedule(self)