#!/usr/bin/env python3 """ Generates real time tasksets according to the method described in 'A Comparison of Global and Partitioned EDF Schedulability Tests for Multiprocessors'. Therefore tasks with random periods within the range 1 to 1000 are generated. Depending on the mode, the utilization of these tasks is either determined by * uniform distribution between 1/period and 1 * bimodal distribution heavy tasks with uniform distribution between 0.5 / 1 and light tasks uniform between 0.1 and 0.5 where the probability of being heavy is 1/3 * exponential distribution with mean 0.25 * exponential distribution with mean 0.50 """ import datetime import sys import os import random import argparse import json from enum import Enum class Timebase(Enum): seconds = 1 milliseconds = 2 microseconds = 3 nanoseconds = 4 class Distribution(Enum): uniform = 1 bimodal = 2 exp_quart = 3 exp_half = 4 class Task: """ Representation of a generated task. """ def __init__(self, wcet, period, deadline): self.wcet = int(wcet) self.period = int(period) self.deadline = int(deadline) def check_taskset(procs, tasklist): sum = 0 for task in tasklist: sum += task.wcet / task.period return sum def get_timebase_string(base): if base is Timebase.seconds: return 'std::chrono::seconds', 'embb::base::DurationSeconds' elif base is Timebase.milliseconds: return 'std::chrono::milliseconds', 'embb::base::DurationMilliseconds' elif base is Timebase.microseconds: return 'std::chrono::microseconds', 'embb::base::DurationMicroseconds' elif base is Timebase.nanoseconds: return 'std::chrono::nanoseconds', 'embb::base::DurationNanoseconds' def create_task(distribution): period = random.uniform(1, 1000) if distribution is Distribution.uniform: util = random.uniform(1.0 / period, 1.0) elif distribution is Distribution.bimodal: check = random.uniform(0.0, 1.0) if check < 1 / 3: util = random.uniform(0.5, 1.0) else: util = random.uniform(1.0 / period, 1.0) elif distribution is Distribution.exp_quart: util = random.expovariant(1.0 / 0.25) elif distribution is Distribution.exp_half: util = random.expovariant(1.0 / 0.5) util = min(1.0, max(0.0, util)) wcet = period * util deadline = random.uniform(wcet, period) return Task(wcet, period, deadline) def main(): parser = argparse.ArgumentParser(description='Generate tasksets.') parser.add_argument('cores', type=int, help='Number of cores the taskset' 'should be generated for.') parser.add_argument('tasksetcount', type=int, help='Number of tasksets that' 'should be generated.') parser.add_argument('target', type=str, help='Output directory.', nargs='?', default='.') parser.add_argument('--baseclock', type=str, default='system_clock', nargs='?', help='The clock which is to be used for the execution.') timebase_grp = parser.add_mutually_exclusive_group(required=True) timebase_grp.add_argument('--seconds', action='store_const', const=Timebase.seconds) timebase_grp.add_argument('--milliseconds', action='store_const', const=Timebase.milliseconds) timebase_grp.add_argument('--microseconds', action='store_const', const=Timebase.microseconds) timebase_grp.add_argument('--nanoseconds', action='store_const', const=Timebase.nanoseconds) distr_grp = parser.add_mutually_exclusive_group() distr_grp.add_argument('--uniform', action='store_const', const=Distribution.uniform) distr_grp.add_argument('--bimodal', action='store_const', const=Distribution.bimodal) distr_grp.add_argument('--exp_quart', action='store_const', const=Distribution.exp_quart) distr_grp.add_argument('--exp_half', action='store_const', const=Distribution.exp_half) args = parser.parse_args() if args.seconds is not None: timebase = args.seconds elif args.milliseconds is not None: timebase = args.milliseconds elif args.microseconds is not None: timebase = args.microseconds elif args.nanoseconds is not None: timebase = args.nanoseconds if args.uniform is not None: distribution = args.uniform elif args.bimodal is not None: distribution = args.bimodal elif args.exp_quart is not None: distribution = args.exp_quart elif args.exp_half is not None: distribution = args.exp_half else: distribution = Distribution.uniform print('Generating tasks…', file=sys.stderr) tasksets = [] while len(tasksets) < args.tasksetcount: taskset = [] while len(taskset) < args.cores + 1 or \ check_taskset(args.cores, taskset) <= 1: taskset.append(create_task(distribution)) if len(taskset) >= args.cores + 1: tasksets.append(taskset) now = datetime.datetime.now() cpp_base, embb_base = get_timebase_string(timebase) print('Writing tasks…', file=sys.stderr) for taskset in tasksets: task_out = [] for task in taskset: out = {} out['wcet'] = task.wcet out['period'] = task.period out['deadline'] = task.deadline task_out.append(out) utilization = str(round(check_taskset(args.cores, taskset), 2)) utilization.replace('.', '_') name = 'taskset_{}_{}_{}'.format(now.strftime('%Y_%m_%d'), len(taskset), utilization) out = {} out['name'] = name out['template'] = 'templates/normal/' out['includes'] = [ { 'name' : '' } ] out['cpp_time_base'] = cpp_base out['embb_time_base'] = embb_base out['base_clock'] = 'std::chrono::system_clock' out['data_description'] = [ { 'name' : 'task', 'fields' : [ { 'type' : 'int', 'name' : 'wcet' }, { 'type' : 'int', 'name' : 'period' }, { 'type' : 'int', 'name' : 'deadline' }, { 'type' : 'int', 'name' : 'count' } ] } ] out['data'] = [ { 'type' : 'task', 'name' : 'taskset', 'elem' : task_out } ] outname = os.path.join(args.target, name + '.json') with open(outname, 'w') as outfile: outfile.write(json.dumps(out)) if __name__ == '__main__': main()