From 41fbb79d33b72bc504893c764af6642f0648377c Mon Sep 17 00:00:00 2001 From: Michael Schmid Date: Wed, 26 Aug 2020 10:13:39 +0200 Subject: [PATCH] seems to work, added threadqueue --- src/main/java/mvd/jester/App.java | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- src/main/java/mvd/jester/model/SystemManager.java | 3 ++- src/main/java/mvd/jester/simulator/GlobalScheduler.java | 66 ++++++++++++++++++++++++++++++++++++++++++------------------------ src/main/java/mvd/jester/simulator/SubtaskQueue.java | 14 ++++++++++++++ src/main/java/mvd/jester/simulator/ThreadQueue.java | 14 ++++++++++++++ src/main/java/mvd/jester/simulator/WaitQueue.java | 14 -------------- src/main/java/mvd/jester/simulator/model/ExtendedSubtaskContext.java | 17 ++++++++++++++++- src/main/java/mvd/jester/simulator/model/JobContext.java | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------- src/main/java/mvd/jester/simulator/model/ProcessorContext.java | 22 ++++++++++++++-------- src/main/java/mvd/jester/simulator/model/ThreadContext.java | 37 +++++++++++++++++++++++++++++++++++++ 10 files changed, 285 insertions(+), 58 deletions(-) create mode 100644 src/main/java/mvd/jester/simulator/SubtaskQueue.java create mode 100644 src/main/java/mvd/jester/simulator/ThreadQueue.java delete mode 100644 src/main/java/mvd/jester/simulator/WaitQueue.java create mode 100644 src/main/java/mvd/jester/simulator/model/ThreadContext.java diff --git a/src/main/java/mvd/jester/App.java b/src/main/java/mvd/jester/App.java index a4d893a..a426e77 100644 --- a/src/main/java/mvd/jester/App.java +++ b/src/main/java/mvd/jester/App.java @@ -1,12 +1,18 @@ package mvd.jester; import java.util.Arrays; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; +import org.jgrapht.experimental.dag.DirectedAcyclicGraph; +import org.jgrapht.graph.DefaultEdge; import mvd.jester.model.DagTask; +import mvd.jester.model.Subtask; import mvd.jester.model.SystemManager; import mvd.jester.model.SystemManager.DagTaskBuilder; import mvd.jester.priority.RateMonotonic; import mvd.jester.simulator.GlobalScheduler; +import mvd.jester.simulator.exceptions.SchedulingException; import mvd.jester.simulator.model.ExtendedSubtaskContext; import mvd.jester.simulator.model.JobContext; import mvd.jester.tests.AbstractTest; @@ -22,12 +28,67 @@ import mvd.jester.tests.TypeFunction.UnkownStructure; * */ public class App { + + private static DirectedAcyclicGraph createJobDag(long time) { + DirectedAcyclicGraph jobDag = + new DirectedAcyclicGraph<>(DefaultEdge.class); + Subtask source = new Subtask(time); + Subtask pathA = new Subtask(10); + Subtask pathB = new Subtask(15); + Subtask pathC = new Subtask(20); + Subtask pathD = new Subtask(25); + Subtask sink = new Subtask(time); + Subtask source2 = new Subtask(time); + Subtask a = new Subtask(30); + Subtask sink2 = new Subtask(35); + + try { + jobDag.addVertex(source); + jobDag.addVertex(pathA); + jobDag.addVertex(pathB); + jobDag.addVertex(pathC); + jobDag.addVertex(pathD); + jobDag.addVertex(sink); + jobDag.addDagEdge(source, pathA); + jobDag.addDagEdge(source, pathB); + jobDag.addDagEdge(source, pathC); + jobDag.addDagEdge(source, pathD); + jobDag.addDagEdge(pathA, sink); + jobDag.addDagEdge(pathB, sink); + jobDag.addDagEdge(pathC, sink); + jobDag.addDagEdge(pathD, sink); + jobDag.addDagEdge(sink, source2); + jobDag.addDagEdge(source2, a); + jobDag.addDagEdge(source2, sink2); + jobDag.addDagEdge(a, sink2); + } catch (Exception e) { + // TODO: handle exception + } + + return jobDag; + } + public static void main(String[] args) { GlobalScheduler scheduler = new GlobalScheduler(new RateMonotonic(), 4); DagTaskBuilder builder = new DagTaskBuilder().setNumberOfProcessors(4); + Set taskset = builder.generateTaskSet(3); + try { + scheduler.schedule(taskset, 10000); + } catch (SchedulingException e) { + int brk = 0; + } + scheduler.schedule(taskset, 10000); + + // DagTask t1 = new DagTask(createJobDag(15), 200, 3); + // DagTask t2 = new DagTask(createJobDag(10), 300, 3); + // DagTask t3 = new DagTask(createJobDag(5), 350, 3); - scheduler.schedule(builder.generateTaskSet(1.0), 10000); + // LinkedHashSet tasks = new LinkedHashSet<>(); + // tasks.add(t1); + // tasks.add(t2); + // tasks.add(t3); + // scheduler.schedule(tasks, 10000); // { // SystemManager manager = new SystemManager(8); diff --git a/src/main/java/mvd/jester/model/SystemManager.java b/src/main/java/mvd/jester/model/SystemManager.java index a350144..0974f76 100644 --- a/src/main/java/mvd/jester/model/SystemManager.java +++ b/src/main/java/mvd/jester/model/SystemManager.java @@ -163,7 +163,7 @@ public class SystemManager { private long maxNumberOfBranches = 3; // TODO: Change back to 5 private long depth = 1; // TODO: Change back to 2 private long p_par = 80; - private long p_add = 10; // TODO: Change back to 20 + private long p_add = 0; // TODO: Change back to 20 public DagTaskBuilder() { } @@ -233,6 +233,7 @@ public class SystemManager { final long period = Math.round(workload / utilization); + return new DagTask(jobDag, period, numberOfProcessors); } diff --git a/src/main/java/mvd/jester/simulator/GlobalScheduler.java b/src/main/java/mvd/jester/simulator/GlobalScheduler.java index 7512d1d..9d3ef69 100644 --- a/src/main/java/mvd/jester/simulator/GlobalScheduler.java +++ b/src/main/java/mvd/jester/simulator/GlobalScheduler.java @@ -1,12 +1,11 @@ package mvd.jester.simulator; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; -import java.util.stream.Collectors; -import org.jgrapht.Graphs; import mvd.jester.model.DagTask; import mvd.jester.model.Subtask; import mvd.jester.priority.PriorityManager; @@ -16,6 +15,7 @@ import mvd.jester.simulator.exceptions.SchedulingException; import mvd.jester.simulator.model.ExtendedSubtaskContext; import mvd.jester.simulator.model.JobContext; import mvd.jester.simulator.model.ProcessorContext; +import mvd.jester.simulator.model.ThreadContext; import mvd.jester.simulator.model.ProcessorContext.ProcessorComparator; public class GlobalScheduler { @@ -75,16 +75,25 @@ public class GlobalScheduler { } private void handleJobCompleted(final JobCompletedEvent event, final SchedulingContext sc) { - if (event.getTime() > event.getJobContext().getDeadline()) { + JobContext job = event.getJobContext(); + if (event.getTime() > job.getDeadline()) { throw new DeadlineMissedException(event.getJobContext() + " misses its deadline"); } + // TODO: Remove threads from sc.threadqueue? + if (sc.threadQueue.stream().anyMatch(t -> t.getJobContext() == job)) { + throw new RuntimeException( + "Why is there still a thread of this completed job in the queue?"); + } } private void handleSubtaskActivated(final SubtaskActivatedEvent event, final SchedulingContext sc) { final Subtask subtask = event.getSubtask(); final JobContext job = event.getJobContext(); - sc.waitingQueue.offer(new ExtendedSubtaskContext(job, subtask)); + ExtendedSubtaskContext context = new ExtendedSubtaskContext(job, subtask); + if (job.claimThread(context)) { + sc.threadQueue.offer(context.getThread().get()); + } updateAdmission(sc, event.getTime()); } @@ -96,7 +105,8 @@ public class GlobalScheduler { processor + " should execute " + event.getSubtaskContext()); } final Optional context = processor.getSubtask(); - if (context.isPresent() && context.get().getRemainingExecutionTime() == 0) { + if (context.isPresent() && context.get() == event.getSubtaskContext() + && context.get().getRemainingExecutionTime() == 0) { sc.events.offer(new SubtaskStoppedEvent(event.getTime(), context.get(), processor)); } } @@ -106,19 +116,20 @@ public class GlobalScheduler { final ExtendedSubtaskContext subtask = event.getSubtaskContext(); final JobContext job = subtask.getJobContext(); - job.registerFinishedSubtask(subtask); - final DagTask task = subtask.getJobContext().getTask(); - final List successor = - Graphs.successorListOf(task.getJobDag(), subtask.getSubtask()); - if (successor.isEmpty()) { + boolean isSink = job.registerFinishedSubtask(subtask); + if (isSink) { sc.events.offer(new JobCompletedEvent(event.getTime(), job)); updateAdmission(sc, event.getTime()); } else { - final List finishedSubtasks = job.getFinishedSubtasks().stream() - .map(esc -> esc.getSubtask()).collect(Collectors.toList()); - for (final Subtask s : successor) { - final List predecessors = Graphs.predecessorListOf(task.getJobDag(), s); - if (finishedSubtasks.containsAll(predecessors)) { + if (job.releaseThread(subtask)) { + sc.threadQueue.offer(subtask.getThread().get()); + updateAdmission(sc, event.getTime()); + } + List successors = job.getReadySuccessors(subtask); + if (successors.isEmpty()) { + updateAdmission(sc, event.getTime()); + } else { + for (final Subtask s : successors) { sc.events.offer(new SubtaskActivatedEvent(event.getTime(), job, s)); } } @@ -129,8 +140,8 @@ public class GlobalScheduler { final SchedulingContext sc) { final ProcessorContext processor = event.getProcessor(); final long time = event.getTime(); - sc.events.offer(new SubtaskStartedEvent(time, event.getNextSubtask(), processor)); sc.events.offer(new SubtaskStoppedEvent(time, event.getSubtaskContext(), processor)); + sc.events.offer(new SubtaskStartedEvent(time, event.getNextSubtask(), processor)); } private void handleSubtaskStarted(final SubtaskStartedEvent event, final SchedulingContext sc) { @@ -140,6 +151,8 @@ public class GlobalScheduler { if (processor.accept(context, time)) { sc.events.offer(new SubtaskCheckCompletedEvent( time + context.getRemainingExecutionTime(), context, processor)); + } else { + int brk = 0; } } @@ -147,10 +160,11 @@ public class GlobalScheduler { final ExtendedSubtaskContext context = event.getSubtaskContext(); final ProcessorContext processor = event.getProcessor(); if (context.getRemainingExecutionTime() > 0) { - sc.waitingQueue.offer(context); + sc.threadQueue.offer(context.getThread().get()); } else { sc.events.offer(new SubtaskCompletedEvent(event.getTime(), context)); } + processor.free(); } @@ -169,18 +183,22 @@ public class GlobalScheduler { private void handleTaskReleased(final TaskReleasedEvent event, final SchedulingContext sc) { final DagTask task = event.getTask(); - final long release = event.getTime(); - final JobContext job = new JobContext(task, release); - sc.events.offer(new JobActivatedEvent(release, job)); + final long releaseTime = event.getTime(); + final JobContext job = new JobContext(task, releaseTime); + sc.events.offer(new JobActivatedEvent(releaseTime, job)); } private void updateAdmission(final SchedulingContext sc, final long time) { Set sortedProcessors = new TreeSet<>(new ProcessorComparator(priorityManager)); sortedProcessors.addAll(sc.processors); + if (sortedProcessors.size() != sc.processors.size()) { + throw new RuntimeException("Processors wrong!"); // TODO: delete this later on + } for (final ProcessorContext processor : sortedProcessors) { - if (processor.canAccept(sc.waitingQueue.peek(), priorityManager)) { - final ExtendedSubtaskContext context = sc.waitingQueue.poll(); + if (processor.canAccept(sc.threadQueue.peek(), priorityManager)) { + ThreadContext threadContext = sc.threadQueue.poll(); + final ExtendedSubtaskContext context = threadContext.getSubtask().get(); if (processor.isIdle()) { sc.events.offer(new SubtaskStartedEvent(time, context, processor)); } else { @@ -201,13 +219,13 @@ public class GlobalScheduler { private final EventQueue events; private final Set processors; - private final WaitQueue waitingQueue; + private final ThreadQueue threadQueue; private final long duration; private SchedulingContext(final PriorityManager priorityManager, final int nbProcessors, final long duration) { this.events = new EventQueue(priorityManager); - this.waitingQueue = new WaitQueue(priorityManager); + this.threadQueue = new ThreadQueue(priorityManager); this.processors = new HashSet<>(); for (int m = 0; m < nbProcessors; ++m) { processors.add(new ProcessorContext(m)); diff --git a/src/main/java/mvd/jester/simulator/SubtaskQueue.java b/src/main/java/mvd/jester/simulator/SubtaskQueue.java new file mode 100644 index 0000000..48e50c2 --- /dev/null +++ b/src/main/java/mvd/jester/simulator/SubtaskQueue.java @@ -0,0 +1,14 @@ +package mvd.jester.simulator; + +import java.util.PriorityQueue; +import mvd.jester.priority.PriorityManager; +import mvd.jester.simulator.model.ExtendedSubtaskContext; + +public class SubtaskQueue extends PriorityQueue { + + private static final long serialVersionUID = 2536224096155380726L; + + public SubtaskQueue(PriorityManager priorityManager) { + super((s1, s2) -> priorityManager.compare(s1.getJobContext(), s2.getJobContext())); + } +} diff --git a/src/main/java/mvd/jester/simulator/ThreadQueue.java b/src/main/java/mvd/jester/simulator/ThreadQueue.java new file mode 100644 index 0000000..079a196 --- /dev/null +++ b/src/main/java/mvd/jester/simulator/ThreadQueue.java @@ -0,0 +1,14 @@ +package mvd.jester.simulator; + +import java.util.PriorityQueue; +import mvd.jester.priority.PriorityManager; +import mvd.jester.simulator.model.ThreadContext; + +public class ThreadQueue extends PriorityQueue { + + private static final long serialVersionUID = -4968780596887106984L; + + public ThreadQueue(PriorityManager priorityManager) { + super((jc1, jc2) -> priorityManager.compare(jc1.getJobContext(), jc2.getJobContext())); + } +} diff --git a/src/main/java/mvd/jester/simulator/WaitQueue.java b/src/main/java/mvd/jester/simulator/WaitQueue.java deleted file mode 100644 index b677d08..0000000 --- a/src/main/java/mvd/jester/simulator/WaitQueue.java +++ /dev/null @@ -1,14 +0,0 @@ -package mvd.jester.simulator; - -import java.util.PriorityQueue; -import mvd.jester.priority.PriorityManager; -import mvd.jester.simulator.model.ExtendedSubtaskContext; - -public class WaitQueue extends PriorityQueue { - - private static final long serialVersionUID = -4968780596887106984L; - - public WaitQueue(PriorityManager priorityManager) { - super((jc1, jc2) -> priorityManager.compare(jc1.getJobContext(), jc2.getJobContext())); - } -} diff --git a/src/main/java/mvd/jester/simulator/model/ExtendedSubtaskContext.java b/src/main/java/mvd/jester/simulator/model/ExtendedSubtaskContext.java index 9130fb5..b94a7cd 100644 --- a/src/main/java/mvd/jester/simulator/model/ExtendedSubtaskContext.java +++ b/src/main/java/mvd/jester/simulator/model/ExtendedSubtaskContext.java @@ -1,21 +1,36 @@ package mvd.jester.simulator.model; +import java.util.Optional; import mvd.jester.model.Subtask; import mvd.jester.model.SubtaskContext; public class ExtendedSubtaskContext extends SubtaskContext { - private JobContext jobContext; + private final JobContext jobContext; + private Optional executingThread; public ExtendedSubtaskContext(JobContext jobContext, Subtask subtask) { super(subtask); this.jobContext = jobContext; + this.executingThread = Optional.empty(); } public JobContext getJobContext() { return jobContext; } + public void assignThread(ThreadContext threadContext) { + this.executingThread = Optional.of(threadContext); + } + + public boolean hasThread() { + return executingThread.isPresent(); + } + + public Optional getThread() { + return executingThread; + } + @Override public String toString() { return getSubtask() + " of task " + getJobContext().getTask(); diff --git a/src/main/java/mvd/jester/simulator/model/JobContext.java b/src/main/java/mvd/jester/simulator/model/JobContext.java index 140417f..f67cdef 100644 --- a/src/main/java/mvd/jester/simulator/model/JobContext.java +++ b/src/main/java/mvd/jester/simulator/model/JobContext.java @@ -1,7 +1,12 @@ package mvd.jester.simulator.model; import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.stream.Collectors; +import org.jgrapht.Graphs; import org.jgrapht.experimental.dag.DirectedAcyclicGraph; import org.jgrapht.graph.DefaultEdge; import mvd.jester.model.DagTask; @@ -13,16 +18,21 @@ public class JobContext { private final long releaseTime; private final long deadline; private final Subtask source; - private final Set finishedSubtasks; - // private long activeThreads; // TODO: Use threads + private final Set finishedContexts; + private final Queue readyContexts; + private final Queue threadPool; public JobContext(DagTask task, long releaseTime) { this.task = task; this.releaseTime = releaseTime; this.deadline = releaseTime + task.getDeadline(); this.source = findSource(task); - this.finishedSubtasks = new HashSet<>(); - // this.activeThreads = task.getNumberOfThreads(); + this.finishedContexts = new HashSet<>(); + this.readyContexts = new LinkedList<>(); + this.threadPool = new LinkedList<>(); + for (int i = 0; i < task.getNumberOfThreads(); ++i) { + threadPool.add(new ThreadContext(i, this)); + } } private Subtask findSource(DagTask task) { @@ -67,20 +77,85 @@ public class JobContext { * @return the finishedSubtasks */ public Set getFinishedSubtasks() { - return finishedSubtasks; + return finishedContexts; } - public void registerFinishedSubtask(ExtendedSubtaskContext subtaskContext) { - if (!finishedSubtasks.add(subtaskContext)) { + public boolean registerFinishedSubtask(ExtendedSubtaskContext subtaskContext) { + if (!finishedContexts.add(subtaskContext)) { throw new RuntimeException( "Subtask could not be added to finished subtask which means it was executed twice!"); } + if (task.getJobDag().outDegreeOf(subtaskContext.getSubtask()) == 0) { + return true; + } + return false; + } + + public List getReadySuccessors(ExtendedSubtaskContext finishedContext) { + if (!finishedContexts.contains(finishedContext)) { + throw new RuntimeException("Subtask is not in finished subtask list!"); + } + List successors = + Graphs.successorListOf(task.getJobDag(), finishedContext.getSubtask()); + + final List finishedSubtasks = + finishedContexts.stream().map(esc -> esc.getSubtask()).collect(Collectors.toList()); + final List readySuccessors = new LinkedList<>(); + + for (final Subtask s : successors) { + final List predecessors = Graphs.predecessorListOf(task.getJobDag(), s); + if (finishedSubtasks.containsAll(predecessors)) { + readySuccessors.add(s); + } + } + + return readySuccessors; + } + + public Queue getReadySubtasks() { + return readyContexts; + } + + public void offerSubtaskContext(ExtendedSubtaskContext context) { + readyContexts.offer(context); + } + + public ExtendedSubtaskContext getNextSubtaskContext() { + if (readyContexts.isEmpty()) { + throw new RuntimeException("There is no ready subtask context!"); + } + return readyContexts.poll(); } - public boolean checkRemainingThreads() { - return true; // TODO: implement check in future + public boolean claimThread(ExtendedSubtaskContext context) { + if (threadPool.isEmpty()) { + readyContexts.offer(context); + return false; + } + ThreadContext thread = threadPool.poll(); + thread.assignSubtask(context); + context.assignThread(thread); + return true; + } + + public boolean releaseThread(ExtendedSubtaskContext context) { + if (context.hasThread()) { + ThreadContext thread = context.getThread().get(); + if (readyContexts.isEmpty()) { + thread.assignSubtask(null); + threadPool.offer(thread); + return false; + } else { + ExtendedSubtaskContext newContext = readyContexts.poll(); + thread.assignSubtask(newContext); + newContext.assignThread(thread); + return true; + } + } + throw new RuntimeException("This subtask should be executed by a thread!"); } + @Override public String toString() { return "of task " + getTask(); } diff --git a/src/main/java/mvd/jester/simulator/model/ProcessorContext.java b/src/main/java/mvd/jester/simulator/model/ProcessorContext.java index 2b4944b..a5a6eb4 100644 --- a/src/main/java/mvd/jester/simulator/model/ProcessorContext.java +++ b/src/main/java/mvd/jester/simulator/model/ProcessorContext.java @@ -41,20 +41,16 @@ public class ProcessorContext { return currentSubtask; } - public boolean canAccept(ExtendedSubtaskContext subtaskContext, - PriorityManager priorityManager) { - if (subtaskContext == null) { + public boolean canAccept(ThreadContext thread, PriorityManager priorityManager) { + if (thread == null) { return false; } if (currentSubtask.isEmpty()) { - if (subtaskContext.getJobContext().checkRemainingThreads()) { - return true; - } - return false; + return true; } - if (priorityManager.compare(subtaskContext.getJobContext(), + if (priorityManager.compare(thread.getJobContext(), currentSubtask.get().getJobContext()) < 0) { return true; } @@ -98,6 +94,11 @@ public class ProcessorContext { } + public enum JobState { + NOT_ACCEPTED, ACCEPTED, REMOVED + } + + public static class ProcessorComparator implements Comparator { private final PriorityManager priorityManager; @@ -114,6 +115,11 @@ public class ProcessorContext { } else if (!p2.getSubtask().isPresent()) { return 1; } else { + JobContext p1Job = p1.currentSubtask.get().getJobContext(); + JobContext p2Job = p2.currentSubtask.get().getJobContext(); + if (p1Job == p2Job) { + return 1; + } // Sort in reverse return priorityManager.compare(p2.currentSubtask.get().getJobContext(), p1.currentSubtask.get().getJobContext()); diff --git a/src/main/java/mvd/jester/simulator/model/ThreadContext.java b/src/main/java/mvd/jester/simulator/model/ThreadContext.java new file mode 100644 index 0000000..08b1879 --- /dev/null +++ b/src/main/java/mvd/jester/simulator/model/ThreadContext.java @@ -0,0 +1,37 @@ +package mvd.jester.simulator.model; + +import java.util.Optional; + +public class ThreadContext { + + private final long threadId; + private final JobContext jobContext; + private Optional subtask; + + public ThreadContext(long threadId, JobContext jobContext) { + this.threadId = threadId; + this.jobContext = jobContext; + this.subtask = Optional.empty(); + } + + /** + * @return the jobContext + */ + public JobContext getJobContext() { + return jobContext; + } + + public void assignSubtask(ExtendedSubtaskContext subtask) { + this.subtask = Optional.ofNullable(subtask); + } + + public Optional getSubtask() { + return subtask; + } + + @Override + public String toString() { + return "Thread " + threadId + " of " + jobContext; + } + +} -- libgit2 0.26.0