JobContext.java 4.91 KB
Newer Older
1 2
package mvd.jester.simulator.model;

Michael Schmid committed
3
import java.util.HashSet;
4 5 6
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
Michael Schmid committed
7
import java.util.Set;
8 9
import java.util.stream.Collectors;
import org.jgrapht.Graphs;
10 11 12 13 14 15 16 17 18 19 20
import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
import org.jgrapht.graph.DefaultEdge;
import mvd.jester.model.DagTask;
import mvd.jester.model.Subtask;

public class JobContext {

    private final DagTask task;
    private final long releaseTime;
    private final long deadline;
    private final Subtask source;
21 22 23
    private final Set<ExtendedSubtaskContext> finishedContexts;
    private final Queue<ExtendedSubtaskContext> readyContexts;
    private final Queue<ThreadContext> threadPool;
24 25 26 27 28 29

    public JobContext(DagTask task, long releaseTime) {
        this.task = task;
        this.releaseTime = releaseTime;
        this.deadline = releaseTime + task.getDeadline();
        this.source = findSource(task);
30 31 32 33 34 35
        this.finishedContexts = new HashSet<>();
        this.readyContexts = new LinkedList<>();
        this.threadPool = new LinkedList<>();
        for (int i = 0; i < task.getNumberOfThreads(); ++i) {
            threadPool.add(new ThreadContext(i, this));
        }
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
    }

    private Subtask findSource(DagTask task) {
        DirectedAcyclicGraph<Subtask, DefaultEdge> jobDag = task.getJobDag();
        for (Subtask subtask : jobDag) {
            if (jobDag.inDegreeOf(subtask) == 0) {
                return subtask;
            }
        }
        throw new RuntimeException("No source found in DAG. How can that be!?");
    }

    /**
     * @return the task
     */
    public DagTask getTask() {
        return task;
    }

    /**
     * @return the source
     */
    public Subtask getSource() {
        return source;
    }

    /**
     * @return the deadline
     */
    public long getDeadline() {
        return deadline;
    }

    /**
     * @return the releaseTime
     */
    public long getReleaseTime() {
        return releaseTime;
    }

Michael Schmid committed
76 77 78 79
    /**
     * @return the finishedSubtasks
     */
    public Set<ExtendedSubtaskContext> getFinishedSubtasks() {
80
        return finishedContexts;
Michael Schmid committed
81 82
    }

83 84
    public boolean registerFinishedSubtask(ExtendedSubtaskContext subtaskContext) {
        if (!finishedContexts.add(subtaskContext)) {
Michael Schmid committed
85 86 87
            throw new RuntimeException(
                    "Subtask could not be added to finished subtask which means it was executed twice!");
        }
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
        if (task.getJobDag().outDegreeOf(subtaskContext.getSubtask()) == 0) {
            return true;
        }
        return false;
    }

    public List<Subtask> getReadySuccessors(ExtendedSubtaskContext finishedContext) {
        if (!finishedContexts.contains(finishedContext)) {
            throw new RuntimeException("Subtask is not in finished subtask list!");
        }
        List<Subtask> successors =
                Graphs.successorListOf(task.getJobDag(), finishedContext.getSubtask());

        final List<Subtask> finishedSubtasks =
                finishedContexts.stream().map(esc -> esc.getSubtask()).collect(Collectors.toList());
        final List<Subtask> readySuccessors = new LinkedList<>();

        for (final Subtask s : successors) {
            final List<Subtask> predecessors = Graphs.predecessorListOf(task.getJobDag(), s);
            if (finishedSubtasks.containsAll(predecessors)) {
                readySuccessors.add(s);
            }
        }

        return readySuccessors;
    }

    public Queue<ExtendedSubtaskContext> getReadySubtasks() {
        return readyContexts;
    }

    public void offerSubtaskContext(ExtendedSubtaskContext context) {
        readyContexts.offer(context);
    }

    public ExtendedSubtaskContext getNextSubtaskContext() {
        if (readyContexts.isEmpty()) {
            throw new RuntimeException("There is no ready subtask context!");
        }
        return readyContexts.poll();
Michael Schmid committed
128 129
    }

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    public boolean claimThread(ExtendedSubtaskContext context) {
        if (threadPool.isEmpty()) {
            readyContexts.offer(context);
            return false;
        }
        ThreadContext thread = threadPool.poll();
        thread.assignSubtask(context);
        context.assignThread(thread);
        return true;
    }

    public boolean releaseThread(ExtendedSubtaskContext context) {
        if (context.hasThread()) {
            ThreadContext thread = context.getThread().get();
            if (readyContexts.isEmpty()) {
                thread.assignSubtask(null);
                threadPool.offer(thread);
                return false;
            } else {
                ExtendedSubtaskContext newContext = readyContexts.poll();
                thread.assignSubtask(newContext);
                newContext.assignThread(thread);
                return true;
            }
        }
        throw new RuntimeException("This subtask should be executed by a thread!");
Michael Schmid committed
156 157
    }

158
    @Override
Michael Schmid committed
159 160
    public String toString() {
        return "of task " + getTask();
161 162 163
    }

}