test 4.89 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3

import os
import sys
5
import time
6 7
import struct
import serial
8 9
import subprocess

10 11


12 13 14
def eprint(*args, **kargs):
    print(*args, file=sys.stderr, **kargs)

15

16
def flash(tty=None):
17
    pipe = subprocess.PIPE
18 19 20 21
    cmd = ['platformio', 'run', '-e', 'uno', '--target', 'upload']
    if tty is not None:
        cmd.extend(['--upload-port', tty])
    p = subprocess.Popen(cmd,
22 23 24
            stdout=sys.stderr, stdin=pipe)
    stdout, stderr = p.communicate("") 

25 26 27 28 29 30 31 32
def get_serial():
    import serial.tools.list_ports
    ports = serial.tools.list_ports.comports()
    devices = [ p.device for p in ports ]
    devices.sort()
    return devices[-1]


33 34 35 36 37
class UARTP:
    def __init__(self, ser):
        UARTP.SYN = 0xf9
        UARTP.FIN = 0xf3
        self.ser = ser
38

39 40
    def uart_read(self):
        r = self.ser.read(1)
41 42 43 44
        if len(r) != 1:
            raise Exception("Serial read error")
        return r[0]

45
    def uart_write(self, c):
46
        b = struct.pack("B", c)
47
        r = self.ser.write(b)
48 49 50 51
        if r != len(b):
            raise Exception("Serial write error")
        return r

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
    def send(self, buf):
        self.uart_write(UARTP.SYN)
        len_ind_0 = 0xff & len(buf)
        len_ind_1 = 0xff & (len(buf) >> 7)
        if len(buf) < 128:
            self.uart_write(len_ind_0)
        else:
            self.uart_write(len_ind_0 | 0x80)
            self.uart_write(len_ind_1)
        fcs = 0
        for i in range(len(buf)):
            info = buf[i]
            fcs = (fcs + info) & 0xff
            self.uart_write(buf[i])
        fcs = (0xff - fcs) & 0xff
        self.uart_write(fcs)
        self.uart_write(UARTP.FIN)
        eprint("sent frame '%s'" % buf.hex())

    def recv(self):
        tag_old = UARTP.FIN
73
        while 1:
74 75 76 77 78
            tag = tag_old
            while 1:
                if tag_old == UARTP.FIN:
                    if tag == UARTP.SYN:
                        break
79
                tag_old = tag
80
                tag = self.uart_read()
81 82
            tag_old = tag
            
83
            l = self.uart_read()
84 85
            if l & 0x80:
                l &= 0x7f
86
                l |= self.uart_read() << 7
87 88 89 90

            fcs = 0
            buf = []
            for i in range(l):
91
                info = self.uart_read()
92 93
                buf.append(info)
                fcs = (fcs + info) & 0xff
94
            fcs = (fcs + self.uart_read()) & 0xff
95

96
            tag = self.uart_read()
97
            if fcs == 0xff:
98
                if tag == UARTP.FIN:
99
                    buf = bytes(buf)
100
                    eprint("rcvd frame '%s'" % buf.hex())
101 102 103 104 105 106
                    if len(buf) >= 1 and buf[0] == 0xde:
                        sys.stderr.buffer.write(buf[1:])
                        sys.stderr.flush()
                    else:
                        return buf

107 108 109 110 111 112 113
def main(argv):
    eprint(argv[0])
    script_dir = os.path.split(argv[0])[0]
    if len(script_dir) > 0:
        os.chdir(script_dir)

    dev = get_serial()
114 115 116 117
    flash(dev)
    eprint("Flashed")
    time.sleep(1)

118 119
    ser = serial.Serial(dev, baudrate=115200, timeout=5)
    uartp = UARTP(ser)
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147

    def stdin_read(n):
        b = sys.stdin.buffer.read(n)
        if len(b) != n:
            sys.exit(1)
        return b

    def stdin_readvar():
        l = stdin_read(4)
        (l, ) = struct.unpack("<I", l)
        v = stdin_read(l)
        return v

    exp_hello = b"Hello, World!"
    hello = ser.read(len(exp_hello))
    if hello != exp_hello:
        eprint("Improper board initialization message: ")
        return 1
    eprint("Board initialized properly")
    sys.stdout.write("Hello, World!\n")
    sys.stdout.flush()

    while 1:
        action = stdin_read(1)[0]
        eprint("Command %c from stdin" % action)

        if action in b"ackmps":
            v = stdin_readvar()
148 149
            uartp.send(struct.pack("B", action) + v)
            ack = uartp.recv()
150 151 152 153 154 155
            if len(ack) != 1 or ack[0] != action:
                raise Exception("Unacknowledged variable transfer")
            eprint("Var %c successfully sent to board" % action)

        elif action in b"ACKMPS":
            c = struct.pack("B", action)
156 157
            uartp.send(c)
            v = uartp.recv()
158 159 160 161 162 163 164 165 166 167
            if len(v) < 1 or v[0] != action:
                raise Exception("Could not obtain variable from board")
            v = v[1:]
            eprint("Var %c received from board: %s" % (action, v.hex()))
            l = struct.pack("<I", len(v))
            sys.stdout.buffer.write(l + v)
            sys.stdout.flush()

        elif action in b"ed":
            c = struct.pack("B", action)
168 169
            uartp.send(c)
            ack = uartp.recv()
170 171 172 173 174 175 176 177 178
            if len(ack) != 1 or ack[0] != action:
                raise Exception("Unacknowledged variable transfer")
            eprint("Operation %c completed successfully" % action)

        else:
            raise Exception("Unknown action %c" % action)
        
    
    return 0
179 180 181 182


if __name__ == "__main__":
    sys.exit(main(sys.argv))