#!/usr/bin/env python3 import os import datetime import threading import subprocess from flask import Flask, request from flask_restful import Resource, Api from flask_jsonpify import jsonify app = Flask(__name__, static_folder='.') api = Api(app) schedule = [] runners = [] class ScheduledTest: def __init__(self, template, path): self.template = template self.path = path self.state = 'SCHEDULED' self.added = datetime.datetime.now() self.lock = threading.Lock() def to_dict(self): return { 'id': str(id(self)), 'template': self.template, 'state': self.state, 'path': self.path, 'added': self.added, } class Runner(threading.Thread): def __init__(self, template, program=None): if program is None: program = ['python3', './templates/%s/test' % template] self.template = template self.program = program self.process = None self.job = None self.event = threading.Event() threading.Thread.__init__(self) self.start() def to_dict(self): return { 'id': str(id(self)), 'template': self.template, 'program': ' '.join(self.program), 'job': str(id(self.job)) if self.job is not None else None } def run(self): while 1: self.event.clear() my_queue = [ s for s in schedule if s.state == 'SCHEDULED' and s.template == self.template ] my_queue.sort(key=lambda s: s.added) if len(my_queue) == 0: # No tasks for this thread, go to sleep self.event.wait(timeout=5) continue job = my_queue[0] with job.lock: # Check if we were the first thread to choose this job if job.state == 'SCHEDULED': job.state = 'RUNNING' self.job = job else: # Some other thread is running this test continue cmd = [] cmd += self.program cmd += [self.job.path] print("Executing ``%s´´" % ' '.join(cmd)) out_fd = open(os.path.join(self.job.path, 'test.stdout.log'), 'w') err_fd = open(os.path.join(self.job.path, 'test.stderr.log'), 'w') self.process = subprocess.Popen( cmd, stdout=out_fd, stderr=err_fd ) self.process.wait() if self.process.returncode == 0: self.job.state = 'SUCCESSFUL' else: self.job.state = 'FAILED' self.process = None self.job = None class Status(Resource): def get(self): print(request.data) return jsonify({ 'schedule': [t.to_dict() for t in schedule], 'runners': [r.to_dict() for r in runners] }) class RestartJob(Resource): def get(self, job_id): job = [job for job in schedule if str(id(job)) == job_id] job = job[0] if len(job) > 0 else None if job is None: return 'Job not found', 404 with job.lock: if job.state != 'RUNNING': job.state = 'SCHEDULED' return jsonify({'success': True}) else: return 'Job is already running', 400 class ScheduleJob(Resource): def post(self): if not request.is_json: return 'Please send me JSON', 400 data = request.get_json() print(data) if 'path' not in data: return 'path expected', 400 if 'template' not in data: return 'template expected', 400 schedule.append(ScheduledTest(data['template'], data['path'])) result = {'success': True} return jsonify(result) api.add_resource(Status, '/status') api.add_resource(ScheduleJob, '/schedule_test') api.add_resource(RestartJob, '/restart_test/') @app.route('/') def root(): return app.send_static_file('index.html') @app.route('/view_log//') def view_log(job_id, log_id): job = [job for job in schedule if str(id(job)) == job_id] job = job[0] if len(job) > 0 else None if job is None: return 'Job not found', 404 log_name = [ 'make.stdout.log', 'make.stderr.log', 'test.stdout.log', 'test.stderr.log', ][log_id] log_path = os.path.join(job.path, log_name) if not os.path.isfile(log_path): return 'Log not found', 404 with open(log_path, 'r') as f: return f.read() if __name__ == '__main__': runners.append(Runner('maixduino')) runners.append(Runner('f7')) runners.append(Runner('uno')) runners.append(Runner('esp32')) runners.append(Runner('bluepill')) app.run(port='5002')