#!/usr/bin/env python3 import os import sys import time import signal import datetime import threading import subprocess from flask import Flask, request, Response from flask_restful import Resource, Api from flask_jsonpify import jsonify from test_common import pack_results app = Flask(__name__, static_folder='.') api = Api(app) schedule = [] runners = [] class ScheduledTest: __slots__ = [ 'id', 'state', 'added', 'lock', 'time_started', 'path', 'template', 'template_commit', 'template_timestamp', 'cipher', 'cipher_timestamp' ] _unserialized_slots = [ 'lock', ] _next_id = 1 def __init__(self, **kwargs): self.path = kwargs['build_dir'] self.cipher = kwargs['cipher'] self.cipher_timestamp = kwargs['cipher_timestamp'] self.template_timestamp = kwargs['template_timestamp'] self.template = kwargs['template'] self.template_commit = kwargs['template_commit'] self.id = str(ScheduledTest._next_id) ScheduledTest._next_id += 1 self.state = 'SCHEDULED' self.added = datetime.datetime.now() self.time_started = None self.lock = threading.Lock() def to_dict(self): res = {} for k in ScheduledTest.__slots__: if k not in ScheduledTest._unserialized_slots: res[k] = getattr(self, k) return res class Runner(threading.Thread): _next_id = 1 def __init__(self, template, platform, program=None): if program is None: program = ['python3', './templates/%s/test' % template] self.id = str(Runner._next_id) Runner._next_id += 1 self.template = template self.platform = platform self.program = program self.process = None self.job = None self.stop_event = threading.Event() threading.Thread.__init__(self) self.name += "-%s" % template self.start() def to_dict(self): return { 'id': self.id, 'template': self.template, 'program': ' '.join(self.program), 'job': self.job.id if self.job is not None else None } def stop(self): self.stop_event.set() def lock_new_job(self): my_queue = [ s for s in schedule if s.state == 'SCHEDULED' and s.template == self.template ] my_queue.sort(key=lambda s: s.added) if len(my_queue) == 0: # print("Runner %s has no jobs in queue" % self.template) return None job = my_queue[0] with job.lock: # Check if we were the first thread to choose this job if job.state == 'SCHEDULED': job.state = 'RUNNING' self.job = job return job else: # Some other thread is running this test # print("Runner %s could not lock a job" % self.template) return None def do_job(self): self.job.time_started = int(time.time()) cmd = [] cmd += self.program cmd += [self.job.path] print("Executing ``%s´´" % ' '.join(cmd)) out_path = os.path.join(self.job.path, 'test.stdout.log') err_path = os.path.join(self.job.path, 'test.stderr.log') with open(out_path, 'w') as out_fd, \ open(err_path, 'w') as err_fd: self.process = subprocess.Popen( cmd, stdout=out_fd, stderr=err_fd ) while self.process.poll() is None: if self.stop_event.wait(timeout=1): self.process.send_signal(signal.SIGINT) try: self.process.wait(timeout=1) except subprocess.TimeoutExpired: pass if self.process.returncode == 0: self.job.state = 'SUCCESSFUL' else: self.job.state = 'FAILED' self.process = None pack_results( self.job, self.platform) def run(self): while not self.stop_event.is_set(): self.lock_new_job() if self.job is None: # No tasks for this thread, go to sleep self.stop_event.wait(timeout=5) continue try: self.do_job() except Exception as ex: print(ex) print("Job %s has finished" % self.job.id) self.job = None print("Thread %s has finished" % self.name) class Status(Resource): def get(self): print(request.data) return jsonify({ 'schedule': [t.to_dict() for t in schedule], 'runners': [r.to_dict() for r in runners] }) class RestartJob(Resource): def get(self, job_id): job = [job for job in schedule if job.id == job_id] job = job[0] if len(job) > 0 else None if job is None: return 'Job not found', 404 with job.lock: if job.state != 'RUNNING': job.state = 'SCHEDULED' return jsonify({'success': True}) else: return 'Job is already running', 400 class ScheduleJob(Resource): def post(self): if not request.is_json: return 'Please send me JSON', 400 data = request.get_json() print(data) mandatory_fields = [ 'build_dir', 'template', 'template_commit', 'template_timestamp', 'cipher', 'cipher_timestamp' ] for k in mandatory_fields: if k not in data: return 'field "%s" expected' % k, 400 schedule.append(ScheduledTest(**data)) result = {'success': True} return jsonify(result) api.add_resource(Status, '/status') api.add_resource(ScheduleJob, '/schedule_test') api.add_resource(RestartJob, '/restart_test/') @app.route('/') def root(): return app.send_static_file('index.html') @app.route('/view_log//') def view_log(job_id, log_id): job = [job for job in schedule if job.id == job_id] job = job[0] if len(job) > 0 else None if job is None: return 'Job not found', 404 log_name = [ 'make.stdout.log', 'make.stderr.log', 'test.stdout.log', 'test.stderr.log', ][log_id] log_path = os.path.join(job.path, log_name) if not os.path.isfile(log_path): return 'Log not found', 404 with open(log_path, 'r') as f: return Response(f.read(), mimetype='text/plain') @app.route('/results//results.zip') def get_results_zip(job_id): job = next(filter(lambda job: job.id == job_id, schedule), None) if job is None: return 'Job not found', 404 zip_path = os.path.join(job.path, 'results.zip') if not os.path.isfile(zip_path): return 'File not found', 404 with open(zip_path, 'rb') as zip: return Response(zip.read(), mimetype='application/zip') @app.route('/results//results.json') def get_results_json(job_id): job = next(filter(lambda job: job.id == job_id, schedule), None) if job is None: return 'Job not found', 404 path = os.path.join(job.path, 'results.json') if not os.path.isfile(path): return 'File not found', 404 with open(path, 'rb') as zip: return Response(zip.read(), mimetype='application/json') if __name__ == '__main__': runners.append(Runner('maixduino', 'Maixduino')) runners.append(Runner('f7', 'NUCLEO-F746ZG')) runners.append(Runner('uno', 'Arduino Uno')) runners.append(Runner('esp32', 'ESP32')) runners.append(Runner('bluepill', 'BluePill')) def signal_handler(signal, frame): print("Process interrupted!", file=sys.stderr) for r in runners: print("Stopping runner %s" % r.name, file=sys.stderr) r.stop() r.join() print("Runner %s stopped" % r.name, file=sys.stderr) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) app.run(port='5002')