#!/usr/bin/env python3 import os import sys import time import struct import serial import subprocess def eprint(*args, **kargs): print(*args, file=sys.stderr, **kargs) def flash(tty=None): pipe = subprocess.PIPE cmd = ['platformio', 'run', '-e', 'uno', '--target', 'upload'] if tty is not None: cmd.extend(['--upload-port', tty]) p = subprocess.Popen(cmd, stdout=sys.stderr, stdin=pipe) stdout, stderr = p.communicate("") def get_serial(): import serial.tools.list_ports ports = serial.tools.list_ports.comports() devices = [ p.device for p in ports ] devices.sort() return devices[-1] class UARTP: def __init__(self, ser): UARTP.SYN = 0xf9 UARTP.FIN = 0xf3 self.ser = ser def uart_read(self): r = self.ser.read(1) if len(r) != 1: raise Exception("Serial read error") return r[0] def uart_write(self, c): b = struct.pack("B", c) r = self.ser.write(b) if r != len(b): raise Exception("Serial write error") return r def send(self, buf): self.uart_write(UARTP.SYN) len_ind_0 = 0xff & len(buf) len_ind_1 = 0xff & (len(buf) >> 7) if len(buf) < 128: self.uart_write(len_ind_0) else: self.uart_write(len_ind_0 | 0x80) self.uart_write(len_ind_1) fcs = 0 for i in range(len(buf)): info = buf[i] fcs = (fcs + info) & 0xff self.uart_write(buf[i]) fcs = (0xff - fcs) & 0xff self.uart_write(fcs) self.uart_write(UARTP.FIN) eprint("sent frame '%s'" % buf.hex()) def recv(self): tag_old = UARTP.FIN while 1: tag = tag_old while 1: if tag_old == UARTP.FIN: if tag == UARTP.SYN: break tag_old = tag tag = self.uart_read() tag_old = tag l = self.uart_read() if l & 0x80: l &= 0x7f l |= self.uart_read() << 7 fcs = 0 buf = [] for i in range(l): info = self.uart_read() buf.append(info) fcs = (fcs + info) & 0xff fcs = (fcs + self.uart_read()) & 0xff tag = self.uart_read() if fcs == 0xff: if tag == UARTP.FIN: buf = bytes(buf) eprint("rcvd frame '%s'" % buf.hex()) if len(buf) >= 1 and buf[0] == 0xde: sys.stderr.buffer.write(buf[1:]) sys.stderr.flush() else: return buf def main(argv): eprint(argv[0]) script_dir = os.path.split(argv[0])[0] if len(script_dir) > 0: os.chdir(script_dir) dev = get_serial() flash(dev) eprint("Flashed") time.sleep(1) ser = serial.Serial(dev, baudrate=115200, timeout=5) uartp = UARTP(ser) def stdin_read(n): b = sys.stdin.buffer.read(n) if len(b) != n: sys.exit(1) return b def stdin_readvar(): l = stdin_read(4) (l, ) = struct.unpack("