#!/usr/bin/env python3 import os import sys import signal import datetime import threading import subprocess from flask import Flask, request, Response from flask_restful import Resource, Api from flask_jsonpify import jsonify app = Flask(__name__, static_folder='.') api = Api(app) schedule = [] runners = [] class ScheduledTest: def __init__(self, template, path): self.template = template self.path = path self.state = 'SCHEDULED' self.added = datetime.datetime.now() self.lock = threading.Lock() def to_dict(self): return { 'id': str(id(self)), 'template': self.template, 'state': self.state, 'path': self.path, 'added': self.added, } class Runner(threading.Thread): def __init__(self, template, program=None): if program is None: program = ['python3', './templates/%s/test' % template] self.template = template self.program = program self.process = None self.job = None self.stop_event = threading.Event() threading.Thread.__init__(self) self.name += "-%s" % template self.start() def to_dict(self): return { 'id': str(id(self)), 'template': self.template, 'program': ' '.join(self.program), 'job': str(id(self.job)) if self.job is not None else None } def stop(self): self.stop_event.set() def run(self): while not self.stop_event.is_set(): my_queue = [ s for s in schedule if s.state == 'SCHEDULED' and s.template == self.template ] my_queue.sort(key=lambda s: s.added) if len(my_queue) == 0: # No tasks for this thread, go to sleep self.stop_event.wait(timeout=5) continue job = my_queue[0] with job.lock: # Check if we were the first thread to choose this job if job.state == 'SCHEDULED': job.state = 'RUNNING' self.job = job else: # Some other thread is running this test continue cmd = [] cmd += self.program cmd += [self.job.path] print("Executing ``%s´´" % ' '.join(cmd)) out_path = os.path.join(self.job.path, 'test.stdout.log') err_path = os.path.join(self.job.path, 'test.stderr.log') with open(out_path, 'w') as out_fd, \ open(err_path, 'w') as err_fd: self.process = subprocess.Popen( cmd, stdout=out_fd, stderr=err_fd ) while self.process.poll() is None: if self.stop_event.wait(timeout=1): self.process.send_signal(signal.SIGINT) try: self.process.wait(timeout=1) except subprocess.TimeoutExpired: pass if self.process.returncode == 0: self.job.state = 'SUCCESSFUL' else: self.job.state = 'FAILED' self.process = None subprocess.check_call( ['zip', '-r', 'results.zip', '.'], cwd=self.job.path) print("Job %d has finished" % id(self.job)) self.job = None print("Thread %s has finished" % self.name) class Status(Resource): def get(self): print(request.data) return jsonify({ 'schedule': [t.to_dict() for t in schedule], 'runners': [r.to_dict() for r in runners] }) class RestartJob(Resource): def get(self, job_id): job = [job for job in schedule if str(id(job)) == job_id] job = job[0] if len(job) > 0 else None if job is None: return 'Job not found', 404 with job.lock: if job.state != 'RUNNING': job.state = 'SCHEDULED' return jsonify({'success': True}) else: return 'Job is already running', 400 class ScheduleJob(Resource): def post(self): if not request.is_json: return 'Please send me JSON', 400 data = request.get_json() print(data) if 'path' not in data: return 'path expected', 400 if 'template' not in data: return 'template expected', 400 schedule.append(ScheduledTest(data['template'], data['path'])) result = {'success': True} return jsonify(result) api.add_resource(Status, '/status') api.add_resource(ScheduleJob, '/schedule_test') api.add_resource(RestartJob, '/restart_test/') @app.route('/') def root(): return app.send_static_file('index.html') @app.route('/view_log//') def view_log(job_id, log_id): job = [job for job in schedule if str(id(job)) == job_id] job = job[0] if len(job) > 0 else None if job is None: return 'Job not found', 404 log_name = [ 'make.stdout.log', 'make.stderr.log', 'test.stdout.log', 'test.stderr.log', ][log_id] log_path = os.path.join(job.path, log_name) if not os.path.isfile(log_path): return 'Log not found', 404 with open(log_path, 'r') as f: return Response(f.read(), mimetype='text/plain') @app.route('/results//results.zip') def get_results_zip(job_id): job = next(filter(lambda job: str(id(job)) == job_id, schedule), None) if job is None: return 'Job not found', 404 zip_path = os.path.join(job.path, 'results.zip') with open(zip_path, 'rb') as zip: return Response(zip.read(), mimetype='application/zip') if __name__ == '__main__': runners.append(Runner('maixduino')) runners.append(Runner('f7')) runners.append(Runner('uno')) runners.append(Runner('esp32')) runners.append(Runner('bluepill')) def signal_handler(signal, frame): print("Process interrupted!", file=sys.stderr) for r in runners: print("Stopping runner %s" % r.name, file=sys.stderr) r.stop() r.join() print("Runner %s stopped" % r.name, file=sys.stderr) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) app.run(port='5002')