test-scheduler.py 10.4 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3


import os
5
import sys
6
import time
7
import signal
8 9 10
import datetime
import threading
import subprocess
Enrico Pozzobon committed
11
import json
12
from flask import Flask, request, Response
13 14
from flask_restful import Resource, Api
from flask_jsonpify import jsonify
15
from test_common import pack_results
16 17 18 19 20 21 22 23 24 25 26


app = Flask(__name__, static_folder='.')
api = Api(app)


schedule = []
runners = []


class ScheduledTest:
27 28 29 30 31 32 33 34 35 36 37
    __slots__ = [
        'id', 'state', 'added', 'lock', 'time_started',
        'path', 'template', 'template_commit', 'template_timestamp',
        'cipher', 'cipher_timestamp'
    ]
    _unserialized_slots = [
        'lock',
    ]
    _next_id = 1

    def __init__(self, **kwargs):
Enrico Pozzobon committed
38 39 40 41 42 43 44
        if len(kwargs) > 0:
            self.path = kwargs['build_dir']
            self.cipher = kwargs['cipher']
            self.cipher_timestamp = kwargs['cipher_timestamp']
            self.template_timestamp = kwargs['template_timestamp']
            self.template = kwargs['template']
            self.template_commit = kwargs['template_commit']
45 46 47 48

        self.id = str(ScheduledTest._next_id)
        ScheduledTest._next_id += 1

49 50
        self.state = 'SCHEDULED'
        self.added = datetime.datetime.now()
51
        self.time_started = None
52 53 54
        self.lock = threading.Lock()

    def to_dict(self):
55 56 57 58 59
        res = {}
        for k in ScheduledTest.__slots__:
            if k not in ScheduledTest._unserialized_slots:
                res[k] = getattr(self, k)
        return res
60

61 62 63 64
    def from_dict(dict):
        a = ScheduledTest()
        for k in ScheduledTest.__slots__:
            if k not in ScheduledTest._unserialized_slots:
Enrico Pozzobon committed
65 66
                if k != 'id':
                    setattr(a, k, dict[k])
67 68
        if a.state == 'RUNNING':
            a.state = 'SCHEDULED'
69 70
        return a

71 72

class Runner(threading.Thread):
73 74 75
    _next_id = 1

    def __init__(self, template, platform, program=None):
76
        if program is None:
Enrico Pozzobon committed
77
            program = ['python3', './templates/%s/test.py' % template]
78

79 80
        self.id = str(Runner._next_id)
        Runner._next_id += 1
81
        self.template = template
82
        self.platform = platform
83 84 85
        self.program = program
        self.process = None
        self.job = None
86 87

        # Event used for stopping the runner
88 89
        self.stop_event = threading.Event()

90 91 92
        # Event used to abort only the current job
        self.abort_event = threading.Event()

93
        threading.Thread.__init__(self)
94
        self.name += "-%s" % template
95 96 97 98
        self.start()

    def to_dict(self):
        return {
99
            'id': self.id,
100 101
            'template': self.template,
            'program': ' '.join(self.program),
102
            'job': self.job.id if self.job is not None else None
103 104
        }

105
    def stop(self):
106
        self.abort_event.set()
107 108
        self.stop_event.set()

109 110 111
    def abort_current_job(self):
        self.abort_event.set()

112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
    def lock_new_job(self):
        my_queue = [
            s for s in schedule
            if s.state == 'SCHEDULED'
            and s.template == self.template
        ]
        my_queue.sort(key=lambda s: s.added)
        if len(my_queue) == 0:
            # print("Runner %s has no jobs in queue" % self.template)
            return None

        job = my_queue[0]

        with job.lock:
            # Check if we were the first thread to choose this job
            if job.state == 'SCHEDULED':
                job.state = 'RUNNING'
                self.job = job
                return job
            else:
                # Some other thread is running this test
                # print("Runner %s could not lock a job" % self.template)
                return None

    def do_job(self):
        self.job.time_started = int(time.time())
138
        self.abort_event.clear()
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153

        cmd = []
        cmd += self.program
        cmd += [self.job.path]
        print("Executing ``%s´´" % ' '.join(cmd))
        out_path = os.path.join(self.job.path, 'test.stdout.log')
        err_path = os.path.join(self.job.path, 'test.stderr.log')
        with open(out_path, 'w') as out_fd, \
                open(err_path, 'w') as err_fd:
            self.process = subprocess.Popen(
                cmd,
                stdout=out_fd,
                stderr=err_fd
            )
            while self.process.poll() is None:
154
                if self.abort_event.wait(timeout=1):
155 156 157 158 159 160
                    self.process.send_signal(signal.SIGINT)
                    try:
                        self.process.wait(timeout=1)
                    except subprocess.TimeoutExpired:
                        pass

161
        returncode = self.process.returncode
162 163
        self.process = None

164 165 166 167
        if returncode != 0:
            raise Exception(
                "Test has failed with return code", returncode)

168 169 170 171
        pack_results(
            self.job,
            self.platform)

172
    def run(self):
173
        while not self.stop_event.is_set():
174 175
            self.lock_new_job()
            if self.job is None:
176
                # No tasks for this thread, go to sleep
177
                self.stop_event.wait(timeout=5)
178 179
                continue

180 181
            try:
                self.do_job()
182
                self.job.state = 'SUCCESSFUL'
183
            except Exception as ex:
184
                self.job.state = 'FAILED'
185
                print(ex)
186

187
            print("Job %s has finished" % self.job.id)
188
            self.job = None
189
        print("Thread %s has finished" % self.name)
190 191 192 193 194 195 196 197 198 199 200


class Status(Resource):
    def get(self):
        print(request.data)
        return jsonify({
            'schedule': [t.to_dict() for t in schedule],
            'runners': [r.to_dict() for r in runners]
        })


201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
class DeleteJob(Resource):
    def get(self, job_id):
        job = [job for job in schedule if job.id == job_id]
        job = job[0] if len(job) > 0 else None
        if job is None:
            return 'Job not found', 404

        with job.lock:
            if job.state != 'RUNNING':
                schedule.remove(job)
                return jsonify({'success': True})
            else:
                return 'Job is already running', 400


216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
class CancelJob(Resource):
    def get(self, job_id):
        job = [job for job in schedule if job.id == job_id]
        job = job[0] if len(job) > 0 else None
        if job is None:
            return 'Job not found', 404

        with job.lock:
            if job.state == 'RUNNING':
                r = [r for r in runners if r if r.job == job]
                if len(r) == 0:
                    return 'Job runner not found', 404
                process = r[0].process
                if process is None:
                    return 'Job runner process not found', 404
                process.send_signal(signal.SIGINT)
                return jsonify({'success': True})
            else:
                return 'Job is not running', 400


237 238
class RestartJob(Resource):
    def get(self, job_id):
239
        job = [job for job in schedule if job.id == job_id]
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
        job = job[0] if len(job) > 0 else None
        if job is None:
            return 'Job not found', 404

        with job.lock:
            if job.state != 'RUNNING':
                job.state = 'SCHEDULED'
                return jsonify({'success': True})
            else:
                return 'Job is already running', 400


class ScheduleJob(Resource):
    def post(self):
        if not request.is_json:
            return 'Please send me JSON', 400
        data = request.get_json()
        print(data)
258 259 260 261 262 263 264
        mandatory_fields = [
            'build_dir', 'template', 'template_commit', 'template_timestamp',
            'cipher', 'cipher_timestamp'
        ]
        for k in mandatory_fields:
            if k not in data:
                return 'field "%s" expected' % k, 400
265

266
        schedule.append(ScheduledTest(**data))
267 268 269 270 271 272 273

        result = {'success': True}
        return jsonify(result)


api.add_resource(Status, '/status')
api.add_resource(ScheduleJob, '/schedule_test')
274
api.add_resource(CancelJob, '/cancel_test/<string:job_id>')
275
api.add_resource(RestartJob, '/restart_test/<string:job_id>')
276
api.add_resource(DeleteJob, '/delete_test/<string:job_id>')
277 278 279 280 281 282 283 284 285


@app.route('/')
def root():
    return app.send_static_file('index.html')


@app.route('/view_log/<string:job_id>/<int:log_id>')
def view_log(job_id, log_id):
286
    job = [job for job in schedule if job.id == job_id]
287 288 289 290 291 292 293 294 295 296 297 298
    job = job[0] if len(job) > 0 else None
    if job is None:
        return 'Job not found', 404

    log_name = [
        'make.stdout.log', 'make.stderr.log',
        'test.stdout.log', 'test.stderr.log',
    ][log_id]
    log_path = os.path.join(job.path, log_name)
    if not os.path.isfile(log_path):
        return 'Log not found', 404
    with open(log_path, 'r') as f:
299 300 301 302 303
        return Response(f.read(), mimetype='text/plain')


@app.route('/results/<string:job_id>/results.zip')
def get_results_zip(job_id):
304
    job = next(filter(lambda job: job.id == job_id, schedule), None)
305 306 307
    if job is None:
        return 'Job not found', 404
    zip_path = os.path.join(job.path, 'results.zip')
308 309
    if not os.path.isfile(zip_path):
        return 'File not found', 404
310 311
    with open(zip_path, 'rb') as zip:
        return Response(zip.read(), mimetype='application/zip')
312 313


314 315 316 317 318 319 320 321 322 323 324 325
@app.route('/results/<string:job_id>/results.json')
def get_results_json(job_id):
    job = next(filter(lambda job: job.id == job_id, schedule), None)
    if job is None:
        return 'Job not found', 404
    path = os.path.join(job.path, 'results.json')
    if not os.path.isfile(path):
        return 'File not found', 404
    with open(path, 'rb') as zip:
        return Response(zip.read(), mimetype='application/json')


326
if __name__ == '__main__':
327 328 329 330 331
    runners.append(Runner('maixduino', 'Maixduino'))
    runners.append(Runner('f7', 'NUCLEO-F746ZG'))
    runners.append(Runner('uno', 'Arduino Uno'))
    runners.append(Runner('esp32', 'ESP32'))
    runners.append(Runner('bluepill', 'BluePill'))
332
    runners.append(Runner('f1-libopencm3', 'STM32F1 libopencm3'))
333 334 335 336 337 338 339 340 341 342 343

    def signal_handler(signal, frame):
        print("Process interrupted!", file=sys.stderr)
        for r in runners:
            print("Stopping runner %s" % r.name, file=sys.stderr)
            r.stop()
            r.join()
            print("Runner %s stopped" % r.name, file=sys.stderr)
        sys.exit(0)
    signal.signal(signal.SIGINT, signal_handler)

Enrico Pozzobon committed
344 345 346 347 348 349
    if len(sys.argv) > 1:
        with open(sys.argv[1], 'r') as fin:
            saved = json.load(fin)
            for job in saved['schedule']:
                schedule.append(ScheduledTest.from_dict(job))

350
    app.run(port='5002')