package mvd.jester.simulator.model; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; import org.jgrapht.Graphs; import org.jgrapht.experimental.dag.DirectedAcyclicGraph; import org.jgrapht.graph.DefaultEdge; import mvd.jester.model.DagTask; import mvd.jester.model.Subtask; public class JobContext { private final DagTask task; private final long releaseTime; private final long deadline; private final Subtask source; private final Set finishedContexts; private final Queue readyContexts; private final Queue threadPool; public JobContext(DagTask task, long releaseTime) { this.task = task; this.releaseTime = releaseTime; this.deadline = releaseTime + task.getDeadline(); this.source = findSource(task); this.finishedContexts = new HashSet<>(); this.readyContexts = new LinkedList<>(); this.threadPool = new LinkedList<>(); for (int i = 0; i < task.getNumberOfThreads(); ++i) { threadPool.add(new ThreadContext(i, this)); } } private Subtask findSource(DagTask task) { DirectedAcyclicGraph jobDag = task.getJobDag(); for (Subtask subtask : jobDag) { if (jobDag.inDegreeOf(subtask) == 0) { return subtask; } } throw new RuntimeException("No source found in DAG. How can that be!?"); } /** * @return the task */ public DagTask getTask() { return task; } /** * @return the source */ public Subtask getSource() { return source; } /** * @return the deadline */ public long getDeadline() { return deadline; } /** * @return the releaseTime */ public long getReleaseTime() { return releaseTime; } /** * @return the finishedSubtasks */ public Set getFinishedSubtasks() { return finishedContexts; } public boolean registerFinishedSubtask(ExtendedSubtaskContext subtaskContext) { if (!finishedContexts.add(subtaskContext)) { throw new RuntimeException( "Subtask could not be added to finished subtask which means it was executed twice!"); } if (task.getJobDag().outDegreeOf(subtaskContext.getSubtask()) == 0) { return true; } return false; } public List getReadySuccessors(ExtendedSubtaskContext finishedContext) { if (!finishedContexts.contains(finishedContext)) { throw new RuntimeException("Subtask is not in finished subtask list!"); } List successors = Graphs.successorListOf(task.getJobDag(), finishedContext.getSubtask()); final List finishedSubtasks = finishedContexts.stream().map(esc -> esc.getSubtask()).collect(Collectors.toList()); final List readySuccessors = new LinkedList<>(); for (final Subtask s : successors) { final List predecessors = Graphs.predecessorListOf(task.getJobDag(), s); if (finishedSubtasks.containsAll(predecessors)) { readySuccessors.add(s); } } return readySuccessors; } public Queue getReadySubtasks() { return readyContexts; } public void offerSubtaskContext(ExtendedSubtaskContext context) { readyContexts.offer(context); } public ExtendedSubtaskContext getNextSubtaskContext() { if (readyContexts.isEmpty()) { throw new RuntimeException("There is no ready subtask context!"); } return readyContexts.poll(); } public boolean claimThread(ExtendedSubtaskContext context) { if (threadPool.isEmpty()) { readyContexts.offer(context); return false; } ThreadContext thread = threadPool.poll(); thread.assignSubtask(context); context.assignThread(thread); return true; } public boolean releaseThread(ExtendedSubtaskContext context) { if (context.hasThread()) { ThreadContext thread = context.getThread().get(); if (readyContexts.isEmpty()) { thread.assignSubtask(null); threadPool.offer(thread); return false; } else { ExtendedSubtaskContext newContext = readyContexts.poll(); thread.assignSubtask(newContext); newContext.assignThread(thread); return true; } } throw new RuntimeException("This subtask should be executed by a thread!"); } @Override public String toString() { return "of task " + getTask(); } }