Commit 41fbb79d by Michael Schmid

seems to work, added threadqueue

parent fe6b5585
package mvd.jester;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
import org.jgrapht.graph.DefaultEdge;
import mvd.jester.model.DagTask;
import mvd.jester.model.Subtask;
import mvd.jester.model.SystemManager;
import mvd.jester.model.SystemManager.DagTaskBuilder;
import mvd.jester.priority.RateMonotonic;
import mvd.jester.simulator.GlobalScheduler;
import mvd.jester.simulator.exceptions.SchedulingException;
import mvd.jester.simulator.model.ExtendedSubtaskContext;
import mvd.jester.simulator.model.JobContext;
import mvd.jester.tests.AbstractTest;
......@@ -22,12 +28,67 @@ import mvd.jester.tests.TypeFunction.UnkownStructure;
*
*/
public class App {
private static DirectedAcyclicGraph<Subtask, DefaultEdge> createJobDag(long time) {
DirectedAcyclicGraph<Subtask, DefaultEdge> jobDag =
new DirectedAcyclicGraph<>(DefaultEdge.class);
Subtask source = new Subtask(time);
Subtask pathA = new Subtask(10);
Subtask pathB = new Subtask(15);
Subtask pathC = new Subtask(20);
Subtask pathD = new Subtask(25);
Subtask sink = new Subtask(time);
Subtask source2 = new Subtask(time);
Subtask a = new Subtask(30);
Subtask sink2 = new Subtask(35);
try {
jobDag.addVertex(source);
jobDag.addVertex(pathA);
jobDag.addVertex(pathB);
jobDag.addVertex(pathC);
jobDag.addVertex(pathD);
jobDag.addVertex(sink);
jobDag.addDagEdge(source, pathA);
jobDag.addDagEdge(source, pathB);
jobDag.addDagEdge(source, pathC);
jobDag.addDagEdge(source, pathD);
jobDag.addDagEdge(pathA, sink);
jobDag.addDagEdge(pathB, sink);
jobDag.addDagEdge(pathC, sink);
jobDag.addDagEdge(pathD, sink);
jobDag.addDagEdge(sink, source2);
jobDag.addDagEdge(source2, a);
jobDag.addDagEdge(source2, sink2);
jobDag.addDagEdge(a, sink2);
} catch (Exception e) {
// TODO: handle exception
}
return jobDag;
}
public static void main(String[] args) {
GlobalScheduler scheduler = new GlobalScheduler(new RateMonotonic(), 4);
DagTaskBuilder builder = new DagTaskBuilder().setNumberOfProcessors(4);
Set<DagTask> taskset = builder.generateTaskSet(3);
try {
scheduler.schedule(taskset, 10000);
} catch (SchedulingException e) {
int brk = 0;
}
scheduler.schedule(taskset, 10000);
// DagTask t1 = new DagTask(createJobDag(15), 200, 3);
// DagTask t2 = new DagTask(createJobDag(10), 300, 3);
// DagTask t3 = new DagTask(createJobDag(5), 350, 3);
scheduler.schedule(builder.generateTaskSet(1.0), 10000);
// LinkedHashSet<DagTask> tasks = new LinkedHashSet<>();
// tasks.add(t1);
// tasks.add(t2);
// tasks.add(t3);
// scheduler.schedule(tasks, 10000);
// {
// SystemManager manager = new SystemManager(8);
......
......@@ -163,7 +163,7 @@ public class SystemManager {
private long maxNumberOfBranches = 3; // TODO: Change back to 5
private long depth = 1; // TODO: Change back to 2
private long p_par = 80;
private long p_add = 10; // TODO: Change back to 20
private long p_add = 0; // TODO: Change back to 20
public DagTaskBuilder() {
}
......@@ -233,6 +233,7 @@ public class SystemManager {
final long period = Math.round(workload / utilization);
return new DagTask(jobDag, period, numberOfProcessors);
}
......
package mvd.jester.simulator;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.jgrapht.Graphs;
import mvd.jester.model.DagTask;
import mvd.jester.model.Subtask;
import mvd.jester.priority.PriorityManager;
......@@ -16,6 +15,7 @@ import mvd.jester.simulator.exceptions.SchedulingException;
import mvd.jester.simulator.model.ExtendedSubtaskContext;
import mvd.jester.simulator.model.JobContext;
import mvd.jester.simulator.model.ProcessorContext;
import mvd.jester.simulator.model.ThreadContext;
import mvd.jester.simulator.model.ProcessorContext.ProcessorComparator;
public class GlobalScheduler {
......@@ -75,16 +75,25 @@ public class GlobalScheduler {
}
private void handleJobCompleted(final JobCompletedEvent event, final SchedulingContext sc) {
if (event.getTime() > event.getJobContext().getDeadline()) {
JobContext job = event.getJobContext();
if (event.getTime() > job.getDeadline()) {
throw new DeadlineMissedException(event.getJobContext() + " misses its deadline");
}
// TODO: Remove threads from sc.threadqueue?
if (sc.threadQueue.stream().anyMatch(t -> t.getJobContext() == job)) {
throw new RuntimeException(
"Why is there still a thread of this completed job in the queue?");
}
}
private void handleSubtaskActivated(final SubtaskActivatedEvent event,
final SchedulingContext sc) {
final Subtask subtask = event.getSubtask();
final JobContext job = event.getJobContext();
sc.waitingQueue.offer(new ExtendedSubtaskContext(job, subtask));
ExtendedSubtaskContext context = new ExtendedSubtaskContext(job, subtask);
if (job.claimThread(context)) {
sc.threadQueue.offer(context.getThread().get());
}
updateAdmission(sc, event.getTime());
}
......@@ -96,7 +105,8 @@ public class GlobalScheduler {
processor + " should execute " + event.getSubtaskContext());
}
final Optional<ExtendedSubtaskContext> context = processor.getSubtask();
if (context.isPresent() && context.get().getRemainingExecutionTime() == 0) {
if (context.isPresent() && context.get() == event.getSubtaskContext()
&& context.get().getRemainingExecutionTime() == 0) {
sc.events.offer(new SubtaskStoppedEvent(event.getTime(), context.get(), processor));
}
}
......@@ -106,19 +116,20 @@ public class GlobalScheduler {
final ExtendedSubtaskContext subtask = event.getSubtaskContext();
final JobContext job = subtask.getJobContext();
job.registerFinishedSubtask(subtask);
final DagTask task = subtask.getJobContext().getTask();
final List<Subtask> successor =
Graphs.successorListOf(task.getJobDag(), subtask.getSubtask());
if (successor.isEmpty()) {
boolean isSink = job.registerFinishedSubtask(subtask);
if (isSink) {
sc.events.offer(new JobCompletedEvent(event.getTime(), job));
updateAdmission(sc, event.getTime());
} else {
final List<Subtask> finishedSubtasks = job.getFinishedSubtasks().stream()
.map(esc -> esc.getSubtask()).collect(Collectors.toList());
for (final Subtask s : successor) {
final List<Subtask> predecessors = Graphs.predecessorListOf(task.getJobDag(), s);
if (finishedSubtasks.containsAll(predecessors)) {
if (job.releaseThread(subtask)) {
sc.threadQueue.offer(subtask.getThread().get());
updateAdmission(sc, event.getTime());
}
List<Subtask> successors = job.getReadySuccessors(subtask);
if (successors.isEmpty()) {
updateAdmission(sc, event.getTime());
} else {
for (final Subtask s : successors) {
sc.events.offer(new SubtaskActivatedEvent(event.getTime(), job, s));
}
}
......@@ -129,8 +140,8 @@ public class GlobalScheduler {
final SchedulingContext sc) {
final ProcessorContext processor = event.getProcessor();
final long time = event.getTime();
sc.events.offer(new SubtaskStartedEvent(time, event.getNextSubtask(), processor));
sc.events.offer(new SubtaskStoppedEvent(time, event.getSubtaskContext(), processor));
sc.events.offer(new SubtaskStartedEvent(time, event.getNextSubtask(), processor));
}
private void handleSubtaskStarted(final SubtaskStartedEvent event, final SchedulingContext sc) {
......@@ -140,6 +151,8 @@ public class GlobalScheduler {
if (processor.accept(context, time)) {
sc.events.offer(new SubtaskCheckCompletedEvent(
time + context.getRemainingExecutionTime(), context, processor));
} else {
int brk = 0;
}
}
......@@ -147,10 +160,11 @@ public class GlobalScheduler {
final ExtendedSubtaskContext context = event.getSubtaskContext();
final ProcessorContext processor = event.getProcessor();
if (context.getRemainingExecutionTime() > 0) {
sc.waitingQueue.offer(context);
sc.threadQueue.offer(context.getThread().get());
} else {
sc.events.offer(new SubtaskCompletedEvent(event.getTime(), context));
}
processor.free();
}
......@@ -169,18 +183,22 @@ public class GlobalScheduler {
private void handleTaskReleased(final TaskReleasedEvent event, final SchedulingContext sc) {
final DagTask task = event.getTask();
final long release = event.getTime();
final JobContext job = new JobContext(task, release);
sc.events.offer(new JobActivatedEvent(release, job));
final long releaseTime = event.getTime();
final JobContext job = new JobContext(task, releaseTime);
sc.events.offer(new JobActivatedEvent(releaseTime, job));
}
private void updateAdmission(final SchedulingContext sc, final long time) {
Set<ProcessorContext> sortedProcessors =
new TreeSet<>(new ProcessorComparator(priorityManager));
sortedProcessors.addAll(sc.processors);
if (sortedProcessors.size() != sc.processors.size()) {
throw new RuntimeException("Processors wrong!"); // TODO: delete this later on
}
for (final ProcessorContext processor : sortedProcessors) {
if (processor.canAccept(sc.waitingQueue.peek(), priorityManager)) {
final ExtendedSubtaskContext context = sc.waitingQueue.poll();
if (processor.canAccept(sc.threadQueue.peek(), priorityManager)) {
ThreadContext threadContext = sc.threadQueue.poll();
final ExtendedSubtaskContext context = threadContext.getSubtask().get();
if (processor.isIdle()) {
sc.events.offer(new SubtaskStartedEvent(time, context, processor));
} else {
......@@ -201,13 +219,13 @@ public class GlobalScheduler {
private final EventQueue events;
private final Set<ProcessorContext> processors;
private final WaitQueue waitingQueue;
private final ThreadQueue threadQueue;
private final long duration;
private SchedulingContext(final PriorityManager priorityManager, final int nbProcessors,
final long duration) {
this.events = new EventQueue(priorityManager);
this.waitingQueue = new WaitQueue(priorityManager);
this.threadQueue = new ThreadQueue(priorityManager);
this.processors = new HashSet<>();
for (int m = 0; m < nbProcessors; ++m) {
processors.add(new ProcessorContext(m));
......
package mvd.jester.simulator;
import java.util.PriorityQueue;
import mvd.jester.priority.PriorityManager;
import mvd.jester.simulator.model.ExtendedSubtaskContext;
public class SubtaskQueue extends PriorityQueue<ExtendedSubtaskContext> {
private static final long serialVersionUID = 2536224096155380726L;
public SubtaskQueue(PriorityManager priorityManager) {
super((s1, s2) -> priorityManager.compare(s1.getJobContext(), s2.getJobContext()));
}
}
......@@ -2,13 +2,13 @@ package mvd.jester.simulator;
import java.util.PriorityQueue;
import mvd.jester.priority.PriorityManager;
import mvd.jester.simulator.model.ExtendedSubtaskContext;
import mvd.jester.simulator.model.ThreadContext;
public class WaitQueue extends PriorityQueue<ExtendedSubtaskContext> {
public class ThreadQueue extends PriorityQueue<ThreadContext> {
private static final long serialVersionUID = -4968780596887106984L;
public WaitQueue(PriorityManager priorityManager) {
public ThreadQueue(PriorityManager priorityManager) {
super((jc1, jc2) -> priorityManager.compare(jc1.getJobContext(), jc2.getJobContext()));
}
}
package mvd.jester.simulator.model;
import java.util.Optional;
import mvd.jester.model.Subtask;
import mvd.jester.model.SubtaskContext;
public class ExtendedSubtaskContext extends SubtaskContext {
private JobContext jobContext;
private final JobContext jobContext;
private Optional<ThreadContext> executingThread;
public ExtendedSubtaskContext(JobContext jobContext, Subtask subtask) {
super(subtask);
this.jobContext = jobContext;
this.executingThread = Optional.empty();
}
public JobContext getJobContext() {
return jobContext;
}
public void assignThread(ThreadContext threadContext) {
this.executingThread = Optional.of(threadContext);
}
public boolean hasThread() {
return executingThread.isPresent();
}
public Optional<ThreadContext> getThread() {
return executingThread;
}
@Override
public String toString() {
return getSubtask() + " of task " + getJobContext().getTask();
......
package mvd.jester.simulator.model;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.jgrapht.Graphs;
import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
import org.jgrapht.graph.DefaultEdge;
import mvd.jester.model.DagTask;
......@@ -13,16 +18,21 @@ public class JobContext {
private final long releaseTime;
private final long deadline;
private final Subtask source;
private final Set<ExtendedSubtaskContext> finishedSubtasks;
// private long activeThreads; // TODO: Use threads
private final Set<ExtendedSubtaskContext> finishedContexts;
private final Queue<ExtendedSubtaskContext> readyContexts;
private final Queue<ThreadContext> threadPool;
public JobContext(DagTask task, long releaseTime) {
this.task = task;
this.releaseTime = releaseTime;
this.deadline = releaseTime + task.getDeadline();
this.source = findSource(task);
this.finishedSubtasks = new HashSet<>();
// this.activeThreads = task.getNumberOfThreads();
this.finishedContexts = new HashSet<>();
this.readyContexts = new LinkedList<>();
this.threadPool = new LinkedList<>();
for (int i = 0; i < task.getNumberOfThreads(); ++i) {
threadPool.add(new ThreadContext(i, this));
}
}
private Subtask findSource(DagTask task) {
......@@ -67,20 +77,85 @@ public class JobContext {
* @return the finishedSubtasks
*/
public Set<ExtendedSubtaskContext> getFinishedSubtasks() {
return finishedSubtasks;
return finishedContexts;
}
public void registerFinishedSubtask(ExtendedSubtaskContext subtaskContext) {
if (!finishedSubtasks.add(subtaskContext)) {
public boolean registerFinishedSubtask(ExtendedSubtaskContext subtaskContext) {
if (!finishedContexts.add(subtaskContext)) {
throw new RuntimeException(
"Subtask could not be added to finished subtask which means it was executed twice!");
}
if (task.getJobDag().outDegreeOf(subtaskContext.getSubtask()) == 0) {
return true;
}
return false;
}
public List<Subtask> getReadySuccessors(ExtendedSubtaskContext finishedContext) {
if (!finishedContexts.contains(finishedContext)) {
throw new RuntimeException("Subtask is not in finished subtask list!");
}
List<Subtask> successors =
Graphs.successorListOf(task.getJobDag(), finishedContext.getSubtask());
final List<Subtask> finishedSubtasks =
finishedContexts.stream().map(esc -> esc.getSubtask()).collect(Collectors.toList());
final List<Subtask> readySuccessors = new LinkedList<>();
for (final Subtask s : successors) {
final List<Subtask> predecessors = Graphs.predecessorListOf(task.getJobDag(), s);
if (finishedSubtasks.containsAll(predecessors)) {
readySuccessors.add(s);
}
}
return readySuccessors;
}
public Queue<ExtendedSubtaskContext> getReadySubtasks() {
return readyContexts;
}
public void offerSubtaskContext(ExtendedSubtaskContext context) {
readyContexts.offer(context);
}
public ExtendedSubtaskContext getNextSubtaskContext() {
if (readyContexts.isEmpty()) {
throw new RuntimeException("There is no ready subtask context!");
}
return readyContexts.poll();
}
public boolean checkRemainingThreads() {
return true; // TODO: implement check in future
public boolean claimThread(ExtendedSubtaskContext context) {
if (threadPool.isEmpty()) {
readyContexts.offer(context);
return false;
}
ThreadContext thread = threadPool.poll();
thread.assignSubtask(context);
context.assignThread(thread);
return true;
}
public boolean releaseThread(ExtendedSubtaskContext context) {
if (context.hasThread()) {
ThreadContext thread = context.getThread().get();
if (readyContexts.isEmpty()) {
thread.assignSubtask(null);
threadPool.offer(thread);
return false;
} else {
ExtendedSubtaskContext newContext = readyContexts.poll();
thread.assignSubtask(newContext);
newContext.assignThread(thread);
return true;
}
}
throw new RuntimeException("This subtask should be executed by a thread!");
}
@Override
public String toString() {
return "of task " + getTask();
}
......
......@@ -41,20 +41,16 @@ public class ProcessorContext {
return currentSubtask;
}
public boolean canAccept(ExtendedSubtaskContext subtaskContext,
PriorityManager priorityManager) {
if (subtaskContext == null) {
public boolean canAccept(ThreadContext thread, PriorityManager priorityManager) {
if (thread == null) {
return false;
}
if (currentSubtask.isEmpty()) {
if (subtaskContext.getJobContext().checkRemainingThreads()) {
return true;
}
return false;
}
if (priorityManager.compare(subtaskContext.getJobContext(),
if (priorityManager.compare(thread.getJobContext(),
currentSubtask.get().getJobContext()) < 0) {
return true;
}
......@@ -98,6 +94,11 @@ public class ProcessorContext {
}
public enum JobState {
NOT_ACCEPTED, ACCEPTED, REMOVED
}
public static class ProcessorComparator implements Comparator<ProcessorContext> {
private final PriorityManager priorityManager;
......@@ -114,6 +115,11 @@ public class ProcessorContext {
} else if (!p2.getSubtask().isPresent()) {
return 1;
} else {
JobContext p1Job = p1.currentSubtask.get().getJobContext();
JobContext p2Job = p2.currentSubtask.get().getJobContext();
if (p1Job == p2Job) {
return 1;
}
// Sort in reverse
return priorityManager.compare(p2.currentSubtask.get().getJobContext(),
p1.currentSubtask.get().getJobContext());
......
package mvd.jester.simulator.model;
import java.util.Optional;
public class ThreadContext {
private final long threadId;
private final JobContext jobContext;
private Optional<ExtendedSubtaskContext> subtask;
public ThreadContext(long threadId, JobContext jobContext) {
this.threadId = threadId;
this.jobContext = jobContext;
this.subtask = Optional.empty();
}
/**
* @return the jobContext
*/
public JobContext getJobContext() {
return jobContext;
}
public void assignSubtask(ExtendedSubtaskContext subtask) {
this.subtask = Optional.ofNullable(subtask);
}
public Optional<ExtendedSubtaskContext> getSubtask() {
return subtask;
}
@Override
public String toString() {
return "Thread " + threadId + " of " + jobContext;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment